Skip to Content
DocsWorkflowsDynamic Workflowsnew

Dynamic Workflows in Kastrax ✅

Kastrax provides powerful capabilities for creating and executing dynamic workflows - workflows that are generated or modified at runtime based on data, conditions, or agent decisions. This advanced pattern enables highly adaptive AI agent orchestration that can respond to changing requirements and contexts.

Dynamic Workflow Architecture ✅

Dynamic workflows in Kastrax are built on several key components:

  1. Workflow Generator: A system that creates workflow definitions at runtime
  2. Dynamic Step Creation: The ability to add, modify, or remove steps based on runtime conditions
  3. Workflow Templates: Reusable workflow patterns that can be customized with parameters
  4. Runtime Workflow Modification: Capabilities to alter workflow structure during execution

These components work together to enable workflows that can adapt their structure and behavior based on data, agent decisions, or external factors.

Creating Dynamic Workflows ✅

Kastrax provides several approaches for creating dynamic workflows, each suited for different use cases:

1. Using the Workflow Generator

The most direct approach is to use Kastrax’s DynamicWorkflowGenerator to create workflows at runtime:

DynamicWorkflowGenerator.kt
import ai.kastrax.core.KastraxSystem import ai.kastrax.core.workflow.dynamic.DynamicWorkflowGenerator import ai.kastrax.core.workflow.variable import ai.kastrax.core.agent.Agent import kotlinx.coroutines.runBlocking fun main() = runBlocking { // Initialize the Kastrax system val kastraxSystem = KastraxSystem() // Create agents for use in the dynamic workflow val dataProcessingAgent: Agent = /* ... */ val analysisAgent: Agent = /* ... */ // Create a dynamic workflow generator val workflowGenerator = DynamicWorkflowGenerator(kastraxSystem) // Generate a dynamic workflow based on runtime conditions val dynamicWorkflow = workflowGenerator.createWorkflow( workflowId = "dynamic-data-workflow", description = "Dynamically generated workflow for data processing" ) { // Define input parameters input { variable("dataSource", String::class, required = true) variable("analysisType", String::class, defaultValue = "standard") } // Add steps to the workflow step(dataProcessingAgent) { id = "process-data" name = "Process Data" description = "Process data from the source" variables = mutableMapOf( "source" to variable("$.input.dataSource") ) } step(analysisAgent) { id = "analyze-data" name = "Analyze Data" description = "Analyze the processed data" after("process-data") variables = mutableMapOf( "data" to variable("$.steps.process-data.output.processedData"), "analysisType" to variable("$.input.analysisType") ) } // Define workflow output output { "results" from "$.steps.analyze-data.output.results" "metadata" from "$.steps.process-data.output.metadata" } } // Register the dynamic workflow with the system kastraxSystem.registerWorkflow(dynamicWorkflow) // Execute the dynamic workflow val result = kastraxSystem.workflowEngine.executeWorkflow( workflowId = "dynamic-data-workflow", input = mapOf( "dataSource" to "customer_database", "analysisType" to "comprehensive" ) ) println("Workflow execution result: ${result.output}") }

2. Dynamic Step Creation in Workflow Steps

Another approach is to dynamically create steps within an existing workflow based on runtime conditions:

DynamicStepCreation.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable import ai.kastrax.core.agent.Agent // Create a workflow with dynamic step creation val adaptiveWorkflow = workflow { name = "adaptive-workflow" description = "Workflow that adapts its steps based on input data" // Initial data analysis step step(analysisAgent) { id = "analyze-input" name = "Analyze Input Data" description = "Determine what processing is needed" variables = mutableMapOf( "data" to variable("$.input.data") ) // Dynamically create subsequent steps based on analysis results onComplete { result -> val dataType = result["dataType"] as? String ?: "unknown" val complexity = result["complexity"] as? Int ?: 1 when (dataType) { "text" -> { // Add text processing steps step(textProcessingAgent) { id = "process-text" name = "Process Text Data" description = "Process textual content" variables = mutableMapOf( "text" to variable("$.steps.analyze-input.output.extractedText") ) } // Add more steps if content is complex if (complexity > 3) { step(advancedTextAgent) { id = "advanced-text-analysis" name = "Advanced Text Analysis" description = "Perform advanced text analysis" after("process-text") variables = mutableMapOf( "processedText" to variable("$.steps.process-text.output.result") ) } } } "image" -> { // Add image processing steps step(imageProcessingAgent) { id = "process-image" name = "Process Image Data" description = "Process image content" variables = mutableMapOf( "imageUrl" to variable("$.steps.analyze-input.output.imageUrl") ) } } else -> { // Add fallback processing step step(genericProcessingAgent) { id = "generic-processing" name = "Generic Data Processing" description = "Process unknown data type" variables = mutableMapOf( "data" to variable("$.steps.analyze-input.output.rawData") ) } } } // Always add a final reporting step step(reportingAgent) { id = "generate-report" name = "Generate Report" description = "Create a report of the processing results" // This step will automatically depend on all previously created steps variables = mutableMapOf( "dataType" to variable("$.steps.analyze-input.output.dataType"), "results" to variable("$.steps", transform = { steps -> // Collect results from all processing steps (steps as Map<String, Any>).filter { it.key != "analyze-input" && it.key != "generate-report" }.mapValues { (it.value as? Map<String, Any>)?.get("output") ?: mapOf<String, Any>() } }) ) } } } } ### 3. Workflow Factory Pattern For more complex scenarios, you can implement a workflow factory that generates different workflows based on input parameters: ```kotlin filename="WorkflowFactory.kt" import ai.kastrax.core.KastraxSystem import ai.kastrax.core.workflow.Workflow import ai.kastrax.core.workflow.WorkflowBuilder import ai.kastrax.core.workflow.variable import ai.kastrax.core.agent.Agent import kotlinx.coroutines.runBlocking // Define a workflow factory class class WorkflowFactory(private val kastraxSystem: KastraxSystem) { // Agents used in workflows private val textProcessingAgent: Agent = /* ... */ private val imageProcessingAgent: Agent = /* ... */ private val dataAnalysisAgent: Agent = /* ... */ private val reportGenerationAgent: Agent = /* ... */ // Create a workflow based on content type fun createContentWorkflow(contentType: String, complexity: Int): Workflow { return when (contentType) { "blog" -> createBlogWorkflow(complexity) "social" -> createSocialMediaWorkflow(complexity) "documentation" -> createDocumentationWorkflow(complexity) else -> createGenericContentWorkflow() } } // Create a blog content workflow private fun createBlogWorkflow(complexity: Int): Workflow { return WorkflowBuilder().apply { id = "blog-content-workflow" description = "Workflow for creating blog content" // Define input parameters input { variable("topic", String::class, required = true) variable("tone", String::class, defaultValue = "informative") variable("wordCount", Int::class, defaultValue = 1000) } // Research step step(textProcessingAgent) { id = "research" name = "Research Topic" description = "Research the blog topic" variables = mutableMapOf( "topic" to variable("$.input.topic"), "depth" to if (complexity > 2) "deep" else "standard" ) } // Outline step step(textProcessingAgent) { id = "create-outline" name = "Create Outline" description = "Create a blog post outline" after("research") variables = mutableMapOf( "research" to variable("$.steps.research.output.results"), "complexity" to complexity ) } // Writing step step(textProcessingAgent) { id = "write-content" name = "Write Blog Content" description = "Write the blog content" after("create-outline") variables = mutableMapOf( "outline" to variable("$.steps.create-outline.output.outline"), "research" to variable("$.steps.research.output.results"), "tone" to variable("$.input.tone"), "wordCount" to variable("$.input.wordCount") ) } // Add additional steps for higher complexity if (complexity >= 3) { step(imageProcessingAgent) { id = "create-images" name = "Create Blog Images" description = "Generate images for the blog post" after("write-content") variables = mutableMapOf( "content" to variable("$.steps.write-content.output.content"), "style" to "professional" ) } step(textProcessingAgent) { id = "edit-content" name = "Edit Blog Content" description = "Edit and improve the blog content" after("write-content") variables = mutableMapOf( "content" to variable("$.steps.write-content.output.content"), "editingLevel" to "thorough" ) } } // Define workflow output output { "content" from if (complexity >= 3) "$.steps.edit-content.output.content" else "$.steps.write-content.output.content" "images" from if (complexity >= 3) "$.steps.create-images.output.images" else listOf<String>() "metadata" from "$.steps.research.output.metadata" } }.build() } // Create a social media content workflow private fun createSocialMediaWorkflow(complexity: Int): Workflow { // Similar implementation to blog workflow but with social media focus // ... return WorkflowBuilder().apply { // Social media workflow implementation // ... }.build() } // Create a documentation workflow private fun createDocumentationWorkflow(complexity: Int): Workflow { // Similar implementation to blog workflow but with documentation focus // ... return WorkflowBuilder().apply { // Documentation workflow implementation // ... }.build() } // Create a generic content workflow private fun createGenericContentWorkflow(): Workflow { return WorkflowBuilder().apply { id = "generic-content-workflow" description = "Generic workflow for content creation" // Basic implementation with minimal steps // ... }.build() } } // Usage example fun main() = runBlocking { val kastraxSystem = KastraxSystem() val workflowFactory = WorkflowFactory(kastraxSystem) // Create a workflow based on content type and complexity val blogWorkflow = workflowFactory.createContentWorkflow("blog", 4) // Register the workflow kastraxSystem.registerWorkflow(blogWorkflow) // Execute the workflow val result = kastraxSystem.workflowEngine.executeWorkflow( workflowId = blogWorkflow.id, input = mapOf( "topic" to "Artificial Intelligence in Healthcare", "tone" to "professional", "wordCount" to 1500 ) ) println("Workflow execution result: ${result.output}") }

Template-Based Dynamic Workflows ✅

Kastrax also supports template-based dynamic workflows, which provide a powerful way to create customizable workflow patterns:

WorkflowTemplates.kt
import ai.kastrax.core.workflow.template.WorkflowTemplate import ai.kastrax.core.workflow.template.TemplateParameter import ai.kastrax.core.workflow.variable // Define a workflow template for content creation class ContentWorkflowTemplate : WorkflowTemplate { // Define template parameters override val parameters = listOf( TemplateParameter("contentType", String::class, required = true), TemplateParameter("includeImages", Boolean::class, defaultValue = false), TemplateParameter("reviewSteps", Int::class, defaultValue = 1), TemplateParameter("targetAudience", String::class, defaultValue = "general") ) // Build the workflow from the template override fun build(params: Map<String, Any>): Workflow { // Extract parameters val contentType = params["contentType"] as String val includeImages = params["includeImages"] as Boolean val reviewSteps = params["reviewSteps"] as Int val targetAudience = params["targetAudience"] as String return WorkflowBuilder().apply { id = "${contentType.lowercase()}-content-workflow" description = "Workflow for creating $contentType content" // Define input parameters input { variable("topic", String::class, required = true) variable("tone", String::class, defaultValue = "informative") } // Research step step(researchAgent) { id = "research" name = "Research Topic" description = "Research the topic for $contentType content" variables = mutableMapOf( "topic" to variable("$.input.topic"), "contentType" to contentType, "audience" to targetAudience ) } // Content creation step step(contentCreationAgent) { id = "create-content" name = "Create $contentType Content" description = "Create the $contentType content" after("research") variables = mutableMapOf( "research" to variable("$.steps.research.output.results"), "topic" to variable("$.input.topic"), "tone" to variable("$.input.tone"), "contentType" to contentType, "audience" to targetAudience ) } // Add image generation if requested if (includeImages) { step(imageGenerationAgent) { id = "generate-images" name = "Generate Images" description = "Generate images for the content" after("create-content") variables = mutableMapOf( "content" to variable("$.steps.create-content.output.content"), "contentType" to contentType, "style" to variable("$.input.tone") ) } } // Add review steps based on parameter repeat(reviewSteps) { index -> step(reviewAgent) { id = "review-${index + 1}" name = "Review ${index + 1}" description = "Review and improve the content" after(if (index == 0) { if (includeImages) "generate-images" else "create-content" } else "review-$index") variables = mutableMapOf( "content" to if (index == 0) { variable("$.steps.create-content.output.content") } else { variable("$.steps.review-$index.output.revisedContent") }, "reviewDepth" to "thorough", "reviewNumber" to (index + 1) ) } } // Define workflow output output { "content" from if (reviewSteps > 0) { "$.steps.review-$reviewSteps.output.revisedContent" } else { "$.steps.create-content.output.content" } if (includeImages) { "images" from "$.steps.generate-images.output.images" } "metadata" from "$.steps.research.output.metadata" } }.build() } } // Usage example fun main() = runBlocking { val kastraxSystem = KastraxSystem() // Create a template instance val template = ContentWorkflowTemplate() // Create workflows from the template with different parameters val blogWorkflow = template.build(mapOf( "contentType" to "Blog", "includeImages" to true, "reviewSteps" to 2, "targetAudience" to "technical" )) val socialWorkflow = template.build(mapOf( "contentType" to "Social", "includeImages" to true, "reviewSteps" to 1, "targetAudience" to "general" )) // Register the workflows kastraxSystem.registerWorkflow(blogWorkflow) kastraxSystem.registerWorkflow(socialWorkflow) // Execute the blog workflow val result = kastraxSystem.workflowEngine.executeWorkflow( workflowId = blogWorkflow.id, input = mapOf( "topic" to "Kotlin Coroutines", "tone" to "educational" ) ) println("Blog workflow result: ${result.output}") }

Important Considerations ✅

1. Performance and Resource Management

Dynamic workflows require careful resource management:

  • Workflow Caching: Consider caching frequently used workflow patterns instead of recreating them
  • Resource Limits: Implement limits on the number of dynamic workflows that can be created
  • Cleanup: Ensure proper cleanup of unused dynamic workflows to prevent resource leaks

2. Error Handling and Validation

Robust error handling is essential for dynamic workflows:

  • Parameter Validation: Validate all parameters used to create dynamic workflows
  • Graceful Degradation: Implement fallback workflows when dynamic creation fails
  • Comprehensive Logging: Log all aspects of dynamic workflow creation and execution

3. Workflow Lifecycle Management

Manage the lifecycle of dynamic workflows carefully:

  • Registration: Explicitly register important dynamic workflows with the Kastrax system
  • Versioning: Consider versioning dynamic workflows for tracking and debugging
  • Persistence: Decide whether dynamic workflows should persist beyond their initial execution

4. Testing and Debugging

Testing dynamic workflows requires special approaches:

  • Template Testing: Test workflow templates with various parameter combinations
  • Snapshot Testing: Compare generated workflow structures against expected snapshots
  • Runtime Validation: Validate workflow structure before execution

Use Cases ✅

Dynamic workflows in Kastrax are particularly valuable for these scenarios:

1. Content Generation Pipelines

Create specialized content workflows based on content type, audience, and complexity requirements:

  • Blog posts with varying levels of research and editing
  • Social media content with platform-specific steps
  • Technical documentation with customized review processes

2. Multi-Agent Collaboration

Dynamically assemble workflows that coordinate multiple specialized agents:

  • Research teams with domain-specific agent selection
  • Creative projects with dynamic role assignment
  • Problem-solving workflows with adaptive agent selection

3. Adaptive Processing

Create workflows that adapt to the nature of the input data:

  • Data processing pipelines that adjust based on data characteristics
  • Content analysis workflows that vary based on content type
  • Decision-making processes that adapt to complexity levels

4. Multi-tenant Applications

Generate isolated workflows for different tenants or users:

  • Customer-specific processing pipelines
  • User-customized agent workflows
  • Organization-specific approval processes

Conclusion ✅

Dynamic workflows represent one of Kastrax’s most powerful capabilities, enabling truly adaptive AI agent orchestration. By leveraging the workflow generation, templating, and runtime modification features, you can create sophisticated systems that respond intelligently to changing requirements and contexts.

The combination of Kastrax’s type-safe workflow DSL with dynamic workflow generation provides both the safety of compile-time checking and the flexibility of runtime adaptation - a powerful foundation for building advanced AI agent applications.

Last updated on