Skip to Content
ExamplesRAGUsageStructured Reasoning with Workflows

Structured Reasoning with Workflows ✅

This example demonstrates how to implement a Retrieval-Augmented Generation (RAG) system using Kastrax, OpenAI embeddings, and PGVector for vector storage, with an emphasis on structured reasoning through a defined workflow.

Overview ✅

The system implements RAG using Kastrax and OpenAI with chain-of-thought prompting through a defined workflow. Here’s what it does:

  1. Sets up a Kastrax agent with gpt-4o-mini for response generation
  2. Creates a vector query tool to manage vector store interactions
  3. Defines a workflow with multiple steps for chain-of-thought reasoning
  4. Processes and chunks text documents
  5. Creates and stores embeddings in PostgreSQL
  6. Generates responses through the workflow steps

Setup ✅

Environment Setup

Make sure to set up your environment variables:

.env
OPENAI_API_KEY=your_openai_api_key_here POSTGRES_CONNECTION_STRING=your_connection_string_here

Dependencies

Import the necessary dependencies:

index.ts
import { openai } from "@ai-sdk/openai"; import { Kastrax } from "@kastrax/core"; import { Agent } from "@kastrax/core/agent"; import { Step, Workflow } from "@kastrax/core/workflows"; import { PgVector } from "@kastrax/pg"; import { createVectorQueryTool, MDocument } from "@kastrax/rag"; import { embedMany } from "ai"; import { z } from "zod";

Workflow Definition ✅

First, define the workflow with its trigger schema:

index.ts
export const ragWorkflow = new Workflow({ name: "rag-workflow", triggerSchema: z.object({ query: z.string(), }), });

Vector Query Tool Creation ✅

Create a tool for querying the vector database:

index.ts
const vectorQueryTool = createVectorQueryTool({ vectorStoreName: "pgVector", indexName: "embeddings", model: openai.embedding('text-embedding-3-small'), });

Agent Configuration ✅

Set up the Kastrax agent:

index.ts
export const ragAgent = new Agent({ name: "RAG Agent", instructions: `You are a helpful assistant that answers questions based on the provided context.`, model: openai("gpt-4o-mini"), tools: { vectorQueryTool, }, });

Workflow Steps ✅

The workflow is divided into multiple steps for chain-of-thought reasoning:

1. Context Analysis Step

index.ts
const analyzeContext = new Step({ id: "analyzeContext", outputSchema: z.object({ initialAnalysis: z.string(), }), execute: async ({ context, kastrax }) => { console.log("---------------------------"); const ragAgent = kastrax?.getAgent('ragAgent'); const query = context?.getStepResult<{ query: string }>( "trigger", )?.query; const analysisPrompt = `${query} 1. First, carefully analyze the retrieved context chunks and identify key information.`; const analysis = await ragAgent?.generate(analysisPrompt); console.log(analysis?.text); return { initialAnalysis: analysis?.text ?? "", }; }, });

2. Thought Breakdown Step

index.ts
const breakdownThoughts = new Step({ id: "breakdownThoughts", outputSchema: z.object({ breakdown: z.string(), }), execute: async ({ context, kastrax }) => { console.log("---------------------------"); const ragAgent = kastrax?.getAgent('ragAgent'); const analysis = context?.getStepResult<{ initialAnalysis: string; }>("analyzeContext")?.initialAnalysis; const connectionPrompt = ` Based on the initial analysis: ${analysis} 2. Break down your thinking process about how the retrieved information relates to the query. `; const connectionAnalysis = await ragAgent?.generate(connectionPrompt); console.log(connectionAnalysis?.text); return { breakdown: connectionAnalysis?.text ?? "", }; }, });

3. Connection Step

