Skip to Content
DocsWorkflowsVariables

Data Flow with Kastrax Workflow Variables ✅

Kastrax provides a powerful, type-safe variable system that enables efficient data flow between workflow steps. This system allows you to map input data to steps, pass information between steps, access nested properties, and transform values—all with the type safety and expressiveness of Kotlin.

Understanding Workflow Variables ✅

In Kastrax workflows, variables serve multiple important functions:

  • Data Mapping: Connect workflow inputs to step inputs
  • Inter-step Communication: Pass outputs from one step to inputs of another
  • Property Access: Navigate complex data structures with JSONPath expressions
  • Data Transformation: Apply transformations to values as they flow between steps
  • Default Values: Provide fallbacks when data might be missing
  • Type Safety: Ensure data consistency with Kotlin’s type system

Variable System in Kastrax ✅

Kastrax implements variables using a combination of JSONPath expressions and Kotlin’s type system. This provides a flexible yet type-safe way to manage data flow in your workflows.

Creating Variables

Variables are created using the variable() function, which supports several overloads for different use cases:

VariableCreation.kt
import ai.kastrax.core.workflow.variable // Basic variable referencing a JSONPath val basicVariable = variable("$.input.topic") // Variable with a default value val variableWithDefault = variable("$.steps.analysis.output.score", defaultValue = 0.0) // Variable with a transformation function val transformedVariable = variable("$.steps.text.output.content") { content -> (content as? String)?.uppercase() ?: "" } // Variable with both default and transformation val complexVariable = variable( path = "$.steps.data.output.items", defaultValue = emptyList<String>(), transform = { items -> (items as? List<*>)?.filterIsInstance<String>()?.take(5) ?: emptyList() } )

Using Variables for Data Mapping ✅

Basic Variable Mapping

You can map data between steps using the variables property when defining a step in your workflow:

BasicVariableMapping.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable import ai.kastrax.core.agent.Agent // Assume we have these agents defined elsewhere val dataProcessingAgent: Agent = /* ... */ val analysisAgent: Agent = /* ... */ // Create a workflow with variable mapping val dataWorkflow = workflow { name = "data-mapping-workflow" description = "Process and analyze data" // First step with input mapping step(dataProcessingAgent) { id = "process-data" name = "Process Data" description = "Process the input data" // Map workflow input to step variables variables = mutableMapOf( "inputData" to variable("$.input.data"), "options" to variable("$.input.processingOptions") ) } // Second step with mapping from first step step(analysisAgent) { id = "analyze-data" name = "Analyze Data" description = "Analyze the processed data" after("process-data") // This step runs after process-data // Map output from first step to input for second step variables = mutableMapOf( "processedData" to variable("$.steps.process-data.output.result"), "analysisDepth" to variable("$.input.analysisDepth", defaultValue = "standard") ) } }

Accessing Nested Properties

You can access nested properties using JSONPath expressions:

