Data Flow with Kastrax Workflow Variables ✅
Kastrax provides a powerful, type-safe variable system that enables efficient data flow between workflow steps. This system allows you to map input data to steps, pass information between steps, access nested properties, and transform values—all with the type safety and expressiveness of Kotlin.
Understanding Workflow Variables ✅
In Kastrax workflows, variables serve multiple important functions:
- Data Mapping: Connect workflow inputs to step inputs
- Inter-step Communication: Pass outputs from one step to inputs of another
- Property Access: Navigate complex data structures with JSONPath expressions
- Data Transformation: Apply transformations to values as they flow between steps
- Default Values: Provide fallbacks when data might be missing
- Type Safety: Ensure data consistency with Kotlin’s type system
Variable System in Kastrax ✅
Kastrax implements variables using a combination of JSONPath expressions and Kotlin’s type system. This provides a flexible yet type-safe way to manage data flow in your workflows.
Creating Variables
Variables are created using the variable()
function, which supports several overloads for different use cases:
import ai.kastrax.core.workflow.variable
// Basic variable referencing a JSONPath
val basicVariable = variable("$.input.topic")
// Variable with a default value
val variableWithDefault = variable("$.steps.analysis.output.score", defaultValue = 0.0)
// Variable with a transformation function
val transformedVariable = variable("$.steps.text.output.content") { content ->
(content as? String)?.uppercase() ?: ""
}
// Variable with both default and transformation
val complexVariable = variable(
path = "$.steps.data.output.items",
defaultValue = emptyList<String>(),
transform = { items ->
(items as? List<*>)?.filterIsInstance<String>()?.take(5) ?: emptyList()
}
)
Using Variables for Data Mapping ✅
Basic Variable Mapping
You can map data between steps using the variables
property when defining a step in your workflow:
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
import ai.kastrax.core.agent.Agent
// Assume we have these agents defined elsewhere
val dataProcessingAgent: Agent = /* ... */
val analysisAgent: Agent = /* ... */
// Create a workflow with variable mapping
val dataWorkflow = workflow {
name = "data-mapping-workflow"
description = "Process and analyze data"
// First step with input mapping
step(dataProcessingAgent) {
id = "process-data"
name = "Process Data"
description = "Process the input data"
// Map workflow input to step variables
variables = mutableMapOf(
"inputData" to variable("$.input.data"),
"options" to variable("$.input.processingOptions")
)
}
// Second step with mapping from first step
step(analysisAgent) {
id = "analyze-data"
name = "Analyze Data"
description = "Analyze the processed data"
after("process-data") // This step runs after process-data
// Map output from first step to input for second step
variables = mutableMapOf(
"processedData" to variable("$.steps.process-data.output.result"),
"analysisDepth" to variable("$.input.analysisDepth", defaultValue = "standard")
)
}
}
Accessing Nested Properties
You can access nested properties using JSONPath expressions:
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a workflow that accesses nested properties
val nestedDataWorkflow = workflow {
name = "nested-data-workflow"
step(dataExtractionAgent) {
id = "extract-data"
name = "Extract Data"
description = "Extract structured data from input"
variables = mutableMapOf(
"source" to variable("$.input.source")
)
}
step(processingAgent) {
id = "process-data"
name = "Process Data"
description = "Process the extracted data"
after("extract-data")
variables = mutableMapOf(
// Access deeply nested properties using JSONPath
"customerName" to variable("$.steps.extract-data.output.data.customer.profile.name"),
"orderItems" to variable("$.steps.extract-data.output.data.order.items[*].name"),
"totalAmount" to variable("$.steps.extract-data.output.data.order.payment.amount")
)
}
}
Data Transformations
You can transform data as it flows between steps:
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a workflow with data transformations
val transformationWorkflow = workflow {
name = "transformation-workflow"
step(dataAgent) {
id = "get-data"
name = "Get Data"
description = "Retrieve raw data"
variables = mutableMapOf(
"query" to variable("$.input.query")
)
}
step(processingAgent) {
id = "process-data"
name = "Process Data"
description = "Process data with transformations"
after("get-data")
variables = mutableMapOf(
// Transform a string to uppercase
"upperCaseTitle" to variable("$.steps.get-data.output.title") { title ->
(title as? String)?.uppercase() ?: ""
},
// Filter and limit a list
"topCategories" to variable("$.steps.get-data.output.categories") { categories ->
(categories as? List<*>)?.take(5) ?: emptyList<String>()
},
// Calculate a derived value
"wordCount" to variable("$.steps.get-data.output.content") { content ->
(content as? String)?.split("\\s+")?.size ?: 0
}
)
}
}
Variables in Loops
Variables are particularly useful in loop steps for passing data between iterations:
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a workflow with a loop that uses variables
val iterativeWorkflow = workflow {
name = "iterative-workflow"
description = "Iteratively improve content"
// Initialize loop variables
step(initializationAgent) {
id = "initialize"
name = "Initialize Variables"
description = "Set up initial values for the loop"
execute { _ ->
mapOf(
"currentContent" to "Initial draft",
"iterationCount" to 0,
"qualityScore" to 0.0
)
}
}
// Improvement loop
loopStep {
id = "improvement-loop"
name = "Content Improvement Loop"
description = "Iteratively improve content until quality threshold is reached"
after("initialize")
// Continue looping while quality is below threshold and iterations are under limit
condition { context ->
val qualityScore = context.getVariable("$.steps.evaluate.output.qualityScore") as? Double ?: 0.0
val iterationCount = context.getVariable("$.steps.improve.output.iterationCount") as? Int ?: 0
qualityScore < 8.0 && iterationCount < 5
}
// Loop body
body {
// Evaluate current content
step(evaluationAgent) {
id = "evaluate"
name = "Evaluate Content"
description = "Assess the quality of the current content"
variables = mutableMapOf(
"content" to variable("$.steps.improve.output.currentContent",
defaultValue = variable("$.steps.initialize.output.currentContent"))
)
}
// Improve content based on evaluation
step(improvementAgent) {
id = "improve"
name = "Improve Content"
description = "Enhance content based on evaluation"
after("evaluate")
variables = mutableMapOf(
"content" to variable("$.steps.improve.output.currentContent",
defaultValue = variable("$.steps.initialize.output.currentContent")),
"feedback" to variable("$.steps.evaluate.output.feedback"),
"iterationCount" to variable("$.steps.improve.output.iterationCount", defaultValue = 0)
)
}
}
}
}
Variable Resolution ✅
When a workflow executes, Kastrax resolves variables at runtime through a sophisticated resolution process:
- Path Parsing: The JSONPath expression is parsed to identify the data source and path
- Context Access: The workflow context is accessed to retrieve step outputs or input data
- Path Navigation: The system navigates through the data structure following the JSONPath
- Default Application: If the path doesn’t resolve to a value and a default is provided, the default is used
- Transformation: If a transformation function is specified, it’s applied to the resolved value
- Type Conversion: The value is converted to the expected type if necessary
- Injection: The final value is injected into the step’s execution context
This process happens automatically for each variable when a step is about to execute.
JSONPath in Kastrax ✅
Kastrax uses JSONPath expressions to navigate data structures. Here are the key patterns:
Pattern | Description | Example |
---|---|---|
$.input | Access workflow input data | $.input.topic |
$.steps | Access step outputs | $.steps.research.output.text |
.property | Access object property | $.input.user.name |
[index] | Access array element | $.steps.data.output.items[0] |
[*] | Access all array elements | $.steps.data.output.items[*].name |
..property | Recursive descent | $..name (finds all name properties) |
Complete Examples ✅
Content Generation Workflow
This example shows a complete content generation workflow that uses variables extensively:
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
import ai.kastrax.core.agent.Agent
// Assume we have these agents defined elsewhere
val researchAgent: Agent = /* ... */
val outlineAgent: Agent = /* ... */
val writingAgent: Agent = /* ... */
val editingAgent: Agent = /* ... */
// Create a content generation workflow
val contentWorkflow = workflow {
name = "content-generation"
description = "Generate high-quality content on a given topic"
// Define input variables
input {
variable("topic", String::class, required = true)
variable("style", String::class, defaultValue = "informative")
variable("wordCount", Int::class, defaultValue = 1000)
variable("audience", String::class, defaultValue = "general")
}
// Research step
step(researchAgent) {
id = "research"
name = "Research Topic"
description = "Research the topic thoroughly"
variables = mutableMapOf(
"topic" to variable("$.input.topic"),
"depth" to variable("$.input.researchDepth", defaultValue = "comprehensive")
)
}
// Create outline step
step(outlineAgent) {
id = "create-outline"
name = "Create Content Outline"
description = "Create a structured outline based on research"
after("research")
variables = mutableMapOf(
"research" to variable("$.steps.research.output.findings"),
"wordCount" to variable("$.input.wordCount"),
"style" to variable("$.input.style")
)
}
// Writing step
step(writingAgent) {
id = "write-content"
name = "Write Content"
description = "Write content based on the outline"
after("create-outline")
variables = mutableMapOf(
"outline" to variable("$.steps.create-outline.output.outline"),
"research" to variable("$.steps.research.output.findings"),
"style" to variable("$.input.style"),
"audience" to variable("$.input.audience"),
"targetWordCount" to variable("$.input.wordCount")
)
}
// Editing step
step(editingAgent) {
id = "edit-content"
name = "Edit Content"
description = "Edit and improve the written content"
after("write-content")
variables = mutableMapOf(
"content" to variable("$.steps.write-content.output.content"),
"style" to variable("$.input.style"),
// Transform the word count to set editing intensity
"editingIntensity" to variable("$.steps.write-content.output.wordCount") { wordCount ->
when ((wordCount as? Int) ?: 0) {
in 0..500 -> "light"
in 501..2000 -> "medium"
else -> "thorough"
}
}
)
}
// Define workflow output mapping
output {
"title" from "$.steps.write-content.output.title"
"content" from "$.steps.edit-content.output.content"
"wordCount" from "$.steps.edit-content.output.wordCount"
"sources" from "$.steps.research.output.sources"
"executionTime" from { context ->
val startTime = context.startTime
val endTime = System.currentTimeMillis()
(endTime - startTime) / 1000 // Convert to seconds
}
}
}
Data Processing Pipeline
This example shows a data processing workflow with complex transformations:
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
import ai.kastrax.core.tool.Tool
// Assume we have these tools defined elsewhere
val dataExtractionTool: Tool<*, *> = /* ... */
val dataNormalizationTool: Tool<*, *> = /* ... */
val dataAnalysisTool: Tool<*, *> = /* ... */
val reportGenerationTool: Tool<*, *> = /* ... */
// Create a data processing workflow
val dataWorkflow = workflow {
name = "data-processing-pipeline"
description = "Extract, normalize, analyze, and report on data"
// Data extraction step
toolStep {
id = "extract-data"
name = "Extract Data"
description = "Extract data from the source"
tool = dataExtractionTool
variables = mutableMapOf(
"source" to variable("$.input.dataSource"),
"format" to variable("$.input.sourceFormat", defaultValue = "json")
)
}
// Data normalization step
toolStep {
id = "normalize-data"
name = "Normalize Data"
description = "Clean and normalize the extracted data"
after("extract-data")
tool = dataNormalizationTool
variables = mutableMapOf(
"rawData" to variable("$.steps.extract-data.output.data"),
// Transform the schema format if needed
"schema" to variable("$.input.schema") { schema ->
if (schema is String && schema.endsWith(".json")) {
// Load schema from file (simplified example)
"{ \"type\": \"object\", \"properties\": {} }"
} else {
schema
}
}
)
}
// Data analysis step
toolStep {
id = "analyze-data"
name = "Analyze Data"
description = "Perform analysis on the normalized data"
after("normalize-data")
tool = dataAnalysisTool
variables = mutableMapOf(
"data" to variable("$.steps.normalize-data.output.normalizedData"),
"metrics" to variable("$.input.analysisMetrics", defaultValue = listOf("mean", "median", "mode")),
// Filter data based on a threshold
"significantOnly" to variable("$.input.significanceThreshold") { threshold ->
(threshold as? Double)?.let { it > 0.0 } ?: false
}
)
}
// Report generation step
toolStep {
id = "generate-report"
name = "Generate Report"
description = "Create a report from the analysis results"
after("analyze-data")
tool = reportGenerationTool
variables = mutableMapOf(
"analysisResults" to variable("$.steps.analyze-data.output.results"),
"format" to variable("$.input.reportFormat", defaultValue = "pdf"),
"includeCharts" to variable("$.input.includeVisualizations", defaultValue = true),
// Combine metadata from multiple steps
"metadata" to variable { context ->
mapOf(
"dataSource" to context.getVariable("$.input.dataSource"),
"recordCount" to context.getVariable("$.steps.normalize-data.output.recordCount"),
"analysisTimestamp" to context.getVariable("$.steps.analyze-data.output.timestamp"),
"generatedBy" to "Kastrax Workflow Engine"
)
}
)
}
// Define workflow output
output {
"report" from "$.steps.generate-report.output.report"
"analysisResults" from "$.steps.analyze-data.output.results"
"recordsProcessed" from "$.steps.normalize-data.output.recordCount"
}
}
Type Safety in Kastrax Variables ✅
Kastrax leverages Kotlin’s strong type system to provide type safety for workflow variables:
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
import ai.kastrax.core.workflow.TypedVariable
// Define typed variables for better type safety
class UserProfile(
val name: String,
val email: String,
val preferences: Map<String, Any>
)
// Create a workflow with type-safe variables
val typeSafeWorkflow = workflow {
name = "type-safe-workflow"
description = "Demonstrate type-safe variables"
// Define typed input variables
input {
variable("userId", String::class, required = true)
variable("includePreferences", Boolean::class, defaultValue = true)
}
// Fetch user profile step
step(userProfileAgent) {
id = "fetch-user"
name = "Fetch User Profile"
description = "Retrieve user profile information"
variables = mutableMapOf(
"userId" to variable("$.input.userId")
)
// Type-safe output definition
outputType = UserProfile::class
}
// Process user data step with type checking
step(processingAgent) {
id = "process-user"
name = "Process User Data"
description = "Process user profile information"
after("fetch-user")
// Type-safe variable access with explicit casting
variables = mutableMapOf(
// TypedVariable ensures the correct type at compile time
"user" to TypedVariable<UserProfile>("$.steps.fetch-user.output"),
// Conditional variable based on input
"includePreferences" to variable("$.input.includePreferences")
)
// Access typed variables in execution
execute { context ->
val user = context.getTypedVariable<UserProfile>("user")
val includePrefs = context.getVariable("includePreferences") as? Boolean ?: false
// Type-safe access to user properties
val greeting = "Hello, ${user.name}!"
val contactInfo = "Contact: ${user.email}"
// Conditional processing based on preferences
val preferences = if (includePrefs) user.preferences else emptyMap()
mapOf(
"greeting" to greeting,
"contactInfo" to contactInfo,
"preferences" to preferences
)
}
}
}
Best Practices for Variables ✅
1. Use Descriptive Variable Names
Choose clear, descriptive names for your variables that indicate their purpose:
// Good: Descriptive variable names
variables = mutableMapOf(
"customerProfile" to variable("$.steps.fetch-customer.output.profile"),
"orderHistory" to variable("$.steps.fetch-orders.output.orders")
)
// Avoid: Vague or generic names
// variables = mutableMapOf(
// "data1" to variable("$.steps.step1.output.profile"),
// "data2" to variable("$.steps.step2.output.orders")
// )
2. Provide Default Values
Always provide sensible default values for optional variables:
variables = mutableMapOf(
"analysisDepth" to variable("$.input.depth", defaultValue = "standard"),
"maxResults" to variable("$.input.limit", defaultValue = 10),
"includeMetadata" to variable("$.input.metadata", defaultValue = true)
)
3. Use Transformations for Data Preparation
Leverage transformation functions to prepare data for steps:
variables = mutableMapOf(
// Normalize text input
"normalizedText" to variable("$.input.text") { text ->
(text as? String)?.trim()?.lowercase() ?: ""
},
// Convert date string to timestamp
"timestamp" to variable("$.input.date") { dateStr ->
if (dateStr is String) {
try {
java.time.LocalDate.parse(dateStr).atStartOfDay()
.toInstant(java.time.ZoneOffset.UTC).toEpochMilli()
} catch (e: Exception) {
System.currentTimeMillis()
}
} else {
System.currentTimeMillis()
}
}
)
4. Structure Complex Data Access
For complex data structures, use intermediate steps to extract and structure data:
// Extract and structure data in a dedicated step
step(dataExtractionAgent) {
id = "extract-structured-data"
execute { context ->
val rawData = context.getVariable("rawData") as? Map<*, *> ?: emptyMap<String, Any>()
// Extract and structure the data
mapOf(
"customers" to extractCustomers(rawData),
"products" to extractProducts(rawData),
"metrics" to calculateMetrics(rawData)
)
}
}
// Then use the structured data in subsequent steps
step(analysisAgent) {
id = "analyze-customers"
after("extract-structured-data")
variables = mutableMapOf(
// Now we can access the structured data easily
"customers" to variable("$.steps.extract-structured-data.output.customers")
)
}
5. Document Variable Requirements
Clearly document the expected structure and types of variables:
step(reportAgent) {
id = "generate-report"
name = "Generate Report"
description = """
Generates a comprehensive report based on analysis results.
Required variables:
- analysisResults: Map<String, Any> - The results of the data analysis
- format: String - The output format (pdf, html, markdown)
- includeCharts: Boolean - Whether to include visualizations
""".trimIndent()
variables = mutableMapOf(
"analysisResults" to variable("$.steps.analyze-data.output.results"),
"format" to variable("$.input.format", defaultValue = "pdf"),
"includeCharts" to variable("$.input.charts", defaultValue = true)
)
}
Variable Access Patterns ✅
Kastrax provides multiple ways to access variables, each with its own advantages:
Pattern | Description | Use Case |
---|---|---|
variable("$.path") | Basic JSONPath variable | Simple data mapping |
variable("$.path", defaultValue) | Variable with default | Optional data with fallback |
variable("$.path") { transform } | Variable with transformation | Data conversion or processing |
TypedVariable<T>("$.path") | Strongly-typed variable | Type-safe access to complex objects |
variable { context -> ... } | Dynamic variable | Computed values from multiple sources |
Choosing the right pattern for each use case helps create robust, maintainable workflows with clear data flow.