Error Handling in Kastrax AI Workflows ✅
Robust error handling is essential for production AI workflows, especially when orchestrating multiple agents and external services. Kastrax provides a comprehensive error handling system that allows your workflows to recover from failures, implement fallback strategies, and gracefully degrade when necessary.
Error Handling Architecture ✅
Kastrax implements a multi-layered approach to error handling in workflows:
- Step-Level Error Handling - Handle errors within individual steps
- Workflow-Level Recovery - Implement alternative paths and fallback strategies
- Retry Mechanisms - Automatically retry failed operations with configurable policies
- Error Propagation - Control how errors flow through the workflow
- Monitoring and Observability - Track and respond to errors across the workflow
- Graceful Degradation - Continue workflow execution with reduced functionality
This layered approach ensures that errors can be handled at the most appropriate level, from localized recovery to workflow-wide strategies.
Step-Level Error Handling ✅
The most direct way to handle errors in Kastrax is at the step level, where you can catch and process exceptions, implement retry logic, and provide fallback results.
Try-Catch Pattern
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a workflow with error handling in steps
val robustWorkflow = workflow {
name = "robust-workflow"
description = "Workflow with comprehensive error handling"
// Step with try-catch error handling
step(dataProcessingAgent) {
id = "process-data"
name = "Process Data"
description = "Process data with error handling"
// Custom execution with error handling
execute { context ->
try {
// Attempt the primary operation
val data = context.getVariable("data") as? Map<String, Any> ?: emptyMap()
val result = processData(data) // This might throw exceptions
// Return successful result
mapOf(
"success" to true,
"result" to result,
"error" to null
)
} catch (e: Exception) {
// Log the error
println("Data processing error: ${e.message}")
// Return a graceful fallback result
mapOf(
"success" to false,
"result" to null,
"error" to e.message,
"fallbackData" to "Default value"
)
}
}
}
// Subsequent step that handles the error condition
conditionalStep {
id = "handle-result"
name = "Handle Processing Result"
description = "Process the result or handle the error"
after("process-data")
// Check if previous step succeeded
condition { context ->
val success = context.getVariable("$.steps.process-data.output.success") as? Boolean ?: false
success
}
// Handle successful case
onTrue {
step(successAgent) {
id = "handle-success"
name = "Handle Success"
description = "Process successful result"
variables = mutableMapOf(
"result" to variable("$.steps.process-data.output.result")
)
}
}
// Handle error case
onFalse {
step(errorHandlingAgent) {
id = "handle-error"
name = "Handle Error"
description = "Process error and fallback data"
variables = mutableMapOf(
"error" to variable("$.steps.process-data.output.error"),
"fallbackData" to variable("$.steps.process-data.output.fallbackData")
)
}
}
}
}
// Example data processing function that might throw exceptions
fun processData(data: Map<String, Any>): Map<String, Any> {
// Simulate potential errors
if (data.isEmpty()) {
throw IllegalArgumentException("Empty data cannot be processed")
}
// Process the data
return mapOf(
"processedData" to "Processed: ${data}",
"timestamp" to System.currentTimeMillis()
)
}
Retry Mechanisms ✅
Kastrax provides built-in retry mechanisms for steps that fail due to transient errors. This is particularly useful for steps that interact with external services or resources that might experience temporary unavailability.
Configuring Retries
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.retry.RetryConfig
import ai.kastrax.core.workflow.retry.ExponentialBackoff
// Create a workflow with retry configuration
val apiWorkflow = workflow {
name = "api-workflow"
description = "Workflow that interacts with external APIs"
// Workflow-level retry configuration (default for all steps)
defaultRetryConfig = RetryConfig(
maxAttempts = 3, // Number of retry attempts
backoff = ExponentialBackoff(
initialDelayMs = 1000, // Initial delay in milliseconds
factor = 2.0, // Exponential factor
maxDelayMs = 10000 // Maximum delay in milliseconds
)
)
// Step with custom retry configuration
step(apiCallAgent) {
id = "call-api"
name = "Call External API"
description = "Make an API call with retry logic"
// Step-level retry configuration (overrides workflow-level)
retry = RetryConfig(
maxAttempts = 5, // This step will retry up to 5 times
backoff = ExponentialBackoff(
initialDelayMs = 2000, // Start with 2 seconds
factor = 1.5, // Slower growth factor
maxDelayMs = 30000 // Up to 30 seconds
),
retryableExceptions = listOf( // Only retry these exceptions
java.net.ConnectException::class,
java.net.SocketTimeoutException::class,
io.ktor.client.network.sockets.ConnectTimeoutException::class
)
)
variables = mutableMapOf(
"endpoint" to variable("$.input.endpoint"),
"payload" to variable("$.input.payload")
)
}
// Another step with conditional retry logic
step(dataFetchAgent) {
id = "fetch-data"
name = "Fetch Data"
description = "Fetch data with conditional retry"
after("call-api")
// Custom retry condition
retry = RetryConfig(
maxAttempts = 3,
backoff = ExponentialBackoff(initialDelayMs = 1000),
retryCondition = { exception, attempt ->
// Only retry rate limit errors, and only up to 3 times
exception is RateLimitException && attempt <= 3
}
)
variables = mutableMapOf(
"apiResult" to variable("$.steps.call-api.output.result")
)
}
}
// Custom exception for rate limiting
class RateLimitException(message: String) : Exception(message)
## Workflow-Level Recovery Strategies ✅
Kastrax provides several mechanisms for implementing workflow-level recovery strategies, including conditional branching, fallback paths, and error recovery steps.
### Conditional Branching
```kotlin filename="ConditionalErrorHandling.kt"
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
// Create a workflow with conditional branching for error handling
val dataWorkflow = workflow {
name = "data-workflow"
description = "Process data with error handling branches"
// Initial data fetching step
step(dataFetchAgent) {
id = "fetch-data"
name = "Fetch Data"
description = "Retrieve data from external source"
variables = mutableMapOf(
"source" to variable("$.input.dataSource"),
"credentials" to variable("$.input.credentials")
)
}
// Conditional branching based on fetch result
conditionalStep {
id = "check-fetch-result"
name = "Check Fetch Result"
description = "Determine next steps based on fetch result"
after("fetch-data")
// Check if data fetch was successful
condition { context ->
val success = context.getVariable("$.steps.fetch-data.output.success") as? Boolean ?: false
val dataSize = context.getVariable("$.steps.fetch-data.output.data")?.let {
when (it) {
is List<*> -> it.size
is Map<*, *> -> it.size
is String -> it.length
else -> 0
}
} ?: 0
success && dataSize > 0
}
// Success path - process the data
onTrue {
step(dataProcessingAgent) {
id = "process-data"
name = "Process Data"
description = "Process the fetched data"
variables = mutableMapOf(
"data" to variable("$.steps.fetch-data.output.data")
)
}
}
// Error path - handle the failure
onFalse {
// Check what type of error occurred
conditionalStep {
id = "diagnose-error"
name = "Diagnose Error"
description = "Determine the type of error"
condition { context ->
val errorType = context.getVariable("$.steps.fetch-data.output.errorType") as? String
errorType == "auth_failure"
}
// Authentication error path
onTrue {
step(authRepairAgent) {
id = "repair-auth"
name = "Repair Authentication"
description = "Attempt to fix authentication issues"
variables = mutableMapOf(
"credentials" to variable("$.input.credentials"),
"error" to variable("$.steps.fetch-data.output.error")
)
}
}
// Other error types
onFalse {
step(fallbackDataAgent) {
id = "use-fallback-data"
name = "Use Fallback Data"
description = "Retrieve data from fallback source"
variables = mutableMapOf(
"originalSource" to variable("$.input.dataSource"),
"fallbackSource" to variable("$.input.fallbackSource", defaultValue = "cache")
)
}
}
}
}
}
// Final step that merges results from all possible paths
step(resultMergingAgent) {
id = "merge-results"
name = "Merge Results"
description = "Combine results from all possible paths"
// This step will run after any of the possible paths
execute { context ->
// Check which path was taken and get appropriate data
val processedData = context.getVariable("$.steps.process-data.output.result")
val fallbackData = context.getVariable("$.steps.use-fallback-data.output.data")
val repairedAuth = context.getVariable("$.steps.repair-auth.output.success")
// Determine the final result based on which path succeeded
when {
processedData != null -> {
mapOf(
"source" to "primary",
"data" to processedData,
"complete" to true
)
}
fallbackData != null -> {
mapOf(
"source" to "fallback",
"data" to fallbackData,
"complete" to true
)
}
repairedAuth == true -> {
mapOf(
"source" to "repaired",
"message" to "Authentication repaired, please retry operation",
"complete" to false
)
}
else -> {
mapOf(
"source" to "none",
"error" to "All recovery paths failed",
"complete" to false
)
}
}
}
}
}
Monitoring and Observability ✅
Kastrax provides comprehensive monitoring capabilities for tracking workflow execution and detecting errors:
import ai.kastrax.core.KastraxSystem
import ai.kastrax.core.workflow.WorkflowExecuteOptions
import ai.kastrax.core.workflow.monitoring.WorkflowMonitor
import ai.kastrax.core.workflow.monitoring.StepStatus
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val kastraxSystem = KastraxSystem()
// Create a workflow monitor
val workflowMonitor = WorkflowMonitor(kastraxSystem)
// Execute a workflow with monitoring
val result = kastraxSystem.workflowEngine.executeWorkflow(
workflowId = "data-workflow",
input = mapOf(
"dataSource" to "api.example.com/data",
"credentials" to mapOf("apiKey" to "your-api-key")
),
options = WorkflowExecuteOptions(
// Monitor step execution
onStepStart = { stepId, input ->
println("Starting step: $stepId with input: $input")
},
onStepFinish = { stepResult ->
println("Step ${stepResult.stepId} completed with status: ${stepResult.status}")
// Check for errors
if (stepResult.status == StepStatus.ERROR) {
println("Error in step ${stepResult.stepId}: ${stepResult.error}")
// Log detailed error information
logStepError(
stepId = stepResult.stepId,
error = stepResult.error,
context = stepResult.context
)
}
},
// Monitor workflow completion
onComplete = { workflowResult ->
println("Workflow completed with status: ${workflowResult.status}")
// Check for workflow-level errors
if (!workflowResult.success) {
println("Workflow failed: ${workflowResult.error}")
// Send alerts for critical workflows
sendErrorAlert(
workflowId = "data-workflow",
error = workflowResult.error,
steps = workflowResult.steps
)
}
}
)
)
// Analyze workflow execution
val executionAnalysis = workflowMonitor.analyzeExecution(result.executionId)
println("Execution analysis: $executionAnalysis")
// Check for specific error patterns
val errorPatterns = workflowMonitor.detectErrorPatterns(result.executionId)
if (errorPatterns.isNotEmpty()) {
println("Detected error patterns: $errorPatterns")
// Take remedial action based on error patterns
handleErrorPatterns(errorPatterns)
}
}
// Helper functions for error handling
fun logStepError(stepId: String, error: Throwable?, context: Map<String, Any?>) {
// Log detailed error information to monitoring system
// In a real implementation, this would send data to a logging service
println("DETAILED ERROR LOG - Step: $stepId, Error: ${error?.message}")
println("Context: $context")
}
fun sendErrorAlert(workflowId: String, error: Throwable?, steps: Map<String, Any>) {
// Send alerts to appropriate channels
// In a real implementation, this might send emails, Slack messages, etc.
println("ALERT: Workflow $workflowId failed with error: ${error?.message}")
}
fun handleErrorPatterns(patterns: List<String>) {
// Take action based on recognized error patterns
// This might involve automatic remediation, escalation, etc.
patterns.forEach { pattern ->
println("Handling error pattern: $pattern")
}
}
Error Propagation and Handling ✅
Kastrax provides fine-grained control over how errors propagate through workflows and how they’re handled at different levels.
Error Propagation Control
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
import ai.kastrax.core.workflow.error.ErrorHandlingMode
// Create a workflow with controlled error propagation
val criticalWorkflow = workflow {
name = "critical-workflow"
description = "Workflow with controlled error propagation"
// Set default error handling mode for the workflow
errorHandlingMode = ErrorHandlingMode.CONTINUE_ON_ERROR
// First step - critical operation
step(criticalOperationAgent) {
id = "critical-operation"
name = "Critical Operation"
description = "Perform a critical operation that must succeed"
// Override workflow-level error handling - this step must succeed
errorHandlingMode = ErrorHandlingMode.FAIL_WORKFLOW
variables = mutableMapOf(
"input" to variable("$.input.criticalData")
)
}
// Second step - important but not critical
step(importantOperationAgent) {
id = "important-operation"
name = "Important Operation"
description = "Perform an important but non-critical operation"
after("critical-operation")
// This step will be attempted, but failure won't stop the workflow
errorHandlingMode = ErrorHandlingMode.CONTINUE_ON_ERROR
variables = mutableMapOf(
"criticalResult" to variable("$.steps.critical-operation.output.result")
)
}
// Third step - optional enhancement
step(enhancementAgent) {
id = "enhancement"
name = "Enhancement"
description = "Perform an optional enhancement"
after("important-operation")
// This step is optional - errors are logged but ignored
errorHandlingMode = ErrorHandlingMode.IGNORE_ERROR
variables = mutableMapOf(
"baseData" to variable("$.steps.critical-operation.output.result"),
"additionalData" to variable("$.steps.important-operation.output.result", defaultValue = null)
)
}
// Final step - result aggregation
step(resultAggregationAgent) {
id = "aggregate-results"
name = "Aggregate Results"
description = "Combine results from all steps"
after("enhancement")
execute { context ->
// Get results from previous steps, handling potential failures
val criticalResult = context.getVariable("$.steps.critical-operation.output.result")
// For steps that might have failed, check status and handle accordingly
val importantStepStatus = context.getStepStatus("important-operation")
val importantResult = if (importantStepStatus == "success") {
context.getVariable("$.steps.important-operation.output.result")
} else {
null
}
val enhancementStepStatus = context.getStepStatus("enhancement")
val enhancementResult = if (enhancementStepStatus == "success") {
context.getVariable("$.steps.enhancement.output.result")
} else {
null
}
// Aggregate results based on what succeeded
mapOf(
"criticalResult" to criticalResult,
"importantResult" to importantResult,
"enhancementResult" to enhancementResult,
"completionLevel" to when {
enhancementResult != null -> "full"
importantResult != null -> "standard"
else -> "minimal"
},
"errors" to mapOf(
"importantOperation" to (importantStepStatus != "success"),
"enhancement" to (enhancementStepStatus != "success")
)
)
}
}
}
Custom Error Handlers
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable
import ai.kastrax.core.workflow.error.ErrorHandler
// Create a workflow with custom error handlers
val resilientWorkflow = workflow {
name = "resilient-workflow"
description = "Workflow with custom error handlers"
// Define a global error handler for the workflow
errorHandler = ErrorHandler { stepId, error, context ->
println("Error in step $stepId: ${error.message}")
// Log the error to a monitoring system
logError(stepId, error, context)
// Return a default result for the failed step
mapOf(
"error" to error.message,
"errorType" to error.javaClass.simpleName,
"recovered" to true,
"defaultResult" to "Error recovery value"
)
}
// First step with a custom error handler
step(dataProcessingAgent) {
id = "process-data"
name = "Process Data"
description = "Process data with custom error handling"
// Step-specific error handler
errorHandler = ErrorHandler { _, error, context ->
// Check the specific type of error
when (error) {
is IllegalArgumentException -> {
// Handle validation errors
mapOf(
"error" to "Validation error: ${error.message}",
"errorType" to "validation",
"recovered" to true,
"validationErrors" to listOf(error.message)
)
}
is java.io.IOException -> {
// Handle I/O errors
mapOf(
"error" to "I/O error: ${error.message}",
"errorType" to "io",
"recovered" to false
)
}
else -> {
// Handle other errors
mapOf(
"error" to "Unknown error: ${error.message}",
"errorType" to "unknown",
"recovered" to false
)
}
}
}
variables = mutableMapOf(
"data" to variable("$.input.data")
)
}
// Additional steps...
}
// Helper function for error logging
fun logError(stepId: String, error: Throwable, context: Map<String, Any?>) {
// In a real implementation, this would log to a monitoring system
println("LOGGING ERROR - Step: $stepId, Error: ${error.message}, Context: $context")
}
Best Practices for Error Handling ✅
1. Design for Failure
Assume that any step in your workflow might fail and design accordingly:
// Good: Design steps with error handling in mind
step(apiCallAgent) {
id = "call-api"
execute { context ->
try {
val result = callExternalApi(context.getVariable("endpoint") as String)
mapOf("result" to result, "success" to true)
} catch (e: Exception) {
mapOf("error" to e.message, "success" to false)
}
}
}
// Avoid: Assuming steps will always succeed
// step(apiCallAgent) {
// id = "call-api"
// execute { context ->
// val result = callExternalApi(context.getVariable("endpoint") as String)
// mapOf("result" to result)
// }
// }
2. Use Appropriate Error Handling Modes
Choose the right error handling mode for each step based on its importance:
// Critical steps should fail the workflow if they fail
step(paymentProcessingAgent) {
id = "process-payment"
errorHandlingMode = ErrorHandlingMode.FAIL_WORKFLOW
}
// Non-critical steps can continue with errors
step(analyticsAgent) {
id = "record-analytics"
errorHandlingMode = ErrorHandlingMode.CONTINUE_ON_ERROR
}
// Optional steps can ignore errors entirely
step(notificationAgent) {
id = "send-notification"
errorHandlingMode = ErrorHandlingMode.IGNORE_ERROR
}
3. Implement Comprehensive Retry Strategies
Use sophisticated retry strategies for different types of operations:
// Network operations might need exponential backoff
step(networkAgent) {
id = "network-operation"
retry = RetryConfig(
maxAttempts = 5,
backoff = ExponentialBackoff(initialDelayMs = 1000, factor = 2.0)
)
}
// Database operations might need different retry logic
step(databaseAgent) {
id = "database-operation"
retry = RetryConfig(
maxAttempts = 3,
backoff = LinearBackoff(delayMs = 2000), // Consistent delay
retryableExceptions = listOf(DatabaseTimeoutException::class)
)
}
4. Provide Detailed Error Context
Include comprehensive information in error outputs:
// Return detailed error information
execute { context ->
try {
// Operation that might fail
} catch (e: Exception) {
mapOf(
"error" to e.message,
"errorType" to e.javaClass.simpleName,
"timestamp" to System.currentTimeMillis(),
"context" to mapOf(
"input" to context.getVariable("input"),
"userId" to context.getVariable("$.input.userId"),
"operation" to "data-processing"
),
"stackTrace" to e.stackTraceToString()
)
}
}
Advanced Error Handling ✅
For more complex error handling scenarios, Kastrax provides advanced mechanisms:
Circuit Breakers
Implement circuit breakers to prevent cascading failures:
// Circuit breaker pattern implementation
class CircuitBreaker(
private val maxFailures: Int = 3,
private val resetTimeoutMs: Long = 60000
) {
private var failures = 0
private var lastFailureTime = 0L
private var state = State.CLOSED
enum class State { CLOSED, OPEN, HALF_OPEN }
fun execute(operation: () -> Map<String, Any>): Map<String, Any> {
// Check if circuit is open
if (state == State.OPEN) {
val now = System.currentTimeMillis()
if (now - lastFailureTime > resetTimeoutMs) {
// Try to reset after timeout
state = State.HALF_OPEN
} else {
// Circuit is open, return fast failure
return mapOf(
"error" to "Circuit breaker open",
"success" to false,
"circuitState" to state
)
}
}
return try {
val result = operation()
// If successful in half-open state, reset the circuit
if (state == State.HALF_OPEN) {
state = State.CLOSED
failures = 0
}
result
} catch (e: Exception) {
failures++
lastFailureTime = System.currentTimeMillis()
// Open circuit if too many failures
if (failures >= maxFailures) {
state = State.OPEN
}
mapOf(
"error" to e.message,
"success" to false,
"circuitState" to state
)
}
}
}
Integration with Actor Model
Kastrax’s error handling system integrates seamlessly with the actor model for distributed error handling:
import ai.kastrax.actor.ActorSystem
import ai.kastrax.actor.Props
import ai.kastrax.workflow.WorkflowActor
import ai.kastrax.workflow.messages.*
import ai.kastrax.workflow.error.ErrorHandlingActor
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
// Create an actor system
val system = ActorSystem("workflow-system")
// Create an error handling actor
val errorHandlingActor = system.actorOf(
Props.create(ErrorHandlingActor::class.java),
"error-handler"
)
// Create a workflow actor with error handling
val workflowActor = system.actorOf(
Props.create(WorkflowActor::class.java, resilientWorkflow, errorHandlingActor),
"workflow-actor"
)
// Execute the workflow
val result = system.ask<WorkflowResult>(
workflowActor,
ExecuteWorkflowMessage(input = mapOf("data" to "test-data"))
)
// Check for errors
if (result is WorkflowResult.Error) {
println("Workflow execution failed: ${result.error}")
// Send error recovery message
system.tell(
errorHandlingActor,
RecoverWorkflowMessage(
workflowId = resilientWorkflow.id,
executionId = result.executionId,
recoveryStrategy = "retry"
)
)
}
// Shutdown the actor system when done
system.terminate()
}
Conclusion ✅
Robust error handling is essential for production AI workflows. Kastrax provides a comprehensive suite of error handling capabilities that enable you to create resilient, fault-tolerant workflows that can recover from failures and continue operating even in challenging conditions.
By combining step-level error handling, workflow-level recovery strategies, sophisticated retry mechanisms, and comprehensive monitoring, you can build AI agent workflows that gracefully handle errors and provide reliable service to your users.
Remember that effective error handling is not just about preventing crashes—it’s about designing systems that can adapt to failures, recover automatically when possible, and provide clear, actionable information when human intervention is required.