Nested Workflows in Kastrax ✅
Kastrax provides powerful support for nested workflows, allowing you to use entire workflows as steps within other workflows. This feature enables modular design, promotes code reuse, and simplifies the implementation of complex AI agent orchestration patterns.
Nested workflows are particularly valuable for:
- Breaking down complex processes into manageable, reusable components
- Encapsulating domain-specific logic in dedicated workflows
- Creating reusable workflow libraries that can be shared across projects
- Improving readability and maintainability of complex agent orchestration
- Enabling team collaboration by allowing different teams to work on separate workflow components
Nested Workflow Architecture ✅
In Kastrax, nested workflows follow a hierarchical structure:
- Parent Workflow: The top-level workflow that contains one or more nested workflows as steps
- Nested Workflow: A complete workflow that is used as a step within another workflow
- Sub-steps: The individual steps within a nested workflow
This architecture allows for multiple levels of nesting, enabling complex hierarchical workflow structures while maintaining clean separation of concerns.
Basic Usage ✅
You can use a workflow as a step within another workflow using the subWorkflowStep
function:
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a nested workflow
val dataProcessingWorkflow = workflow {
name = "data-processing-workflow"
description = "Process and analyze data"
// Define input parameters
input {
variable("data", Any::class, required = true)
variable("processingLevel", String::class, defaultValue = "standard")
}
// Define workflow steps
step(dataCleaningAgent) {
id = "clean-data"
name = "Clean Data"
description = "Clean and normalize the input data"
variables = mutableMapOf(
"rawData" to variable("$.input.data"),
"level" to variable("$.input.processingLevel")
)
}
step(dataAnalysisAgent) {
id = "analyze-data"
name = "Analyze Data"
description = "Analyze the cleaned data"
after("clean-data")
variables = mutableMapOf(
"cleanedData" to variable("$.steps.clean-data.output.data")
)
}
// Define workflow output
output {
"processedData" from "$.steps.clean-data.output.data"
"analysis" from "$.steps.analyze-data.output.results"
"metadata" from "$.steps.analyze-data.output.metadata"
}
}
// Create a parent workflow that uses the nested workflow
val parentWorkflow = workflow {
name = "parent-workflow"
description = "Parent workflow that uses a nested workflow"
// Define input parameters
input {
variable("sourceData", Any::class, required = true)
variable("analysisType", String::class, defaultValue = "comprehensive")
}
// First step to prepare data
step(dataPreparationAgent) {
id = "prepare-data"
name = "Prepare Data"
description = "Prepare data for processing"
variables = mutableMapOf(
"sourceData" to variable("$.input.sourceData")
)
}
// Use the nested workflow as a step
subWorkflowStep {
id = "process-data"
name = "Process Data"
description = "Process data using the data processing workflow"
after("prepare-data")
// Specify which workflow to use
workflowId = dataProcessingWorkflow.id
// Map parent workflow variables to nested workflow inputs
input = mapOf(
"data" to variable("$.steps.prepare-data.output.preparedData"),
"processingLevel" to variable("$.input.analysisType")
)
}
// Final step that uses the nested workflow results
step(reportGenerationAgent) {
id = "generate-report"
name = "Generate Report"
description = "Generate a report based on processed data"
after("process-data")
variables = mutableMapOf(
"processedData" to variable("$.steps.process-data.output.processedData"),
"analysis" to variable("$.steps.process-data.output.analysis")
)
}
// Define workflow output
output {
"report" from "$.steps.generate-report.output.report"
"analysisResults" from "$.steps.process-data.output.analysis"
}
}
When a workflow is used as a step:
- It is referenced by its ID in the
workflowId
property - Input variables are mapped from the parent workflow to the nested workflow
- The nested workflow’s output is available in the parent workflow’s context
- The nested workflow’s steps are executed in their defined order
Accessing Results ✅
Results from a nested workflow are available in the parent workflow’s context under the nested workflow’s step ID. The results include all outputs defined in the nested workflow’s output mapping:
import ai.kastrax.core.KastraxSystem
import ai.kastrax.core.workflow.WorkflowExecuteOptions
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val kastraxSystem = KastraxSystem()
// Register workflows
kastraxSystem.registerWorkflow(dataProcessingWorkflow)
kastraxSystem.registerWorkflow(parentWorkflow)
// Execute the parent workflow
val result = kastraxSystem.workflowEngine.executeWorkflow(
workflowId = parentWorkflow.id,
input = mapOf(
"sourceData" to mapOf("records" to listOf(1, 2, 3, 4, 5)),
"analysisType" to "detailed"
)
)
// Access nested workflow results
if (result.success) {
// Access the final workflow output (which may include nested workflow results)
val report = result.output["report"]
val analysisResults = result.output["analysisResults"]
println("Report: $report")
println("Analysis Results: $analysisResults")
// Access specific nested workflow outputs directly
val processedData = result.steps["process-data"]?.output?.get("processedData")
val metadata = result.steps["process-data"]?.output?.get("metadata")
println("Processed Data: $processedData")
println("Metadata: $metadata")
} else {
println("Workflow execution failed: ${result.error}")
}
}
Advanced Nested Workflow Patterns ✅
Kastrax supports several advanced patterns for working with nested workflows:
Parallel Execution of Nested Workflows
Multiple nested workflows can be executed in parallel for improved performance:
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a workflow with parallel nested workflows
val parallelWorkflow = workflow {
name = "parallel-workflow"
description = "Execute multiple nested workflows in parallel"
// Define input parameters
input {
variable("data", Any::class, required = true)
}
// Execute multiple nested workflows in parallel
parallelSteps {
// First nested workflow
subWorkflowStep {
id = "text-analysis"
name = "Text Analysis"
description = "Analyze textual content"
workflowId = "text-analysis-workflow"
input = mapOf(
"text" to variable("$.input.data.text")
)
}
// Second nested workflow
subWorkflowStep {
id = "image-analysis"
name = "Image Analysis"
description = "Analyze image content"
workflowId = "image-analysis-workflow"
input = mapOf(
"imageUrl" to variable("$.input.data.imageUrl")
)
}
// Third nested workflow
subWorkflowStep {
id = "metadata-analysis"
name = "Metadata Analysis"
description = "Analyze metadata"
workflowId = "metadata-analysis-workflow"
input = mapOf(
"metadata" to variable("$.input.data.metadata")
)
}
}
// Combine results from all parallel nested workflows
step(resultCombinationAgent) {
id = "combine-results"
name = "Combine Results"
description = "Combine results from all analysis workflows"
after("text-analysis", "image-analysis", "metadata-analysis")
variables = mutableMapOf(
"textResults" to variable("$.steps.text-analysis.output.results"),
"imageResults" to variable("$.steps.image-analysis.output.results"),
"metadataResults" to variable("$.steps.metadata-analysis.output.results")
)
}
// Define workflow output
output {
"combinedResults" from "$.steps.combine-results.output.combinedResults"
"textAnalysis" from "$.steps.text-analysis.output.results"
"imageAnalysis" from "$.steps.image-analysis.output.results"
"metadataAnalysis" from "$.steps.metadata-analysis.output.results"
}
}
Conditional Nested Workflows
You can conditionally execute different nested workflows based on input data or previous step results:
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a workflow with conditional nested workflow execution
val conditionalWorkflow = workflow {
name = "conditional-workflow"
description = "Conditionally execute different nested workflows"
// Define input parameters
input {
variable("data", Any::class, required = true)
variable("processingType", String::class, required = true)
}
// Initial data analysis step
step(dataAnalysisAgent) {
id = "analyze-data"
name = "Analyze Data"
description = "Analyze input data to determine processing path"
variables = mutableMapOf(
"data" to variable("$.input.data"),
"processingType" to variable("$.input.processingType")
)
}
// Conditional step to choose the appropriate nested workflow
conditionalStep {
id = "processing-path"
name = "Choose Processing Path"
description = "Select the appropriate processing workflow"
after("analyze-data")
// Condition to determine which path to take
condition { context ->
val processingType = context.getVariable("$.input.processingType") as? String ?: "standard"
processingType == "advanced"
}
// Advanced processing path
onTrue {
subWorkflowStep {
id = "advanced-processing"
name = "Advanced Processing"
description = "Execute advanced processing workflow"
workflowId = "advanced-processing-workflow"
input = mapOf(
"data" to variable("$.steps.analyze-data.output.preparedData"),
"options" to variable("$.steps.analyze-data.output.advancedOptions")
)
}
}
// Standard processing path
onFalse {
subWorkflowStep {
id = "standard-processing"
name = "Standard Processing"
description = "Execute standard processing workflow"
workflowId = "standard-processing-workflow"
input = mapOf(
"data" to variable("$.steps.analyze-data.output.preparedData")
)
}
}
}
// Final step to format results
step(resultFormattingAgent) {
id = "format-results"
name = "Format Results"
description = "Format the processing results"
// This step will run after whichever processing path was taken
execute { context ->
// Get results from whichever processing path was taken
val advancedResults = context.getVariable("$.steps.advanced-processing.output.results")
val standardResults = context.getVariable("$.steps.standard-processing.output.results")
// Use whichever results are available
val results = advancedResults ?: standardResults
// Format the results
mapOf(
"formattedResults" to formatResults(results),
"processingType" to if (advancedResults != null) "advanced" else "standard"
)
}
}
// Define workflow output
output {
"results" from "$.steps.format-results.output.formattedResults"
"processingType" from "$.steps.format-results.output.processingType"
}
}
// Helper function to format results
fun formatResults(results: Any?): Map<String, Any> {
// In a real implementation, this would format the results appropriately
return mapOf("formatted" to (results ?: "No results"))
}
Iterative Nested Workflows
You can implement iterative processing using nested workflows in loops:
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a workflow with iterative nested workflow execution
val iterativeWorkflow = workflow {
name = "iterative-workflow"
description = "Iteratively execute a nested workflow until a condition is met"
// Define input parameters
input {
variable("initialData", Any::class, required = true)
variable("maxIterations", Int::class, defaultValue = 5)
variable("qualityThreshold", Double::class, defaultValue = 0.9)
}
// Initialize iteration variables
step(initializationAgent) {
id = "initialize"
name = "Initialize"
description = "Set up initial values for iteration"
execute { context ->
mapOf(
"currentData" to context.getVariable("$.input.initialData"),
"iterationCount" to 0,
"quality" to 0.0
)
}
}
// Iterative loop
loopStep {
id = "refinement-loop"
name = "Refinement Loop"
description = "Iteratively refine data until quality threshold is reached"
after("initialize")
// Continue looping while quality is below threshold and iterations are under limit
condition { context ->
val quality = context.getVariable("$.steps.evaluate-quality.output.quality") as? Double ?: 0.0
val iterationCount = context.getVariable("$.steps.refine-data.output.iterationCount") as? Int ?: 0
val maxIterations = context.getVariable("$.input.maxIterations") as? Int ?: 5
quality < context.getVariable("$.input.qualityThreshold") as Double && iterationCount < maxIterations
}
// Loop body
body {
// Execute nested workflow for data refinement
subWorkflowStep {
id = "refine-data"
name = "Refine Data"
description = "Refine data using the refinement workflow"
workflowId = "data-refinement-workflow"
input = mapOf(
"data" to variable("$.steps.refine-data.output.currentData",
defaultValue = variable("$.steps.initialize.output.currentData")),
"iterationCount" to variable("$.steps.refine-data.output.iterationCount",
defaultValue = variable("$.steps.initialize.output.iterationCount"))
)
}
// Evaluate the quality of refined data
step(qualityEvaluationAgent) {
id = "evaluate-quality"
name = "Evaluate Quality"
description = "Evaluate the quality of refined data"
after("refine-data")
variables = mutableMapOf(
"refinedData" to variable("$.steps.refine-data.output.refinedData"),
"iterationCount" to variable("$.steps.refine-data.output.iterationCount")
)
}
}
}
// Final step to process the refined data
step(finalProcessingAgent) {
id = "final-processing"
name = "Final Processing"
description = "Process the final refined data"
after("refinement-loop")
variables = mutableMapOf(
"refinedData" to variable("$.steps.refine-data.output.refinedData"),
"quality" to variable("$.steps.evaluate-quality.output.quality"),
"iterationCount" to variable("$.steps.refine-data.output.iterationCount")
)
}
// Define workflow output
output {
"finalData" from "$.steps.final-processing.output.finalData"
"quality" from "$.steps.evaluate-quality.output.quality"
"iterations" from "$.steps.refine-data.output.iterationCount"
}
}
Monitoring Nested Workflows ✅
Kastrax provides comprehensive monitoring capabilities for nested workflows, allowing you to track their execution and handle events:
import ai.kastrax.core.KastraxSystem
import ai.kastrax.core.workflow.WorkflowExecuteOptions
import ai.kastrax.core.workflow.monitoring.WorkflowMonitor
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val kastraxSystem = KastraxSystem()
// Register workflows
kastraxSystem.registerWorkflow(parentWorkflow)
// Create a workflow monitor
val workflowMonitor = WorkflowMonitor(kastraxSystem)
// Execute the parent workflow with monitoring
val result = kastraxSystem.workflowEngine.executeWorkflow(
workflowId = parentWorkflow.id,
input = mapOf("sourceData" to "test-data"),
options = WorkflowExecuteOptions(
// Monitor all steps, including nested workflow steps
onStepStart = { stepId, input ->
// Check if this is a nested workflow step
if (stepId == "process-data") {
println("Starting nested workflow execution with input: $input")
} else {
println("Starting step: $stepId with input: $input")
}
},
onStepFinish = { stepResult ->
// Check if this is a nested workflow step
if (stepResult.stepId == "process-data") {
println("Nested workflow completed with status: ${stepResult.status}")
// Access nested workflow outputs
val nestedOutput = stepResult.output
println("Nested workflow outputs: $nestedOutput")
// Access nested workflow steps
val nestedSteps = stepResult.nestedSteps
println("Nested workflow steps: ${nestedSteps?.keys}")
} else {
val stepId = stepResult.stepId
val status = stepResult.status
println("Step $stepId completed with status: $status")
}
},
// Monitor nested workflow events
onNestedWorkflowEvent = { workflowId, eventType, data ->
println("Nested workflow $workflowId event: $eventType, data: $data")
}
)
)
// Get detailed execution history, including nested workflows
val executionHistory = workflowMonitor.getExecutionHistory(result.executionId, includeNested = true)
// Analyze execution metrics
val metrics = workflowMonitor.getExecutionMetrics(result.executionId)
println("Total execution time: ${metrics.totalExecutionTimeMs} ms")
println("Nested workflow execution time: ${metrics.stepExecutionTimes.get("process-data")} ms")
// Or alternatively:
// val executionTime = metrics.stepExecutionTimes["process-data"]
// println("Nested workflow execution time: $executionTime ms")
}
Suspending and Resuming Nested Workflows ✅
Nested workflows fully support suspension and resumption, allowing you to create complex workflows with human-in-the-loop processes or asynchronous operations:
import ai.kastrax.core.KastraxSystem
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
import kotlinx.coroutines.runBlocking
// Create a nested workflow with suspension points
val reviewWorkflow = workflow {
name = "content-review-workflow"
description = "Review content with human approval"
// Define input parameters
input {
variable("content", String::class, required = true)
}
// Initial content analysis
step(contentAnalysisAgent) {
id = "analyze-content"
name = "Analyze Content"
description = "Analyze content quality"
variables = mutableMapOf(
"content" to variable("$.input.content")
)
}
// Human review step (suspends the workflow)
humanStep {
id = "human-review"
name = "Human Review"
description = "Get human approval for content"
after("analyze-content")
prompt { context ->
val content = context.getVariable("$.input.content") as? String ?: ""
val analysis = context.getVariable("$.steps.analyze-content.output.analysis") as? Map<*, *> ?: emptyMap<String, Any>()
// Format the analysis as a string first
val analysisText = analysis.entries.joinToString("\n") { entry ->
// Format each entry
val key = entry.key
val value = entry.value
"$key: $value"
}
// Then use it in the template
"""Please review the following content:
$content
Analysis:
$analysisText
Approve or suggest changes.""".trimIndent()
}
}
// Process review results
step(reviewProcessingAgent) {
id = "process-review"
name = "Process Review"
description = "Process human review results"
after("human-review")
variables = mutableMapOf(
"content" to variable("$.input.content"),
"approved" to variable("$.steps.human-review.output.approved"),
"feedback" to variable("$.steps.human-review.output.feedback")
)
}
// Define workflow output
output {
"reviewedContent" from "$.steps.process-review.output.reviewedContent"
"approved" from "$.steps.human-review.output.approved"
"feedback" from "$.steps.human-review.output.feedback"
}
}
// Create a parent workflow that uses the suspendable nested workflow
val contentPublishingWorkflow = workflow {
name = "content-publishing-workflow"
description = "Create and publish content with review"
// Define input parameters
input {
variable("topic", String::class, required = true)
}
// Generate content
step(contentGenerationAgent) {
id = "generate-content"
name = "Generate Content"
description = "Generate content based on topic"
variables = mutableMapOf(
"topic" to variable("$.input.topic")
)
}
// Use the review workflow as a nested workflow
subWorkflowStep {
id = "review-content"
name = "Review Content"
description = "Review content using the review workflow"
after("generate-content")
workflowId = reviewWorkflow.id
input = mapOf(
"content" to variable("$.steps.generate-content.output.content")
)
}
// Publish content if approved
conditionalStep {
id = "publishing-decision"
name = "Publishing Decision"
description = "Decide whether to publish content"
after("review-content")
condition { context ->
context.getVariable("$.steps.review-content.output.approved") as? Boolean ?: false
}
onTrue {
step(publishingAgent) {
id = "publish-content"
name = "Publish Content"
description = "Publish the approved content"
variables = mutableMapOf(
"content" to variable("$.steps.review-content.output.reviewedContent")
)
}
}
onFalse {
step(revisionAgent) {
id = "revise-content"
name = "Revise Content"
description = "Revise content based on feedback"
variables = mutableMapOf(
"content" to variable("$.steps.generate-content.output.content"),
"feedback" to variable("$.steps.review-content.output.feedback")
)
}
}
}
}
// Execute and resume the workflow
fun main() = runBlocking {
val kastraxSystem = KastraxSystem()
// Register workflows
kastraxSystem.registerWorkflow(reviewWorkflow)
kastraxSystem.registerWorkflow(contentPublishingWorkflow)
// Execute the parent workflow
val result = kastraxSystem.workflowEngine.executeWorkflow(
workflowId = contentPublishingWorkflow.id,
input = mapOf("topic" to "Artificial Intelligence Ethics")
)
// Check if the workflow is suspended
if (result.status == "suspended") {
println("Workflow suspended at: ${result.suspendedStepId}")
// In a real application, this would be triggered by a human review
// For this example, we'll simulate receiving human input
val humanInput = mapOf(
"approved" to true,
"feedback" to "The content looks good!"
)
// Resume the workflow with human input
val resumeResult = kastraxSystem.workflowEngine.resumeWorkflow(
suspensionId = result.suspensionId,
resumeData = humanInput
)
println("Workflow resumed and completed with status: ${resumeResult.status}")
println("Final output: ${resumeResult.output}")
}
}
When working with suspended nested workflows:
- The parent workflow is suspended when any step in a nested workflow suspends
- The suspension ID uniquely identifies the suspended workflow, regardless of nesting level
- When resuming, the nested workflow continues from the suspended step
- After the nested workflow completes, the parent workflow continues execution
Type-Safe Nested Workflows ✅
Kastrax provides strong type safety for nested workflows through explicit input and output definitions:
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
import kotlinx.serialization.Serializable
// Define data classes for type-safe input/output
@Serializable
data class AnalysisInput(
val text: String,
val options: Map<String, Any> = emptyMap()
)
@Serializable
data class AnalysisResult(
val sentiment: Double,
val topics: List<String>,
val summary: String,
val metadata: Map<String, Any>
)
// Create a type-safe nested workflow
val analysisWorkflow = workflow {
name = "text-analysis-workflow"
description = "Analyze text with type safety"
// Define typed input
input {
variable("analysisInput", AnalysisInput::class, required = true)
}
// Workflow steps
step(textAnalysisAgent) {
id = "analyze-text"
name = "Analyze Text"
description = "Perform text analysis"
variables = mutableMapOf(
"text" to variable("$.input.analysisInput.text"),
"options" to variable("$.input.analysisInput.options")
)
}
// Define typed output
output {
"result" from { context ->
// Create a strongly-typed result object
val sentiment = context.getVariable("$.steps.analyze-text.output.sentiment") as? Double ?: 0.0
val topics = context.getVariable("$.steps.analyze-text.output.topics") as? List<String> ?: emptyList()
val summary = context.getVariable("$.steps.analyze-text.output.summary") as? String ?: ""
val metadata = context.getVariable("$.steps.analyze-text.output.metadata") as? Map<String, Any> ?: emptyMap()
AnalysisResult(
sentiment = sentiment,
topics = topics,
summary = summary,
metadata = metadata
)
}
}
}
// Use the type-safe nested workflow in a parent workflow
val documentProcessingWorkflow = workflow {
name = "document-processing-workflow"
description = "Process documents with type-safe nested workflows"
// Define input parameters
input {
variable("document", String::class, required = true)
}
// Extract text from document
step(textExtractionAgent) {
id = "extract-text"
name = "Extract Text"
description = "Extract text from document"
variables = mutableMapOf(
"document" to variable("$.input.document")
)
}
// Use the analysis workflow with type-safe input
subWorkflowStep {
id = "analyze-document"
name = "Analyze Document"
description = "Analyze document text"
after("extract-text")
workflowId = analysisWorkflow.id
// Create a type-safe input object
input = { context ->
val text = context.getVariable("$.steps.extract-text.output.text") as? String ?: ""
val options = mapOf("language" to "en", "detailed" to true)
mapOf("analysisInput" to AnalysisInput(text = text, options = options))
}
}
// Process the type-safe result
step(reportGenerationAgent) {
id = "generate-report"
name = "Generate Report"
description = "Generate report based on analysis"
after("analyze-document")
// Access the strongly-typed result
execute { context ->
// Get the typed result from the nested workflow
val analysisResult = context.getVariable("$.steps.analyze-document.output.result") as AnalysisResult
// Use the strongly-typed properties
val sentiment = analysisResult.sentiment
val topics = analysisResult.topics
val summary = analysisResult.summary
// Generate report using typed data
mapOf(
"report" to "Document Analysis Report:\n" +
"Sentiment: $sentiment\n" +
// Join topics with comma separator
"Topics: " + topics.joinToString(", ") + "\n" +
"Summary: $summary",
"sentimentScore" to sentiment,
"topTopics" to topics.take(3)
)
}
}
}
Best Practices for Nested Workflows ✅
1. Design for Modularity and Reusability
Create nested workflows that encapsulate specific functionality and can be reused across multiple parent workflows:
// Good: Modular workflow that handles a specific task
val emailProcessingWorkflow = workflow {
name = "email-processing-workflow"
description = "Process emails with classification and response generation"
// Clear input/output definitions
input {
variable("emailContent", String::class, required = true)
variable("sender", String::class, required = true)
}
// Focused steps for this specific task
step(emailClassificationAgent) { /* ... */ }
step(responseGenerationAgent) { /* ... */ }
// Well-defined outputs
output {
"classification" from "$.steps.email-classification.output.category"
"response" from "$.steps.response-generation.output.text"
}
}
// This workflow can now be used in multiple parent workflows
2. Use Clear Input/Output Contracts
Define explicit input and output contracts for nested workflows to ensure they can be used correctly:
// Define clear input requirements
input {
variable("data", DataType::class, required = true, description = "The data to process")
variable("options", OptionsType::class, defaultValue = DefaultOptions)
}
// Define comprehensive output mapping
output {
"primaryResult" from "$.steps.main-processing.output.result"
"metadata" from "$.steps.analysis.output.metadata"
"status" from { context ->
if (context.getVariable("$.steps.validation.output.valid") as? Boolean == true) {
"success"
} else {
"failed_validation"
}
}
}
3. Implement Proper Error Handling
Ensure nested workflows handle errors appropriately and propagate meaningful error information to parent workflows:
// In nested workflow
step(processingAgent) {
id = "process-data"
// Handle errors within the nested workflow
onError { error ->
mapOf(
"error" to error.message,
"errorType" to error.javaClass.simpleName,
"recoverable" to (error !is CriticalException)
)
}
}
// In parent workflow
subWorkflowStep {
id = "nested-processing"
workflowId = "data-processing-workflow"
// Handle errors from the nested workflow
onError { error ->
println("Nested workflow failed: ${error.message}")
// Take appropriate action based on the error
step(errorHandlingAgent) {
id = "handle-nested-error"
variables = mutableMapOf(
"error" to error.message,
"workflowId" to "data-processing-workflow"
)
}
}
}
4. Monitor Performance and Resource Usage
Track the performance of nested workflows to identify bottlenecks and optimize resource usage:
// Configure monitoring for nested workflows
val options = WorkflowExecuteOptions(
// Track step execution times
onStepStart = { stepId, _ ->
stepStartTimes[stepId] = System.currentTimeMillis()
},
onStepFinish = { stepResult ->
val startTime = stepStartTimes[stepResult.stepId] ?: 0
val duration = System.currentTimeMillis() - startTime
// Log execution time for nested workflows
if (stepResult.nestedSteps != null) {
val stepId = stepResult.stepId
val stepsSize = stepResult.nestedSteps?.size ?: 0
println("Nested workflow $stepId completed in $duration ms")
println("Nested steps: $stepsSize")
}
},
// Set resource limits for nested workflows
nestedWorkflowOptions = NestedWorkflowOptions(
maxDepth = 3, // Limit nesting depth
timeoutMs = 30000 // Timeout for nested workflows
)
)
5. Version Nested Workflows Carefully
Manage versioning of nested workflows to ensure compatibility with parent workflows:
// Version your workflows explicitly
val analysisWorkflowV2 = workflow {
name = "text-analysis-workflow"
version = "2.0.0"
description = "Improved text analysis workflow (v2)"
// Maintain backward compatibility in input/output
input {
variable("text", String::class, required = true) // Original input
variable("options", Map::class, defaultValue = emptyMap<String, Any>()) // New in v2
}
// Steps...
// Ensure output remains compatible
output {
"result" from "$.steps.analyze.output.result" // Original output
"enhancedResult" from "$.steps.enhanced-analysis.output.result" // New in v2
}
}
// Reference specific versions in parent workflows
subWorkflowStep {
id = "analyze-text"
workflowId = "text-analysis-workflow"
workflowVersion = "2.0.0" // Explicitly request v2
}
Complete Example ✅
Here’s a comprehensive example demonstrating a multi-level nested workflow system for content creation and publishing:
import ai.kastrax.core.KastraxSystem
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.Serializable
// Data classes for type safety
@Serializable
data class ResearchResult(
val sources: List<String>,
val keyPoints: List<String>,
val statistics: Map<String, Any>
)
@Serializable
data class ContentOutline(
val title: String,
val sections: List<String>,
val targetWordCount: Int
)
@Serializable
data class DraftContent(
val title: String,
val content: String,
val wordCount: Int,
val metadata: Map<String, Any>
)
@Serializable
data class EditedContent(
val title: String,
val content: String,
val wordCount: Int,
val readabilityScore: Double,
val metadata: Map<String, Any>
)
// 1. Research workflow (nested level 2)
val researchWorkflow = workflow {
name = "research-workflow"
description = "Research a topic thoroughly"
input {
variable("topic", String::class, required = true)
variable("depth", String::class, defaultValue = "standard")
}
// Research steps
step(academicResearchAgent) {
id = "academic-research"
name = "Academic Research"
description = "Research from academic sources"
variables = mutableMapOf(
"topic" to variable("$.input.topic"),
"depth" to variable("$.input.depth")
)
}
step(newsResearchAgent) {
id = "news-research"
name = "News Research"
description = "Research from news sources"
variables = mutableMapOf(
"topic" to variable("$.input.topic")
)
}
step(synthesisAgent) {
id = "synthesize-research"
name = "Synthesize Research"
description = "Combine research from all sources"
after("academic-research", "news-research")
variables = mutableMapOf(
"academicResearch" to variable("$.steps.academic-research.output.findings"),
"newsResearch" to variable("$.steps.news-research.output.findings")
)
}
// Define typed output
output {
"result" from { context ->
val sources = context.getVariable("$.steps.synthesize-research.output.sources") as? List<String> ?: emptyList()
val keyPoints = context.getVariable("$.steps.synthesize-research.output.keyPoints") as? List<String> ?: emptyList()
val statistics = context.getVariable("$.steps.synthesize-research.output.statistics") as? Map<String, Any> ?: emptyMap()
ResearchResult(sources = sources, keyPoints = keyPoints, statistics = statistics)
}
}
}
// 2. Content creation workflow (nested level 1)
val contentCreationWorkflow = workflow {
name = "content-creation-workflow"
description = "Create content based on research"
input {
variable("topic", String::class, required = true)
variable("contentType", String::class, required = true)
variable("targetWordCount", Int::class, defaultValue = 1000)
}
// Use research workflow as a nested workflow
subWorkflowStep {
id = "research"
name = "Research Topic"
description = "Research the topic thoroughly"
workflowId = researchWorkflow.id
input = mapOf(
"topic" to variable("$.input.topic"),
"depth" to if (variable("$.input.contentType") == "academic") "deep" else "standard"
)
}
// Create outline based on research
step(outlineAgent) {
id = "create-outline"
name = "Create Outline"
description = "Create a content outline based on research"
after("research")
variables = mutableMapOf(
"research" to variable("$.steps.research.output.result"),
"contentType" to variable("$.input.contentType"),
"targetWordCount" to variable("$.input.targetWordCount")
)
}
// Write draft based on outline
step(writingAgent) {
id = "write-draft"
name = "Write Draft"
description = "Write a draft based on the outline"
after("create-outline")
variables = mutableMapOf(
"outline" to variable("$.steps.create-outline.output.outline"),
"research" to variable("$.steps.research.output.result"),
"contentType" to variable("$.input.contentType"),
"targetWordCount" to variable("$.input.targetWordCount")
)
}
// Edit the draft
step(editingAgent) {
id = "edit-draft"
name = "Edit Draft"
description = "Edit and improve the draft"
after("write-draft")
variables = mutableMapOf(
"draft" to variable("$.steps.write-draft.output.draft"),
"contentType" to variable("$.input.contentType")
)
}
// Define typed output
output {
"outline" from { context ->
val title = context.getVariable("$.steps.create-outline.output.title") as? String ?: ""
val sections = context.getVariable("$.steps.create-outline.output.sections") as? List<String> ?: emptyList()
val targetWordCount = context.getVariable("$.input.targetWordCount") as? Int ?: 1000
ContentOutline(title = title, sections = sections, targetWordCount = targetWordCount)
}
"draft" from { context ->
val title = context.getVariable("$.steps.write-draft.output.title") as? String ?: ""
val content = context.getVariable("$.steps.write-draft.output.content") as? String ?: ""
val wordCount = context.getVariable("$.steps.write-draft.output.wordCount") as? Int ?: 0
val metadata = context.getVariable("$.steps.write-draft.output.metadata") as? Map<String, Any> ?: emptyMap()
DraftContent(title = title, content = content, wordCount = wordCount, metadata = metadata)
}
"editedContent" from { context ->
val title = context.getVariable("$.steps.edit-draft.output.title") as? String ?: ""
val content = context.getVariable("$.steps.edit-draft.output.content") as? String ?: ""
val wordCount = context.getVariable("$.steps.edit-draft.output.wordCount") as? Int ?: 0
val readabilityScore = context.getVariable("$.steps.edit-draft.output.readabilityScore") as? Double ?: 0.0
val metadata = context.getVariable("$.steps.edit-draft.output.metadata") as? Map<String, Any> ?: emptyMap()
EditedContent(
title = title,
content = content,
wordCount = wordCount,
readabilityScore = readabilityScore,
metadata = metadata
)
}
}
}
Main Publishing Workflow ✅
Here’s the top-level publishing workflow that uses the content creation workflow:
val publishingWorkflow = workflow {
name = "publishing-workflow"
description = "Create and publish content"
input {
variable("topic", String::class, required = true)
variable("contentType", String::class, required = true)
variable("targetWordCount", Int::class, defaultValue = 1000)
variable("publishPlatform", String::class, defaultValue = "blog")
}
// Use content creation workflow as a nested workflow
subWorkflowStep {
id = "create-content"
name = "Create Content"
description = "Create content using the content creation workflow"
workflowId = contentCreationWorkflow.id
input = mapOf(
"topic" to variable("$.input.topic"),
"contentType" to variable("$.input.contentType"),
"targetWordCount" to variable("$.input.targetWordCount")
)
}
// Human review step (suspends the workflow)
humanStep {
id = "human-review"
name = "Human Review"
description = "Get human approval for content"
after("create-content")
prompt { context ->
val content = context.getVariable("$.steps.create-content.output.editedContent.content") as? String ?: ""
val title = context.getVariable("$.steps.create-content.output.editedContent.title") as? String ?: ""
// Using triple quotes for multiline string
"""Please review the following content:
Title: $title
$content
Approve or suggest changes.""".trimIndent()
}
}
// Conditional publishing based on review
conditionalStep {
id = "publishing-decision"
name = "Publishing Decision"
description = "Decide whether to publish content"
after("human-review")
condition { context ->
context.getVariable("$.steps.human-review.output.approved") as? Boolean ?: false
}
onTrue {
step(publishingAgent) {
id = "publish-content"
name = "Publish Content"
description = "Publish the approved content"
variables = mutableMapOf(
"content" to variable("$.steps.create-content.output.editedContent"),
"platform" to variable("$.input.publishPlatform")
)
}
}
onFalse {
step(revisionAgent) {
id = "revise-content"
name = "Revise Content"
description = "Revise content based on feedback"
variables = mutableMapOf(
"content" to variable("$.steps.create-content.output.editedContent"),
"feedback" to variable("$.steps.human-review.output.feedback")
)
}
}
}
// Define workflow output
output {
"title" from "$.steps.create-content.output.editedContent.title"
"content" from "$.steps.create-content.output.editedContent.content"
"published" from { context ->
context.getStepStatus("publish-content") == "success"
}
"publishedUrl" from "$.steps.publish-content.output.url"
"feedback" from "$.steps.human-review.output.feedback"
}
}
// Execute the workflow
fun main() = runBlocking {
val kastraxSystem = KastraxSystem()
// Register all workflows
kastraxSystem.registerWorkflow(researchWorkflow)
kastraxSystem.registerWorkflow(contentCreationWorkflow)
kastraxSystem.registerWorkflow(publishingWorkflow)
// Execute the top-level workflow
val result = kastraxSystem.workflowEngine.executeWorkflow(
workflowId = publishingWorkflow.id,
input = mapOf(
"topic" to "Artificial Intelligence Ethics",
"contentType" to "blog",
"targetWordCount" to 1500,
"publishPlatform" to "medium"
)
)
// Check if the workflow is suspended for human review
if (result.status == "suspended") {
println("Workflow suspended for human review. Suspension ID: ${result.suspensionId}")
// In a real application, this would be triggered by a human review
// For this example, we'll simulate receiving human input
val humanInput = mapOf(
"approved" to true,
"feedback" to "The content looks great! Approved for publishing."
)
// Resume the workflow with human input
val resumeResult = kastraxSystem.workflowEngine.resumeWorkflow(
suspensionId = result.suspensionId,
resumeData = humanInput
)
// Check the final result
if (resumeResult.success) {
println("Content published successfully!")
println("Title: ${resumeResult.output.get("title")}")
println("Published URL: ${resumeResult.output.get("publishedUrl")}")
// Or alternatively:
// val title = resumeResult.output["title"]
// val url = resumeResult.output["publishedUrl"]
// println("Title: $title")
// println("Published URL: $url")
} else {
println("Workflow failed: ${resumeResult.error}")
}
}
}
Conclusion ✅
Nested workflows are a powerful feature in Kastrax that enable modular, reusable, and maintainable AI agent orchestration. By breaking down complex processes into manageable components, you can create sophisticated multi-agent systems that are easier to develop, test, and maintain.
Key benefits of nested workflows include:
- Modularity: Encapsulate related functionality in dedicated workflows
- Reusability: Create workflow libraries that can be shared across projects
- Maintainability: Simplify complex processes by breaking them into smaller components
- Type Safety: Ensure data consistency with strongly-typed inputs and outputs
- Testability: Test individual workflow components in isolation
By following the best practices outlined in this guide and leveraging Kastrax’s comprehensive nested workflow capabilities, you can build sophisticated AI agent systems that are both powerful and maintainable.