Skip to Content
DocsWorkflowsControl Flow

Control Flow in Kastrax AI Workflows ✅

Kastrax’s workflow system provides sophisticated control flow capabilities that allow you to orchestrate complex AI agent operations. Whether you need to execute steps sequentially, run operations in parallel, implement conditional branching, or create loops, Kastrax offers a type-safe, declarative API for defining exactly how your workflow should behave.

This guide explains the various control flow patterns available in Kastrax workflows and how to implement them effectively.

Sequential Execution ✅

Sequential execution is the most basic control flow pattern, where steps are executed one after another in a defined order. This ensures that outputs from one step become inputs for the next step.

SequentialWorkflow.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with sequential steps val orderProcessingWorkflow = workflow { name = "order-processing" description = "Process customer orders sequentially" // First step: Fetch order data step(fetchOrderAgent) { id = "fetch-order" name = "Fetch Order Data" description = "Retrieve order information from the database" variables = mutableMapOf( "orderId" to variable("$.input.orderId") ) } // Second step: Validate order data (runs after fetch-order) step(validationAgent) { id = "validate-order" name = "Validate Order" description = "Validate order data for completeness and correctness" after("fetch-order") // Explicit dependency variables = mutableMapOf( "orderData" to variable("$.steps.fetch-order.output.data") ) } // Third step: Process payment (runs after validate-order) step(paymentAgent) { id = "process-payment" name = "Process Payment" description = "Process payment for the validated order" after("validate-order") // Explicit dependency variables = mutableMapOf( "orderData" to variable("$.steps.fetch-order.output.data"), "validationResult" to variable("$.steps.validate-order.output.result") ) } }

In this example, each step explicitly depends on the previous step using the after() function, creating a clear sequential flow.

Parallel Execution ✅

Parallel execution allows multiple steps to run simultaneously when they don’t depend on each other. This can significantly improve workflow performance for independent operations.

ParallelWorkflow.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with parallel steps val dataProcessingWorkflow = workflow { name = "data-processing" description = "Process data from multiple sources in parallel" // These steps will run in parallel since they don't have dependencies on each other step(userDataAgent) { id = "fetch-user-data" name = "Fetch User Data" description = "Retrieve user information" variables = mutableMapOf( "userId" to variable("$.input.userId") ) } step(productDataAgent) { id = "fetch-product-data" name = "Fetch Product Data" description = "Retrieve product information" variables = mutableMapOf( "productId" to variable("$.input.productId") ) } step(inventoryDataAgent) { id = "fetch-inventory-data" name = "Fetch Inventory Data" description = "Retrieve inventory information" variables = mutableMapOf( "productId" to variable("$.input.productId") ) } // This step will only run after all parallel steps have completed step(analysisAgent) { id = "analyze-data" name = "Analyze Combined Data" description = "Analyze data from all sources" after("fetch-user-data", "fetch-product-data", "fetch-inventory-data") // Multiple dependencies variables = mutableMapOf( "userData" to variable("$.steps.fetch-user-data.output.data"), "productData" to variable("$.steps.fetch-product-data.output.data"), "inventoryData" to variable("$.steps.fetch-inventory-data.output.data") ) } }

In this example, the first three steps run in parallel because they don’t have dependencies on each other. The final analysis step only runs after all three parallel steps have completed.

Branching and Conditional Paths ✅

Branching allows your workflow to take different paths based on conditions or results from previous steps. This is essential for implementing decision logic in your workflows.

BranchingWorkflow.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with branching paths val contentWorkflow = workflow { name = "content-workflow" description = "Create and process content with branching logic" // Initial content analysis step step(analysisAgent) { id = "analyze-content" name = "Content Analysis" description = "Analyze content quality and type" variables = mutableMapOf( "content" to variable("$.input.content") ) } // Conditional branching based on content quality conditionalStep { id = "quality-branch" name = "Quality Branching" description = "Branch based on content quality" after("analyze-content") // Define the condition to evaluate condition { context -> val quality = context.getVariable("$.steps.analyze-content.output.qualityScore") as? Double ?: 0.0 quality >= 8.0 // High quality if score is 8.0 or higher } // Steps to execute if condition is true (high quality) onTrue { step(publishAgent) { id = "publish-content" name = "Publish Content" description = "Publish the high-quality content" variables = mutableMapOf( "content" to variable("$.input.content") ) } } // Steps to execute if condition is false (low quality) onFalse { step(revisionAgent) { id = "revise-content" name = "Revise Content" description = "Improve the low-quality content" variables = mutableMapOf( "content" to variable("$.input.content"), "feedback" to variable("$.steps.analyze-content.output.feedback") ) // Continue to publishing after revision onComplete { step(publishAgent) { id = "publish-revised-content" name = "Publish Revised Content" description = "Publish the revised content" variables = mutableMapOf( "content" to variable("$.steps.revise-content.output.revisedContent") ) } } } } } }

In this example, the workflow branches based on the content quality score. High-quality content goes directly to publishing, while low-quality content goes through a revision step before publishing.

Merging Execution Paths ✅

Merging allows multiple execution paths to converge at a specific step. This is useful when you need to synchronize different branches of your workflow.

MergingWorkflow.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with merging paths val researchWorkflow = workflow { name = "research-workflow" description = "Research a topic from multiple sources and synthesize results" // These steps run in parallel step(academicResearchAgent) { id = "academic-research" name = "Academic Research" description = "Research from academic sources" variables = mutableMapOf( "topic" to variable("$.input.topic"), "depth" to variable("$.input.academicDepth") ) } step(newsResearchAgent) { id = "news-research" name = "News Research" description = "Research from news sources" variables = mutableMapOf( "topic" to variable("$.input.topic"), "timeframe" to variable("$.input.newsTimeframe") ) } step(socialMediaResearchAgent) { id = "social-research" name = "Social Media Research" description = "Research from social media" variables = mutableMapOf( "topic" to variable("$.input.topic"), "platforms" to variable("$.input.socialPlatforms") ) } // This step merges all research paths step(synthesisAgent) { id = "research-synthesis" name = "Research Synthesis" description = "Synthesize research from all sources" // This step depends on all three research steps after("academic-research", "news-research", "social-research") variables = mutableMapOf( "academicData" to variable("$.steps.academic-research.output.findings"), "newsData" to variable("$.steps.news-research.output.findings"), "socialData" to variable("$.steps.social-research.output.findings") ) } // Final report generation step(reportAgent) { id = "generate-report" name = "Generate Report" description = "Create a comprehensive report" after("research-synthesis") variables = mutableMapOf( "synthesizedData" to variable("$.steps.research-synthesis.output.synthesizedFindings"), "format" to variable("$.input.reportFormat") ) } }

In this example, three parallel research paths merge at the synthesis step, which only executes after all three research steps have completed. This ensures that all research data is available before synthesis begins.

Loops and Iterative Processing ✅

Kastrax supports iterative processing through loop constructs that allow steps to repeat until certain conditions are met. This is essential for tasks that require multiple iterations or refinement cycles.

Using Loop Steps

LoopWorkflow.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with a loop val contentRefinementWorkflow = workflow { name = "content-refinement" description = "Iteratively refine content until it meets quality standards" // Initial content generation step(contentGenerationAgent) { id = "generate-content" name = "Generate Initial Content" description = "Create the first draft of content" variables = mutableMapOf( "topic" to variable("$.input.topic"), "style" to variable("$.input.style") ) } // Initialize iteration counter step(initializationAgent) { id = "initialize-loop" name = "Initialize Loop Variables" description = "Set up variables for the refinement loop" after("generate-content") execute { context -> mapOf( "currentContent" to context.getVariable("$.steps.generate-content.output.content"), "iterationCount" to 0, "qualityScore" to 0.0 ) } } // Refinement loop loopStep { id = "refinement-loop" name = "Content Refinement Loop" description = "Iteratively improve content until quality threshold is reached" after("initialize-loop") // Continue looping while quality is below threshold and iterations are under limit condition { context -> val qualityScore = context.getVariable("$.steps.evaluate-content.output.qualityScore") as? Double ?: 0.0 val iterationCount = context.getVariable("$.steps.refine-content.output.iterationCount") as? Int ?: 0 qualityScore < 8.5 && iterationCount < 5 // Continue if quality < 8.5 and fewer than 5 iterations } // Loop body body { // Evaluate current content step(evaluationAgent) { id = "evaluate-content" name = "Evaluate Content Quality" description = "Assess the quality of the current content" variables = mutableMapOf( "content" to variable("$.steps.refine-content.output.currentContent", defaultValue = variable("$.steps.initialize-loop.output.currentContent")) ) } // Refine content based on evaluation step(refinementAgent) { id = "refine-content" name = "Refine Content" description = "Improve content based on evaluation feedback" after("evaluate-content") variables = mutableMapOf( "content" to variable("$.steps.refine-content.output.currentContent", defaultValue = variable("$.steps.initialize-loop.output.currentContent")), "feedback" to variable("$.steps.evaluate-content.output.feedback"), "qualityScore" to variable("$.steps.evaluate-content.output.qualityScore"), "iterationCount" to variable("$.steps.refine-content.output.iterationCount", defaultValue = 0) ) execute { context -> val content = context.getVariable("content") as String val feedback = context.getVariable("feedback") as String val iterationCount = (context.getVariable("iterationCount") as Int) + 1 val qualityScore = context.getVariable("qualityScore") as Double // In a real implementation, this would use an agent to refine the content // For this example, we're just simulating refinement val refinedContent = "$content\n\nRefined in iteration $iterationCount based on: $feedback" mapOf( "currentContent" to refinedContent, "iterationCount" to iterationCount, "qualityScore" to qualityScore ) } } } } // Final processing after loop completion step(finalizationAgent) { id = "finalize-content" name = "Finalize Content" description = "Prepare the refined content for publication" after("refinement-loop") variables = mutableMapOf( "content" to variable("$.steps.refine-content.output.currentContent"), "iterationCount" to variable("$.steps.refine-content.output.iterationCount"), "finalQuality" to variable("$.steps.evaluate-content.output.qualityScore") ) } }

In this example, the workflow uses a loop to iteratively refine content until it reaches a quality threshold or hits a maximum number of iterations. The loop body contains steps for evaluating and refining the content, with each iteration building on the results of the previous one.

Using Recursive Patterns

For more complex iterative processes, Kastrax also supports recursive patterns where steps can trigger themselves under certain conditions:

RecursiveWorkflow.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with recursive processing val recursiveWorkflow = workflow { name = "recursive-processing" description = "Process data recursively until completion" // Initial data processing step(processingAgent) { id = "process-data" name = "Process Data" description = "Process a batch of data" variables = mutableMapOf( "data" to variable("$.input.data"), "batchNumber" to variable("$.input.batchNumber", defaultValue = 1), "totalBatches" to variable("$.input.totalBatches") ) // Define what happens after this step completes onComplete { result -> val batchNumber = result["batchNumber"] as Int val totalBatches = result["totalBatches"] as Int // If there are more batches to process, trigger the next batch if (batchNumber < totalBatches) { step(processingAgent) { id = "process-data-${batchNumber + 1}" name = "Process Data Batch ${batchNumber + 1}" description = "Process the next batch of data" variables = mutableMapOf( "data" to variable("$.input.nextBatchData"), "batchNumber" to (batchNumber + 1), "totalBatches" to totalBatches ) } } else { // If all batches are processed, move to finalization step(finalizationAgent) { id = "finalize-processing" name = "Finalize Processing" description = "Combine and finalize all processed batches" variables = mutableMapOf( "processedBatches" to variable("$.steps", transform = { steps -> // Collect results from all processing steps (steps as Map<String, Any>).filter { it.key.startsWith("process-data") } .map { it.value } }) ) } } } } }

This recursive pattern allows for dynamic workflow generation based on the results of previous steps, enabling complex iterative processes that can’t be fully defined in advance.

Conditional Execution ✅

Kastrax provides several ways to conditionally execute steps based on the results of previous steps or external conditions. This allows for dynamic, adaptive workflows that can respond to changing circumstances.

Using Conditional Steps

The most direct way to implement conditional logic is using conditional steps:

ConditionalExecution.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with conditional execution val approvalWorkflow = workflow { name = "approval-workflow" description = "Process content with approval conditions" // Initial content creation step(contentAgent) { id = "create-content" name = "Create Content" description = "Generate initial content" variables = mutableMapOf( "topic" to variable("$.input.topic") ) } // Conditional step for content approval conditionalStep { id = "approval-check" name = "Approval Check" description = "Check if content requires approval" after("create-content") // Define the condition to evaluate condition { context -> val contentLength = context.getVariable("$.steps.create-content.output.content")?.toString()?.length ?: 0 val sensitiveTopics = context.getVariable("$.steps.create-content.output.sensitiveTopicsDetected") as? Boolean ?: false // Content requires approval if it's long or contains sensitive topics contentLength > 1000 || sensitiveTopics } // Steps to execute if approval is required onTrue { step(approvalAgent) { id = "request-approval" name = "Request Approval" description = "Send content for human approval" variables = mutableMapOf( "content" to variable("$.steps.create-content.output.content"), "approver" to variable("$.input.approverEmail") ) } } // Steps to execute if approval is not required onFalse { step(publishAgent) { id = "auto-publish" name = "Auto-Publish" description = "Automatically publish content without approval" variables = mutableMapOf( "content" to variable("$.steps.create-content.output.content") ) } } } }

Using Step Conditions

You can also add conditions directly to individual steps:

step(publishAgent) { id = "publish-content" name = "Publish Content" description = "Publish content if approved" after("request-approval") // Only execute this step if the approval was granted condition { context -> context.getVariable("$.steps.request-approval.output.approved") as? Boolean ?: false } variables = mutableMapOf( "content" to variable("$.steps.create-content.output.content"), "channel" to variable("$.input.publishChannel") ) }

Using Dynamic Step Generation

For more complex conditional logic, you can dynamically generate steps based on runtime conditions:

step(analysisAgent) { id = "analyze-data" name = "Analyze Data" description = "Analyze input data and determine next steps" variables = mutableMapOf( "data" to variable("$.input.data") ) // Dynamically determine next steps based on analysis results onComplete { result -> val dataType = result["dataType"] as? String when (dataType) { "text" -> { step(textProcessingAgent) { id = "process-text" name = "Process Text Data" description = "Process textual data" variables = mutableMapOf( "text" to variable("$.steps.analyze-data.output.extractedText") ) } } "image" -> { step(imageProcessingAgent) { id = "process-image" name = "Process Image Data" description = "Process image data" variables = mutableMapOf( "imageUrl" to variable("$.steps.analyze-data.output.imageUrl") ) } } else -> { step(fallbackAgent) { id = "process-unknown" name = "Process Unknown Data" description = "Handle unknown data type" variables = mutableMapOf( "rawData" to variable("$.steps.analyze-data.output.rawData") ) } } } } }

Best Practices for Control Flow ✅

When designing workflow control flow, consider these best practices:

1. Keep Workflows Readable

Design your workflows to be as readable and maintainable as possible:

// Good: Clear, descriptive step IDs and logical flow workflow { step(dataFetchAgent) { id = "fetch-data" } step(validationAgent) { id = "validate-data"; after("fetch-data") } step(processingAgent) { id = "process-data"; after("validate-data") } } // Avoid: Confusing dependencies and unclear flow // workflow { // step(processingAgent) { id = "process" } // step(dataFetchAgent) { id = "fetch" } // step(validationAgent) { id = "validate"; after("fetch") } // step(finalAgent) { id = "final"; after("process", "validate") } // }

2. Use Explicit Dependencies

Always make dependencies between steps explicit:

// Good: Explicit dependencies step(analysisAgent) { id = "analyze-data" after("fetch-data", "preprocess-data") // Explicit dependencies } // Avoid: Implicit dependencies through variable references without explicit step dependencies // step(analysisAgent) { // id = "analyze-data" // variables = mutableMapOf( // "data" to variable("$.steps.fetch-data.output.data") // Implicit dependency // ) // }

3. Handle Error Cases

Implement proper error handling in your control flow:

conditionalStep { id = "data-validation" condition { context -> val validationErrors = context.getVariable("$.steps.validate-data.output.errors") as? List<String> ?: emptyList() validationErrors.isEmpty() // Check if there are no validation errors } onTrue { step(processingAgent) { id = "process-data" } } onFalse { step(errorHandlingAgent) { id = "handle-validation-errors" variables = mutableMapOf( "errors" to variable("$.steps.validate-data.output.errors") ) } } }

4. Avoid Overly Complex Workflows

Break down complex workflows into smaller, more manageable sub-workflows:

// Main workflow that orchestrates sub-workflows val mainWorkflow = workflow { name = "main-process" // Execute data preparation sub-workflow subWorkflowStep { id = "data-preparation" workflowId = "data-prep-workflow" input = mapOf( "rawData" to variable("$.input.data") ) } // Execute analysis sub-workflow after data preparation subWorkflowStep { id = "data-analysis" after("data-preparation") workflowId = "analysis-workflow" input = mapOf( "preparedData" to variable("$.steps.data-preparation.output.processedData") ) } }

5. Test Complex Control Flows

Thoroughly test workflows with complex control flows to ensure they behave as expected:

// Test workflow with different input conditions fun testWorkflow() = runBlocking { val workflowEngine = kastraxSystem.workflowEngine // Test with valid data val validResult = workflowEngine.executeWorkflow( workflowId = "data-processing", input = mapOf("data" to validTestData) ) assert(validResult.success) // Test with invalid data to verify error handling val invalidResult = workflowEngine.executeWorkflow( workflowId = "data-processing", input = mapOf("data" to invalidTestData) ) assert(invalidResult.steps["handle-validation-errors"] != null) }

By following these best practices, you can create workflows with clear, maintainable control flow that effectively orchestrates your AI agents and tools.

Data Access Patterns ✅

Kastrax provides several ways to pass data between steps:

  1. Context Object - Access step results directly through the context object
  2. Variable Mapping - Explicitly map outputs from one step to inputs of another
  3. getStepResult Method - Type-safe method to retrieve step outputs

Each approach has its advantages depending on your use case and requirements for type safety.

Using getStepResult Method

The getStepResult method provides a type-safe way to access step results. This is the recommended approach when working with TypeScript as it preserves type information.

Basic Usage

For better type safety, you can provide a type parameter to getStepResult:

src/kastrax/workflows/get-step-result.ts
import { Step, Workflow } from "@kastrax/core/workflows"; import { z } from "zod"; const fetchUserStep = new Step({ id: 'fetchUser', outputSchema: z.object({ name: z.string(), userId: z.string(), }), execute: async ({ context }) => { return { name: 'John Doe', userId: '123' }; }, }); const analyzeDataStep = new Step({ id: "analyzeData", execute: async ({ context }) => { // Type-safe access to previous step result const userData = context.getStepResult<{ name: string, userId: string }>("fetchUser"); if (!userData) { return { status: "error", message: "User data not found" }; } return { analysis: `Analyzed data for user ${userData.name}`, userId: userData.userId }; }, });

Using Step References

The most type-safe approach is to reference the step directly in the getStepResult call:

src/kastrax/workflows/step-reference.ts
import { Step, Workflow } from "@kastrax/core/workflows"; import { z } from "zod"; // Define step with output schema const fetchUserStep = new Step({ id: "fetchUser", outputSchema: z.object({ userId: z.string(), name: z.string(), email: z.string(), }), execute: async () => { return { userId: "user123", name: "John Doe", email: "john@example.com" }; }, }); const processUserStep = new Step({ id: "processUser", execute: async ({ context }) => { // TypeScript will infer the correct type from fetchUserStep's outputSchema const userData = context.getStepResult(fetchUserStep); return { processed: true, userName: userData?.name }; }, }); const workflow = new Workflow({ name: "user-workflow", }); workflow .step(fetchUserStep) .then(processUserStep) .commit();

Using Variable Mapping

Variable mapping is an explicit way to define data flow between steps. This approach makes dependencies clear and provides good type safety. The data injected into the step is available in the context.inputData object, and typed based on the inputSchema of the step.

src/kastrax/workflows/variable-mapping.ts
import { Step, Workflow } from "@kastrax/core/workflows"; import { z } from "zod"; const fetchUserStep = new Step({ id: "fetchUser", outputSchema: z.object({ userId: z.string(), name: z.string(), email: z.string(), }), execute: async () => { return { userId: "user123", name: "John Doe", email: "john@example.com" }; }, }); const sendEmailStep = new Step({ id: "sendEmail", inputSchema: z.object({ recipientEmail: z.string(), recipientName: z.string(), }), execute: async ({ context }) => { const { recipientEmail, recipientName } = context.inputData; // Send email logic here return { status: "sent", to: recipientEmail }; }, }); const workflow = new Workflow({ name: "email-workflow", }); workflow .step(fetchUserStep) .then(sendEmailStep, { variables: { // Map specific fields from fetchUser to sendEmail inputs recipientEmail: { step: fetchUserStep, path: 'email' }, recipientName: { step: fetchUserStep, path: 'name' } } }) .commit();

For more details on variable mapping, see the Data Mapping with Workflow Variables documentation.

Using the Context Object

The context object provides direct access to all step results and their outputs. This approach is more flexible but requires careful handling to maintain type safety. You can access step results directly through the context.steps object:

src/kastrax/workflows/context-access.ts
import { Step, Workflow } from "@kastrax/core/workflows"; import { z } from "zod"; const processOrderStep = new Step({ id: 'processOrder', execute: async ({ context }) => { // Access data from a previous step let userData: { name: string, userId: string }; if (context.steps['fetchUser']?.status === 'success') { userData = context.steps.fetchUser.output; } else { throw new Error('User data not found'); } return { orderId: 'order123', userId: userData.userId, status: 'processing', }; }, }); const workflow = new Workflow({ name: "order-workflow", }); workflow .step(fetchUserStep) .then(processOrderStep) .commit();

Workflow-Level Type Safety

For comprehensive type safety across your entire workflow, you can define types for all steps and pass them to the Workflow This allows you to get type safety for the context object on conditions, and on step results in the final workflow output.

src/kastrax/workflows/workflow-typing.ts
import { Step, Workflow } from "@kastrax/core/workflows"; import { z } from "zod"; // Create steps with typed outputs const fetchUserStep = new Step({ id: "fetchUser", outputSchema: z.object({ userId: z.string(), name: z.string(), email: z.string(), }), execute: async () => { return { userId: "user123", name: "John Doe", email: "john@example.com" }; }, }); const processOrderStep = new Step({ id: "processOrder", execute: async ({ context }) => { // TypeScript knows the shape of userData const userData = context.getStepResult(fetchUserStep); return { orderId: "order123", status: "processing" }; }, }); const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({ name: "typed-workflow", }); workflow .step(fetchUserStep) .then(processOrderStep) .until(async ({ context }) => { // TypeScript knows the shape of userData here const res = context.getStepResult('fetchUser'); return res?.userId === '123'; }, processOrderStep) .commit();

Accessing Trigger Data

In addition to step results, you can access the original trigger data that started the workflow:

src/kastrax/workflows/trigger-data.ts
import { Step, Workflow } from "@kastrax/core/workflows"; import { z } from "zod"; // Define trigger schema const triggerSchema = z.object({ customerId: z.string(), orderItems: z.array(z.string()), }); type TriggerType = z.infer<typeof triggerSchema>; const processOrderStep = new Step({ id: "processOrder", execute: async ({ context }) => { // Access trigger data with type safety const triggerData = context.getStepResult<TriggerType>('trigger'); return { customerId: triggerData?.customerId, itemCount: triggerData?.orderItems.length || 0, status: "processing" }; }, }); const workflow = new Workflow({ name: "order-workflow", triggerSchema, }); workflow .step(processOrderStep) .commit();

Accessing Resume Data

The data injected into the step is available in the context.inputData object, and typed based on the inputSchema of the step.

src/kastrax/workflows/resume-data.ts
import { Step, Workflow } from "@kastrax/core/workflows"; import { z } from "zod"; const processOrderStep = new Step({ id: "processOrder", inputSchema: z.object({ orderId: z.string(), }), execute: async ({ context, suspend }) => { const { orderId } = context.inputData; if (!orderId) { await suspend(); return; } return { orderId, status: "processed" }; }, }); const workflow = new Workflow({ name: "order-workflow", }); workflow .step(processOrderStep) .commit(); const run = workflow.createRun(); const result = await run.start(); const resumedResult = await workflow.resume({ runId: result.runId, stepId: 'processOrder', inputData: { orderId: '123', }, }); console.log({resumedResult});

Accessing Workflow Results

You can get typed access to the results of a workflow by injecting the step types into the Workflow type params:

src/kastrax/workflows/get-results.ts
import { Workflow } from "@kastrax/core/workflows"; const fetchUserStep = new Step({ id: "fetchUser", outputSchema: z.object({ userId: z.string(), name: z.string(), email: z.string(), }), execute: async () => { return { userId: "user123", name: "John Doe", email: "john@example.com" }; }, }); const processOrderStep = new Step({ id: "processOrder", outputSchema: z.object({ orderId: z.string(), status: z.string(), }), execute: async ({ context }) => { const userData = context.getStepResult(fetchUserStep); return { orderId: "order123", status: "processing" }; }, }); const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({ name: "typed-workflow", }); workflow .step(fetchUserStep) .then(processOrderStep) .commit(); const run = workflow.createRun(); const result = await run.start(); // The result is a discriminated union of the step results // So it needs to be narrowed down via status checks if (result.results.processOrder.status === 'success') { // TypeScript will know the shape of the results const orderId = result.results.processOrder.output.orderId; console.log({orderId}); } if (result.results.fetchUser.status === 'success') { const userId = result.results.fetchUser.output.userId; console.log({userId}); }

Best Practices for Data Flow

  1. Use getStepResult with Step References for Type Safety

    • Ensures TypeScript can infer the correct types
    • Catches type errors at compile time
  2. *Use Variable Mapping for Explicit Dependencies

    • Makes data flow clear and maintainable
    • Provides good documentation of step dependencies
  3. Define Output Schemas for Steps

    • Validates data at runtime
      • Validates return type of the execute function
    • Improves type inference in TypeScript
  4. Handle Missing Data Gracefully

    • Always check if step results exist before accessing properties
    • Provide fallback values for optional data
  5. Keep Data Transformations Simple

    • Transform data in dedicated steps rather than in variable mappings
    • Makes workflows easier to test and debug

Comparison of Data Flow Methods

MethodType SafetyExplicitnessUse Case
getStepResultHighestHighComplex workflows with strict typing requirements
Variable MappingHighHighWhen dependencies need to be clear and explicit
context.stepsMediumLowQuick access to step data in simple workflows

By choosing the right data flow method for your use case, you can create workflows that are both type-safe and maintainable.

Last updated on