NestedProperties.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow that accesses nested properties val nestedDataWorkflow = workflow { name = "nested-data-workflow" step(dataExtractionAgent) { id = "extract-data" name = "Extract Data" description = "Extract structured data from input" variables = mutableMapOf( "source" to variable("$.input.source") ) } step(processingAgent) { id = "process-data" name = "Process Data" description = "Process the extracted data" after("extract-data") variables = mutableMapOf( // Access deeply nested properties using JSONPath "customerName" to variable("$.steps.extract-data.output.data.customer.profile.name"), "orderItems" to variable("$.steps.extract-data.output.data.order.items[*].name"), "totalAmount" to variable("$.steps.extract-data.output.data.order.payment.amount") ) } }

Data Transformations

You can transform data as it flows between steps:

DataTransformations.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with data transformations val transformationWorkflow = workflow { name = "transformation-workflow" step(dataAgent) { id = "get-data" name = "Get Data" description = "Retrieve raw data" variables = mutableMapOf( "query" to variable("$.input.query") ) } step(processingAgent) { id = "process-data" name = "Process Data" description = "Process data with transformations" after("get-data") variables = mutableMapOf( // Transform a string to uppercase "upperCaseTitle" to variable("$.steps.get-data.output.title") { title -> (title as? String)?.uppercase() ?: "" }, // Filter and limit a list "topCategories" to variable("$.steps.get-data.output.categories") { categories -> (categories as? List<*>)?.take(5) ?: emptyList<String>() }, // Calculate a derived value "wordCount" to variable("$.steps.get-data.output.content") { content -> (content as? String)?.split("\\s+")?.size ?: 0 } ) } }

Variables in Loops

Variables are particularly useful in loop steps for passing data between iterations:

LoopVariables.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with a loop that uses variables val iterativeWorkflow = workflow { name = "iterative-workflow" description = "Iteratively improve content" // Initialize loop variables step(initializationAgent) { id = "initialize" name = "Initialize Variables" description = "Set up initial values for the loop" execute { _ -> mapOf( "currentContent" to "Initial draft", "iterationCount" to 0, "qualityScore" to 0.0 ) } } // Improvement loop loopStep { id = "improvement-loop" name = "Content Improvement Loop" description = "Iteratively improve content until quality threshold is reached" after("initialize") // Continue looping while quality is below threshold and iterations are under limit condition { context -> val qualityScore = context.getVariable("$.steps.evaluate.output.qualityScore") as? Double ?: 0.0 val iterationCount = context.getVariable("$.steps.improve.output.iterationCount") as? Int ?: 0 qualityScore < 8.0 && iterationCount < 5 } // Loop body body { // Evaluate current content step(evaluationAgent) { id = "evaluate" name = "Evaluate Content" description = "Assess the quality of the current content" variables = mutableMapOf( "content" to variable("$.steps.improve.output.currentContent", defaultValue = variable("$.steps.initialize.output.currentContent")) ) } // Improve content based on evaluation step(improvementAgent) { id = "improve" name = "Improve Content" description = "Enhance content based on evaluation" after("evaluate") variables = mutableMapOf( "content" to variable("$.steps.improve.output.currentContent", defaultValue = variable("$.steps.initialize.output.currentContent")), "feedback" to variable("$.steps.evaluate.output.feedback"), "iterationCount" to variable("$.steps.improve.output.iterationCount", defaultValue = 0) ) } } } }

Variable Resolution ✅

When a workflow executes, Kastrax resolves variables at runtime through a sophisticated resolution process:

  1. Path Parsing: The JSONPath expression is parsed to identify the data source and path
  2. Context Access: The workflow context is accessed to retrieve step outputs or input data
  3. Path Navigation: The system navigates through the data structure following the JSONPath
  4. Default Application: If the path doesn’t resolve to a value and a default is provided, the default is used
  5. Transformation: If a transformation function is specified, it’s applied to the resolved value
  6. Type Conversion: The value is converted to the expected type if necessary
  7. Injection: The final value is injected into the step’s execution context

This process happens automatically for each variable when a step is about to execute.

JSONPath in Kastrax ✅

Kastrax uses JSONPath expressions to navigate data structures. Here are the key patterns:

PatternDescriptionExample
$.inputAccess workflow input data$.input.topic
$.stepsAccess step outputs$.steps.research.output.text
.propertyAccess object property$.input.user.name
[index]Access array element$.steps.data.output.items[0]
[*]Access all array elements$.steps.data.output.items[*].name
..propertyRecursive descent$..name (finds all name properties)

Complete Examples ✅

Content Generation Workflow

This example shows a complete content generation workflow that uses variables extensively:

ContentGenerationWorkflow.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable import ai.kastrax.core.agent.Agent // Assume we have these agents defined elsewhere val researchAgent: Agent = /* ... */ val outlineAgent: Agent = /* ... */ val writingAgent: Agent = /* ... */ val editingAgent: Agent = /* ... */ // Create a content generation workflow val contentWorkflow = workflow { name = "content-generation" description = "Generate high-quality content on a given topic" // Define input variables input { variable("topic", String::class, required = true) variable("style", String::class, defaultValue = "informative") variable("wordCount", Int::class, defaultValue = 1000) variable("audience", String::class, defaultValue = "general") } // Research step step(researchAgent) { id = "research" name = "Research Topic" description = "Research the topic thoroughly" variables = mutableMapOf( "topic" to variable("$.input.topic"), "depth" to variable("$.input.researchDepth", defaultValue = "comprehensive") ) } // Create outline step step(outlineAgent) { id = "create-outline" name = "Create Content Outline" description = "Create a structured outline based on research" after("research") variables = mutableMapOf( "research" to variable("$.steps.research.output.findings"), "wordCount" to variable("$.input.wordCount"), "style" to variable("$.input.style") ) } // Writing step step(writingAgent) { id = "write-content" name = "Write Content" description = "Write content based on the outline" after("create-outline") variables = mutableMapOf( "outline" to variable("$.steps.create-outline.output.outline"), "research" to variable("$.steps.research.output.findings"), "style" to variable("$.input.style"), "audience" to variable("$.input.audience"), "targetWordCount" to variable("$.input.wordCount") ) } // Editing step step(editingAgent) { id = "edit-content" name = "Edit Content" description = "Edit and improve the written content" after("write-content") variables = mutableMapOf( "content" to variable("$.steps.write-content.output.content"), "style" to variable("$.input.style"), // Transform the word count to set editing intensity "editingIntensity" to variable("$.steps.write-content.output.wordCount") { wordCount -> when ((wordCount as? Int) ?: 0) { in 0..500 -> "light" in 501..2000 -> "medium" else -> "thorough" } } ) } // Define workflow output mapping output { "title" from "$.steps.write-content.output.title" "content" from "$.steps.edit-content.output.content" "wordCount" from "$.steps.edit-content.output.wordCount" "sources" from "$.steps.research.output.sources" "executionTime" from { context -> val startTime = context.startTime val endTime = System.currentTimeMillis() (endTime - startTime) / 1000 // Convert to seconds } } }

Data Processing Pipeline

This example shows a data processing workflow with complex transformations:

DataProcessingWorkflow.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable import ai.kastrax.core.tool.Tool // Assume we have these tools defined elsewhere val dataExtractionTool: Tool<*, *> = /* ... */ val dataNormalizationTool: Tool<*, *> = /* ... */ val dataAnalysisTool: Tool<*, *> = /* ... */ val reportGenerationTool: Tool<*, *> = /* ... */ // Create a data processing workflow val dataWorkflow = workflow { name = "data-processing-pipeline" description = "Extract, normalize, analyze, and report on data" // Data extraction step toolStep { id = "extract-data" name = "Extract Data" description = "Extract data from the source" tool = dataExtractionTool variables = mutableMapOf( "source" to variable("$.input.dataSource"), "format" to variable("$.input.sourceFormat", defaultValue = "json") ) } // Data normalization step toolStep { id = "normalize-data" name = "Normalize Data" description = "Clean and normalize the extracted data" after("extract-data") tool = dataNormalizationTool variables = mutableMapOf( "rawData" to variable("$.steps.extract-data.output.data"), // Transform the schema format if needed "schema" to variable("$.input.schema") { schema -> if (schema is String && schema.endsWith(".json")) { // Load schema from file (simplified example) "{ \"type\": \"object\", \"properties\": {} }" } else { schema } } ) } // Data analysis step toolStep { id = "analyze-data" name = "Analyze Data" description = "Perform analysis on the normalized data" after("normalize-data") tool = dataAnalysisTool variables = mutableMapOf( "data" to variable("$.steps.normalize-data.output.normalizedData"), "metrics" to variable("$.input.analysisMetrics", defaultValue = listOf("mean", "median", "mode")), // Filter data based on a threshold "significantOnly" to variable("$.input.significanceThreshold") { threshold -> (threshold as? Double)?.let { it > 0.0 } ?: false } ) } // Report generation step toolStep { id = "generate-report" name = "Generate Report" description = "Create a report from the analysis results" after("analyze-data") tool = reportGenerationTool variables = mutableMapOf( "analysisResults" to variable("$.steps.analyze-data.output.results"), "format" to variable("$.input.reportFormat", defaultValue = "pdf"), "includeCharts" to variable("$.input.includeVisualizations", defaultValue = true), // Combine metadata from multiple steps "metadata" to variable { context -> mapOf( "dataSource" to context.getVariable("$.input.dataSource"), "recordCount" to context.getVariable("$.steps.normalize-data.output.recordCount"), "analysisTimestamp" to context.getVariable("$.steps.analyze-data.output.timestamp"), "generatedBy" to "Kastrax Workflow Engine" ) } ) } // Define workflow output output { "report" from "$.steps.generate-report.output.report" "analysisResults" from "$.steps.analyze-data.output.results" "recordsProcessed" from "$.steps.normalize-data.output.recordCount" } }

Type Safety in Kastrax Variables ✅

Kastrax leverages Kotlin’s strong type system to provide type safety for workflow variables:

TypeSafeVariables.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable import ai.kastrax.core.workflow.TypedVariable // Define typed variables for better type safety class UserProfile( val name: String, val email: String, val preferences: Map<String, Any> ) // Create a workflow with type-safe variables val typeSafeWorkflow = workflow { name = "type-safe-workflow" description = "Demonstrate type-safe variables" // Define typed input variables input { variable("userId", String::class, required = true) variable("includePreferences", Boolean::class, defaultValue = true) } // Fetch user profile step step(userProfileAgent) { id = "fetch-user" name = "Fetch User Profile" description = "Retrieve user profile information" variables = mutableMapOf( "userId" to variable("$.input.userId") ) // Type-safe output definition outputType = UserProfile::class } // Process user data step with type checking step(processingAgent) { id = "process-user" name = "Process User Data" description = "Process user profile information" after("fetch-user") // Type-safe variable access with explicit casting variables = mutableMapOf( // TypedVariable ensures the correct type at compile time "user" to TypedVariable<UserProfile>("$.steps.fetch-user.output"), // Conditional variable based on input "includePreferences" to variable("$.input.includePreferences") ) // Access typed variables in execution execute { context -> val user = context.getTypedVariable<UserProfile>("user") val includePrefs = context.getVariable("includePreferences") as? Boolean ?: false // Type-safe access to user properties val greeting = "Hello, ${user.name}!" val contactInfo = "Contact: ${user.email}" // Conditional processing based on preferences val preferences = if (includePrefs) user.preferences else emptyMap() mapOf( "greeting" to greeting, "contactInfo" to contactInfo, "preferences" to preferences ) } } }

Best Practices for Variables ✅

1. Use Descriptive Variable Names

Choose clear, descriptive names for your variables that indicate their purpose:

// Good: Descriptive variable names variables = mutableMapOf( "customerProfile" to variable("$.steps.fetch-customer.output.profile"), "orderHistory" to variable("$.steps.fetch-orders.output.orders") ) // Avoid: Vague or generic names // variables = mutableMapOf( // "data1" to variable("$.steps.step1.output.profile"), // "data2" to variable("$.steps.step2.output.orders") // )

2. Provide Default Values

Always provide sensible default values for optional variables:

variables = mutableMapOf( "analysisDepth" to variable("$.input.depth", defaultValue = "standard"), "maxResults" to variable("$.input.limit", defaultValue = 10), "includeMetadata" to variable("$.input.metadata", defaultValue = true) )

3. Use Transformations for Data Preparation

Leverage transformation functions to prepare data for steps:

variables = mutableMapOf( // Normalize text input "normalizedText" to variable("$.input.text") { text -> (text as? String)?.trim()?.lowercase() ?: "" }, // Convert date string to timestamp "timestamp" to variable("$.input.date") { dateStr -> if (dateStr is String) { try { java.time.LocalDate.parse(dateStr).atStartOfDay() .toInstant(java.time.ZoneOffset.UTC).toEpochMilli() } catch (e: Exception) { System.currentTimeMillis() } } else { System.currentTimeMillis() } } )

4. Structure Complex Data Access

For complex data structures, use intermediate steps to extract and structure data:

// Extract and structure data in a dedicated step step(dataExtractionAgent) { id = "extract-structured-data" execute { context -> val rawData = context.getVariable("rawData") as? Map<*, *> ?: emptyMap<String, Any>() // Extract and structure the data mapOf( "customers" to extractCustomers(rawData), "products" to extractProducts(rawData), "metrics" to calculateMetrics(rawData) ) } } // Then use the structured data in subsequent steps step(analysisAgent) { id = "analyze-customers" after("extract-structured-data") variables = mutableMapOf( // Now we can access the structured data easily "customers" to variable("$.steps.extract-structured-data.output.customers") ) }

5. Document Variable Requirements

Clearly document the expected structure and types of variables:

step(reportAgent) { id = "generate-report" name = "Generate Report" description = """ Generates a comprehensive report based on analysis results. Required variables: - analysisResults: Map<String, Any> - The results of the data analysis - format: String - The output format (pdf, html, markdown) - includeCharts: Boolean - Whether to include visualizations """.trimIndent() variables = mutableMapOf( "analysisResults" to variable("$.steps.analyze-data.output.results"), "format" to variable("$.input.format", defaultValue = "pdf"), "includeCharts" to variable("$.input.charts", defaultValue = true) ) }

Variable Access Patterns ✅

Kastrax provides multiple ways to access variables, each with its own advantages:

PatternDescriptionUse Case
variable("$.path")Basic JSONPath variableSimple data mapping
variable("$.path", defaultValue)Variable with defaultOptional data with fallback
variable("$.path") { transform }Variable with transformationData conversion or processing
TypedVariable<T>("$.path")Strongly-typed variableType-safe access to complex objects
variable { context -> ... }Dynamic variableComputed values from multiple sources

Choosing the right pattern for each use case helps create robust, maintainable workflows with clear data flow.

Last updated on