Skip to Content
DocsWorkflowsOverview

Kastrax AI Workflow System

The Kastrax Workflow System provides a powerful framework for orchestrating complex AI agent operations. Built on Kotlin’s concurrency model, it enables you to define sophisticated workflows with features like branching, parallel execution, resource suspension, and more. This system is designed to handle the complex, multi-step processes that are common in AI applications.

When to Use Workflows

Workflows are essential when your AI application requires orchestrating multiple operations in a structured manner. Consider using workflows when:

  • Complex Multi-step Processes: Your application needs to execute a sequence of AI operations with dependencies between steps
  • Conditional Logic: You need to implement branching paths based on intermediate results
  • Parallel Processing: Multiple operations need to be executed concurrently
  • User Interaction: Workflows need to pause and wait for user input before continuing
  • Error Handling: You need sophisticated error recovery mechanisms for AI operations
  • Observability: You want detailed tracking and monitoring of each step in your AI process

Kastrax’s workflow system provides:

  • A type-safe, Kotlin-based DSL for defining workflows and their steps
  • Support for both simple (linear) and advanced (branching, parallel) execution paths
  • Variable passing between steps with powerful reference resolution
  • Suspension and resumption capabilities for long-running or user-interactive workflows
  • Comprehensive error handling and recovery mechanisms
  • Detailed observability and debugging features to track each workflow run

Workflow Architecture

The Kastrax Workflow System is built on a modular architecture with several key components:

  1. Workflow Interface: Defines the core operations for workflow execution
  2. Workflow Steps: Individual units of work that can be combined into a workflow
  3. Workflow Context: Maintains state and data during workflow execution
  4. Variable References: Mechanism for passing data between steps
  5. Workflow Engine: Orchestrates the execution of steps according to defined dependencies

This architecture enables flexible, maintainable, and observable workflow definitions.

Example Workflow

Let’s examine a simple workflow that processes content creation with research, writing, and editing steps:

Creating a Basic Workflow

Here’s how to define a workflow using Kastrax’s Kotlin DSL:

ContentCreationWorkflow.kt
import ai.kastrax.core.agent.Agent import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Assume we have these agents defined elsewhere val researchAgent: Agent = /* ... */ val writingAgent: Agent = /* ... */ val editingAgent: Agent = /* ... */ // Create the workflow val contentWorkflow = workflow { name = "content-creation" description = "Create content about a topic" // Research step step(researchAgent) { id = "research" name = "Research" description = "Research the topic" variables = mutableMapOf( "topic" to variable("$.input.topic") ) } // Writing step (depends on research) step(writingAgent) { id = "writing" name = "Writing" description = "Write an article based on research" after("research") // This step runs after the research step variables = mutableMapOf( "research" to variable("$.steps.research.output.text") ) } // Editing step (depends on writing) step(editingAgent) { id = "editing" name = "Editing" description = "Edit the article" after("writing") // This step runs after the writing step variables = mutableMapOf( "draft" to variable("$.steps.writing.output.text") ) } // Define output mapping output { "researchSummary" from "$.steps.research.output.text" "finalArticle" from "$.steps.editing.output.text" "wordCount" from { "$.steps.editing.output.text" to { text -> (text as? String)?.split(" ")?.size ?: 0 } } } }

This workflow defines three steps (research, writing, and editing) with dependencies between them, and maps the final outputs.

Registering the Workflow

Register your workflow with the Kastrax system to enable execution, logging, and telemetry:

KastraxSetup.kt
import ai.kastrax.core.KastraxSystem // Create a Kastrax system instance val kastraxSystem = KastraxSystem() // Register the workflow kastraxSystem.registerWorkflow(contentWorkflow) // You can also register multiple workflows kastraxSystem.registerWorkflows( contentWorkflow, otherWorkflow, anotherWorkflow )

You can also create dynamic workflows at runtime using the workflow engine:

