Control Flow in Kastrax AI Workflows ✅
Kastrax’s workflow system provides sophisticated control flow capabilities that allow you to orchestrate complex AI agent operations. Whether you need to execute steps sequentially, run operations in parallel, implement conditional branching, or create loops, Kastrax offers a type-safe, declarative API for defining exactly how your workflow should behave.
This guide explains the various control flow patterns available in Kastrax workflows and how to implement them effectively.
Sequential Execution ✅
Sequential execution is the most basic control flow pattern, where steps are executed one after another in a defined order. This ensures that outputs from one step become inputs for the next step.
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a workflow with sequential steps
val orderProcessingWorkflow = workflow {
name = "order-processing"
description = "Process customer orders sequentially"
// First step: Fetch order data
step(fetchOrderAgent) {
id = "fetch-order"
name = "Fetch Order Data"
description = "Retrieve order information from the database"
variables = mutableMapOf(
"orderId" to variable("$.input.orderId")
)
}
// Second step: Validate order data (runs after fetch-order)
step(validationAgent) {
id = "validate-order"
name = "Validate Order"
description = "Validate order data for completeness and correctness"
after("fetch-order") // Explicit dependency
variables = mutableMapOf(
"orderData" to variable("$.steps.fetch-order.output.data")
)
}
// Third step: Process payment (runs after validate-order)
step(paymentAgent) {
id = "process-payment"
name = "Process Payment"
description = "Process payment for the validated order"
after("validate-order") // Explicit dependency
variables = mutableMapOf(
"orderData" to variable("$.steps.fetch-order.output.data"),
"validationResult" to variable("$.steps.validate-order.output.result")
)
}
}
In this example, each step explicitly depends on the previous step using the after()
function, creating a clear sequential flow.
Parallel Execution ✅
Parallel execution allows multiple steps to run simultaneously when they don’t depend on each other. This can significantly improve workflow performance for independent operations.
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a workflow with parallel steps
val dataProcessingWorkflow = workflow {
name = "data-processing"
description = "Process data from multiple sources in parallel"
// These steps will run in parallel since they don't have dependencies on each other
step(userDataAgent) {
id = "fetch-user-data"
name = "Fetch User Data"
description = "Retrieve user information"
variables = mutableMapOf(
"userId" to variable("$.input.userId")
)
}
step(productDataAgent) {
id = "fetch-product-data"
name = "Fetch Product Data"
description = "Retrieve product information"
variables = mutableMapOf(
"productId" to variable("$.input.productId")
)
}
step(inventoryDataAgent) {
id = "fetch-inventory-data"
name = "Fetch Inventory Data"
description = "Retrieve inventory information"
variables = mutableMapOf(
"productId" to variable("$.input.productId")
)
}
// This step will only run after all parallel steps have completed
step(analysisAgent) {
id = "analyze-data"
name = "Analyze Combined Data"
description = "Analyze data from all sources"
after("fetch-user-data", "fetch-product-data", "fetch-inventory-data") // Multiple dependencies
variables = mutableMapOf(
"userData" to variable("$.steps.fetch-user-data.output.data"),
"productData" to variable("$.steps.fetch-product-data.output.data"),
"inventoryData" to variable("$.steps.fetch-inventory-data.output.data")
)
}
}
In this example, the first three steps run in parallel because they don’t have dependencies on each other. The final analysis step only runs after all three parallel steps have completed.
Branching and Conditional Paths ✅
Branching allows your workflow to take different paths based on conditions or results from previous steps. This is essential for implementing decision logic in your workflows.
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a workflow with branching paths
val contentWorkflow = workflow {
name = "content-workflow"
description = "Create and process content with branching logic"
// Initial content analysis step
step(analysisAgent) {
id = "analyze-content"
name = "Content Analysis"
description = "Analyze content quality and type"
variables = mutableMapOf(
"content" to variable("$.input.content")
)
}
// Conditional branching based on content quality
conditionalStep {
id = "quality-branch"
name = "Quality Branching"
description = "Branch based on content quality"
after("analyze-content")
// Define the condition to evaluate
condition { context ->
val quality = context.getVariable("$.steps.analyze-content.output.qualityScore") as? Double ?: 0.0
quality >= 8.0 // High quality if score is 8.0 or higher
}
// Steps to execute if condition is true (high quality)
onTrue {
step(publishAgent) {
id = "publish-content"
name = "Publish Content"
description = "Publish the high-quality content"
variables = mutableMapOf(
"content" to variable("$.input.content")
)
}
}
// Steps to execute if condition is false (low quality)
onFalse {
step(revisionAgent) {
id = "revise-content"
name = "Revise Content"
description = "Improve the low-quality content"
variables = mutableMapOf(
"content" to variable("$.input.content"),
"feedback" to variable("$.steps.analyze-content.output.feedback")
)
// Continue to publishing after revision
onComplete {
step(publishAgent) {
id = "publish-revised-content"
name = "Publish Revised Content"
description = "Publish the revised content"
variables = mutableMapOf(
"content" to variable("$.steps.revise-content.output.revisedContent")
)
}
}
}
}
}
}
In this example, the workflow branches based on the content quality score. High-quality content goes directly to publishing, while low-quality content goes through a revision step before publishing.
Merging Execution Paths ✅
Merging allows multiple execution paths to converge at a specific step. This is useful when you need to synchronize different branches of your workflow.
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a workflow with merging paths
val researchWorkflow = workflow {
name = "research-workflow"
description = "Research a topic from multiple sources and synthesize results"
// These steps run in parallel
step(academicResearchAgent) {
id = "academic-research"
name = "Academic Research"
description = "Research from academic sources"
variables = mutableMapOf(
"topic" to variable("$.input.topic"),
"depth" to variable("$.input.academicDepth")
)
}
step(newsResearchAgent) {
id = "news-research"
name = "News Research"
description = "Research from news sources"
variables = mutableMapOf(
"topic" to variable("$.input.topic"),
"timeframe" to variable("$.input.newsTimeframe")
)
}
step(socialMediaResearchAgent) {
id = "social-research"
name = "Social Media Research"
description = "Research from social media"
variables = mutableMapOf(
"topic" to variable("$.input.topic"),
"platforms" to variable("$.input.socialPlatforms")
)
}
// This step merges all research paths
step(synthesisAgent) {
id = "research-synthesis"
name = "Research Synthesis"
description = "Synthesize research from all sources"
// This step depends on all three research steps
after("academic-research", "news-research", "social-research")
variables = mutableMapOf(
"academicData" to variable("$.steps.academic-research.output.findings"),
"newsData" to variable("$.steps.news-research.output.findings"),
"socialData" to variable("$.steps.social-research.output.findings")
)
}
// Final report generation
step(reportAgent) {
id = "generate-report"
name = "Generate Report"
description = "Create a comprehensive report"
after("research-synthesis")
variables = mutableMapOf(
"synthesizedData" to variable("$.steps.research-synthesis.output.synthesizedFindings"),
"format" to variable("$.input.reportFormat")
)
}
}
In this example, three parallel research paths merge at the synthesis step, which only executes after all three research steps have completed. This ensures that all research data is available before synthesis begins.
Loops and Iterative Processing ✅
Kastrax supports iterative processing through loop constructs that allow steps to repeat until certain conditions are met. This is essential for tasks that require multiple iterations or refinement cycles.
Using Loop Steps
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a workflow with a loop
val contentRefinementWorkflow = workflow {
name = "content-refinement"
description = "Iteratively refine content until it meets quality standards"
// Initial content generation
step(contentGenerationAgent) {
id = "generate-content"
name = "Generate Initial Content"
description = "Create the first draft of content"
variables = mutableMapOf(
"topic" to variable("$.input.topic"),
"style" to variable("$.input.style")
)
}
// Initialize iteration counter
step(initializationAgent) {
id = "initialize-loop"
name = "Initialize Loop Variables"
description = "Set up variables for the refinement loop"
after("generate-content")
execute { context ->
mapOf(
"currentContent" to context.getVariable("$.steps.generate-content.output.content"),
"iterationCount" to 0,
"qualityScore" to 0.0
)
}
}
// Refinement loop
loopStep {
id = "refinement-loop"
name = "Content Refinement Loop"
description = "Iteratively improve content until quality threshold is reached"
after("initialize-loop")
// Continue looping while quality is below threshold and iterations are under limit
condition { context ->
val qualityScore = context.getVariable("$.steps.evaluate-content.output.qualityScore") as? Double ?: 0.0
val iterationCount = context.getVariable("$.steps.refine-content.output.iterationCount") as? Int ?: 0
qualityScore < 8.5 && iterationCount < 5 // Continue if quality < 8.5 and fewer than 5 iterations
}
// Loop body
body {
// Evaluate current content
step(evaluationAgent) {
id = "evaluate-content"
name = "Evaluate Content Quality"
description = "Assess the quality of the current content"
variables = mutableMapOf(
"content" to variable("$.steps.refine-content.output.currentContent",
defaultValue = variable("$.steps.initialize-loop.output.currentContent"))
)
}
// Refine content based on evaluation
step(refinementAgent) {
id = "refine-content"
name = "Refine Content"
description = "Improve content based on evaluation feedback"
after("evaluate-content")
variables = mutableMapOf(
"content" to variable("$.steps.refine-content.output.currentContent",
defaultValue = variable("$.steps.initialize-loop.output.currentContent")),
"feedback" to variable("$.steps.evaluate-content.output.feedback"),
"qualityScore" to variable("$.steps.evaluate-content.output.qualityScore"),
"iterationCount" to variable("$.steps.refine-content.output.iterationCount", defaultValue = 0)
)
execute { context ->
val content = context.getVariable("content") as String
val feedback = context.getVariable("feedback") as String
val iterationCount = (context.getVariable("iterationCount") as Int) + 1
val qualityScore = context.getVariable("qualityScore") as Double
// In a real implementation, this would use an agent to refine the content
// For this example, we're just simulating refinement
val refinedContent = "$content\n\nRefined in iteration $iterationCount based on: $feedback"
mapOf(
"currentContent" to refinedContent,
"iterationCount" to iterationCount,
"qualityScore" to qualityScore
)
}
}
}
}
// Final processing after loop completion
step(finalizationAgent) {
id = "finalize-content"
name = "Finalize Content"
description = "Prepare the refined content for publication"
after("refinement-loop")
variables = mutableMapOf(
"content" to variable("$.steps.refine-content.output.currentContent"),
"iterationCount" to variable("$.steps.refine-content.output.iterationCount"),
"finalQuality" to variable("$.steps.evaluate-content.output.qualityScore")
)
}
}
In this example, the workflow uses a loop to iteratively refine content until it reaches a quality threshold or hits a maximum number of iterations. The loop body contains steps for evaluating and refining the content, with each iteration building on the results of the previous one.
Using Recursive Patterns
For more complex iterative processes, Kastrax also supports recursive patterns where steps can trigger themselves under certain conditions:
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a workflow with recursive processing
val recursiveWorkflow = workflow {
name = "recursive-processing"
description = "Process data recursively until completion"
// Initial data processing
step(processingAgent) {
id = "process-data"
name = "Process Data"
description = "Process a batch of data"
variables = mutableMapOf(
"data" to variable("$.input.data"),
"batchNumber" to variable("$.input.batchNumber", defaultValue = 1),
"totalBatches" to variable("$.input.totalBatches")
)
// Define what happens after this step completes
onComplete { result ->
val batchNumber = result["batchNumber"] as Int
val totalBatches = result["totalBatches"] as Int
// If there are more batches to process, trigger the next batch
if (batchNumber < totalBatches) {
step(processingAgent) {
id = "process-data-${batchNumber + 1}"
name = "Process Data Batch ${batchNumber + 1}"
description = "Process the next batch of data"
variables = mutableMapOf(
"data" to variable("$.input.nextBatchData"),
"batchNumber" to (batchNumber + 1),
"totalBatches" to totalBatches
)
}
} else {
// If all batches are processed, move to finalization
step(finalizationAgent) {
id = "finalize-processing"
name = "Finalize Processing"
description = "Combine and finalize all processed batches"
variables = mutableMapOf(
"processedBatches" to variable("$.steps", transform = { steps ->
// Collect results from all processing steps
(steps as Map<String, Any>).filter { it.key.startsWith("process-data") }
.map { it.value }
})
)
}
}
}
}
}
This recursive pattern allows for dynamic workflow generation based on the results of previous steps, enabling complex iterative processes that can’t be fully defined in advance.
Conditional Execution ✅
Kastrax provides several ways to conditionally execute steps based on the results of previous steps or external conditions. This allows for dynamic, adaptive workflows that can respond to changing circumstances.
Using Conditional Steps
The most direct way to implement conditional logic is using conditional steps:
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a workflow with conditional execution
val approvalWorkflow = workflow {
name = "approval-workflow"
description = "Process content with approval conditions"
// Initial content creation
step(contentAgent) {
id = "create-content"
name = "Create Content"
description = "Generate initial content"
variables = mutableMapOf(
"topic" to variable("$.input.topic")
)
}
// Conditional step for content approval
conditionalStep {
id = "approval-check"
name = "Approval Check"
description = "Check if content requires approval"
after("create-content")
// Define the condition to evaluate
condition { context ->
val contentLength = context.getVariable("$.steps.create-content.output.content")?.toString()?.length ?: 0
val sensitiveTopics = context.getVariable("$.steps.create-content.output.sensitiveTopicsDetected") as? Boolean ?: false
// Content requires approval if it's long or contains sensitive topics
contentLength > 1000 || sensitiveTopics
}
// Steps to execute if approval is required
onTrue {
step(approvalAgent) {
id = "request-approval"
name = "Request Approval"
description = "Send content for human approval"
variables = mutableMapOf(
"content" to variable("$.steps.create-content.output.content"),
"approver" to variable("$.input.approverEmail")
)
}
}
// Steps to execute if approval is not required
onFalse {
step(publishAgent) {
id = "auto-publish"
name = "Auto-Publish"
description = "Automatically publish content without approval"
variables = mutableMapOf(
"content" to variable("$.steps.create-content.output.content")
)
}
}
}
}
Using Step Conditions
You can also add conditions directly to individual steps:
step(publishAgent) {
id = "publish-content"
name = "Publish Content"
description = "Publish content if approved"
after("request-approval")
// Only execute this step if the approval was granted
condition { context ->
context.getVariable("$.steps.request-approval.output.approved") as? Boolean ?: false
}
variables = mutableMapOf(
"content" to variable("$.steps.create-content.output.content"),
"channel" to variable("$.input.publishChannel")
)
}
Using Dynamic Step Generation
For more complex conditional logic, you can dynamically generate steps based on runtime conditions:
step(analysisAgent) {
id = "analyze-data"
name = "Analyze Data"
description = "Analyze input data and determine next steps"
variables = mutableMapOf(
"data" to variable("$.input.data")
)
// Dynamically determine next steps based on analysis results
onComplete { result ->
val dataType = result["dataType"] as? String
when (dataType) {
"text" -> {
step(textProcessingAgent) {
id = "process-text"
name = "Process Text Data"
description = "Process textual data"
variables = mutableMapOf(
"text" to variable("$.steps.analyze-data.output.extractedText")
)
}
}
"image" -> {
step(imageProcessingAgent) {
id = "process-image"
name = "Process Image Data"
description = "Process image data"
variables = mutableMapOf(
"imageUrl" to variable("$.steps.analyze-data.output.imageUrl")
)
}
}
else -> {
step(fallbackAgent) {
id = "process-unknown"
name = "Process Unknown Data"
description = "Handle unknown data type"
variables = mutableMapOf(
"rawData" to variable("$.steps.analyze-data.output.rawData")
)
}
}
}
}
}
Best Practices for Control Flow ✅
When designing workflow control flow, consider these best practices:
1. Keep Workflows Readable
Design your workflows to be as readable and maintainable as possible:
// Good: Clear, descriptive step IDs and logical flow
workflow {
step(dataFetchAgent) { id = "fetch-data" }
step(validationAgent) { id = "validate-data"; after("fetch-data") }
step(processingAgent) { id = "process-data"; after("validate-data") }
}
// Avoid: Confusing dependencies and unclear flow
// workflow {
// step(processingAgent) { id = "process" }
// step(dataFetchAgent) { id = "fetch" }
// step(validationAgent) { id = "validate"; after("fetch") }
// step(finalAgent) { id = "final"; after("process", "validate") }
// }
2. Use Explicit Dependencies
Always make dependencies between steps explicit:
// Good: Explicit dependencies
step(analysisAgent) {
id = "analyze-data"
after("fetch-data", "preprocess-data") // Explicit dependencies
}
// Avoid: Implicit dependencies through variable references without explicit step dependencies
// step(analysisAgent) {
// id = "analyze-data"
// variables = mutableMapOf(
// "data" to variable("$.steps.fetch-data.output.data") // Implicit dependency
// )
// }
3. Handle Error Cases
Implement proper error handling in your control flow:
conditionalStep {
id = "data-validation"
condition { context ->
val validationErrors = context.getVariable("$.steps.validate-data.output.errors") as? List<String> ?: emptyList()
validationErrors.isEmpty() // Check if there are no validation errors
}
onTrue {
step(processingAgent) { id = "process-data" }
}
onFalse {
step(errorHandlingAgent) {
id = "handle-validation-errors"
variables = mutableMapOf(
"errors" to variable("$.steps.validate-data.output.errors")
)
}
}
}
4. Avoid Overly Complex Workflows
Break down complex workflows into smaller, more manageable sub-workflows:
// Main workflow that orchestrates sub-workflows
val mainWorkflow = workflow {
name = "main-process"
// Execute data preparation sub-workflow
subWorkflowStep {
id = "data-preparation"
workflowId = "data-prep-workflow"
input = mapOf(
"rawData" to variable("$.input.data")
)
}
// Execute analysis sub-workflow after data preparation
subWorkflowStep {
id = "data-analysis"
after("data-preparation")
workflowId = "analysis-workflow"
input = mapOf(
"preparedData" to variable("$.steps.data-preparation.output.processedData")
)
}
}
5. Test Complex Control Flows
Thoroughly test workflows with complex control flows to ensure they behave as expected:
// Test workflow with different input conditions
fun testWorkflow() = runBlocking {
val workflowEngine = kastraxSystem.workflowEngine
// Test with valid data
val validResult = workflowEngine.executeWorkflow(
workflowId = "data-processing",
input = mapOf("data" to validTestData)
)
assert(validResult.success)
// Test with invalid data to verify error handling
val invalidResult = workflowEngine.executeWorkflow(
workflowId = "data-processing",
input = mapOf("data" to invalidTestData)
)
assert(invalidResult.steps["handle-validation-errors"] != null)
}
By following these best practices, you can create workflows with clear, maintainable control flow that effectively orchestrates your AI agents and tools.
Data Access Patterns ✅
Kastrax provides several ways to pass data between steps:
- Context Object - Access step results directly through the context object
- Variable Mapping - Explicitly map outputs from one step to inputs of another
- getStepResult Method - Type-safe method to retrieve step outputs
Each approach has its advantages depending on your use case and requirements for type safety.
Using getStepResult Method
The getStepResult
method provides a type-safe way to access step results. This is the recommended approach when working with TypeScript as it preserves type information.
Basic Usage
For better type safety, you can provide a type parameter to getStepResult
:
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";
const fetchUserStep = new Step({
id: 'fetchUser',
outputSchema: z.object({
name: z.string(),
userId: z.string(),
}),
execute: async ({ context }) => {
return { name: 'John Doe', userId: '123' };
},
});
const analyzeDataStep = new Step({
id: "analyzeData",
execute: async ({ context }) => {
// Type-safe access to previous step result
const userData = context.getStepResult<{ name: string, userId: string }>("fetchUser");
if (!userData) {
return { status: "error", message: "User data not found" };
}
return {
analysis: `Analyzed data for user ${userData.name}`,
userId: userData.userId
};
},
});
Using Step References
The most type-safe approach is to reference the step directly in the getStepResult
call:
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";
// Define step with output schema
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com"
};
},
});
const processUserStep = new Step({
id: "processUser",
execute: async ({ context }) => {
// TypeScript will infer the correct type from fetchUserStep's outputSchema
const userData = context.getStepResult(fetchUserStep);
return {
processed: true,
userName: userData?.name
};
},
});
const workflow = new Workflow({
name: "user-workflow",
});
workflow
.step(fetchUserStep)
.then(processUserStep)
.commit();
Using Variable Mapping
Variable mapping is an explicit way to define data flow between steps.
This approach makes dependencies clear and provides good type safety.
The data injected into the step is available in the context.inputData
object, and typed based on the inputSchema
of the step.
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com"
};
},
});
const sendEmailStep = new Step({
id: "sendEmail",
inputSchema: z.object({
recipientEmail: z.string(),
recipientName: z.string(),
}),
execute: async ({ context }) => {
const { recipientEmail, recipientName } = context.inputData;
// Send email logic here
return {
status: "sent",
to: recipientEmail
};
},
});
const workflow = new Workflow({
name: "email-workflow",
});
workflow
.step(fetchUserStep)
.then(sendEmailStep, {
variables: {
// Map specific fields from fetchUser to sendEmail inputs
recipientEmail: { step: fetchUserStep, path: 'email' },
recipientName: { step: fetchUserStep, path: 'name' }
}
})
.commit();
For more details on variable mapping, see the Data Mapping with Workflow Variables documentation.
Using the Context Object
The context object provides direct access to all step results and their outputs. This approach is more flexible but requires careful handling to maintain type safety.
You can access step results directly through the context.steps
object:
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";
const processOrderStep = new Step({
id: 'processOrder',
execute: async ({ context }) => {
// Access data from a previous step
let userData: { name: string, userId: string };
if (context.steps['fetchUser']?.status === 'success') {
userData = context.steps.fetchUser.output;
} else {
throw new Error('User data not found');
}
return {
orderId: 'order123',
userId: userData.userId,
status: 'processing',
};
},
});
const workflow = new Workflow({
name: "order-workflow",
});
workflow
.step(fetchUserStep)
.then(processOrderStep)
.commit();
Workflow-Level Type Safety
For comprehensive type safety across your entire workflow, you can define types for all steps and pass them to the Workflow This allows you to get type safety for the context object on conditions, and on step results in the final workflow output.
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";
// Create steps with typed outputs
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com"
};
},
});
const processOrderStep = new Step({
id: "processOrder",
execute: async ({ context }) => {
// TypeScript knows the shape of userData
const userData = context.getStepResult(fetchUserStep);
return {
orderId: "order123",
status: "processing"
};
},
});
const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({
name: "typed-workflow",
});
workflow
.step(fetchUserStep)
.then(processOrderStep)
.until(async ({ context }) => {
// TypeScript knows the shape of userData here
const res = context.getStepResult('fetchUser');
return res?.userId === '123';
}, processOrderStep)
.commit();
Accessing Trigger Data
In addition to step results, you can access the original trigger data that started the workflow:
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";
// Define trigger schema
const triggerSchema = z.object({
customerId: z.string(),
orderItems: z.array(z.string()),
});
type TriggerType = z.infer<typeof triggerSchema>;
const processOrderStep = new Step({
id: "processOrder",
execute: async ({ context }) => {
// Access trigger data with type safety
const triggerData = context.getStepResult<TriggerType>('trigger');
return {
customerId: triggerData?.customerId,
itemCount: triggerData?.orderItems.length || 0,
status: "processing"
};
},
});
const workflow = new Workflow({
name: "order-workflow",
triggerSchema,
});
workflow
.step(processOrderStep)
.commit();
Accessing Resume Data
The data injected into the step is available in the context.inputData
object, and typed based on the inputSchema
of the step.
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";
const processOrderStep = new Step({
id: "processOrder",
inputSchema: z.object({
orderId: z.string(),
}),
execute: async ({ context, suspend }) => {
const { orderId } = context.inputData;
if (!orderId) {
await suspend();
return;
}
return {
orderId,
status: "processed"
};
},
});
const workflow = new Workflow({
name: "order-workflow",
});
workflow
.step(processOrderStep)
.commit();
const run = workflow.createRun();
const result = await run.start();
const resumedResult = await workflow.resume({
runId: result.runId,
stepId: 'processOrder',
inputData: {
orderId: '123',
},
});
console.log({resumedResult});
Accessing Workflow Results
You can get typed access to the results of a workflow by injecting the step types into the Workflow
type params:
import { Workflow } from "@kastrax/core/workflows";
const fetchUserStep = new Step({
id: "fetchUser",
outputSchema: z.object({
userId: z.string(),
name: z.string(),
email: z.string(),
}),
execute: async () => {
return {
userId: "user123",
name: "John Doe",
email: "john@example.com"
};
},
});
const processOrderStep = new Step({
id: "processOrder",
outputSchema: z.object({
orderId: z.string(),
status: z.string(),
}),
execute: async ({ context }) => {
const userData = context.getStepResult(fetchUserStep);
return {
orderId: "order123",
status: "processing"
};
},
});
const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({
name: "typed-workflow",
});
workflow
.step(fetchUserStep)
.then(processOrderStep)
.commit();
const run = workflow.createRun();
const result = await run.start();
// The result is a discriminated union of the step results
// So it needs to be narrowed down via status checks
if (result.results.processOrder.status === 'success') {
// TypeScript will know the shape of the results
const orderId = result.results.processOrder.output.orderId;
console.log({orderId});
}
if (result.results.fetchUser.status === 'success') {
const userId = result.results.fetchUser.output.userId;
console.log({userId});
}
Best Practices for Data Flow
-
Use getStepResult with Step References for Type Safety
- Ensures TypeScript can infer the correct types
- Catches type errors at compile time
-
*Use Variable Mapping for Explicit Dependencies
- Makes data flow clear and maintainable
- Provides good documentation of step dependencies
-
Define Output Schemas for Steps
- Validates data at runtime
- Validates return type of the
execute
function
- Validates return type of the
- Improves type inference in TypeScript
- Validates data at runtime
-
Handle Missing Data Gracefully
- Always check if step results exist before accessing properties
- Provide fallback values for optional data
-
Keep Data Transformations Simple
- Transform data in dedicated steps rather than in variable mappings
- Makes workflows easier to test and debug
Comparison of Data Flow Methods
Method | Type Safety | Explicitness | Use Case |
---|---|---|---|
getStepResult | Highest | High | Complex workflows with strict typing requirements |
Variable Mapping | High | High | When dependencies need to be clear and explicit |
context.steps | Medium | Low | Quick access to step data in simple workflows |
By choosing the right data flow method for your use case, you can create workflows that are both type-safe and maintainable.