Dynamic Workflows in Kastrax ✅
Kastrax provides powerful capabilities for creating and executing dynamic workflows - workflows that are generated or modified at runtime based on data, conditions, or agent decisions. This advanced pattern enables highly adaptive AI agent orchestration that can respond to changing requirements and contexts.
Dynamic Workflow Architecture ✅
Dynamic workflows in Kastrax are built on several key components:
- Workflow Generator: A system that creates workflow definitions at runtime
- Dynamic Step Creation: The ability to add, modify, or remove steps based on runtime conditions
- Workflow Templates: Reusable workflow patterns that can be customized with parameters
- Runtime Workflow Modification: Capabilities to alter workflow structure during execution
These components work together to enable workflows that can adapt their structure and behavior based on data, agent decisions, or external factors.
Creating Dynamic Workflows ✅
Kastrax provides several approaches for creating dynamic workflows, each suited for different use cases:
1. Using the Workflow Generator
The most direct approach is to use Kastrax’s DynamicWorkflowGenerator
to create workflows at runtime:
import ai.kastrax.core.KastraxSystem
import ai.kastrax.core.workflow.dynamic.DynamicWorkflowGenerator
import ai.kastrax.core.workflow.variable
import ai.kastrax.core.agent.Agent
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
// Initialize the Kastrax system
val kastraxSystem = KastraxSystem()
// Create agents for use in the dynamic workflow
val dataProcessingAgent: Agent = /* ... */
val analysisAgent: Agent = /* ... */
// Create a dynamic workflow generator
val workflowGenerator = DynamicWorkflowGenerator(kastraxSystem)
// Generate a dynamic workflow based on runtime conditions
val dynamicWorkflow = workflowGenerator.createWorkflow(
workflowId = "dynamic-data-workflow",
description = "Dynamically generated workflow for data processing"
) {
// Define input parameters
input {
variable("dataSource", String::class, required = true)
variable("analysisType", String::class, defaultValue = "standard")
}
// Add steps to the workflow
step(dataProcessingAgent) {
id = "process-data"
name = "Process Data"
description = "Process data from the source"
variables = mutableMapOf(
"source" to variable("$.input.dataSource")
)
}
step(analysisAgent) {
id = "analyze-data"
name = "Analyze Data"
description = "Analyze the processed data"
after("process-data")
variables = mutableMapOf(
"data" to variable("$.steps.process-data.output.processedData"),
"analysisType" to variable("$.input.analysisType")
)
}
// Define workflow output
output {
"results" from "$.steps.analyze-data.output.results"
"metadata" from "$.steps.process-data.output.metadata"
}
}
// Register the dynamic workflow with the system
kastraxSystem.registerWorkflow(dynamicWorkflow)
// Execute the dynamic workflow
val result = kastraxSystem.workflowEngine.executeWorkflow(
workflowId = "dynamic-data-workflow",
input = mapOf(
"dataSource" to "customer_database",
"analysisType" to "comprehensive"
)
)
println("Workflow execution result: ${result.output}")
}
2. Dynamic Step Creation in Workflow Steps
Another approach is to dynamically create steps within an existing workflow based on runtime conditions:
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
import ai.kastrax.core.agent.Agent
// Create a workflow with dynamic step creation
val adaptiveWorkflow = workflow {
name = "adaptive-workflow"
description = "Workflow that adapts its steps based on input data"
// Initial data analysis step
step(analysisAgent) {
id = "analyze-input"
name = "Analyze Input Data"
description = "Determine what processing is needed"
variables = mutableMapOf(
"data" to variable("$.input.data")
)
// Dynamically create subsequent steps based on analysis results
onComplete { result ->
val dataType = result["dataType"] as? String ?: "unknown"
val complexity = result["complexity"] as? Int ?: 1
when (dataType) {
"text" -> {
// Add text processing steps
step(textProcessingAgent) {
id = "process-text"
name = "Process Text Data"
description = "Process textual content"
variables = mutableMapOf(
"text" to variable("$.steps.analyze-input.output.extractedText")
)
}
// Add more steps if content is complex
if (complexity > 3) {
step(advancedTextAgent) {
id = "advanced-text-analysis"
name = "Advanced Text Analysis"
description = "Perform advanced text analysis"
after("process-text")
variables = mutableMapOf(
"processedText" to variable("$.steps.process-text.output.result")
)
}
}
}
"image" -> {
// Add image processing steps
step(imageProcessingAgent) {
id = "process-image"
name = "Process Image Data"
description = "Process image content"
variables = mutableMapOf(
"imageUrl" to variable("$.steps.analyze-input.output.imageUrl")
)
}
}
else -> {
// Add fallback processing step
step(genericProcessingAgent) {
id = "generic-processing"
name = "Generic Data Processing"
description = "Process unknown data type"
variables = mutableMapOf(
"data" to variable("$.steps.analyze-input.output.rawData")
)
}
}
}
// Always add a final reporting step
step(reportingAgent) {
id = "generate-report"
name = "Generate Report"
description = "Create a report of the processing results"
// This step will automatically depend on all previously created steps
variables = mutableMapOf(
"dataType" to variable("$.steps.analyze-input.output.dataType"),
"results" to variable("$.steps", transform = { steps ->
// Collect results from all processing steps
(steps as Map<String, Any>).filter {
it.key != "analyze-input" && it.key != "generate-report"
}.mapValues {
(it.value as? Map<String, Any>)?.get("output") ?: mapOf<String, Any>()
}
})
)
}
}
}
}
### 3. Workflow Factory Pattern
For more complex scenarios, you can implement a workflow factory that generates different workflows based on input parameters:
```kotlin filename="WorkflowFactory.kt"
import ai.kastrax.core.KastraxSystem
import ai.kastrax.core.workflow.Workflow
import ai.kastrax.core.workflow.WorkflowBuilder
import ai.kastrax.core.workflow.variable
import ai.kastrax.core.agent.Agent
import kotlinx.coroutines.runBlocking
// Define a workflow factory class
class WorkflowFactory(private val kastraxSystem: KastraxSystem) {
// Agents used in workflows
private val textProcessingAgent: Agent = /* ... */
private val imageProcessingAgent: Agent = /* ... */
private val dataAnalysisAgent: Agent = /* ... */
private val reportGenerationAgent: Agent = /* ... */
// Create a workflow based on content type
fun createContentWorkflow(contentType: String, complexity: Int): Workflow {
return when (contentType) {
"blog" -> createBlogWorkflow(complexity)
"social" -> createSocialMediaWorkflow(complexity)
"documentation" -> createDocumentationWorkflow(complexity)
else -> createGenericContentWorkflow()
}
}
// Create a blog content workflow
private fun createBlogWorkflow(complexity: Int): Workflow {
return WorkflowBuilder().apply {
id = "blog-content-workflow"
description = "Workflow for creating blog content"
// Define input parameters
input {
variable("topic", String::class, required = true)
variable("tone", String::class, defaultValue = "informative")
variable("wordCount", Int::class, defaultValue = 1000)
}
// Research step
step(textProcessingAgent) {
id = "research"
name = "Research Topic"
description = "Research the blog topic"
variables = mutableMapOf(
"topic" to variable("$.input.topic"),
"depth" to if (complexity > 2) "deep" else "standard"
)
}
// Outline step
step(textProcessingAgent) {
id = "create-outline"
name = "Create Outline"
description = "Create a blog post outline"
after("research")
variables = mutableMapOf(
"research" to variable("$.steps.research.output.results"),
"complexity" to complexity
)
}
// Writing step
step(textProcessingAgent) {
id = "write-content"
name = "Write Blog Content"
description = "Write the blog content"
after("create-outline")
variables = mutableMapOf(
"outline" to variable("$.steps.create-outline.output.outline"),
"research" to variable("$.steps.research.output.results"),
"tone" to variable("$.input.tone"),
"wordCount" to variable("$.input.wordCount")
)
}
// Add additional steps for higher complexity
if (complexity >= 3) {
step(imageProcessingAgent) {
id = "create-images"
name = "Create Blog Images"
description = "Generate images for the blog post"
after("write-content")
variables = mutableMapOf(
"content" to variable("$.steps.write-content.output.content"),
"style" to "professional"
)
}
step(textProcessingAgent) {
id = "edit-content"
name = "Edit Blog Content"
description = "Edit and improve the blog content"
after("write-content")
variables = mutableMapOf(
"content" to variable("$.steps.write-content.output.content"),
"editingLevel" to "thorough"
)
}
}
// Define workflow output
output {
"content" from if (complexity >= 3)
"$.steps.edit-content.output.content"
else
"$.steps.write-content.output.content"
"images" from if (complexity >= 3)
"$.steps.create-images.output.images"
else
listOf<String>()
"metadata" from "$.steps.research.output.metadata"
}
}.build()
}
// Create a social media content workflow
private fun createSocialMediaWorkflow(complexity: Int): Workflow {
// Similar implementation to blog workflow but with social media focus
// ...
return WorkflowBuilder().apply {
// Social media workflow implementation
// ...
}.build()
}
// Create a documentation workflow
private fun createDocumentationWorkflow(complexity: Int): Workflow {
// Similar implementation to blog workflow but with documentation focus
// ...
return WorkflowBuilder().apply {
// Documentation workflow implementation
// ...
}.build()
}
// Create a generic content workflow
private fun createGenericContentWorkflow(): Workflow {
return WorkflowBuilder().apply {
id = "generic-content-workflow"
description = "Generic workflow for content creation"
// Basic implementation with minimal steps
// ...
}.build()
}
}
// Usage example
fun main() = runBlocking {
val kastraxSystem = KastraxSystem()
val workflowFactory = WorkflowFactory(kastraxSystem)
// Create a workflow based on content type and complexity
val blogWorkflow = workflowFactory.createContentWorkflow("blog", 4)
// Register the workflow
kastraxSystem.registerWorkflow(blogWorkflow)
// Execute the workflow
val result = kastraxSystem.workflowEngine.executeWorkflow(
workflowId = blogWorkflow.id,
input = mapOf(
"topic" to "Artificial Intelligence in Healthcare",
"tone" to "professional",
"wordCount" to 1500
)
)
println("Workflow execution result: ${result.output}")
}
Template-Based Dynamic Workflows ✅
Kastrax also supports template-based dynamic workflows, which provide a powerful way to create customizable workflow patterns:
import ai.kastrax.core.workflow.template.WorkflowTemplate
import ai.kastrax.core.workflow.template.TemplateParameter
import ai.kastrax.core.workflow.variable
// Define a workflow template for content creation
class ContentWorkflowTemplate : WorkflowTemplate {
// Define template parameters
override val parameters = listOf(
TemplateParameter("contentType", String::class, required = true),
TemplateParameter("includeImages", Boolean::class, defaultValue = false),
TemplateParameter("reviewSteps", Int::class, defaultValue = 1),
TemplateParameter("targetAudience", String::class, defaultValue = "general")
)
// Build the workflow from the template
override fun build(params: Map<String, Any>): Workflow {
// Extract parameters
val contentType = params["contentType"] as String
val includeImages = params["includeImages"] as Boolean
val reviewSteps = params["reviewSteps"] as Int
val targetAudience = params["targetAudience"] as String
return WorkflowBuilder().apply {
id = "${contentType.lowercase()}-content-workflow"
description = "Workflow for creating $contentType content"
// Define input parameters
input {
variable("topic", String::class, required = true)
variable("tone", String::class, defaultValue = "informative")
}
// Research step
step(researchAgent) {
id = "research"
name = "Research Topic"
description = "Research the topic for $contentType content"
variables = mutableMapOf(
"topic" to variable("$.input.topic"),
"contentType" to contentType,
"audience" to targetAudience
)
}
// Content creation step
step(contentCreationAgent) {
id = "create-content"
name = "Create $contentType Content"
description = "Create the $contentType content"
after("research")
variables = mutableMapOf(
"research" to variable("$.steps.research.output.results"),
"topic" to variable("$.input.topic"),
"tone" to variable("$.input.tone"),
"contentType" to contentType,
"audience" to targetAudience
)
}
// Add image generation if requested
if (includeImages) {
step(imageGenerationAgent) {
id = "generate-images"
name = "Generate Images"
description = "Generate images for the content"
after("create-content")
variables = mutableMapOf(
"content" to variable("$.steps.create-content.output.content"),
"contentType" to contentType,
"style" to variable("$.input.tone")
)
}
}
// Add review steps based on parameter
repeat(reviewSteps) { index ->
step(reviewAgent) {
id = "review-${index + 1}"
name = "Review ${index + 1}"
description = "Review and improve the content"
after(if (index == 0) {
if (includeImages) "generate-images" else "create-content"
} else "review-$index")
variables = mutableMapOf(
"content" to if (index == 0) {
variable("$.steps.create-content.output.content")
} else {
variable("$.steps.review-$index.output.revisedContent")
},
"reviewDepth" to "thorough",
"reviewNumber" to (index + 1)
)
}
}
// Define workflow output
output {
"content" from if (reviewSteps > 0) {
"$.steps.review-$reviewSteps.output.revisedContent"
} else {
"$.steps.create-content.output.content"
}
if (includeImages) {
"images" from "$.steps.generate-images.output.images"
}
"metadata" from "$.steps.research.output.metadata"
}
}.build()
}
}
// Usage example
fun main() = runBlocking {
val kastraxSystem = KastraxSystem()
// Create a template instance
val template = ContentWorkflowTemplate()
// Create workflows from the template with different parameters
val blogWorkflow = template.build(mapOf(
"contentType" to "Blog",
"includeImages" to true,
"reviewSteps" to 2,
"targetAudience" to "technical"
))
val socialWorkflow = template.build(mapOf(
"contentType" to "Social",
"includeImages" to true,
"reviewSteps" to 1,
"targetAudience" to "general"
))
// Register the workflows
kastraxSystem.registerWorkflow(blogWorkflow)
kastraxSystem.registerWorkflow(socialWorkflow)
// Execute the blog workflow
val result = kastraxSystem.workflowEngine.executeWorkflow(
workflowId = blogWorkflow.id,
input = mapOf(
"topic" to "Kotlin Coroutines",
"tone" to "educational"
)
)
println("Blog workflow result: ${result.output}")
}
Important Considerations ✅
1. Performance and Resource Management
Dynamic workflows require careful resource management:
- Workflow Caching: Consider caching frequently used workflow patterns instead of recreating them
- Resource Limits: Implement limits on the number of dynamic workflows that can be created
- Cleanup: Ensure proper cleanup of unused dynamic workflows to prevent resource leaks
2. Error Handling and Validation
Robust error handling is essential for dynamic workflows:
- Parameter Validation: Validate all parameters used to create dynamic workflows
- Graceful Degradation: Implement fallback workflows when dynamic creation fails
- Comprehensive Logging: Log all aspects of dynamic workflow creation and execution
3. Workflow Lifecycle Management
Manage the lifecycle of dynamic workflows carefully:
- Registration: Explicitly register important dynamic workflows with the Kastrax system
- Versioning: Consider versioning dynamic workflows for tracking and debugging
- Persistence: Decide whether dynamic workflows should persist beyond their initial execution
4. Testing and Debugging
Testing dynamic workflows requires special approaches:
- Template Testing: Test workflow templates with various parameter combinations
- Snapshot Testing: Compare generated workflow structures against expected snapshots
- Runtime Validation: Validate workflow structure before execution
Use Cases ✅
Dynamic workflows in Kastrax are particularly valuable for these scenarios:
1. Content Generation Pipelines
Create specialized content workflows based on content type, audience, and complexity requirements:
- Blog posts with varying levels of research and editing
- Social media content with platform-specific steps
- Technical documentation with customized review processes
2. Multi-Agent Collaboration
Dynamically assemble workflows that coordinate multiple specialized agents:
- Research teams with domain-specific agent selection
- Creative projects with dynamic role assignment
- Problem-solving workflows with adaptive agent selection
3. Adaptive Processing
Create workflows that adapt to the nature of the input data:
- Data processing pipelines that adjust based on data characteristics
- Content analysis workflows that vary based on content type
- Decision-making processes that adapt to complexity levels
4. Multi-tenant Applications
Generate isolated workflows for different tenants or users:
- Customer-specific processing pipelines
- User-customized agent workflows
- Organization-specific approval processes
Conclusion ✅
Dynamic workflows represent one of Kastrax’s most powerful capabilities, enabling truly adaptive AI agent orchestration. By leveraging the workflow generation, templating, and runtime modification features, you can create sophisticated systems that respond intelligently to changing requirements and contexts.
The combination of Kastrax’s type-safe workflow DSL with dynamic workflow generation provides both the safety of compile-time checking and the flexibility of runtime adaptation - a powerful foundation for building advanced AI agent applications.