Workflow System Overview ✅
The Kastrax workflow system allows you to define, execute, and monitor complex sequences of operations involving agents and other components. This guide explains the workflow system architecture and how to use it effectively.
What Are Workflows? ✅
Workflows in Kastrax are structured sequences of steps that can be executed in a specific order, with data flowing between steps. They enable you to:
- Orchestrate multiple agents to collaborate on complex tasks
- Define conditional execution paths based on intermediate results
- Process and transform data between steps
- Handle errors and retries
- Monitor and track execution progress
- Create reusable patterns for common agent interactions
Workflow System Architecture ✅
The Kastrax workflow system consists of several key components:
- Workflow: The top-level container that defines a sequence of steps
- WorkflowStep: Individual units of work within a workflow
- WorkflowContext: Shared state and data passed between steps
- WorkflowEngine: The runtime that executes workflows
- WorkflowBuilder: DSL for creating workflows
- WorkflowStateStorage: Storage for workflow execution state
Creating Basic Workflows ✅
Kastrax provides a DSL for creating workflows:
import ai.kastrax.core.workflow.builder.workflow
import ai.kastrax.core.agent.agent
import ai.kastrax.integrations.deepseek.deepSeek
import ai.kastrax.integrations.deepseek.DeepSeekModel
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
// Create agents for the workflow
val researchAgent = agent {
name("ResearchAgent")
description("An agent that researches topics")
model = deepSeek {
apiKey("your-deepseek-api-key")
model(DeepSeekModel.DEEPSEEK_CHAT)
temperature(0.7)
}
}
val writingAgent = agent {
name("WritingAgent")
description("An agent that writes content based on research")
model = deepSeek {
apiKey("your-deepseek-api-key")
model(DeepSeekModel.DEEPSEEK_CHAT)
temperature(0.7)
}
}
// Create a workflow
val researchWorkflow = workflow("research-workflow", "Research and write an article") {
// Research step
step(researchAgent) {
id = "research"
name = "Research"
description = "Research the topic"
variables = mutableMapOf(
"topic" to variable("$.input.topic")
)
}
// Writing step
step(writingAgent) {
id = "writing"
name = "Writing"
description = "Write an article based on research"
after("research")
variables = mutableMapOf(
"research" to variable("$.steps.research.output.text")
)
}
// Define output mapping
output {
"researchResults" from "$.steps.research.output.text"
"article" from "$.steps.writing.output.text"
"wordCount" from {
"$.steps.writing.output.text" to { text ->
(text as? String)?.split(" ")?.size ?: 0
}
}
}
}
// Execute the workflow
val workflowEngine = WorkflowEngine()
workflowEngine.registerWorkflow("research-workflow", researchWorkflow)
val result = workflowEngine.executeWorkflow(
workflowId = "research-workflow",
input = mapOf("topic" to "Artificial Intelligence")
)
println("Workflow result: ${result.output}")
}
Workflow Components ✅
Workflow ✅
The Workflow
interface defines the core functionality of a workflow:
interface Workflow {
suspend fun execute(
input: Map<String, Any?>,
options: WorkflowExecuteOptions = WorkflowExecuteOptions()
): WorkflowResult
suspend fun streamExecute(
input: Map<String, Any?>,
options: WorkflowExecuteOptions = WorkflowExecuteOptions()
): Flow<WorkflowStatusUpdate>
}
WorkflowStep ✅
The WorkflowStep
interface defines individual steps within a workflow:
interface WorkflowStep {
val id: String
val name: String
val description: String
val after: List<String>
val variables: Map<String, VariableReference>
val config: StepConfig?
get() = null
val condition: (WorkflowContext) -> Boolean
get() = { true }
suspend fun execute(context: WorkflowContext): WorkflowStepResult
}
WorkflowContext ✅
The WorkflowContext
class holds the state and data during workflow execution:
data class WorkflowContext(
val input: Map<String, Any?>,
val steps: MutableMap<String, WorkflowStepResult> = mutableMapOf(),
val variables: MutableMap<String, Any?> = mutableMapOf(),
val runId: String? = null
) {
fun resolveReference(reference: VariableReference): Any? {
// Implementation...
}
fun resolveVariables(variables: Map<String, VariableReference>): Map<String, Any?> {
// Implementation...
}
}
WorkflowEngine ✅
The WorkflowEngine
class is responsible for executing workflows:
class WorkflowEngine(
private val stateStorage: WorkflowStateStorage = InMemoryWorkflowStateStorage()
) {
private val workflows = mutableMapOf<String, Workflow>()
fun registerWorkflow(workflowId: String, workflow: Workflow) {
workflows[workflowId] = workflow
}
suspend fun executeWorkflow(
workflowId: String,
input: Map<String, Any?>,
options: WorkflowExecuteOptions = WorkflowExecuteOptions()
): WorkflowResult {
// Implementation...
}
suspend fun streamExecuteWorkflow(
workflowId: String,
input: Map<String, Any?>,
options: WorkflowExecuteOptions = WorkflowExecuteOptions()
): Flow<WorkflowStatusUpdate> {
// Implementation...
}
}
Step Types ✅
Kastrax provides several built-in step types:
Agent Step ✅
The most common step type, which executes an agent with specific inputs:
step(agent) {
id = "generate-content"
name = "Generate Content"
description = "Generate content based on a topic"
variables = mutableMapOf(
"topic" to variable("$.input.topic"),
"style" to variable("$.input.style")
)
}
Function Step ✅
Executes a custom function:
functionStep {
id = "process-data"
name = "Process Data"
description = "Process data from previous steps"
after("collect-data")
function { context ->
val data = context.resolveReference(variable("$.steps.collect-data.output.data"))
// Process the data
mapOf("processedData" to processData(data))
}
}
Conditional Step ✅
Executes based on a condition:
step(agent) {
id = "optional-step"
name = "Optional Step"
description = "This step only executes under certain conditions"
after("previous-step")
condition { context ->
val quality = context.resolveReference(variable("$.steps.previous-step.output.quality"))
(quality as? Int) ?: 0 > 7
}
variables = mutableMapOf(
"input" to variable("$.steps.previous-step.output.text")
)
}
Subworkflow Step ✅
Executes another workflow as a step:
subworkflowStep {
id = "nested-workflow"
name = "Nested Workflow"
description = "Execute a nested workflow"
workflowId = "another-workflow"
inputMapping { context ->
mapOf(
"data" to context.resolveReference(variable("$.steps.previous-step.output.data"))
)
}
}
Data Flow ✅
Data flows through a workflow via variable references:
Variable References ✅
Variable references use a JSON path-like syntax to access data:
// Reference workflow input
val topicRef = variable("$.input.topic")
// Reference output from a previous step
val researchRef = variable("$.steps.research.output.text")
// Reference a global variable
val configRef = variable("$.variables.config")
Output Mapping ✅
Output mapping defines how to extract and transform the final workflow output:
output {
// Direct mapping
"researchResults" from "$.steps.research.output.text"
// Mapping with transformation
"wordCount" from {
"$.steps.writing.output.text" to { text ->
(text as? String)?.split(" ")?.size ?: 0
}
}
// Combining multiple sources
"summary" from {
"$.steps.research.output.text" to { research ->
"$.steps.writing.output.text" to { article ->
"Research: $research\n\nArticle: $article"
}
}
}
}
Error Handling ✅
Workflows can handle errors in several ways:
Retry Configuration ✅
step(agent) {
id = "unreliable-step"
name = "Unreliable Step"
description = "A step that might fail"
// Configure retries
config = StepConfig(
retryConfig = RetryConfig(
maxRetries = 3,
retryDelay = 1000, // 1 second
backoffMultiplier = 2.0 // Exponential backoff
)
)
}
Error Handlers ✅
// Create an error handling workflow engine
val workflowEngine = ErrorHandlingWorkflowEngine(
errorHandler = object : ErrorHandler {
override suspend fun handleStepError(
workflowId: String,
runId: String,
stepId: String,
error: Throwable
): ErrorHandlingAction {
return when {
error is TimeoutException -> ErrorHandlingAction.Retry(maxRetries = 3)
stepId == "optional-step" -> ErrorHandlingAction.Skip
else -> ErrorHandlingAction.Fail
}
}
}
)
Workflow Execution ✅
Workflows can be executed in different ways:
Synchronous Execution ✅
val result = workflowEngine.executeWorkflow(
workflowId = "my-workflow",
input = mapOf("key" to "value")
)
println("Workflow completed with output: ${result.output}")
Streaming Execution ✅
workflowEngine.streamExecuteWorkflow(
workflowId = "my-workflow",
input = mapOf("key" to "value")
).collect { update ->
when (update) {
is WorkflowStatusUpdate.Started -> println("Workflow started")
is WorkflowStatusUpdate.StepStarted -> println("Step started: ${update.stepId}")
is WorkflowStatusUpdate.StepCompleted -> println("Step completed: ${update.stepId}")
is WorkflowStatusUpdate.StepFailed -> println("Step failed: ${update.stepId}, error: ${update.error}")
is WorkflowStatusUpdate.Completed -> println("Workflow completed")
is WorkflowStatusUpdate.Failed -> println("Workflow failed: ${update.error}")
}
}
Execution Options ✅
val options = WorkflowExecuteOptions(
maxSteps = 20, // Maximum number of steps to execute
timeout = 120000, // 2 minutes timeout
onStepFinish = { stepResult ->
println("Step ${stepResult.stepId} finished with output: ${stepResult.output}")
},
onStepError = { stepId, error ->
println("Step $stepId failed with error: ${error.message}")
},
threadId = "conversation-123" // Optional thread ID for memory integration
)
val result = workflowEngine.executeWorkflow(
workflowId = "my-workflow",
input = mapOf("key" to "value"),
options = options
)
Workflow State Management ✅
Workflows maintain state during and after execution:
State Storage ✅
// In-memory state storage (default)
val inMemoryStorage = InMemoryWorkflowStateStorage()
// File-based state storage
val fileStorage = FileWorkflowStateStorage("workflows/state")
// Database state storage
val dbStorage = DatabaseWorkflowStateStorage(dataSource)
// Create a workflow engine with custom state storage
val workflowEngine = WorkflowEngine(stateStorage = fileStorage)
State Queries ✅
// Get workflow state
val state = workflowEngine.getWorkflowState("my-workflow", "run-123")
// Get all runs for a workflow
val runs = workflowEngine.getWorkflowRuns("my-workflow")
// Get active runs
val activeRuns = workflowEngine.getActiveWorkflowRuns()
Advanced Workflow Features ✅
Parallel Execution ✅
val parallelWorkflow = workflow("parallel-workflow", "Execute steps in parallel") {
// These steps have no dependencies, so they can run in parallel
step(agent1) {
id = "step1"
name = "Step 1"
description = "First parallel step"
}
step(agent2) {
id = "step2"
name = "Step 2"
description = "Second parallel step"
}
// This step depends on both parallel steps
step(agent3) {
id = "step3"
name = "Step 3"
description = "Final step after parallel execution"
after("step1", "step2")
variables = mutableMapOf(
"result1" to variable("$.steps.step1.output.result"),
"result2" to variable("$.steps.step2.output.result")
)
}
}
Workflow Composition ✅
// Create a workflow composer
val composer = WorkflowComposer("my-composer", workflowEngine)
// Compose workflows sequentially
val sequentialWorkflow = composer.composeSequential(
workflowName = "sequential-workflow",
description = "Execute workflows in sequence",
workflows = listOf(
"workflow1" to { input -> input }, // Input mapping for workflow1
"workflow2" to { input, prevOutput -> prevOutput } // Use previous output as input
)
)
// Compose workflows in parallel
val parallelWorkflow = composer.composeParallel(
workflowName = "parallel-workflow",
description = "Execute workflows in parallel",
workflows = mapOf(
"branch1" to "workflow1",
"branch2" to "workflow2"
),
inputMapping = { branch, input ->
// Map input for each branch
when (branch) {
"branch1" -> mapOf("key1" to input["value"])
"branch2" -> mapOf("key2" to input["value"])
else -> emptyMap()
}
},
mergeStep = MergeResultsStep(
id = "merge",
name = "Merge Results",
description = "Merge results from parallel workflows"
)
)
State Machines ✅
For complex workflows with multiple possible paths, you can use state machines:
// Define states
enum class OrderState {
NEW, PROCESSING, SHIPPED, DELIVERED, CANCELED
}
// Define events
sealed class OrderEvent {
data class PlaceOrder(val items: List<String>) : OrderEvent()
data class ProcessOrder(val paymentId: String) : OrderEvent()
data class ShipOrder(val trackingNumber: String) : OrderEvent()
data class DeliverOrder(val deliveryDate: String) : OrderEvent()
object CancelOrder : OrderEvent()
}
// Create a state machine
val orderStateMachine = BasicStateMachine<OrderState, OrderEvent, Map<String, Any>>(
initialState = OrderState.NEW,
initialContext = emptyMap(),
transitioner = object : StateTransitioner<OrderState, OrderEvent, Map<String, Any>> {
override suspend fun transition(
currentState: OrderState,
event: OrderEvent,
context: Map<String, Any>
): TransitionResult<OrderState, Map<String, Any>> {
// Define state transitions
val (nextState, updatedContext) = when (currentState to event) {
OrderState.NEW to is OrderEvent.PlaceOrder -> {
OrderState.PROCESSING to context + ("items" to event.items)
}
OrderState.PROCESSING to is OrderEvent.ProcessOrder -> {
OrderState.SHIPPED to context + ("paymentId" to event.paymentId)
}
OrderState.SHIPPED to is OrderEvent.ShipOrder -> {
OrderState.DELIVERED to context + ("trackingNumber" to event.trackingNumber)
}
OrderState.SHIPPED to is OrderEvent.DeliverOrder -> {
OrderState.DELIVERED to context + ("deliveryDate" to event.deliveryDate)
}
else to is OrderEvent.CancelOrder -> {
OrderState.CANCELED to context
}
else -> currentState to context
}
return TransitionResult(
nextState = nextState,
updatedContext = updatedContext,
sideEffects = emptyList()
)
}
}
)
// Use the state machine
val nextState = orderStateMachine.sendEvent(OrderEvent.PlaceOrder(listOf("item1", "item2")))
println("Next state: $nextState")
Complete Example ✅
Here’s a complete example of a sophisticated workflow:
import ai.kastrax.core.workflow.builder.workflow
import ai.kastrax.core.workflow.engine.WorkflowEngine
import ai.kastrax.core.agent.agent
import ai.kastrax.integrations.deepseek.deepSeek
import ai.kastrax.integrations.deepseek.DeepSeekModel
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
// Create agents
val researchAgent = agent {
name("ResearchAgent")
description("An agent that researches topics")
model = deepSeek {
apiKey("your-deepseek-api-key")
model(DeepSeekModel.DEEPSEEK_CHAT)
temperature(0.7)
}
}
val writingAgent = agent {
name("WritingAgent")
description("An agent that writes content based on research")
model = deepSeek {
apiKey("your-deepseek-api-key")
model(DeepSeekModel.DEEPSEEK_CHAT)
temperature(0.7)
}
}
val editingAgent = agent {
name("EditingAgent")
description("An agent that edits and improves content")
model = deepSeek {
apiKey("your-deepseek-api-key")
model(DeepSeekModel.DEEPSEEK_CHAT)
temperature(0.7)
}
}
val factCheckingAgent = agent {
name("FactCheckingAgent")
description("An agent that verifies facts in content")
model = deepSeek {
apiKey("your-deepseek-api-key")
model(DeepSeekModel.DEEPSEEK_CHAT)
temperature(0.7)
}
}
// Create a content creation workflow
val contentWorkflow = workflow("content-creation", "Create researched and edited content") {
// Research step
step(researchAgent) {
id = "research"
name = "Research"
description = "Research the topic thoroughly"
variables = mutableMapOf(
"topic" to variable("$.input.topic"),
"depth" to variable("$.input.researchDepth")
)
}
// Writing step
step(writingAgent) {
id = "writing"
name = "Writing"
description = "Write content based on research"
after("research")
variables = mutableMapOf(
"research" to variable("$.steps.research.output.text"),
"style" to variable("$.input.contentStyle"),
"length" to variable("$.input.contentLength")
)
}
// Parallel fact checking and editing
step(factCheckingAgent) {
id = "fact-checking"
name = "Fact Checking"
description = "Verify facts in the content"
after("writing")
variables = mutableMapOf(
"content" to variable("$.steps.writing.output.text"),
"research" to variable("$.steps.research.output.text")
)
}
step(editingAgent) {
id = "editing"
name = "Editing"
description = "Edit and improve the content"
after("writing")
variables = mutableMapOf(
"content" to variable("$.steps.writing.output.text"),
"style" to variable("$.input.contentStyle")
)
}
// Final revision step
step(editingAgent) {
id = "final-revision"
name = "Final Revision"
description = "Create the final version incorporating edits and fact checks"
after("fact-checking", "editing")
variables = mutableMapOf(
"original" to variable("$.steps.writing.output.text"),
"edits" to variable("$.steps.editing.output.text"),
"factChecks" to variable("$.steps.fact-checking.output.text")
)
}
// Conditional quality check step
step(editingAgent) {
id = "quality-check"
name = "Quality Check"
description = "Perform a quality check if requested"
after("final-revision")
condition { context ->
val performQualityCheck = context.resolveReference(variable("$.input.performQualityCheck"))
(performQualityCheck as? Boolean) ?: false
}
variables = mutableMapOf(
"content" to variable("$.steps.final-revision.output.text")
)
}
// Define output mapping
output {
"research" from "$.steps.research.output.text"
"draft" from "$.steps.writing.output.text"
"factChecks" from "$.steps.fact-checking.output.text"
"edits" from "$.steps.editing.output.text"
"finalContent" from "$.steps.final-revision.output.text"
"qualityScore" from {
"$.steps.quality-check.output.score" to { score ->
score ?: "No quality check performed"
}
}
"metadata" from {
"$.steps.writing.output.wordCount" to { wordCount ->
"$.steps.final-revision.output.wordCount" to { finalWordCount ->
mapOf(
"initialWordCount" to wordCount,
"finalWordCount" to finalWordCount,
"changePercentage" to calculatePercentage(wordCount, finalWordCount)
)
}
}
}
}
}
// Create workflow engine
val workflowEngine = WorkflowEngine()
workflowEngine.registerWorkflow("content-creation", contentWorkflow)
// Execute the workflow
val result = workflowEngine.executeWorkflow(
workflowId = "content-creation",
input = mapOf(
"topic" to "Artificial Intelligence Ethics",
"researchDepth" to "comprehensive",
"contentStyle" to "academic",
"contentLength" to "2000 words",
"performQualityCheck" to true
)
)
// Print the results
println("Workflow completed with status: ${if (result.success) "SUCCESS" else "FAILURE"}")
println("Final content: ${result.output["finalContent"]}")
println("Quality score: ${result.output["qualityScore"]}")
println("Metadata: ${result.output["metadata"]}")
}
// Helper function to calculate percentage change
fun calculatePercentage(initial: Any?, final: Any?): Double {
val initialValue = (initial as? Number)?.toDouble() ?: return 0.0
val finalValue = (final as? Number)?.toDouble() ?: return 0.0
if (initialValue == 0.0) return 0.0
return ((finalValue - initialValue) / initialValue) * 100.0
}
Best Practices ✅
- Step Granularity: Design steps with appropriate granularity - not too small to create overhead, not too large to lose flexibility
- Error Handling: Implement robust error handling with retries for unreliable operations
- Data Flow: Be explicit about data flow between steps using variable references
- Conditional Logic: Use conditions to create dynamic workflows that adapt to intermediate results
- Monitoring: Set up proper monitoring and logging for workflow execution
- State Management: Choose appropriate state storage based on your reliability requirements
- Testing: Test workflows with different inputs and edge cases
- Composition: Compose complex workflows from simpler, reusable workflows
Next Steps ✅
Now that you understand the workflow system, you can:
- Learn about workflow definition
- Explore workflow execution
- Implement agent integration with workflows
- Set up workflow state management