Skip to Content
DocsWorkflowsSuspend & Resume

Suspend and Resume in Kastrax AI Workflows ✅

Complex AI workflows often need to pause execution while waiting for external input, human feedback, or resource availability. Kastrax provides a sophisticated suspension and resumption system that leverages Kotlin’s coroutine capabilities to create natural, intuitive pausing points in your workflows.

Kastrax’s suspend and resume features allow you to:

  • Pause workflow execution at any step
  • Persist the complete workflow state to durable storage
  • Resume execution from the exact suspension point when ready
  • Inject new data when resuming to influence subsequent steps
  • Create human-in-the-loop processes with minimal code

This entire process is automatically managed by the Kastrax workflow engine, with the workflow state preserved across application restarts, deployments, and server instances. This persistence is crucial for workflows that might remain suspended for minutes, hours, or even days while waiting for external input or resources.

When to Use Suspend/Resume ✅

Kastrax’s suspension and resumption capabilities are particularly valuable in these scenarios:

  • Human-in-the-Loop Processes: Pause for human review, approval, or input
  • Asynchronous Operations: Wait for long-running external processes to complete
  • Resource Management: Pause until required resources or API quotas become available
  • Multi-Stage Approvals: Implement complex approval workflows with multiple decision points
  • Scheduled Operations: Suspend until a specific time or condition is met
  • Error Recovery: Pause on error for manual intervention before continuing
  • Distributed Decision Making: Coordinate decisions across multiple agents or systems

Suspension Mechanisms in Kastrax ✅

Kastrax provides several ways to suspend workflow execution:

1. Step-Level Suspension

The most direct approach is to suspend within a step’s execution logic:

