Skip to Content
DocsWorkflowsNested Workflowsnew

Nested Workflows in Kastrax ✅

Kastrax provides powerful support for nested workflows, allowing you to use entire workflows as steps within other workflows. This feature enables modular design, promotes code reuse, and simplifies the implementation of complex AI agent orchestration patterns.

Nested workflows are particularly valuable for:

  • Breaking down complex processes into manageable, reusable components
  • Encapsulating domain-specific logic in dedicated workflows
  • Creating reusable workflow libraries that can be shared across projects
  • Improving readability and maintainability of complex agent orchestration
  • Enabling team collaboration by allowing different teams to work on separate workflow components

Nested Workflow Architecture ✅

In Kastrax, nested workflows follow a hierarchical structure:

  1. Parent Workflow: The top-level workflow that contains one or more nested workflows as steps
  2. Nested Workflow: A complete workflow that is used as a step within another workflow
  3. Sub-steps: The individual steps within a nested workflow

This architecture allows for multiple levels of nesting, enabling complex hierarchical workflow structures while maintaining clean separation of concerns.

Basic Usage ✅

You can use a workflow as a step within another workflow using the subWorkflowStep function:

BasicNestedWorkflow.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a nested workflow val dataProcessingWorkflow = workflow { name = "data-processing-workflow" description = "Process and analyze data" // Define input parameters input { variable("data", Any::class, required = true) variable("processingLevel", String::class, defaultValue = "standard") } // Define workflow steps step(dataCleaningAgent) { id = "clean-data" name = "Clean Data" description = "Clean and normalize the input data" variables = mutableMapOf( "rawData" to variable("$.input.data"), "level" to variable("$.input.processingLevel") ) } step(dataAnalysisAgent) { id = "analyze-data" name = "Analyze Data" description = "Analyze the cleaned data" after("clean-data") variables = mutableMapOf( "cleanedData" to variable("$.steps.clean-data.output.data") ) } // Define workflow output output { "processedData" from "$.steps.clean-data.output.data" "analysis" from "$.steps.analyze-data.output.results" "metadata" from "$.steps.analyze-data.output.metadata" } } // Create a parent workflow that uses the nested workflow val parentWorkflow = workflow { name = "parent-workflow" description = "Parent workflow that uses a nested workflow" // Define input parameters input { variable("sourceData", Any::class, required = true) variable("analysisType", String::class, defaultValue = "comprehensive") } // First step to prepare data step(dataPreparationAgent) { id = "prepare-data" name = "Prepare Data" description = "Prepare data for processing" variables = mutableMapOf( "sourceData" to variable("$.input.sourceData") ) } // Use the nested workflow as a step subWorkflowStep { id = "process-data" name = "Process Data" description = "Process data using the data processing workflow" after("prepare-data") // Specify which workflow to use workflowId = dataProcessingWorkflow.id // Map parent workflow variables to nested workflow inputs input = mapOf( "data" to variable("$.steps.prepare-data.output.preparedData"), "processingLevel" to variable("$.input.analysisType") ) } // Final step that uses the nested workflow results step(reportGenerationAgent) { id = "generate-report" name = "Generate Report" description = "Generate a report based on processed data" after("process-data") variables = mutableMapOf( "processedData" to variable("$.steps.process-data.output.processedData"), "analysis" to variable("$.steps.process-data.output.analysis") ) } // Define workflow output output { "report" from "$.steps.generate-report.output.report" "analysisResults" from "$.steps.process-data.output.analysis" } }

When a workflow is used as a step:

  • It is referenced by its ID in the workflowId property
  • Input variables are mapped from the parent workflow to the nested workflow
  • The nested workflow’s output is available in the parent workflow’s context
  • The nested workflow’s steps are executed in their defined order

Accessing Results ✅

Results from a nested workflow are available in the parent workflow’s context under the nested workflow’s step ID. The results include all outputs defined in the nested workflow’s output mapping:

AccessingNestedResults.kt
import ai.kastrax.core.KastraxSystem import ai.kastrax.core.workflow.WorkflowExecuteOptions import kotlinx.coroutines.runBlocking fun main() = runBlocking { val kastraxSystem = KastraxSystem() // Register workflows kastraxSystem.registerWorkflow(dataProcessingWorkflow) kastraxSystem.registerWorkflow(parentWorkflow) // Execute the parent workflow val result = kastraxSystem.workflowEngine.executeWorkflow( workflowId = parentWorkflow.id, input = mapOf( "sourceData" to mapOf("records" to listOf(1, 2, 3, 4, 5)), "analysisType" to "detailed" ) ) // Access nested workflow results if (result.success) { // Access the final workflow output (which may include nested workflow results) val report = result.output["report"] val analysisResults = result.output["analysisResults"] println("Report: $report") println("Analysis Results: $analysisResults") // Access specific nested workflow outputs directly val processedData = result.steps["process-data"]?.output?.get("processedData") val metadata = result.steps["process-data"]?.output?.get("metadata") println("Processed Data: $processedData") println("Metadata: $metadata") } else { println("Workflow execution failed: ${result.error}") } }

Advanced Nested Workflow Patterns ✅

Kastrax supports several advanced patterns for working with nested workflows:

Parallel Execution of Nested Workflows

Multiple nested workflows can be executed in parallel for improved performance:

ParallelNestedWorkflows.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with parallel nested workflows val parallelWorkflow = workflow { name = "parallel-workflow" description = "Execute multiple nested workflows in parallel" // Define input parameters input { variable("data", Any::class, required = true) } // Execute multiple nested workflows in parallel parallelSteps { // First nested workflow subWorkflowStep { id = "text-analysis" name = "Text Analysis" description = "Analyze textual content" workflowId = "text-analysis-workflow" input = mapOf( "text" to variable("$.input.data.text") ) } // Second nested workflow subWorkflowStep { id = "image-analysis" name = "Image Analysis" description = "Analyze image content" workflowId = "image-analysis-workflow" input = mapOf( "imageUrl" to variable("$.input.data.imageUrl") ) } // Third nested workflow subWorkflowStep { id = "metadata-analysis" name = "Metadata Analysis" description = "Analyze metadata" workflowId = "metadata-analysis-workflow" input = mapOf( "metadata" to variable("$.input.data.metadata") ) } } // Combine results from all parallel nested workflows step(resultCombinationAgent) { id = "combine-results" name = "Combine Results" description = "Combine results from all analysis workflows" after("text-analysis", "image-analysis", "metadata-analysis") variables = mutableMapOf( "textResults" to variable("$.steps.text-analysis.output.results"), "imageResults" to variable("$.steps.image-analysis.output.results"), "metadataResults" to variable("$.steps.metadata-analysis.output.results") ) } // Define workflow output output { "combinedResults" from "$.steps.combine-results.output.combinedResults" "textAnalysis" from "$.steps.text-analysis.output.results" "imageAnalysis" from "$.steps.image-analysis.output.results" "metadataAnalysis" from "$.steps.metadata-analysis.output.results" } }

Conditional Nested Workflows

You can conditionally execute different nested workflows based on input data or previous step results:

ConditionalNestedWorkflows.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with conditional nested workflow execution val conditionalWorkflow = workflow { name = "conditional-workflow" description = "Conditionally execute different nested workflows" // Define input parameters input { variable("data", Any::class, required = true) variable("processingType", String::class, required = true) } // Initial data analysis step step(dataAnalysisAgent) { id = "analyze-data" name = "Analyze Data" description = "Analyze input data to determine processing path" variables = mutableMapOf( "data" to variable("$.input.data"), "processingType" to variable("$.input.processingType") ) } // Conditional step to choose the appropriate nested workflow conditionalStep { id = "processing-path" name = "Choose Processing Path" description = "Select the appropriate processing workflow" after("analyze-data") // Condition to determine which path to take condition { context -> val processingType = context.getVariable("$.input.processingType") as? String ?: "standard" processingType == "advanced" } // Advanced processing path onTrue { subWorkflowStep { id = "advanced-processing" name = "Advanced Processing" description = "Execute advanced processing workflow" workflowId = "advanced-processing-workflow" input = mapOf( "data" to variable("$.steps.analyze-data.output.preparedData"), "options" to variable("$.steps.analyze-data.output.advancedOptions") ) } } // Standard processing path onFalse { subWorkflowStep { id = "standard-processing" name = "Standard Processing" description = "Execute standard processing workflow" workflowId = "standard-processing-workflow" input = mapOf( "data" to variable("$.steps.analyze-data.output.preparedData") ) } } } // Final step to format results step(resultFormattingAgent) { id = "format-results" name = "Format Results" description = "Format the processing results" // This step will run after whichever processing path was taken execute { context -> // Get results from whichever processing path was taken val advancedResults = context.getVariable("$.steps.advanced-processing.output.results") val standardResults = context.getVariable("$.steps.standard-processing.output.results") // Use whichever results are available val results = advancedResults ?: standardResults // Format the results mapOf( "formattedResults" to formatResults(results), "processingType" to if (advancedResults != null) "advanced" else "standard" ) } } // Define workflow output output { "results" from "$.steps.format-results.output.formattedResults" "processingType" from "$.steps.format-results.output.processingType" } } // Helper function to format results fun formatResults(results: Any?): Map<String, Any> { // In a real implementation, this would format the results appropriately return mapOf("formatted" to (results ?: "No results")) }

Iterative Nested Workflows

You can implement iterative processing using nested workflows in loops:

IterativeNestedWorkflows.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with iterative nested workflow execution val iterativeWorkflow = workflow { name = "iterative-workflow" description = "Iteratively execute a nested workflow until a condition is met" // Define input parameters input { variable("initialData", Any::class, required = true) variable("maxIterations", Int::class, defaultValue = 5) variable("qualityThreshold", Double::class, defaultValue = 0.9) } // Initialize iteration variables step(initializationAgent) { id = "initialize" name = "Initialize" description = "Set up initial values for iteration" execute { context -> mapOf( "currentData" to context.getVariable("$.input.initialData"), "iterationCount" to 0, "quality" to 0.0 ) } } // Iterative loop loopStep { id = "refinement-loop" name = "Refinement Loop" description = "Iteratively refine data until quality threshold is reached" after("initialize") // Continue looping while quality is below threshold and iterations are under limit condition { context -> val quality = context.getVariable("$.steps.evaluate-quality.output.quality") as? Double ?: 0.0 val iterationCount = context.getVariable("$.steps.refine-data.output.iterationCount") as? Int ?: 0 val maxIterations = context.getVariable("$.input.maxIterations") as? Int ?: 5 quality < context.getVariable("$.input.qualityThreshold") as Double && iterationCount < maxIterations } // Loop body body { // Execute nested workflow for data refinement subWorkflowStep { id = "refine-data" name = "Refine Data" description = "Refine data using the refinement workflow" workflowId = "data-refinement-workflow" input = mapOf( "data" to variable("$.steps.refine-data.output.currentData", defaultValue = variable("$.steps.initialize.output.currentData")), "iterationCount" to variable("$.steps.refine-data.output.iterationCount", defaultValue = variable("$.steps.initialize.output.iterationCount")) ) } // Evaluate the quality of refined data step(qualityEvaluationAgent) { id = "evaluate-quality" name = "Evaluate Quality" description = "Evaluate the quality of refined data" after("refine-data") variables = mutableMapOf( "refinedData" to variable("$.steps.refine-data.output.refinedData"), "iterationCount" to variable("$.steps.refine-data.output.iterationCount") ) } } } // Final step to process the refined data step(finalProcessingAgent) { id = "final-processing" name = "Final Processing" description = "Process the final refined data" after("refinement-loop") variables = mutableMapOf( "refinedData" to variable("$.steps.refine-data.output.refinedData"), "quality" to variable("$.steps.evaluate-quality.output.quality"), "iterationCount" to variable("$.steps.refine-data.output.iterationCount") ) } // Define workflow output output { "finalData" from "$.steps.final-processing.output.finalData" "quality" from "$.steps.evaluate-quality.output.quality" "iterations" from "$.steps.refine-data.output.iterationCount" } }

Monitoring Nested Workflows ✅

Kastrax provides comprehensive monitoring capabilities for nested workflows, allowing you to track their execution and handle events:

MonitoringNestedWorkflows.kt
import ai.kastrax.core.KastraxSystem import ai.kastrax.core.workflow.WorkflowExecuteOptions import ai.kastrax.core.workflow.monitoring.WorkflowMonitor import kotlinx.coroutines.runBlocking fun main() = runBlocking { val kastraxSystem = KastraxSystem() // Register workflows kastraxSystem.registerWorkflow(parentWorkflow) // Create a workflow monitor val workflowMonitor = WorkflowMonitor(kastraxSystem) // Execute the parent workflow with monitoring val result = kastraxSystem.workflowEngine.executeWorkflow( workflowId = parentWorkflow.id, input = mapOf("sourceData" to "test-data"), options = WorkflowExecuteOptions( // Monitor all steps, including nested workflow steps onStepStart = { stepId, input -> // Check if this is a nested workflow step if (stepId == "process-data") { println("Starting nested workflow execution with input: $input") } else { println("Starting step: $stepId with input: $input") } }, onStepFinish = { stepResult -> // Check if this is a nested workflow step if (stepResult.stepId == "process-data") { println("Nested workflow completed with status: ${stepResult.status}") // Access nested workflow outputs val nestedOutput = stepResult.output println("Nested workflow outputs: $nestedOutput") // Access nested workflow steps val nestedSteps = stepResult.nestedSteps println("Nested workflow steps: ${nestedSteps?.keys}") } else { val stepId = stepResult.stepId val status = stepResult.status println("Step $stepId completed with status: $status") } }, // Monitor nested workflow events onNestedWorkflowEvent = { workflowId, eventType, data -> println("Nested workflow $workflowId event: $eventType, data: $data") } ) ) // Get detailed execution history, including nested workflows val executionHistory = workflowMonitor.getExecutionHistory(result.executionId, includeNested = true) // Analyze execution metrics val metrics = workflowMonitor.getExecutionMetrics(result.executionId) println("Total execution time: ${metrics.totalExecutionTimeMs} ms") println("Nested workflow execution time: ${metrics.stepExecutionTimes.get("process-data")} ms") // Or alternatively: // val executionTime = metrics.stepExecutionTimes["process-data"] // println("Nested workflow execution time: $executionTime ms") }

Suspending and Resuming Nested Workflows ✅

Nested workflows fully support suspension and resumption, allowing you to create complex workflows with human-in-the-loop processes or asynchronous operations:

SuspendableNestedWorkflows.kt
import ai.kastrax.core.KastraxSystem import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable import kotlinx.coroutines.runBlocking // Create a nested workflow with suspension points val reviewWorkflow = workflow { name = "content-review-workflow" description = "Review content with human approval" // Define input parameters input { variable("content", String::class, required = true) } // Initial content analysis step(contentAnalysisAgent) { id = "analyze-content" name = "Analyze Content" description = "Analyze content quality" variables = mutableMapOf( "content" to variable("$.input.content") ) } // Human review step (suspends the workflow) humanStep { id = "human-review" name = "Human Review" description = "Get human approval for content" after("analyze-content") prompt { context -> val content = context.getVariable("$.input.content") as? String ?: "" val analysis = context.getVariable("$.steps.analyze-content.output.analysis") as? Map<*, *> ?: emptyMap<String, Any>() // Format the analysis as a string first val analysisText = analysis.entries.joinToString("\n") { entry -> // Format each entry val key = entry.key val value = entry.value "$key: $value" } // Then use it in the template """Please review the following content: $content Analysis: $analysisText Approve or suggest changes.""".trimIndent() } } // Process review results step(reviewProcessingAgent) { id = "process-review" name = "Process Review" description = "Process human review results" after("human-review") variables = mutableMapOf( "content" to variable("$.input.content"), "approved" to variable("$.steps.human-review.output.approved"), "feedback" to variable("$.steps.human-review.output.feedback") ) } // Define workflow output output { "reviewedContent" from "$.steps.process-review.output.reviewedContent" "approved" from "$.steps.human-review.output.approved" "feedback" from "$.steps.human-review.output.feedback" } } // Create a parent workflow that uses the suspendable nested workflow val contentPublishingWorkflow = workflow { name = "content-publishing-workflow" description = "Create and publish content with review" // Define input parameters input { variable("topic", String::class, required = true) } // Generate content step(contentGenerationAgent) { id = "generate-content" name = "Generate Content" description = "Generate content based on topic" variables = mutableMapOf( "topic" to variable("$.input.topic") ) } // Use the review workflow as a nested workflow subWorkflowStep { id = "review-content" name = "Review Content" description = "Review content using the review workflow" after("generate-content") workflowId = reviewWorkflow.id input = mapOf( "content" to variable("$.steps.generate-content.output.content") ) } // Publish content if approved conditionalStep { id = "publishing-decision" name = "Publishing Decision" description = "Decide whether to publish content" after("review-content") condition { context -> context.getVariable("$.steps.review-content.output.approved") as? Boolean ?: false } onTrue { step(publishingAgent) { id = "publish-content" name = "Publish Content" description = "Publish the approved content" variables = mutableMapOf( "content" to variable("$.steps.review-content.output.reviewedContent") ) } } onFalse { step(revisionAgent) { id = "revise-content" name = "Revise Content" description = "Revise content based on feedback" variables = mutableMapOf( "content" to variable("$.steps.generate-content.output.content"), "feedback" to variable("$.steps.review-content.output.feedback") ) } } } } // Execute and resume the workflow fun main() = runBlocking { val kastraxSystem = KastraxSystem() // Register workflows kastraxSystem.registerWorkflow(reviewWorkflow) kastraxSystem.registerWorkflow(contentPublishingWorkflow) // Execute the parent workflow val result = kastraxSystem.workflowEngine.executeWorkflow( workflowId = contentPublishingWorkflow.id, input = mapOf("topic" to "Artificial Intelligence Ethics") ) // Check if the workflow is suspended if (result.status == "suspended") { println("Workflow suspended at: ${result.suspendedStepId}") // In a real application, this would be triggered by a human review // For this example, we'll simulate receiving human input val humanInput = mapOf( "approved" to true, "feedback" to "The content looks good!" ) // Resume the workflow with human input val resumeResult = kastraxSystem.workflowEngine.resumeWorkflow( suspensionId = result.suspensionId, resumeData = humanInput ) println("Workflow resumed and completed with status: ${resumeResult.status}") println("Final output: ${resumeResult.output}") } }

When working with suspended nested workflows:

  • The parent workflow is suspended when any step in a nested workflow suspends
  • The suspension ID uniquely identifies the suspended workflow, regardless of nesting level
  • When resuming, the nested workflow continues from the suspended step
  • After the nested workflow completes, the parent workflow continues execution

Type-Safe Nested Workflows ✅

Kastrax provides strong type safety for nested workflows through explicit input and output definitions:

TypeSafeNestedWorkflows.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable import kotlinx.serialization.Serializable // Define data classes for type-safe input/output @Serializable data class AnalysisInput( val text: String, val options: Map<String, Any> = emptyMap() ) @Serializable data class AnalysisResult( val sentiment: Double, val topics: List<String>, val summary: String, val metadata: Map<String, Any> ) // Create a type-safe nested workflow val analysisWorkflow = workflow { name = "text-analysis-workflow" description = "Analyze text with type safety" // Define typed input input { variable("analysisInput", AnalysisInput::class, required = true) } // Workflow steps step(textAnalysisAgent) { id = "analyze-text" name = "Analyze Text" description = "Perform text analysis" variables = mutableMapOf( "text" to variable("$.input.analysisInput.text"), "options" to variable("$.input.analysisInput.options") ) } // Define typed output output { "result" from { context -> // Create a strongly-typed result object val sentiment = context.getVariable("$.steps.analyze-text.output.sentiment") as? Double ?: 0.0 val topics = context.getVariable("$.steps.analyze-text.output.topics") as? List<String> ?: emptyList() val summary = context.getVariable("$.steps.analyze-text.output.summary") as? String ?: "" val metadata = context.getVariable("$.steps.analyze-text.output.metadata") as? Map<String, Any> ?: emptyMap() AnalysisResult( sentiment = sentiment, topics = topics, summary = summary, metadata = metadata ) } } } // Use the type-safe nested workflow in a parent workflow val documentProcessingWorkflow = workflow { name = "document-processing-workflow" description = "Process documents with type-safe nested workflows" // Define input parameters input { variable("document", String::class, required = true) } // Extract text from document step(textExtractionAgent) { id = "extract-text" name = "Extract Text" description = "Extract text from document" variables = mutableMapOf( "document" to variable("$.input.document") ) } // Use the analysis workflow with type-safe input subWorkflowStep { id = "analyze-document" name = "Analyze Document" description = "Analyze document text" after("extract-text") workflowId = analysisWorkflow.id // Create a type-safe input object input = { context -> val text = context.getVariable("$.steps.extract-text.output.text") as? String ?: "" val options = mapOf("language" to "en", "detailed" to true) mapOf("analysisInput" to AnalysisInput(text = text, options = options)) } } // Process the type-safe result step(reportGenerationAgent) { id = "generate-report" name = "Generate Report" description = "Generate report based on analysis" after("analyze-document") // Access the strongly-typed result execute { context -> // Get the typed result from the nested workflow val analysisResult = context.getVariable("$.steps.analyze-document.output.result") as AnalysisResult // Use the strongly-typed properties val sentiment = analysisResult.sentiment val topics = analysisResult.topics val summary = analysisResult.summary // Generate report using typed data mapOf( "report" to "Document Analysis Report:\n" + "Sentiment: $sentiment\n" + // Join topics with comma separator "Topics: " + topics.joinToString(", ") + "\n" + "Summary: $summary", "sentimentScore" to sentiment, "topTopics" to topics.take(3) ) } } }

Best Practices for Nested Workflows ✅

1. Design for Modularity and Reusability

Create nested workflows that encapsulate specific functionality and can be reused across multiple parent workflows:

// Good: Modular workflow that handles a specific task val emailProcessingWorkflow = workflow { name = "email-processing-workflow" description = "Process emails with classification and response generation" // Clear input/output definitions input { variable("emailContent", String::class, required = true) variable("sender", String::class, required = true) } // Focused steps for this specific task step(emailClassificationAgent) { /* ... */ } step(responseGenerationAgent) { /* ... */ } // Well-defined outputs output { "classification" from "$.steps.email-classification.output.category" "response" from "$.steps.response-generation.output.text" } } // This workflow can now be used in multiple parent workflows

2. Use Clear Input/Output Contracts

Define explicit input and output contracts for nested workflows to ensure they can be used correctly:

// Define clear input requirements input { variable("data", DataType::class, required = true, description = "The data to process") variable("options", OptionsType::class, defaultValue = DefaultOptions) } // Define comprehensive output mapping output { "primaryResult" from "$.steps.main-processing.output.result" "metadata" from "$.steps.analysis.output.metadata" "status" from { context -> if (context.getVariable("$.steps.validation.output.valid") as? Boolean == true) { "success" } else { "failed_validation" } } }

3. Implement Proper Error Handling

Ensure nested workflows handle errors appropriately and propagate meaningful error information to parent workflows:

// In nested workflow step(processingAgent) { id = "process-data" // Handle errors within the nested workflow onError { error -> mapOf( "error" to error.message, "errorType" to error.javaClass.simpleName, "recoverable" to (error !is CriticalException) ) } } // In parent workflow subWorkflowStep { id = "nested-processing" workflowId = "data-processing-workflow" // Handle errors from the nested workflow onError { error -> println("Nested workflow failed: ${error.message}") // Take appropriate action based on the error step(errorHandlingAgent) { id = "handle-nested-error" variables = mutableMapOf( "error" to error.message, "workflowId" to "data-processing-workflow" ) } } }

4. Monitor Performance and Resource Usage

Track the performance of nested workflows to identify bottlenecks and optimize resource usage:

// Configure monitoring for nested workflows val options = WorkflowExecuteOptions( // Track step execution times onStepStart = { stepId, _ -> stepStartTimes[stepId] = System.currentTimeMillis() }, onStepFinish = { stepResult -> val startTime = stepStartTimes[stepResult.stepId] ?: 0 val duration = System.currentTimeMillis() - startTime // Log execution time for nested workflows if (stepResult.nestedSteps != null) { val stepId = stepResult.stepId val stepsSize = stepResult.nestedSteps?.size ?: 0 println("Nested workflow $stepId completed in $duration ms") println("Nested steps: $stepsSize") } }, // Set resource limits for nested workflows nestedWorkflowOptions = NestedWorkflowOptions( maxDepth = 3, // Limit nesting depth timeoutMs = 30000 // Timeout for nested workflows ) )

5. Version Nested Workflows Carefully

Manage versioning of nested workflows to ensure compatibility with parent workflows:

// Version your workflows explicitly val analysisWorkflowV2 = workflow { name = "text-analysis-workflow" version = "2.0.0" description = "Improved text analysis workflow (v2)" // Maintain backward compatibility in input/output input { variable("text", String::class, required = true) // Original input variable("options", Map::class, defaultValue = emptyMap<String, Any>()) // New in v2 } // Steps... // Ensure output remains compatible output { "result" from "$.steps.analyze.output.result" // Original output "enhancedResult" from "$.steps.enhanced-analysis.output.result" // New in v2 } } // Reference specific versions in parent workflows subWorkflowStep { id = "analyze-text" workflowId = "text-analysis-workflow" workflowVersion = "2.0.0" // Explicitly request v2 }

Complete Example ✅

Here’s a comprehensive example demonstrating a multi-level nested workflow system for content creation and publishing:

CompleteNestedWorkflowExample.kt
import ai.kastrax.core.KastraxSystem import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable import kotlinx.coroutines.runBlocking import kotlinx.serialization.Serializable // Data classes for type safety @Serializable data class ResearchResult( val sources: List<String>, val keyPoints: List<String>, val statistics: Map<String, Any> ) @Serializable data class ContentOutline( val title: String, val sections: List<String>, val targetWordCount: Int ) @Serializable data class DraftContent( val title: String, val content: String, val wordCount: Int, val metadata: Map<String, Any> ) @Serializable data class EditedContent( val title: String, val content: String, val wordCount: Int, val readabilityScore: Double, val metadata: Map<String, Any> ) // 1. Research workflow (nested level 2) val researchWorkflow = workflow { name = "research-workflow" description = "Research a topic thoroughly" input { variable("topic", String::class, required = true) variable("depth", String::class, defaultValue = "standard") } // Research steps step(academicResearchAgent) { id = "academic-research" name = "Academic Research" description = "Research from academic sources" variables = mutableMapOf( "topic" to variable("$.input.topic"), "depth" to variable("$.input.depth") ) } step(newsResearchAgent) { id = "news-research" name = "News Research" description = "Research from news sources" variables = mutableMapOf( "topic" to variable("$.input.topic") ) } step(synthesisAgent) { id = "synthesize-research" name = "Synthesize Research" description = "Combine research from all sources" after("academic-research", "news-research") variables = mutableMapOf( "academicResearch" to variable("$.steps.academic-research.output.findings"), "newsResearch" to variable("$.steps.news-research.output.findings") ) } // Define typed output output { "result" from { context -> val sources = context.getVariable("$.steps.synthesize-research.output.sources") as? List<String> ?: emptyList() val keyPoints = context.getVariable("$.steps.synthesize-research.output.keyPoints") as? List<String> ?: emptyList() val statistics = context.getVariable("$.steps.synthesize-research.output.statistics") as? Map<String, Any> ?: emptyMap() ResearchResult(sources = sources, keyPoints = keyPoints, statistics = statistics) } } } // 2. Content creation workflow (nested level 1) val contentCreationWorkflow = workflow { name = "content-creation-workflow" description = "Create content based on research" input { variable("topic", String::class, required = true) variable("contentType", String::class, required = true) variable("targetWordCount", Int::class, defaultValue = 1000) } // Use research workflow as a nested workflow subWorkflowStep { id = "research" name = "Research Topic" description = "Research the topic thoroughly" workflowId = researchWorkflow.id input = mapOf( "topic" to variable("$.input.topic"), "depth" to if (variable("$.input.contentType") == "academic") "deep" else "standard" ) } // Create outline based on research step(outlineAgent) { id = "create-outline" name = "Create Outline" description = "Create a content outline based on research" after("research") variables = mutableMapOf( "research" to variable("$.steps.research.output.result"), "contentType" to variable("$.input.contentType"), "targetWordCount" to variable("$.input.targetWordCount") ) } // Write draft based on outline step(writingAgent) { id = "write-draft" name = "Write Draft" description = "Write a draft based on the outline" after("create-outline") variables = mutableMapOf( "outline" to variable("$.steps.create-outline.output.outline"), "research" to variable("$.steps.research.output.result"), "contentType" to variable("$.input.contentType"), "targetWordCount" to variable("$.input.targetWordCount") ) } // Edit the draft step(editingAgent) { id = "edit-draft" name = "Edit Draft" description = "Edit and improve the draft" after("write-draft") variables = mutableMapOf( "draft" to variable("$.steps.write-draft.output.draft"), "contentType" to variable("$.input.contentType") ) } // Define typed output output { "outline" from { context -> val title = context.getVariable("$.steps.create-outline.output.title") as? String ?: "" val sections = context.getVariable("$.steps.create-outline.output.sections") as? List<String> ?: emptyList() val targetWordCount = context.getVariable("$.input.targetWordCount") as? Int ?: 1000 ContentOutline(title = title, sections = sections, targetWordCount = targetWordCount) } "draft" from { context -> val title = context.getVariable("$.steps.write-draft.output.title") as? String ?: "" val content = context.getVariable("$.steps.write-draft.output.content") as? String ?: "" val wordCount = context.getVariable("$.steps.write-draft.output.wordCount") as? Int ?: 0 val metadata = context.getVariable("$.steps.write-draft.output.metadata") as? Map<String, Any> ?: emptyMap() DraftContent(title = title, content = content, wordCount = wordCount, metadata = metadata) } "editedContent" from { context -> val title = context.getVariable("$.steps.edit-draft.output.title") as? String ?: "" val content = context.getVariable("$.steps.edit-draft.output.content") as? String ?: "" val wordCount = context.getVariable("$.steps.edit-draft.output.wordCount") as? Int ?: 0 val readabilityScore = context.getVariable("$.steps.edit-draft.output.readabilityScore") as? Double ?: 0.0 val metadata = context.getVariable("$.steps.edit-draft.output.metadata") as? Map<String, Any> ?: emptyMap() EditedContent( title = title, content = content, wordCount = wordCount, readabilityScore = readabilityScore, metadata = metadata ) } } }

Main Publishing Workflow ✅

Here’s the top-level publishing workflow that uses the content creation workflow:

PublishingWorkflow.kt
val publishingWorkflow = workflow { name = "publishing-workflow" description = "Create and publish content" input { variable("topic", String::class, required = true) variable("contentType", String::class, required = true) variable("targetWordCount", Int::class, defaultValue = 1000) variable("publishPlatform", String::class, defaultValue = "blog") } // Use content creation workflow as a nested workflow subWorkflowStep { id = "create-content" name = "Create Content" description = "Create content using the content creation workflow" workflowId = contentCreationWorkflow.id input = mapOf( "topic" to variable("$.input.topic"), "contentType" to variable("$.input.contentType"), "targetWordCount" to variable("$.input.targetWordCount") ) } // Human review step (suspends the workflow) humanStep { id = "human-review" name = "Human Review" description = "Get human approval for content" after("create-content") prompt { context -> val content = context.getVariable("$.steps.create-content.output.editedContent.content") as? String ?: "" val title = context.getVariable("$.steps.create-content.output.editedContent.title") as? String ?: "" // Using triple quotes for multiline string """Please review the following content: Title: $title $content Approve or suggest changes.""".trimIndent() } } // Conditional publishing based on review conditionalStep { id = "publishing-decision" name = "Publishing Decision" description = "Decide whether to publish content" after("human-review") condition { context -> context.getVariable("$.steps.human-review.output.approved") as? Boolean ?: false } onTrue { step(publishingAgent) { id = "publish-content" name = "Publish Content" description = "Publish the approved content" variables = mutableMapOf( "content" to variable("$.steps.create-content.output.editedContent"), "platform" to variable("$.input.publishPlatform") ) } } onFalse { step(revisionAgent) { id = "revise-content" name = "Revise Content" description = "Revise content based on feedback" variables = mutableMapOf( "content" to variable("$.steps.create-content.output.editedContent"), "feedback" to variable("$.steps.human-review.output.feedback") ) } } } // Define workflow output output { "title" from "$.steps.create-content.output.editedContent.title" "content" from "$.steps.create-content.output.editedContent.content" "published" from { context -> context.getStepStatus("publish-content") == "success" } "publishedUrl" from "$.steps.publish-content.output.url" "feedback" from "$.steps.human-review.output.feedback" } } // Execute the workflow fun main() = runBlocking { val kastraxSystem = KastraxSystem() // Register all workflows kastraxSystem.registerWorkflow(researchWorkflow) kastraxSystem.registerWorkflow(contentCreationWorkflow) kastraxSystem.registerWorkflow(publishingWorkflow) // Execute the top-level workflow val result = kastraxSystem.workflowEngine.executeWorkflow( workflowId = publishingWorkflow.id, input = mapOf( "topic" to "Artificial Intelligence Ethics", "contentType" to "blog", "targetWordCount" to 1500, "publishPlatform" to "medium" ) ) // Check if the workflow is suspended for human review if (result.status == "suspended") { println("Workflow suspended for human review. Suspension ID: ${result.suspensionId}") // In a real application, this would be triggered by a human review // For this example, we'll simulate receiving human input val humanInput = mapOf( "approved" to true, "feedback" to "The content looks great! Approved for publishing." ) // Resume the workflow with human input val resumeResult = kastraxSystem.workflowEngine.resumeWorkflow( suspensionId = result.suspensionId, resumeData = humanInput ) // Check the final result if (resumeResult.success) { println("Content published successfully!") println("Title: ${resumeResult.output.get("title")}") println("Published URL: ${resumeResult.output.get("publishedUrl")}") // Or alternatively: // val title = resumeResult.output["title"] // val url = resumeResult.output["publishedUrl"] // println("Title: $title") // println("Published URL: $url") } else { println("Workflow failed: ${resumeResult.error}") } } }

Conclusion ✅

Nested workflows are a powerful feature in Kastrax that enable modular, reusable, and maintainable AI agent orchestration. By breaking down complex processes into manageable components, you can create sophisticated multi-agent systems that are easier to develop, test, and maintain.

Key benefits of nested workflows include:

  • Modularity: Encapsulate related functionality in dedicated workflows
  • Reusability: Create workflow libraries that can be shared across projects
  • Maintainability: Simplify complex processes by breaking them into smaller components
  • Type Safety: Ensure data consistency with strongly-typed inputs and outputs
  • Testability: Test individual workflow components in isolation

By following the best practices outlined in this guide and leveraging Kastrax’s comprehensive nested workflow capabilities, you can build sophisticated AI agent systems that are both powerful and maintainable.

Last updated on