Skip to Content
DocsWorkflowsWorkflow System Overview | Kastrax Docs

Workflow System Overview ✅

The Kastrax workflow system allows you to define, execute, and monitor complex sequences of operations involving agents and other components. This guide explains the workflow system architecture and how to use it effectively.

What Are Workflows? ✅

Workflows in Kastrax are structured sequences of steps that can be executed in a specific order, with data flowing between steps. They enable you to:

  • Orchestrate multiple agents to collaborate on complex tasks
  • Define conditional execution paths based on intermediate results
  • Process and transform data between steps
  • Handle errors and retries
  • Monitor and track execution progress
  • Create reusable patterns for common agent interactions

Workflow System Architecture ✅

The Kastrax workflow system consists of several key components:

  1. Workflow: The top-level container that defines a sequence of steps
  2. WorkflowStep: Individual units of work within a workflow
  3. WorkflowContext: Shared state and data passed between steps
  4. WorkflowEngine: The runtime that executes workflows
  5. WorkflowBuilder: DSL for creating workflows
  6. WorkflowStateStorage: Storage for workflow execution state

Creating Basic Workflows ✅

Kastrax provides a DSL for creating workflows:

import ai.kastrax.core.workflow.builder.workflow import ai.kastrax.core.agent.agent import ai.kastrax.integrations.deepseek.deepSeek import ai.kastrax.integrations.deepseek.DeepSeekModel import kotlinx.coroutines.runBlocking fun main() = runBlocking { // Create agents for the workflow val researchAgent = agent { name("ResearchAgent") description("An agent that researches topics") model = deepSeek { apiKey("your-deepseek-api-key") model(DeepSeekModel.DEEPSEEK_CHAT) temperature(0.7) } } val writingAgent = agent { name("WritingAgent") description("An agent that writes content based on research") model = deepSeek { apiKey("your-deepseek-api-key") model(DeepSeekModel.DEEPSEEK_CHAT) temperature(0.7) } } // Create a workflow val researchWorkflow = workflow("research-workflow", "Research and write an article") { // Research step step(researchAgent) { id = "research" name = "Research" description = "Research the topic" variables = mutableMapOf( "topic" to variable("$.input.topic") ) } // Writing step step(writingAgent) { id = "writing" name = "Writing" description = "Write an article based on research" after("research") variables = mutableMapOf( "research" to variable("$.steps.research.output.text") ) } // Define output mapping output { "researchResults" from "$.steps.research.output.text" "article" from "$.steps.writing.output.text" "wordCount" from { "$.steps.writing.output.text" to { text -> (text as? String)?.split(" ")?.size ?: 0 } } } } // Execute the workflow val workflowEngine = WorkflowEngine() workflowEngine.registerWorkflow("research-workflow", researchWorkflow) val result = workflowEngine.executeWorkflow( workflowId = "research-workflow", input = mapOf("topic" to "Artificial Intelligence") ) println("Workflow result: ${result.output}") }

Workflow Components ✅

Workflow ✅

The Workflow interface defines the core functionality of a workflow:

interface Workflow { suspend fun execute( input: Map<String, Any?>, options: WorkflowExecuteOptions = WorkflowExecuteOptions() ): WorkflowResult suspend fun streamExecute( input: Map<String, Any?>, options: WorkflowExecuteOptions = WorkflowExecuteOptions() ): Flow<WorkflowStatusUpdate> }

WorkflowStep ✅

The WorkflowStep interface defines individual steps within a workflow:

interface WorkflowStep { val id: String val name: String val description: String val after: List<String> val variables: Map<String, VariableReference> val config: StepConfig? get() = null val condition: (WorkflowContext) -> Boolean get() = { true } suspend fun execute(context: WorkflowContext): WorkflowStepResult }

WorkflowContext ✅

The WorkflowContext class holds the state and data during workflow execution:

data class WorkflowContext( val input: Map<String, Any?>, val steps: MutableMap<String, WorkflowStepResult> = mutableMapOf(), val variables: MutableMap<String, Any?> = mutableMapOf(), val runId: String? = null ) { fun resolveReference(reference: VariableReference): Any? { // Implementation... } fun resolveVariables(variables: Map<String, VariableReference>): Map<String, Any?> { // Implementation... } }

WorkflowEngine ✅

The WorkflowEngine class is responsible for executing workflows:

class WorkflowEngine( private val stateStorage: WorkflowStateStorage = InMemoryWorkflowStateStorage() ) { private val workflows = mutableMapOf<String, Workflow>() fun registerWorkflow(workflowId: String, workflow: Workflow) { workflows[workflowId] = workflow } suspend fun executeWorkflow( workflowId: String, input: Map<String, Any?>, options: WorkflowExecuteOptions = WorkflowExecuteOptions() ): WorkflowResult { // Implementation... } suspend fun streamExecuteWorkflow( workflowId: String, input: Map<String, Any?>, options: WorkflowExecuteOptions = WorkflowExecuteOptions() ): Flow<WorkflowStatusUpdate> { // Implementation... } }

Step Types ✅

Kastrax provides several built-in step types:

Agent Step ✅

The most common step type, which executes an agent with specific inputs:

step(agent) { id = "generate-content" name = "Generate Content" description = "Generate content based on a topic" variables = mutableMapOf( "topic" to variable("$.input.topic"), "style" to variable("$.input.style") ) }

Function Step ✅

Executes a custom function:

functionStep { id = "process-data" name = "Process Data" description = "Process data from previous steps" after("collect-data") function { context -> val data = context.resolveReference(variable("$.steps.collect-data.output.data")) // Process the data mapOf("processedData" to processData(data)) } }

Conditional Step ✅

Executes based on a condition:

step(agent) { id = "optional-step" name = "Optional Step" description = "This step only executes under certain conditions" after("previous-step") condition { context -> val quality = context.resolveReference(variable("$.steps.previous-step.output.quality")) (quality as? Int) ?: 0 > 7 } variables = mutableMapOf( "input" to variable("$.steps.previous-step.output.text") ) }

Subworkflow Step ✅

Executes another workflow as a step:

subworkflowStep { id = "nested-workflow" name = "Nested Workflow" description = "Execute a nested workflow" workflowId = "another-workflow" inputMapping { context -> mapOf( "data" to context.resolveReference(variable("$.steps.previous-step.output.data")) ) } }

Data Flow ✅

Data flows through a workflow via variable references:

Variable References ✅

Variable references use a JSON path-like syntax to access data:

// Reference workflow input val topicRef = variable("$.input.topic") // Reference output from a previous step val researchRef = variable("$.steps.research.output.text") // Reference a global variable val configRef = variable("$.variables.config")

Output Mapping ✅

Output mapping defines how to extract and transform the final workflow output:

output { // Direct mapping "researchResults" from "$.steps.research.output.text" // Mapping with transformation "wordCount" from { "$.steps.writing.output.text" to { text -> (text as? String)?.split(" ")?.size ?: 0 } } // Combining multiple sources "summary" from { "$.steps.research.output.text" to { research -> "$.steps.writing.output.text" to { article -> "Research: $research\n\nArticle: $article" } } } }

Error Handling ✅

Workflows can handle errors in several ways:

Retry Configuration ✅

step(agent) { id = "unreliable-step" name = "Unreliable Step" description = "A step that might fail" // Configure retries config = StepConfig( retryConfig = RetryConfig( maxRetries = 3, retryDelay = 1000, // 1 second backoffMultiplier = 2.0 // Exponential backoff ) ) }

Error Handlers ✅

// Create an error handling workflow engine val workflowEngine = ErrorHandlingWorkflowEngine( errorHandler = object : ErrorHandler { override suspend fun handleStepError( workflowId: String, runId: String, stepId: String, error: Throwable ): ErrorHandlingAction { return when { error is TimeoutException -> ErrorHandlingAction.Retry(maxRetries = 3) stepId == "optional-step" -> ErrorHandlingAction.Skip else -> ErrorHandlingAction.Fail } } } )

Workflow Execution ✅

Workflows can be executed in different ways:

Synchronous Execution ✅

val result = workflowEngine.executeWorkflow( workflowId = "my-workflow", input = mapOf("key" to "value") ) println("Workflow completed with output: ${result.output}")

Streaming Execution ✅

workflowEngine.streamExecuteWorkflow( workflowId = "my-workflow", input = mapOf("key" to "value") ).collect { update -> when (update) { is WorkflowStatusUpdate.Started -> println("Workflow started") is WorkflowStatusUpdate.StepStarted -> println("Step started: ${update.stepId}") is WorkflowStatusUpdate.StepCompleted -> println("Step completed: ${update.stepId}") is WorkflowStatusUpdate.StepFailed -> println("Step failed: ${update.stepId}, error: ${update.error}") is WorkflowStatusUpdate.Completed -> println("Workflow completed") is WorkflowStatusUpdate.Failed -> println("Workflow failed: ${update.error}") } }

Execution Options ✅

val options = WorkflowExecuteOptions( maxSteps = 20, // Maximum number of steps to execute timeout = 120000, // 2 minutes timeout onStepFinish = { stepResult -> println("Step ${stepResult.stepId} finished with output: ${stepResult.output}") }, onStepError = { stepId, error -> println("Step $stepId failed with error: ${error.message}") }, threadId = "conversation-123" // Optional thread ID for memory integration ) val result = workflowEngine.executeWorkflow( workflowId = "my-workflow", input = mapOf("key" to "value"), options = options )

Workflow State Management ✅

Workflows maintain state during and after execution:

State Storage ✅

// In-memory state storage (default) val inMemoryStorage = InMemoryWorkflowStateStorage() // File-based state storage val fileStorage = FileWorkflowStateStorage("workflows/state") // Database state storage val dbStorage = DatabaseWorkflowStateStorage(dataSource) // Create a workflow engine with custom state storage val workflowEngine = WorkflowEngine(stateStorage = fileStorage)

State Queries ✅

// Get workflow state val state = workflowEngine.getWorkflowState("my-workflow", "run-123") // Get all runs for a workflow val runs = workflowEngine.getWorkflowRuns("my-workflow") // Get active runs val activeRuns = workflowEngine.getActiveWorkflowRuns()

Advanced Workflow Features ✅

Parallel Execution ✅

val parallelWorkflow = workflow("parallel-workflow", "Execute steps in parallel") { // These steps have no dependencies, so they can run in parallel step(agent1) { id = "step1" name = "Step 1" description = "First parallel step" } step(agent2) { id = "step2" name = "Step 2" description = "Second parallel step" } // This step depends on both parallel steps step(agent3) { id = "step3" name = "Step 3" description = "Final step after parallel execution" after("step1", "step2") variables = mutableMapOf( "result1" to variable("$.steps.step1.output.result"), "result2" to variable("$.steps.step2.output.result") ) } }

Workflow Composition ✅

// Create a workflow composer val composer = WorkflowComposer("my-composer", workflowEngine) // Compose workflows sequentially val sequentialWorkflow = composer.composeSequential( workflowName = "sequential-workflow", description = "Execute workflows in sequence", workflows = listOf( "workflow1" to { input -> input }, // Input mapping for workflow1 "workflow2" to { input, prevOutput -> prevOutput } // Use previous output as input ) ) // Compose workflows in parallel val parallelWorkflow = composer.composeParallel( workflowName = "parallel-workflow", description = "Execute workflows in parallel", workflows = mapOf( "branch1" to "workflow1", "branch2" to "workflow2" ), inputMapping = { branch, input -> // Map input for each branch when (branch) { "branch1" -> mapOf("key1" to input["value"]) "branch2" -> mapOf("key2" to input["value"]) else -> emptyMap() } }, mergeStep = MergeResultsStep( id = "merge", name = "Merge Results", description = "Merge results from parallel workflows" ) )

State Machines ✅

For complex workflows with multiple possible paths, you can use state machines:

// Define states enum class OrderState { NEW, PROCESSING, SHIPPED, DELIVERED, CANCELED } // Define events sealed class OrderEvent { data class PlaceOrder(val items: List<String>) : OrderEvent() data class ProcessOrder(val paymentId: String) : OrderEvent() data class ShipOrder(val trackingNumber: String) : OrderEvent() data class DeliverOrder(val deliveryDate: String) : OrderEvent() object CancelOrder : OrderEvent() } // Create a state machine val orderStateMachine = BasicStateMachine<OrderState, OrderEvent, Map<String, Any>>( initialState = OrderState.NEW, initialContext = emptyMap(), transitioner = object : StateTransitioner<OrderState, OrderEvent, Map<String, Any>> { override suspend fun transition( currentState: OrderState, event: OrderEvent, context: Map<String, Any> ): TransitionResult<OrderState, Map<String, Any>> { // Define state transitions val (nextState, updatedContext) = when (currentState to event) { OrderState.NEW to is OrderEvent.PlaceOrder -> { OrderState.PROCESSING to context + ("items" to event.items) } OrderState.PROCESSING to is OrderEvent.ProcessOrder -> { OrderState.SHIPPED to context + ("paymentId" to event.paymentId) } OrderState.SHIPPED to is OrderEvent.ShipOrder -> { OrderState.DELIVERED to context + ("trackingNumber" to event.trackingNumber) } OrderState.SHIPPED to is OrderEvent.DeliverOrder -> { OrderState.DELIVERED to context + ("deliveryDate" to event.deliveryDate) } else to is OrderEvent.CancelOrder -> { OrderState.CANCELED to context } else -> currentState to context } return TransitionResult( nextState = nextState, updatedContext = updatedContext, sideEffects = emptyList() ) } } ) // Use the state machine val nextState = orderStateMachine.sendEvent(OrderEvent.PlaceOrder(listOf("item1", "item2"))) println("Next state: $nextState")

Complete Example ✅

Here’s a complete example of a sophisticated workflow:

import ai.kastrax.core.workflow.builder.workflow import ai.kastrax.core.workflow.engine.WorkflowEngine import ai.kastrax.core.agent.agent import ai.kastrax.integrations.deepseek.deepSeek import ai.kastrax.integrations.deepseek.DeepSeekModel import kotlinx.coroutines.runBlocking fun main() = runBlocking { // Create agents val researchAgent = agent { name("ResearchAgent") description("An agent that researches topics") model = deepSeek { apiKey("your-deepseek-api-key") model(DeepSeekModel.DEEPSEEK_CHAT) temperature(0.7) } } val writingAgent = agent { name("WritingAgent") description("An agent that writes content based on research") model = deepSeek { apiKey("your-deepseek-api-key") model(DeepSeekModel.DEEPSEEK_CHAT) temperature(0.7) } } val editingAgent = agent { name("EditingAgent") description("An agent that edits and improves content") model = deepSeek { apiKey("your-deepseek-api-key") model(DeepSeekModel.DEEPSEEK_CHAT) temperature(0.7) } } val factCheckingAgent = agent { name("FactCheckingAgent") description("An agent that verifies facts in content") model = deepSeek { apiKey("your-deepseek-api-key") model(DeepSeekModel.DEEPSEEK_CHAT) temperature(0.7) } } // Create a content creation workflow val contentWorkflow = workflow("content-creation", "Create researched and edited content") { // Research step step(researchAgent) { id = "research" name = "Research" description = "Research the topic thoroughly" variables = mutableMapOf( "topic" to variable("$.input.topic"), "depth" to variable("$.input.researchDepth") ) } // Writing step step(writingAgent) { id = "writing" name = "Writing" description = "Write content based on research" after("research") variables = mutableMapOf( "research" to variable("$.steps.research.output.text"), "style" to variable("$.input.contentStyle"), "length" to variable("$.input.contentLength") ) } // Parallel fact checking and editing step(factCheckingAgent) { id = "fact-checking" name = "Fact Checking" description = "Verify facts in the content" after("writing") variables = mutableMapOf( "content" to variable("$.steps.writing.output.text"), "research" to variable("$.steps.research.output.text") ) } step(editingAgent) { id = "editing" name = "Editing" description = "Edit and improve the content" after("writing") variables = mutableMapOf( "content" to variable("$.steps.writing.output.text"), "style" to variable("$.input.contentStyle") ) } // Final revision step step(editingAgent) { id = "final-revision" name = "Final Revision" description = "Create the final version incorporating edits and fact checks" after("fact-checking", "editing") variables = mutableMapOf( "original" to variable("$.steps.writing.output.text"), "edits" to variable("$.steps.editing.output.text"), "factChecks" to variable("$.steps.fact-checking.output.text") ) } // Conditional quality check step step(editingAgent) { id = "quality-check" name = "Quality Check" description = "Perform a quality check if requested" after("final-revision") condition { context -> val performQualityCheck = context.resolveReference(variable("$.input.performQualityCheck")) (performQualityCheck as? Boolean) ?: false } variables = mutableMapOf( "content" to variable("$.steps.final-revision.output.text") ) } // Define output mapping output { "research" from "$.steps.research.output.text" "draft" from "$.steps.writing.output.text" "factChecks" from "$.steps.fact-checking.output.text" "edits" from "$.steps.editing.output.text" "finalContent" from "$.steps.final-revision.output.text" "qualityScore" from { "$.steps.quality-check.output.score" to { score -> score ?: "No quality check performed" } } "metadata" from { "$.steps.writing.output.wordCount" to { wordCount -> "$.steps.final-revision.output.wordCount" to { finalWordCount -> mapOf( "initialWordCount" to wordCount, "finalWordCount" to finalWordCount, "changePercentage" to calculatePercentage(wordCount, finalWordCount) ) } } } } } // Create workflow engine val workflowEngine = WorkflowEngine() workflowEngine.registerWorkflow("content-creation", contentWorkflow) // Execute the workflow val result = workflowEngine.executeWorkflow( workflowId = "content-creation", input = mapOf( "topic" to "Artificial Intelligence Ethics", "researchDepth" to "comprehensive", "contentStyle" to "academic", "contentLength" to "2000 words", "performQualityCheck" to true ) ) // Print the results println("Workflow completed with status: ${if (result.success) "SUCCESS" else "FAILURE"}") println("Final content: ${result.output["finalContent"]}") println("Quality score: ${result.output["qualityScore"]}") println("Metadata: ${result.output["metadata"]}") } // Helper function to calculate percentage change fun calculatePercentage(initial: Any?, final: Any?): Double { val initialValue = (initial as? Number)?.toDouble() ?: return 0.0 val finalValue = (final as? Number)?.toDouble() ?: return 0.0 if (initialValue == 0.0) return 0.0 return ((finalValue - initialValue) / initialValue) * 100.0 }

Best Practices ✅

  1. Step Granularity: Design steps with appropriate granularity - not too small to create overhead, not too large to lose flexibility
  2. Error Handling: Implement robust error handling with retries for unreliable operations
  3. Data Flow: Be explicit about data flow between steps using variable references
  4. Conditional Logic: Use conditions to create dynamic workflows that adapt to intermediate results
  5. Monitoring: Set up proper monitoring and logging for workflow execution
  6. State Management: Choose appropriate state storage based on your reliability requirements
  7. Testing: Test workflows with different inputs and edge cases
  8. Composition: Compose complex workflows from simpler, reusable workflows

Next Steps ✅

Now that you understand the workflow system, you can:

  1. Learn about workflow definition
  2. Explore workflow execution
  3. Implement agent integration with workflows
  4. Set up workflow state management
Last updated on