index.ts
const connectPieces = new Step({ id: "connectPieces", outputSchema: z.object({ connections: z.string(), }), execute: async ({ context, kastrax }) => { console.log("---------------------------"); const ragAgent = kastrax?.getAgent('ragAgent'); const process = context?.getStepResult<{ breakdown: string; }>("breakdownThoughts")?.breakdown; const connectionPrompt = ` Based on the breakdown: ${process} 3. Explain how you're connecting different pieces from the retrieved chunks. `; const connections = await ragAgent?.generate(connectionPrompt); console.log(connections?.text); return { connections: connections?.text ?? "", }; }, });

4. Conclusion Step

index.ts
const drawConclusions = new Step({ id: "drawConclusions", outputSchema: z.object({ conclusions: z.string(), }), execute: async ({ context, kastrax }) => { console.log("---------------------------"); const ragAgent = kastrax?.getAgent('ragAgent'); const evidence = context?.getStepResult<{ connections: string; }>("connectPieces")?.connections; const conclusionPrompt = ` Based on the connections: ${evidence} 4. Draw conclusions based only on the evidence in the retrieved context. `; const conclusions = await ragAgent?.generate(conclusionPrompt); console.log(conclusions?.text); return { conclusions: conclusions?.text ?? "", }; }, });

5. Final Answer Step

index.ts
const finalAnswer = new Step({ id: "finalAnswer", outputSchema: z.object({ finalAnswer: z.string(), }), execute: async ({ context, kastrax }) => { console.log("---------------------------"); const ragAgent = kastrax?.getAgent('ragAgent'); const conclusions = context?.getStepResult<{ conclusions: string; }>("drawConclusions")?.conclusions; const answerPrompt = ` Based on the conclusions: ${conclusions} Format your response as: THOUGHT PROCESS: - Step 1: [Initial analysis of retrieved chunks] - Step 2: [Connections between chunks] - Step 3: [Reasoning based on chunks] FINAL ANSWER: [Your concise answer based on the retrieved context]`; const finalAnswer = await ragAgent?.generate(answerPrompt); console.log(finalAnswer?.text); return { finalAnswer: finalAnswer?.text ?? "", }; }, });

Workflow Configuration ✅

Connect all the steps in the workflow:

index.ts
ragWorkflow .step(analyzeContext) .then(breakdownThoughts) .then(connectPieces) .then(drawConclusions) .then(finalAnswer); ragWorkflow.commit();

Instantiate PgVector and Kastrax ✅

Instantiate PgVector and Kastrax with all components:

index.ts
const pgVector = new PgVector(process.env.POSTGRES_CONNECTION_STRING!); export const kastrax = new Kastrax({ agents: { ragAgent }, vectors: { pgVector }, workflows: { ragWorkflow }, });

Document Processing ✅

Process and chunks the document:

index.ts
const doc = MDocument.fromText(`The Impact of Climate Change on Global Agriculture...`); const chunks = await doc.chunk({ strategy: "recursive", size: 512, overlap: 50, separator: "\n", });

Embedding Creation and Storage ✅

Generate and store embeddings:

index.ts
const { embeddings } = await embedMany({ model: openai.embedding('text-embedding-3-small'), values: chunks.map(chunk => chunk.text), }); const vectorStore = kastrax.getVector("pgVector"); await vectorStore.createIndex({ indexName: "embeddings", dimension: 1536, }); await vectorStore.upsert({ indexName: "embeddings", vectors: embeddings, metadata: chunks?.map((chunk: any) => ({ text: chunk.text })), });

Workflow Execution ✅

Here’s how to execute the workflow with a query:

index.ts
const query = 'What are the main adaptation strategies for farmers?'; console.log('\nQuery:', query); const prompt = ` Please answer the following question: ${query} Please base your answer only on the context provided in the tool. If the context doesn't contain enough information to fully answer the question, please state that explicitly. `; const { runId, start } = ragWorkflow.createRun(); console.log('Run:', runId); const workflowResult = await start({ triggerData: { query: prompt, }, }); console.log('\nThought Process:'); console.log(workflowResult.results);





View Example on GitHub
Last updated on