DynamicWorkflowCreation.kt
import ai.kastrax.core.workflow.dynamic.DynamicWorkflowGenerator // Create a dynamic workflow generator val workflowGenerator = DynamicWorkflowGenerator(kastraxSystem) // Create a workflow dynamically val dynamicWorkflow = workflowGenerator.createWorkflow( workflowName = "dynamic-workflow", description = "Dynamically created workflow" ) { // Define steps and flow here step(someAgent) { id = "dynamic-step" // ... step configuration } // ... more steps and configuration } // Register the dynamic workflow kastraxSystem.registerWorkflow(dynamicWorkflow)

Executing the Workflow

Execute your workflow programmatically:

ExecuteWorkflow.kt
import ai.kastrax.core.workflow.WorkflowExecuteOptions import kotlinx.coroutines.runBlocking fun main() = runBlocking { // Get the workflow engine val workflowEngine = kastraxSystem.workflowEngine // Execute the workflow val result = workflowEngine.executeWorkflow( workflowId = "content-creation", input = mapOf("topic" to "Artificial Intelligence in Healthcare"), options = WorkflowExecuteOptions( maxSteps = 10, // Maximum number of steps to execute timeout = 60000, // Timeout in milliseconds onStepFinish = { stepResult -> println("Step ${stepResult.stepId} completed") }, onStepError = { stepId, error -> println("Error in step $stepId: ${error.message}") } ) ) // Process the result if (result.success) { println("Workflow completed successfully") println("Final article: ${result.output["finalArticle"]}") println("Word count: ${result.output["wordCount"]}") } else { println("Workflow failed: ${result.error}") } }

You can also execute workflows via the Kastrax HTTP API (when running a Kastrax server):

curl --location 'http://localhost:8080/api/workflows/content-creation/execute' \ --header 'Content-Type: application/json' \ --data '{ "topic": "Artificial Intelligence in Healthcare" }'

This example demonstrates the essentials: define your workflow with steps, register it with the Kastrax system, then execute it with input data.

Workflow Step Types

Kastrax supports various types of workflow steps to handle different scenarios:

Agent Steps

Agent steps use AI agents to perform tasks:

step(agent) { id = "agent-step" name = "Agent Processing" description = "Use an agent to process data" variables = mutableMapOf( "input" to variable("$.input.data") ) }

Tool Steps

Tool steps execute specific tools or functions:

toolStep { id = "data-processing" name = "Process Data" description = "Process data using a specific tool" tool = DataProcessingTool() variables = mutableMapOf( "data" to variable("$.input.rawData") ) }

Conditional Steps

Conditional steps execute based on conditions:

conditionalStep { id = "quality-check" name = "Quality Check" description = "Check if the content meets quality standards" condition { context -> val wordCount = context.getVariable("$.steps.writing.output.wordCount") as? Int ?: 0 wordCount > 500 } onTrue { step(qualityAgent) { id = "quality-review" // ... configuration } } onFalse { step(revisionAgent) { id = "content-revision" // ... configuration } } }

Human-in-the-Loop Steps

Steps that require human input or approval:

humanStep { id = "human-review" name = "Human Review" description = "Get human approval for the content" prompt { context -> val content = context.getVariable("$.steps.editing.output.text") as? String ?: "" "Please review the following content:\n\n$content\n\nApprove or suggest changes." } timeoutMs = 86400000 // 24 hours in milliseconds onTimeout { // Handle timeout case step(notificationAgent) { id = "timeout-notification" // ... configuration } } }

Advanced Workflow Features

Parallel Execution

Execute multiple steps concurrently:

parallelSteps { step(researchAgent1) { id = "research-source1" // ... configuration } step(researchAgent2) { id = "research-source2" // ... configuration } step(researchAgent3) { id = "research-source3" // ... configuration } } // Continue with a step that uses results from all parallel steps step(synthesisAgent) { id = "research-synthesis" after("research-source1", "research-source2", "research-source3") variables = mutableMapOf( "source1" to variable("$.steps.research-source1.output.text"), "source2" to variable("$.steps.research-source2.output.text"), "source3" to variable("$.steps.research-source3.output.text") ) }

Error Handling

Implement sophisticated error handling:

step(riskAgent) { id = "risk-analysis" // ... configuration onError { error -> // Log the error println("Error in risk analysis: ${error.message}") // Execute fallback step step(fallbackAgent) { id = "risk-analysis-fallback" variables = mutableMapOf( "error" to variable(error.message) ) } } }

Workflow Variables

Workflow variables provide a powerful mechanism for passing data between steps:

// Define a workflow with variables val workflowWithVariables = workflow { name = "data-processing-workflow" // Define input variables input { variable("dataSource", String::class) variable("processingLevel", Int::class, defaultValue = 1) } // Use variables in steps step(dataAgent) { id = "data-extraction" variables = mutableMapOf( "source" to variable("$.input.dataSource"), "level" to variable("$.input.processingLevel") ) } // Use JSONPath expressions to access nested properties step(analysisAgent) { id = "data-analysis" after("data-extraction") variables = mutableMapOf( "rawData" to variable("$.steps.data-extraction.output.data"), "metadata" to variable("$.steps.data-extraction.output.metadata.timestamp") ) } }

Observability and Monitoring

Kastrax workflows provide comprehensive observability features:

// Configure workflow monitoring val monitoredWorkflow = workflow { name = "monitored-workflow" // ... workflow configuration monitoring { onStart { workflowId, input -> println("Workflow $workflowId started with input: $input") } onStepStart { workflowId, stepId -> println("Step $stepId in workflow $workflowId started") } onStepComplete { workflowId, stepId, output -> println("Step $stepId in workflow $workflowId completed with output: $output") } onComplete { workflowId, output -> println("Workflow $workflowId completed with output: $output") } onError { workflowId, error -> println("Workflow $workflowId failed with error: ${error.message}") } } }

Integration with Actor Model

One of Kastrax’s unique features is the integration of the workflow system with the actor model, enabling distributed workflow execution:

import ai.kastrax.actor.ActorSystem import ai.kastrax.actor.Props import ai.kastrax.workflow.WorkflowActor import ai.kastrax.workflow.messages.* import kotlinx.coroutines.runBlocking fun main() = runBlocking { // Create an actor system val system = ActorSystem("workflow-system") // Create a workflow actor val workflowActor = system.actorOf( Props.create(WorkflowActor::class.java, contentWorkflow), "workflow-actor" ) // Send a workflow execution message val result = system.ask<WorkflowResult>( workflowActor, ExecuteWorkflowMessage(input = mapOf("topic" to "AI in Healthcare")) ) // Process the result when (result) { is WorkflowResult.Success -> { println("Workflow executed successfully") println("Final article: ${result.output["finalArticle"]}") } is WorkflowResult.Error -> { println("Workflow execution failed: ${result.error}") } } // Shutdown the actor system when done system.terminate() }

This integration enables building sophisticated multi-agent systems where workflows can be distributed across different nodes and executed concurrently.

Real-World Use Cases

Kastrax workflows are ideal for a variety of AI agent applications:

Content Creation Pipeline

Orchestrate a complete content creation process:

  • Research step gathers information from multiple sources
  • Planning step creates an outline based on research
  • Writing step generates the initial content
  • Editing step refines and improves the content
  • Review step (human-in-the-loop) gets final approval

Customer Support Automation

Handle complex customer inquiries:

  • Classification step determines the type of inquiry
  • Information retrieval step gathers relevant knowledge base articles
  • Response generation step creates a personalized response
  • Escalation step (conditional) routes complex issues to human agents

Data Analysis Workflow

Process and analyze large datasets:

  • Data collection step gathers information from multiple sources
  • Preprocessing step cleans and normalizes the data
  • Analysis step extracts insights from the data
  • Visualization step creates charts and graphs
  • Reporting step generates a comprehensive report

More Resources

Last updated on