Kastrax AI Workflow System
The Kastrax Workflow System provides a powerful framework for orchestrating complex AI agent operations. Built on Kotlin’s concurrency model, it enables you to define sophisticated workflows with features like branching, parallel execution, resource suspension, and more. This system is designed to handle the complex, multi-step processes that are common in AI applications.
When to Use Workflows
Workflows are essential when your AI application requires orchestrating multiple operations in a structured manner. Consider using workflows when:
- Complex Multi-step Processes: Your application needs to execute a sequence of AI operations with dependencies between steps
- Conditional Logic: You need to implement branching paths based on intermediate results
- Parallel Processing: Multiple operations need to be executed concurrently
- User Interaction: Workflows need to pause and wait for user input before continuing
- Error Handling: You need sophisticated error recovery mechanisms for AI operations
- Observability: You want detailed tracking and monitoring of each step in your AI process
Kastrax’s workflow system provides:
- A type-safe, Kotlin-based DSL for defining workflows and their steps
- Support for both simple (linear) and advanced (branching, parallel) execution paths
- Variable passing between steps with powerful reference resolution
- Suspension and resumption capabilities for long-running or user-interactive workflows
- Comprehensive error handling and recovery mechanisms
- Detailed observability and debugging features to track each workflow run
Workflow Architecture
The Kastrax Workflow System is built on a modular architecture with several key components:
- Workflow Interface: Defines the core operations for workflow execution
- Workflow Steps: Individual units of work that can be combined into a workflow
- Workflow Context: Maintains state and data during workflow execution
- Variable References: Mechanism for passing data between steps
- Workflow Engine: Orchestrates the execution of steps according to defined dependencies
This architecture enables flexible, maintainable, and observable workflow definitions.
Example Workflow
Let’s examine a simple workflow that processes content creation with research, writing, and editing steps:
Creating a Basic Workflow
Here’s how to define a workflow using Kastrax’s Kotlin DSL:
import ai.kastrax.core.agent.Agent
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Assume we have these agents defined elsewhere
val researchAgent: Agent = /* ... */
val writingAgent: Agent = /* ... */
val editingAgent: Agent = /* ... */
// Create the workflow
val contentWorkflow = workflow {
name = "content-creation"
description = "Create content about a topic"
// Research step
step(researchAgent) {
id = "research"
name = "Research"
description = "Research the topic"
variables = mutableMapOf(
"topic" to variable("$.input.topic")
)
}
// Writing step (depends on research)
step(writingAgent) {
id = "writing"
name = "Writing"
description = "Write an article based on research"
after("research") // This step runs after the research step
variables = mutableMapOf(
"research" to variable("$.steps.research.output.text")
)
}
// Editing step (depends on writing)
step(editingAgent) {
id = "editing"
name = "Editing"
description = "Edit the article"
after("writing") // This step runs after the writing step
variables = mutableMapOf(
"draft" to variable("$.steps.writing.output.text")
)
}
// Define output mapping
output {
"researchSummary" from "$.steps.research.output.text"
"finalArticle" from "$.steps.editing.output.text"
"wordCount" from {
"$.steps.editing.output.text" to { text ->
(text as? String)?.split(" ")?.size ?: 0
}
}
}
}
This workflow defines three steps (research, writing, and editing) with dependencies between them, and maps the final outputs.
Registering the Workflow
Register your workflow with the Kastrax system to enable execution, logging, and telemetry:
import ai.kastrax.core.KastraxSystem
// Create a Kastrax system instance
val kastraxSystem = KastraxSystem()
// Register the workflow
kastraxSystem.registerWorkflow(contentWorkflow)
// You can also register multiple workflows
kastraxSystem.registerWorkflows(
contentWorkflow,
otherWorkflow,
anotherWorkflow
)
You can also create dynamic workflows at runtime using the workflow engine:
import ai.kastrax.core.workflow.dynamic.DynamicWorkflowGenerator
// Create a dynamic workflow generator
val workflowGenerator = DynamicWorkflowGenerator(kastraxSystem)
// Create a workflow dynamically
val dynamicWorkflow = workflowGenerator.createWorkflow(
workflowName = "dynamic-workflow",
description = "Dynamically created workflow"
) {
// Define steps and flow here
step(someAgent) {
id = "dynamic-step"
// ... step configuration
}
// ... more steps and configuration
}
// Register the dynamic workflow
kastraxSystem.registerWorkflow(dynamicWorkflow)
Executing the Workflow
Execute your workflow programmatically:
import ai.kastrax.core.workflow.WorkflowExecuteOptions
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
// Get the workflow engine
val workflowEngine = kastraxSystem.workflowEngine
// Execute the workflow
val result = workflowEngine.executeWorkflow(
workflowId = "content-creation",
input = mapOf("topic" to "Artificial Intelligence in Healthcare"),
options = WorkflowExecuteOptions(
maxSteps = 10, // Maximum number of steps to execute
timeout = 60000, // Timeout in milliseconds
onStepFinish = { stepResult ->
println("Step ${stepResult.stepId} completed")
},
onStepError = { stepId, error ->
println("Error in step $stepId: ${error.message}")
}
)
)
// Process the result
if (result.success) {
println("Workflow completed successfully")
println("Final article: ${result.output["finalArticle"]}")
println("Word count: ${result.output["wordCount"]}")
} else {
println("Workflow failed: ${result.error}")
}
}
You can also execute workflows via the Kastrax HTTP API (when running a Kastrax server):
curl --location 'http://localhost:8080/api/workflows/content-creation/execute' \
--header 'Content-Type: application/json' \
--data '{
"topic": "Artificial Intelligence in Healthcare"
}'
This example demonstrates the essentials: define your workflow with steps, register it with the Kastrax system, then execute it with input data.
Workflow Step Types
Kastrax supports various types of workflow steps to handle different scenarios:
Agent Steps
Agent steps use AI agents to perform tasks:
step(agent) {
id = "agent-step"
name = "Agent Processing"
description = "Use an agent to process data"
variables = mutableMapOf(
"input" to variable("$.input.data")
)
}
Tool Steps
Tool steps execute specific tools or functions:
toolStep {
id = "data-processing"
name = "Process Data"
description = "Process data using a specific tool"
tool = DataProcessingTool()
variables = mutableMapOf(
"data" to variable("$.input.rawData")
)
}
Conditional Steps
Conditional steps execute based on conditions:
conditionalStep {
id = "quality-check"
name = "Quality Check"
description = "Check if the content meets quality standards"
condition { context ->
val wordCount = context.getVariable("$.steps.writing.output.wordCount") as? Int ?: 0
wordCount > 500
}
onTrue {
step(qualityAgent) {
id = "quality-review"
// ... configuration
}
}
onFalse {
step(revisionAgent) {
id = "content-revision"
// ... configuration
}
}
}
Human-in-the-Loop Steps
Steps that require human input or approval:
humanStep {
id = "human-review"
name = "Human Review"
description = "Get human approval for the content"
prompt { context ->
val content = context.getVariable("$.steps.editing.output.text") as? String ?: ""
"Please review the following content:\n\n$content\n\nApprove or suggest changes."
}
timeoutMs = 86400000 // 24 hours in milliseconds
onTimeout {
// Handle timeout case
step(notificationAgent) {
id = "timeout-notification"
// ... configuration
}
}
}
Advanced Workflow Features
Parallel Execution
Execute multiple steps concurrently:
parallelSteps {
step(researchAgent1) {
id = "research-source1"
// ... configuration
}
step(researchAgent2) {
id = "research-source2"
// ... configuration
}
step(researchAgent3) {
id = "research-source3"
// ... configuration
}
}
// Continue with a step that uses results from all parallel steps
step(synthesisAgent) {
id = "research-synthesis"
after("research-source1", "research-source2", "research-source3")
variables = mutableMapOf(
"source1" to variable("$.steps.research-source1.output.text"),
"source2" to variable("$.steps.research-source2.output.text"),
"source3" to variable("$.steps.research-source3.output.text")
)
}
Error Handling
Implement sophisticated error handling:
step(riskAgent) {
id = "risk-analysis"
// ... configuration
onError { error ->
// Log the error
println("Error in risk analysis: ${error.message}")
// Execute fallback step
step(fallbackAgent) {
id = "risk-analysis-fallback"
variables = mutableMapOf(
"error" to variable(error.message)
)
}
}
}
Workflow Variables
Workflow variables provide a powerful mechanism for passing data between steps:
// Define a workflow with variables
val workflowWithVariables = workflow {
name = "data-processing-workflow"
// Define input variables
input {
variable("dataSource", String::class)
variable("processingLevel", Int::class, defaultValue = 1)
}
// Use variables in steps
step(dataAgent) {
id = "data-extraction"
variables = mutableMapOf(
"source" to variable("$.input.dataSource"),
"level" to variable("$.input.processingLevel")
)
}
// Use JSONPath expressions to access nested properties
step(analysisAgent) {
id = "data-analysis"
after("data-extraction")
variables = mutableMapOf(
"rawData" to variable("$.steps.data-extraction.output.data"),
"metadata" to variable("$.steps.data-extraction.output.metadata.timestamp")
)
}
}
Observability and Monitoring
Kastrax workflows provide comprehensive observability features:
// Configure workflow monitoring
val monitoredWorkflow = workflow {
name = "monitored-workflow"
// ... workflow configuration
monitoring {
onStart { workflowId, input ->
println("Workflow $workflowId started with input: $input")
}
onStepStart { workflowId, stepId ->
println("Step $stepId in workflow $workflowId started")
}
onStepComplete { workflowId, stepId, output ->
println("Step $stepId in workflow $workflowId completed with output: $output")
}
onComplete { workflowId, output ->
println("Workflow $workflowId completed with output: $output")
}
onError { workflowId, error ->
println("Workflow $workflowId failed with error: ${error.message}")
}
}
}
Integration with Actor Model
One of Kastrax’s unique features is the integration of the workflow system with the actor model, enabling distributed workflow execution:
import ai.kastrax.actor.ActorSystem
import ai.kastrax.actor.Props
import ai.kastrax.workflow.WorkflowActor
import ai.kastrax.workflow.messages.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
// Create an actor system
val system = ActorSystem("workflow-system")
// Create a workflow actor
val workflowActor = system.actorOf(
Props.create(WorkflowActor::class.java, contentWorkflow),
"workflow-actor"
)
// Send a workflow execution message
val result = system.ask<WorkflowResult>(
workflowActor,
ExecuteWorkflowMessage(input = mapOf("topic" to "AI in Healthcare"))
)
// Process the result
when (result) {
is WorkflowResult.Success -> {
println("Workflow executed successfully")
println("Final article: ${result.output["finalArticle"]}")
}
is WorkflowResult.Error -> {
println("Workflow execution failed: ${result.error}")
}
}
// Shutdown the actor system when done
system.terminate()
}
This integration enables building sophisticated multi-agent systems where workflows can be distributed across different nodes and executed concurrently.
Real-World Use Cases
Kastrax workflows are ideal for a variety of AI agent applications:
Content Creation Pipeline
Orchestrate a complete content creation process:
- Research step gathers information from multiple sources
- Planning step creates an outline based on research
- Writing step generates the initial content
- Editing step refines and improves the content
- Review step (human-in-the-loop) gets final approval
Customer Support Automation
Handle complex customer inquiries:
- Classification step determines the type of inquiry
- Information retrieval step gathers relevant knowledge base articles
- Response generation step creates a personalized response
- Escalation step (conditional) routes complex issues to human agents
Data Analysis Workflow
Process and analyze large datasets:
- Data collection step gathers information from multiple sources
- Preprocessing step cleans and normalizes the data
- Analysis step extracts insights from the data
- Visualization step creates charts and graphs
- Reporting step generates a comprehensive report
More Resources
- The Workflow Guide in the Guides section is a tutorial that covers the main concepts
- Sequential Steps workflow example
- Parallel Steps workflow example
- Branching Paths workflow example
- Workflow Variables example
- Cyclical Dependencies workflow example
- Suspend and Resume workflow example