Skip to Content
DocsWorkflowsError Handling

Error Handling in Kastrax AI Workflows ✅

Robust error handling is essential for production AI workflows, especially when orchestrating multiple agents and external services. Kastrax provides a comprehensive error handling system that allows your workflows to recover from failures, implement fallback strategies, and gracefully degrade when necessary.

Error Handling Architecture ✅

Kastrax implements a multi-layered approach to error handling in workflows:

  1. Step-Level Error Handling - Handle errors within individual steps
  2. Workflow-Level Recovery - Implement alternative paths and fallback strategies
  3. Retry Mechanisms - Automatically retry failed operations with configurable policies
  4. Error Propagation - Control how errors flow through the workflow
  5. Monitoring and Observability - Track and respond to errors across the workflow
  6. Graceful Degradation - Continue workflow execution with reduced functionality

This layered approach ensures that errors can be handled at the most appropriate level, from localized recovery to workflow-wide strategies.

Step-Level Error Handling ✅

The most direct way to handle errors in Kastrax is at the step level, where you can catch and process exceptions, implement retry logic, and provide fallback results.

Try-Catch Pattern

StepErrorHandling.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with error handling in steps val robustWorkflow = workflow { name = "robust-workflow" description = "Workflow with comprehensive error handling" // Step with try-catch error handling step(dataProcessingAgent) { id = "process-data" name = "Process Data" description = "Process data with error handling" // Custom execution with error handling execute { context -> try { // Attempt the primary operation val data = context.getVariable("data") as? Map<String, Any> ?: emptyMap() val result = processData(data) // This might throw exceptions // Return successful result mapOf( "success" to true, "result" to result, "error" to null ) } catch (e: Exception) { // Log the error println("Data processing error: ${e.message}") // Return a graceful fallback result mapOf( "success" to false, "result" to null, "error" to e.message, "fallbackData" to "Default value" ) } } } // Subsequent step that handles the error condition conditionalStep { id = "handle-result" name = "Handle Processing Result" description = "Process the result or handle the error" after("process-data") // Check if previous step succeeded condition { context -> val success = context.getVariable("$.steps.process-data.output.success") as? Boolean ?: false success } // Handle successful case onTrue { step(successAgent) { id = "handle-success" name = "Handle Success" description = "Process successful result" variables = mutableMapOf( "result" to variable("$.steps.process-data.output.result") ) } } // Handle error case onFalse { step(errorHandlingAgent) { id = "handle-error" name = "Handle Error" description = "Process error and fallback data" variables = mutableMapOf( "error" to variable("$.steps.process-data.output.error"), "fallbackData" to variable("$.steps.process-data.output.fallbackData") ) } } } } // Example data processing function that might throw exceptions fun processData(data: Map<String, Any>): Map<String, Any> { // Simulate potential errors if (data.isEmpty()) { throw IllegalArgumentException("Empty data cannot be processed") } // Process the data return mapOf( "processedData" to "Processed: ${data}", "timestamp" to System.currentTimeMillis() ) }

Retry Mechanisms ✅

Kastrax provides built-in retry mechanisms for steps that fail due to transient errors. This is particularly useful for steps that interact with external services or resources that might experience temporary unavailability.

Configuring Retries

RetryConfiguration.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.retry.RetryConfig import ai.kastrax.core.workflow.retry.ExponentialBackoff // Create a workflow with retry configuration val apiWorkflow = workflow { name = "api-workflow" description = "Workflow that interacts with external APIs" // Workflow-level retry configuration (default for all steps) defaultRetryConfig = RetryConfig( maxAttempts = 3, // Number of retry attempts backoff = ExponentialBackoff( initialDelayMs = 1000, // Initial delay in milliseconds factor = 2.0, // Exponential factor maxDelayMs = 10000 // Maximum delay in milliseconds ) ) // Step with custom retry configuration step(apiCallAgent) { id = "call-api" name = "Call External API" description = "Make an API call with retry logic" // Step-level retry configuration (overrides workflow-level) retry = RetryConfig( maxAttempts = 5, // This step will retry up to 5 times backoff = ExponentialBackoff( initialDelayMs = 2000, // Start with 2 seconds factor = 1.5, // Slower growth factor maxDelayMs = 30000 // Up to 30 seconds ), retryableExceptions = listOf( // Only retry these exceptions java.net.ConnectException::class, java.net.SocketTimeoutException::class, io.ktor.client.network.sockets.ConnectTimeoutException::class ) ) variables = mutableMapOf( "endpoint" to variable("$.input.endpoint"), "payload" to variable("$.input.payload") ) } // Another step with conditional retry logic step(dataFetchAgent) { id = "fetch-data" name = "Fetch Data" description = "Fetch data with conditional retry" after("call-api") // Custom retry condition retry = RetryConfig( maxAttempts = 3, backoff = ExponentialBackoff(initialDelayMs = 1000), retryCondition = { exception, attempt -> // Only retry rate limit errors, and only up to 3 times exception is RateLimitException && attempt <= 3 } ) variables = mutableMapOf( "apiResult" to variable("$.steps.call-api.output.result") ) } } // Custom exception for rate limiting class RateLimitException(message: String) : Exception(message) ## Workflow-Level Recovery Strategies ✅ Kastrax provides several mechanisms for implementing workflow-level recovery strategies, including conditional branching, fallback paths, and error recovery steps. ### Conditional Branching ```kotlin filename="ConditionalErrorHandling.kt" import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with conditional branching for error handling val dataWorkflow = workflow { name = "data-workflow" description = "Process data with error handling branches" // Initial data fetching step step(dataFetchAgent) { id = "fetch-data" name = "Fetch Data" description = "Retrieve data from external source" variables = mutableMapOf( "source" to variable("$.input.dataSource"), "credentials" to variable("$.input.credentials") ) } // Conditional branching based on fetch result conditionalStep { id = "check-fetch-result" name = "Check Fetch Result" description = "Determine next steps based on fetch result" after("fetch-data") // Check if data fetch was successful condition { context -> val success = context.getVariable("$.steps.fetch-data.output.success") as? Boolean ?: false val dataSize = context.getVariable("$.steps.fetch-data.output.data")?.let { when (it) { is List<*> -> it.size is Map<*, *> -> it.size is String -> it.length else -> 0 } } ?: 0 success && dataSize > 0 } // Success path - process the data onTrue { step(dataProcessingAgent) { id = "process-data" name = "Process Data" description = "Process the fetched data" variables = mutableMapOf( "data" to variable("$.steps.fetch-data.output.data") ) } } // Error path - handle the failure onFalse { // Check what type of error occurred conditionalStep { id = "diagnose-error" name = "Diagnose Error" description = "Determine the type of error" condition { context -> val errorType = context.getVariable("$.steps.fetch-data.output.errorType") as? String errorType == "auth_failure" } // Authentication error path onTrue { step(authRepairAgent) { id = "repair-auth" name = "Repair Authentication" description = "Attempt to fix authentication issues" variables = mutableMapOf( "credentials" to variable("$.input.credentials"), "error" to variable("$.steps.fetch-data.output.error") ) } } // Other error types onFalse { step(fallbackDataAgent) { id = "use-fallback-data" name = "Use Fallback Data" description = "Retrieve data from fallback source" variables = mutableMapOf( "originalSource" to variable("$.input.dataSource"), "fallbackSource" to variable("$.input.fallbackSource", defaultValue = "cache") ) } } } } } // Final step that merges results from all possible paths step(resultMergingAgent) { id = "merge-results" name = "Merge Results" description = "Combine results from all possible paths" // This step will run after any of the possible paths execute { context -> // Check which path was taken and get appropriate data val processedData = context.getVariable("$.steps.process-data.output.result") val fallbackData = context.getVariable("$.steps.use-fallback-data.output.data") val repairedAuth = context.getVariable("$.steps.repair-auth.output.success") // Determine the final result based on which path succeeded when { processedData != null -> { mapOf( "source" to "primary", "data" to processedData, "complete" to true ) } fallbackData != null -> { mapOf( "source" to "fallback", "data" to fallbackData, "complete" to true ) } repairedAuth == true -> { mapOf( "source" to "repaired", "message" to "Authentication repaired, please retry operation", "complete" to false ) } else -> { mapOf( "source" to "none", "error" to "All recovery paths failed", "complete" to false ) } } } } }

Monitoring and Observability ✅

Kastrax provides comprehensive monitoring capabilities for tracking workflow execution and detecting errors:

WorkflowMonitoring.kt
import ai.kastrax.core.KastraxSystem import ai.kastrax.core.workflow.WorkflowExecuteOptions import ai.kastrax.core.workflow.monitoring.WorkflowMonitor import ai.kastrax.core.workflow.monitoring.StepStatus import kotlinx.coroutines.runBlocking fun main() = runBlocking { val kastraxSystem = KastraxSystem() // Create a workflow monitor val workflowMonitor = WorkflowMonitor(kastraxSystem) // Execute a workflow with monitoring val result = kastraxSystem.workflowEngine.executeWorkflow( workflowId = "data-workflow", input = mapOf( "dataSource" to "api.example.com/data", "credentials" to mapOf("apiKey" to "your-api-key") ), options = WorkflowExecuteOptions( // Monitor step execution onStepStart = { stepId, input -> println("Starting step: $stepId with input: $input") }, onStepFinish = { stepResult -> println("Step ${stepResult.stepId} completed with status: ${stepResult.status}") // Check for errors if (stepResult.status == StepStatus.ERROR) { println("Error in step ${stepResult.stepId}: ${stepResult.error}") // Log detailed error information logStepError( stepId = stepResult.stepId, error = stepResult.error, context = stepResult.context ) } }, // Monitor workflow completion onComplete = { workflowResult -> println("Workflow completed with status: ${workflowResult.status}") // Check for workflow-level errors if (!workflowResult.success) { println("Workflow failed: ${workflowResult.error}") // Send alerts for critical workflows sendErrorAlert( workflowId = "data-workflow", error = workflowResult.error, steps = workflowResult.steps ) } } ) ) // Analyze workflow execution val executionAnalysis = workflowMonitor.analyzeExecution(result.executionId) println("Execution analysis: $executionAnalysis") // Check for specific error patterns val errorPatterns = workflowMonitor.detectErrorPatterns(result.executionId) if (errorPatterns.isNotEmpty()) { println("Detected error patterns: $errorPatterns") // Take remedial action based on error patterns handleErrorPatterns(errorPatterns) } } // Helper functions for error handling fun logStepError(stepId: String, error: Throwable?, context: Map<String, Any?>) { // Log detailed error information to monitoring system // In a real implementation, this would send data to a logging service println("DETAILED ERROR LOG - Step: $stepId, Error: ${error?.message}") println("Context: $context") } fun sendErrorAlert(workflowId: String, error: Throwable?, steps: Map<String, Any>) { // Send alerts to appropriate channels // In a real implementation, this might send emails, Slack messages, etc. println("ALERT: Workflow $workflowId failed with error: ${error?.message}") } fun handleErrorPatterns(patterns: List<String>) { // Take action based on recognized error patterns // This might involve automatic remediation, escalation, etc. patterns.forEach { pattern -> println("Handling error pattern: $pattern") } }

Error Propagation and Handling ✅

Kastrax provides fine-grained control over how errors propagate through workflows and how they’re handled at different levels.

Error Propagation Control

ErrorPropagation.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable import ai.kastrax.core.workflow.error.ErrorHandlingMode // Create a workflow with controlled error propagation val criticalWorkflow = workflow { name = "critical-workflow" description = "Workflow with controlled error propagation" // Set default error handling mode for the workflow errorHandlingMode = ErrorHandlingMode.CONTINUE_ON_ERROR // First step - critical operation step(criticalOperationAgent) { id = "critical-operation" name = "Critical Operation" description = "Perform a critical operation that must succeed" // Override workflow-level error handling - this step must succeed errorHandlingMode = ErrorHandlingMode.FAIL_WORKFLOW variables = mutableMapOf( "input" to variable("$.input.criticalData") ) } // Second step - important but not critical step(importantOperationAgent) { id = "important-operation" name = "Important Operation" description = "Perform an important but non-critical operation" after("critical-operation") // This step will be attempted, but failure won't stop the workflow errorHandlingMode = ErrorHandlingMode.CONTINUE_ON_ERROR variables = mutableMapOf( "criticalResult" to variable("$.steps.critical-operation.output.result") ) } // Third step - optional enhancement step(enhancementAgent) { id = "enhancement" name = "Enhancement" description = "Perform an optional enhancement" after("important-operation") // This step is optional - errors are logged but ignored errorHandlingMode = ErrorHandlingMode.IGNORE_ERROR variables = mutableMapOf( "baseData" to variable("$.steps.critical-operation.output.result"), "additionalData" to variable("$.steps.important-operation.output.result", defaultValue = null) ) } // Final step - result aggregation step(resultAggregationAgent) { id = "aggregate-results" name = "Aggregate Results" description = "Combine results from all steps" after("enhancement") execute { context -> // Get results from previous steps, handling potential failures val criticalResult = context.getVariable("$.steps.critical-operation.output.result") // For steps that might have failed, check status and handle accordingly val importantStepStatus = context.getStepStatus("important-operation") val importantResult = if (importantStepStatus == "success") { context.getVariable("$.steps.important-operation.output.result") } else { null } val enhancementStepStatus = context.getStepStatus("enhancement") val enhancementResult = if (enhancementStepStatus == "success") { context.getVariable("$.steps.enhancement.output.result") } else { null } // Aggregate results based on what succeeded mapOf( "criticalResult" to criticalResult, "importantResult" to importantResult, "enhancementResult" to enhancementResult, "completionLevel" to when { enhancementResult != null -> "full" importantResult != null -> "standard" else -> "minimal" }, "errors" to mapOf( "importantOperation" to (importantStepStatus != "success"), "enhancement" to (enhancementStepStatus != "success") ) ) } } }

Custom Error Handlers

CustomErrorHandlers.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable import ai.kastrax.core.workflow.error.ErrorHandler // Create a workflow with custom error handlers val resilientWorkflow = workflow { name = "resilient-workflow" description = "Workflow with custom error handlers" // Define a global error handler for the workflow errorHandler = ErrorHandler { stepId, error, context -> println("Error in step $stepId: ${error.message}") // Log the error to a monitoring system logError(stepId, error, context) // Return a default result for the failed step mapOf( "error" to error.message, "errorType" to error.javaClass.simpleName, "recovered" to true, "defaultResult" to "Error recovery value" ) } // First step with a custom error handler step(dataProcessingAgent) { id = "process-data" name = "Process Data" description = "Process data with custom error handling" // Step-specific error handler errorHandler = ErrorHandler { _, error, context -> // Check the specific type of error when (error) { is IllegalArgumentException -> { // Handle validation errors mapOf( "error" to "Validation error: ${error.message}", "errorType" to "validation", "recovered" to true, "validationErrors" to listOf(error.message) ) } is java.io.IOException -> { // Handle I/O errors mapOf( "error" to "I/O error: ${error.message}", "errorType" to "io", "recovered" to false ) } else -> { // Handle other errors mapOf( "error" to "Unknown error: ${error.message}", "errorType" to "unknown", "recovered" to false ) } } } variables = mutableMapOf( "data" to variable("$.input.data") ) } // Additional steps... } // Helper function for error logging fun logError(stepId: String, error: Throwable, context: Map<String, Any?>) { // In a real implementation, this would log to a monitoring system println("LOGGING ERROR - Step: $stepId, Error: ${error.message}, Context: $context") }

Best Practices for Error Handling ✅

1. Design for Failure

Assume that any step in your workflow might fail and design accordingly:

// Good: Design steps with error handling in mind step(apiCallAgent) { id = "call-api" execute { context -> try { val result = callExternalApi(context.getVariable("endpoint") as String) mapOf("result" to result, "success" to true) } catch (e: Exception) { mapOf("error" to e.message, "success" to false) } } } // Avoid: Assuming steps will always succeed // step(apiCallAgent) { // id = "call-api" // execute { context -> // val result = callExternalApi(context.getVariable("endpoint") as String) // mapOf("result" to result) // } // }

2. Use Appropriate Error Handling Modes

Choose the right error handling mode for each step based on its importance:

// Critical steps should fail the workflow if they fail step(paymentProcessingAgent) { id = "process-payment" errorHandlingMode = ErrorHandlingMode.FAIL_WORKFLOW } // Non-critical steps can continue with errors step(analyticsAgent) { id = "record-analytics" errorHandlingMode = ErrorHandlingMode.CONTINUE_ON_ERROR } // Optional steps can ignore errors entirely step(notificationAgent) { id = "send-notification" errorHandlingMode = ErrorHandlingMode.IGNORE_ERROR }

3. Implement Comprehensive Retry Strategies

Use sophisticated retry strategies for different types of operations:

// Network operations might need exponential backoff step(networkAgent) { id = "network-operation" retry = RetryConfig( maxAttempts = 5, backoff = ExponentialBackoff(initialDelayMs = 1000, factor = 2.0) ) } // Database operations might need different retry logic step(databaseAgent) { id = "database-operation" retry = RetryConfig( maxAttempts = 3, backoff = LinearBackoff(delayMs = 2000), // Consistent delay retryableExceptions = listOf(DatabaseTimeoutException::class) ) }

4. Provide Detailed Error Context

Include comprehensive information in error outputs:

// Return detailed error information execute { context -> try { // Operation that might fail } catch (e: Exception) { mapOf( "error" to e.message, "errorType" to e.javaClass.simpleName, "timestamp" to System.currentTimeMillis(), "context" to mapOf( "input" to context.getVariable("input"), "userId" to context.getVariable("$.input.userId"), "operation" to "data-processing" ), "stackTrace" to e.stackTraceToString() ) } }

Advanced Error Handling ✅

For more complex error handling scenarios, Kastrax provides advanced mechanisms:

Circuit Breakers

Implement circuit breakers to prevent cascading failures:

// Circuit breaker pattern implementation class CircuitBreaker( private val maxFailures: Int = 3, private val resetTimeoutMs: Long = 60000 ) { private var failures = 0 private var lastFailureTime = 0L private var state = State.CLOSED enum class State { CLOSED, OPEN, HALF_OPEN } fun execute(operation: () -> Map<String, Any>): Map<String, Any> { // Check if circuit is open if (state == State.OPEN) { val now = System.currentTimeMillis() if (now - lastFailureTime > resetTimeoutMs) { // Try to reset after timeout state = State.HALF_OPEN } else { // Circuit is open, return fast failure return mapOf( "error" to "Circuit breaker open", "success" to false, "circuitState" to state ) } } return try { val result = operation() // If successful in half-open state, reset the circuit if (state == State.HALF_OPEN) { state = State.CLOSED failures = 0 } result } catch (e: Exception) { failures++ lastFailureTime = System.currentTimeMillis() // Open circuit if too many failures if (failures >= maxFailures) { state = State.OPEN } mapOf( "error" to e.message, "success" to false, "circuitState" to state ) } } }

Integration with Actor Model

Kastrax’s error handling system integrates seamlessly with the actor model for distributed error handling:

ActorErrorHandling.kt
import ai.kastrax.actor.ActorSystem import ai.kastrax.actor.Props import ai.kastrax.workflow.WorkflowActor import ai.kastrax.workflow.messages.* import ai.kastrax.workflow.error.ErrorHandlingActor import kotlinx.coroutines.runBlocking fun main() = runBlocking { // Create an actor system val system = ActorSystem("workflow-system") // Create an error handling actor val errorHandlingActor = system.actorOf( Props.create(ErrorHandlingActor::class.java), "error-handler" ) // Create a workflow actor with error handling val workflowActor = system.actorOf( Props.create(WorkflowActor::class.java, resilientWorkflow, errorHandlingActor), "workflow-actor" ) // Execute the workflow val result = system.ask<WorkflowResult>( workflowActor, ExecuteWorkflowMessage(input = mapOf("data" to "test-data")) ) // Check for errors if (result is WorkflowResult.Error) { println("Workflow execution failed: ${result.error}") // Send error recovery message system.tell( errorHandlingActor, RecoverWorkflowMessage( workflowId = resilientWorkflow.id, executionId = result.executionId, recoveryStrategy = "retry" ) ) } // Shutdown the actor system when done system.terminate() }

Conclusion ✅

Robust error handling is essential for production AI workflows. Kastrax provides a comprehensive suite of error handling capabilities that enable you to create resilient, fault-tolerant workflows that can recover from failures and continue operating even in challenging conditions.

By combining step-level error handling, workflow-level recovery strategies, sophisticated retry mechanisms, and comprehensive monitoring, you can build AI agent workflows that gracefully handle errors and provide reliable service to your users.

Remember that effective error handling is not just about preventing crashes—it’s about designing systems that can adapt to failures, recover automatically when possible, and provide clear, actionable information when human intervention is required.

Last updated on