StepSuspension.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with a suspending step val approvalWorkflow = workflow { name = "approval-workflow" description = "Process content with human approval" // Generate content step step(contentGenerationAgent) { id = "generate-content" name = "Generate Content" description = "Create initial content" variables = mutableMapOf( "topic" to variable("$.input.topic"), "style" to variable("$.input.style") ) } // Review step with suspension step(reviewAgent) { id = "review-content" name = "Review Content" description = "Review and potentially suspend for human approval" after("generate-content") variables = mutableMapOf( "content" to variable("$.steps.generate-content.output.content"), "qualityThreshold" to variable("$.input.qualityThreshold", defaultValue = 8.0) ) // Custom execution logic with suspension execute { context, suspend -> val content = context.getVariable("content") as? String ?: "" val threshold = context.getVariable("qualityThreshold") as? Double ?: 8.0 // Evaluate content quality val quality = evaluateContentQuality(content) // If quality is below threshold, suspend for human review if (quality < threshold) { // Suspend with metadata about why we're suspending val suspensionId = suspend(mapOf( "reason" to "Content quality below threshold", "quality" to quality, "threshold" to threshold, "contentPreview" to content.take(100) )) // This code will execute when the workflow is resumed // We can access the resume data provided when resuming val humanApproved = context.getResumeData("approved") as? Boolean ?: false val humanFeedback = context.getResumeData("feedback") as? String ?: "" return@execute mapOf( "quality" to quality, "approved" to humanApproved, "feedback" to humanFeedback, "suspensionId" to suspensionId ) } // If quality is good, no need for human review return@execute mapOf( "quality" to quality, "approved" to true, "feedback" to "Automatically approved - quality above threshold" ) } } // Final processing based on review step(finalizationAgent) { id = "finalize-content" name = "Finalize Content" description = "Process content based on review results" after("review-content") variables = mutableMapOf( "content" to variable("$.steps.generate-content.output.content"), "approved" to variable("$.steps.review-content.output.approved"), "feedback" to variable("$.steps.review-content.output.feedback") ) } } // Helper function to evaluate content quality fun evaluateContentQuality(content: String): Double { // In a real implementation, this would use an AI model or heuristics // to evaluate content quality return 7.5 // Example score that will trigger suspension }

In this example, the workflow suspends when content quality is below a threshold, waiting for human review before continuing.

2. Human-in-the-Loop Steps

Kastrax provides dedicated human-in-the-loop steps for common approval patterns:

HumanInTheLoopStep.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable // Create a workflow with a human-in-the-loop step val reviewWorkflow = workflow { name = "human-review-workflow" description = "Process content with explicit human review step" // Generate content step step(contentGenerationAgent) { id = "generate-content" name = "Generate Content" description = "Create initial content" variables = mutableMapOf( "topic" to variable("$.input.topic") ) } // Human review step - automatically suspends the workflow humanStep { id = "human-review" name = "Human Review" description = "Get human approval for the content" after("generate-content") // Define the prompt to show to the human reviewer prompt { context -> val content = context.getVariable("$.steps.generate-content.output.content") as? String ?: "" val topic = context.getVariable("$.input.topic") as? String ?: "unknown topic" """ Please review the following content about "$topic": $content Approve or suggest changes. """.trimIndent() } // Set timeout for human response (24 hours) timeoutMs = 86400000 // Define what happens on timeout onTimeout { step(notificationAgent) { id = "timeout-notification" name = "Timeout Notification" description = "Notify about review timeout" variables = mutableMapOf( "content" to variable("$.steps.generate-content.output.content"), "recipient" to variable("$.input.notificationEmail") ) } } } // Process the human feedback step(processFeedbackAgent) { id = "process-feedback" name = "Process Human Feedback" description = "Process the feedback from human review" after("human-review") variables = mutableMapOf( "originalContent" to variable("$.steps.generate-content.output.content"), "approved" to variable("$.steps.human-review.output.approved"), "feedback" to variable("$.steps.human-review.output.feedback") ) } }

3. Event-Based Suspension

Kastrax supports event-based suspension for waiting on external events:

EventBasedSuspension.kt
import ai.kastrax.core.workflow.workflow import ai.kastrax.core.workflow.variable import ai.kastrax.core.workflow.event.EventDefinition // Create a workflow with event-based suspension val paymentWorkflow = workflow { name = "payment-processing" description = "Process payment with external confirmation" // Define events that the workflow can wait for events { event("payment_confirmed") { description = "Payment confirmation received from payment processor" schema = PaymentConfirmation::class } event("payment_rejected") { description = "Payment rejection received from payment processor" schema = PaymentRejection::class } } // Initial payment processing step step(paymentInitiationAgent) { id = "initiate-payment" name = "Initiate Payment" description = "Start the payment process" variables = mutableMapOf( "amount" to variable("$.input.amount"), "paymentMethod" to variable("$.input.paymentMethod") ) } // Wait for payment confirmation event waitForEvent { id = "wait-for-confirmation" name = "Wait for Payment Confirmation" description = "Suspend workflow until payment is confirmed or rejected" after("initiate-payment") // Specify which events to wait for events = listOf("payment_confirmed", "payment_rejected") // Optional timeout timeoutMs = 300000 // 5 minutes // Handle timeout onTimeout { step(timeoutHandlingAgent) { id = "handle-timeout" name = "Handle Payment Timeout" description = "Process payment timeout" variables = mutableMapOf( "paymentId" to variable("$.steps.initiate-payment.output.paymentId") ) } } } // Conditional processing based on which event was received conditionalStep { id = "process-payment-result" name = "Process Payment Result" description = "Handle payment confirmation or rejection" after("wait-for-confirmation") // Check which event was received condition { context -> val eventType = context.getVariable("$.steps.wait-for-confirmation.output.eventType") as? String eventType == "payment_confirmed" } // Handle payment confirmation onTrue { step(confirmationAgent) { id = "process-confirmation" name = "Process Confirmation" description = "Handle successful payment" variables = mutableMapOf( "confirmation" to variable("$.steps.wait-for-confirmation.output.eventData") ) } } // Handle payment rejection onFalse { step(rejectionAgent) { id = "process-rejection" name = "Process Rejection" description = "Handle payment rejection" variables = mutableMapOf( "rejection" to variable("$.steps.wait-for-confirmation.output.eventData") ) } } } } // Data classes for event schemas @Serializable data class PaymentConfirmation( val transactionId: String, val amount: Double, val timestamp: Long, val confirmationCode: String ) @Serializable data class PaymentRejection( val transactionId: String, val reason: String, val timestamp: Long )

Workflow Suspension Lifecycle ✅

The suspension and resumption process in Kastrax follows a well-defined lifecycle:

  1. Suspension Trigger: A step calls suspend() or a dedicated suspension step is reached
  2. State Persistence: The complete workflow state is serialized and stored
  3. Suspension Notification: The workflow engine returns a suspended status with metadata
  4. External Processing: External systems or humans process the suspended workflow
  5. Resumption Request: The workflow is resumed with new data via the API
  6. State Restoration: The workflow state is restored from storage
  7. Execution Continuation: Execution continues from the exact suspension point

This lifecycle ensures reliable long-running workflows that can span minutes to days or even weeks.

Executing Suspendable Workflows ✅

Kastrax provides a comprehensive API for executing, monitoring, and resuming suspended workflows:

WorkflowExecution.kt
import ai.kastrax.core.workflow.WorkflowEngine import ai.kastrax.core.workflow.WorkflowExecuteOptions import ai.kastrax.core.workflow.WorkflowResumeOptions import kotlinx.coroutines.runBlocking fun main() = runBlocking { // Get the workflow engine val workflowEngine = kastraxSystem.workflowEngine // Execute the approval workflow val result = workflowEngine.executeWorkflow( workflowId = "approval-workflow", input = mapOf( "topic" to "Artificial Intelligence Ethics", "style" to "academic", "qualityThreshold" to 8.5, "notificationEmail" to "reviewer@example.com" ), options = WorkflowExecuteOptions( // Set execution options maxSteps = 20, timeout = 60000, // 60 seconds timeout for the initial execution // Add step completion listener onStepFinish = { stepResult -> println("Step ${stepResult.stepId} completed with status: ${stepResult.status}") // Check if a step was suspended if (stepResult.status == "suspended") { println("Workflow suspended at step: ${stepResult.stepId}") println("Suspension metadata: ${stepResult.suspensionMetadata}") // In a real application, you would notify external systems // or create a task for human review here } } ) ) // Check if the workflow is suspended if (result.status == "suspended") { val suspendedStepId = result.suspendedStepId val suspensionId = result.suspensionId println("Workflow suspended at step: $suspendedStepId") println("Suspension ID: $suspensionId") // In a real application, you would wait for human input or an external event // For this example, we'll simulate receiving human input val humanInput = simulateHumanReview(result) // Resume the workflow with the human input val resumeResult = workflowEngine.resumeWorkflow( suspensionId = suspensionId, resumeData = mapOf( "approved" to humanInput.approved, "feedback" to humanInput.feedback ), options = WorkflowResumeOptions( timeout = 30000 // 30 seconds timeout for the resumed execution ) ) println("Workflow resumed and completed with status: ${resumeResult.status}") println("Final output: ${resumeResult.output}") } else { println("Workflow completed without suspension with status: ${result.status}") println("Output: ${result.output}") } } // Simulate human review (in a real application, this would be a UI or API interaction) data class HumanReviewInput(val approved: Boolean, val feedback: String) fun simulateHumanReview(workflowResult: WorkflowResult): HumanReviewInput { // In a real application, this would wait for actual human input // For this example, we're simulating an approval with feedback return HumanReviewInput( approved = true, feedback = "The content looks good, but please add more examples." ) }

Resuming Event-Based Workflows ✅

For event-based workflows, Kastrax provides specialized methods to resume execution when events occur:

EventBasedResumption.kt
import ai.kastrax.core.workflow.WorkflowEngine import kotlinx.coroutines.runBlocking fun main() = runBlocking { // Get the workflow engine val workflowEngine = kastraxSystem.workflowEngine // Execute the payment workflow val result = workflowEngine.executeWorkflow( workflowId = "payment-processing", input = mapOf( "amount" to 99.99, "paymentMethod" to "credit_card" ) ) // Check if the workflow is waiting for an event if (result.status == "waiting_for_event") { val waitingForEvents = result.waitingForEvents val suspensionId = result.suspensionId println("Workflow waiting for events: $waitingForEvents") println("Suspension ID: $suspensionId") // In a real application, this would be triggered by an external system // For this example, we'll simulate receiving a payment confirmation event val paymentConfirmation = PaymentConfirmation( transactionId = "tx-12345", amount = 99.99, timestamp = System.currentTimeMillis(), confirmationCode = "CONF-ABC123" ) // Resume the workflow with the event val resumeResult = workflowEngine.resumeWorkflowWithEvent( suspensionId = suspensionId, eventName = "payment_confirmed", eventData = paymentConfirmation ) println("Workflow resumed with event and completed with status: ${resumeResult.status}") println("Final output: ${resumeResult.output}") } }

Integration with External Systems ✅

Kastrax’s suspension mechanism is designed to integrate seamlessly with external systems through its API:

REST API Integration

RestApiIntegration.kt
import io.ktor.server.application.* import io.ktor.server.engine.* import io.ktor.server.netty.* import io.ktor.server.routing.* import io.ktor.server.request.* import io.ktor.server.response.* // Create a REST API for workflow interaction fun main() { embeddedServer(Netty, port = 8080) { // Get the workflow engine val workflowEngine = kastraxSystem.workflowEngine routing { // Start a new workflow post("/workflows/{workflowId}/execute") { val workflowId = call.parameters["workflowId"]!! val input = call.receive<Map<String, Any>>() val result = workflowEngine.executeWorkflow(workflowId, input) call.respond(result) } // Resume a suspended workflow post("/workflows/resume/{suspensionId}") { val suspensionId = call.parameters["suspensionId"]!! val resumeData = call.receive<Map<String, Any>>() val result = workflowEngine.resumeWorkflow(suspensionId, resumeData) call.respond(result) } // Send an event to a waiting workflow post("/workflows/events/{suspensionId}/{eventName}") { val suspensionId = call.parameters["suspensionId"]!! val eventName = call.parameters["eventName"]!! val eventData = call.receive<Any>() val result = workflowEngine.resumeWorkflowWithEvent(suspensionId, eventName, eventData) call.respond(result) } // Get suspended workflows get("/workflows/suspended") { val suspendedWorkflows = workflowEngine.getSuspendedWorkflows() call.respond(suspendedWorkflows) } } }.start(wait = true) }

Web UI Integration

For human-in-the-loop workflows, you can create a web UI that displays suspended workflows and allows humans to provide input:

WebUiIntegration.kt
import io.ktor.server.application.* import io.ktor.server.engine.* import io.ktor.server.netty.* import io.ktor.server.routing.* import io.ktor.server.html.* import kotlinx.html.* // Create a web UI for human review fun main() { embeddedServer(Netty, port = 8080) { // Get the workflow engine val workflowEngine = kastraxSystem.workflowEngine routing { // Display a list of workflows waiting for human input get("/human-tasks") { val suspendedWorkflows = workflowEngine.getSuspendedWorkflows() .filter { it.suspensionReason == "human_review" } call.respondHtml { head { title("Human Review Tasks") } body { h1 { +"Tasks Waiting for Human Review" } suspendedWorkflows.forEach { workflow -> div { h2 { +"Task: ${workflow.workflowName}" } p { +"Suspended at step: ${workflow.suspendedStepId}" } p { +"Suspension ID: ${workflow.suspensionId}" } // Display the prompt for human review val prompt = workflow.suspensionMetadata["prompt"] as? String ?: "" div { h3 { +"Review Request:" } pre { +prompt } } // Form for submitting feedback form { action = "/human-tasks/${workflow.suspensionId}/submit" method = FormMethod.post div { label { htmlFor = "approved" +"Approve: " } select { name = "approved" option { value = "true"; +"Yes" } option { value = "false"; +"No" } } } div { label { htmlFor = "feedback" +"Feedback: " } textArea { name = "feedback" rows = "4" cols = "50" } } button { type = ButtonType.submit +"Submit Review" } } } hr {} } } } } // Handle form submission post("/human-tasks/{suspensionId}/submit") { val suspensionId = call.parameters["suspensionId"]!! val parameters = call.receiveParameters() val approved = parameters["approved"] == "true" val feedback = parameters["feedback"] ?: "" // Resume the workflow with the human input workflowEngine.resumeWorkflow( suspensionId = suspensionId, resumeData = mapOf( "approved" to approved, "feedback" to feedback ) ) call.respondRedirect("/human-tasks") } } }.start(wait = true) }

Key Points About Event-Based Workflows

  • The suspend() function can optionally take a payload object that will be stored with the suspended state

  • Code after the await suspend() call will not execute until the step is resumed

  • When a step is suspended, its status becomes 'suspended' in the workflow results

  • When resumed, the step’s status changes from 'suspended' to 'success' once completed

  • The resume() method requires the stepId to identify which suspended step to resume

  • You can provide new context data when resuming that will be merged with existing step results

  • Events must be defined in the workflow configuration with a schema

  • The afterEvent method creates a special suspended step that waits for the event

  • The event step is automatically named __eventName_event (e.g., __approvalReceived_event)

  • Use resumeWithEvent to provide event data and continue the workflow

  • Event data is validated against the schema defined for that event

  • The event data is available in the context as inputData.resumedEvent

Storage for Suspend and Resume ✅

When a workflow is suspended using await suspend(), Kastrax automatically persists the entire workflow state to storage. This is essential for workflows that might remain suspended for extended periods, as it ensures the state is preserved across application restarts or server instances.

Default Storage: LibSQL

By default, Kastrax uses LibSQL as its storage engine:

import { Kastrax } from "@kastrax/core/kastrax"; import { LibSQLStore } from "@kastrax/libsql"; const kastrax = new Kastrax({ storage: new LibSQLStore({ url: "file:./storage.db", // Local file-based database for development // For production, use a persistent URL: // url: process.env.DATABASE_URL, // authToken: process.env.DATABASE_AUTH_TOKEN, // Optional for authenticated connections }), });

The LibSQL storage can be configured in different modes:

  • In-memory database (testing): :memory:
  • File-based database (development): file:storage.db
  • Remote database (production): URLs like libsql://your-database.turso.io

Alternative Storage Options

Upstash (Redis-Compatible)

For serverless applications or environments where Redis is preferred:

npm install @kastrax/upstash@latest
import { Kastrax } from "@kastrax/core/kastrax"; import { UpstashStore } from "@kastrax/upstash"; const kastrax = new Kastrax({ storage: new UpstashStore({ url: process.env.UPSTASH_URL, token: process.env.UPSTASH_TOKEN, }), });

Storage Considerations

  • All storage options support suspend and resume functionality identically
  • The workflow state is automatically serialized and saved when suspended
  • No additional configuration is needed for suspend/resume to work with storage
  • Choose your storage option based on your infrastructure, scaling needs, and existing technology stack

Watching and Resuming ✅

To handle suspended workflows, use the watch method to monitor workflow status per run and resume to continue execution:

import { kastrax } from "./index"; // Get the workflow const myWorkflow = kastrax.getWorkflow("myWorkflow"); const { start, watch, resume } = myWorkflow.createRun(); // Start watching the workflow before executing it watch(async ({ activePaths }) => { const isStepTwoSuspended = activePaths.get("stepTwo")?.status === "suspended"; if (isStepTwoSuspended) { console.log("Workflow suspended, resuming with new value"); // Resume the workflow with new context await resume({ stepId: "stepTwo", context: { secondValue: 100 }, }); } }); // Start the workflow execution await start({ triggerData: { inputValue: 45 } });

Watching and Resuming Event-Based Workflows

You can use the same watching pattern with event-based workflows:

const { start, watch, resumeWithEvent } = workflow.createRun(); // Watch for suspended event steps watch(async ({ activePaths }) => { const isApprovalReceivedSuspended = activePaths.get("__approvalReceived_event")?.status === "suspended"; if (isApprovalReceivedSuspended) { console.log("Workflow waiting for approval event"); // In a real scenario, you would wait for the actual event to occur // For example, this could be triggered by a webhook or user interaction setTimeout(async () => { await resumeWithEvent("approvalReceived", { approved: true, approverName: "Auto Approver", }); }, 5000); // Simulate event after 5 seconds } }); // Start the workflow await start({ triggerData: { requestId: "auto-123" } });

Best Practices for Suspendable Workflows ✅

1. Design for Clarity and Maintainability

// Good: Clear suspension points with descriptive metadata step(reviewAgent) { id = "content-review" execute { context, suspend -> if (needsHumanReview(context)) { suspend(mapOf( "reason" to "Content requires human review", "contentType" to "blog post", "priority" to "high" )) } // Rest of execution logic } } // Avoid: Unclear suspension with no metadata // step(reviewAgent) { // id = "review" // execute { context, suspend -> // if (someCondition) { // suspend() // No metadata about why we're suspending // } // } // }

2. Implement Proper Error Handling

// Handle errors during resumption step(processingAgent) { id = "process-data" execute { context, suspend -> try { // Attempt to process data val result = processData(context) return@execute result } catch (e: Exception) { // Suspend for human intervention on error suspend(mapOf( "reason" to "Error during processing", "error" to e.message, "errorType" to e.javaClass.simpleName )) // When resumed, try to recover val manualFix = context.getResumeData("manualFix") as? String return@execute mapOf("result" to manualFix) } } }

3. Use Timeouts Appropriately

// Set appropriate timeouts for different types of human tasks humanStep { id = "urgent-review" description = "Urgent review needed" prompt { /* ... */ } timeoutMs = 3600000 // 1 hour for urgent reviews } humanStep { id = "standard-review" description = "Standard content review" prompt { /* ... */ } timeoutMs = 86400000 // 24 hours for standard reviews }

4. Implement Idempotent Resumption

// Design steps to handle being resumed multiple times step(paymentProcessingAgent) { id = "process-payment" execute { context, suspend -> val paymentId = context.getVariable("paymentId") as? String // Check if payment was already processed (idempotency check) if (isPaymentAlreadyProcessed(paymentId)) { return@execute mapOf( "status" to "already_processed", "paymentId" to paymentId ) } // Process payment logic... } }

5. Monitor Suspended Workflows

// Implement monitoring for suspended workflows class WorkflowMonitor(private val workflowEngine: WorkflowEngine) { fun monitorSuspendedWorkflows() { val suspendedWorkflows = workflowEngine.getSuspendedWorkflows() // Check for stale workflows val staleWorkflows = suspendedWorkflows.filter { workflow -> val suspendedAt = workflow.suspendedAt val now = System.currentTimeMillis() val ageInHours = (now - suspendedAt) / (1000 * 60 * 60) ageInHours > 48 // Workflows suspended for more than 48 hours } // Send notifications for stale workflows staleWorkflows.forEach { workflow -> sendNotification( "Stale workflow detected", "Workflow ${workflow.workflowId} has been suspended for more than 48 hours" ) } } }

Limitations and Considerations ✅

When working with suspendable workflows in Kastrax, be aware of these limitations and considerations:

  1. Storage Requirements: Suspended workflows consume storage resources, so implement cleanup strategies for abandoned workflows

  2. Serialization Constraints: All data in suspended workflows must be serializable for persistence

  3. Long-Running Transactions: Consider the impact of long-running database transactions when workflows are suspended for extended periods

  4. State Evolution: Plan for how to handle suspended workflows when your code or data models evolve

  5. Security Considerations: Ensure that sensitive data in suspended workflows is properly protected

  6. Monitoring Overhead: Implement appropriate monitoring to track and manage suspended workflows

  7. Testing Complexity: Thoroughly test suspension and resumption paths, including timeout scenarios

Further Reading ✅

For a deeper understanding of how suspend and resume works in Kastrax:

Last updated on