Workflows (vNext) Overview ✅
Getting Started ✅
To use vNext workflows, first import the necessary functions from the vNext module:
import ai.kastrax.core.workflow.vnext.createWorkflow
import ai.kastrax.core.workflow.vnext.createStep
Key Concepts ✅
vNext workflows consist of:
- Schemas: Type definitions for inputs and outputs
- Steps: Individual units of work with defined inputs and outputs
- Workflow: A collection of steps with defined execution flow
- Runtime: The execution environment for workflows
Creating a Simple Workflow ✅
Here’s a basic example of creating a workflow:
import ai.kastrax.core.workflow.vnext.createWorkflow
import ai.kastrax.core.workflow.vnext.createStep
import ai.kastrax.integrations.deepseek.deepSeek
import ai.kastrax.integrations.deepseek.DeepSeekModel
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
// Define a step
val generateStep = createStep {
name("generate")
description("Generate a response to the user's query")
// Define input schema
input {
field("query", String::class, "The user's query")
}
// Define output schema
output {
field("response", String::class, "The generated response")
}
// Define the step's execution logic
execute { input ->
val query = input.get<String>("query")
// Use DeepSeek to generate a response
val llm = deepSeek {
apiKey("your-deepseek-api-key")
model(DeepSeekModel.DEEPSEEK_CHAT)
temperature(0.7)
}
val response = llm.generate(query)
// Return the output
mapOf("response" to response.text)
}
}
// Create a workflow
val simpleWorkflow = createWorkflow {
name("SimpleWorkflow")
description("A simple workflow that generates a response to a query")
// Define workflow input schema
input {
field("query", String::class, "The user's query")
}
// Define workflow output schema
output {
field("response", String::class, "The generated response")
}
// Add the step to the workflow
addStep(generateStep)
// Define the workflow's execution flow
flow {
// Execute the generate step
val result = executeStep(generateStep, mapOf("query" to input.get<String>("query")))
// Return the workflow output
mapOf("response" to result.get<String>("response"))
}
}
// Execute the workflow
val result = simpleWorkflow.execute(mapOf("query" to "Tell me about quantum computing"))
println("Response: ${result.get<String>("response")}")
}
Advanced Features ✅
vNext workflows support advanced features like:
Conditional Branching ✅
flow {
val sentiment = executeStep(analyzeSentimentStep, mapOf("text" to input.get<String>("text")))
if (sentiment.get<String>("sentiment") == "positive") {
executeStep(positiveResponseStep, mapOf("text" to input.get<String>("text")))
} else {
executeStep(negativeResponseStep, mapOf("text" to input.get<String>("text")))
}
}
Parallel Execution ✅
flow {
val results = executeParallel(
Pair(searchWebStep, mapOf("query" to input.get<String>("query"))),
Pair(searchDatabaseStep, mapOf("query" to input.get<String>("query")))
)
val webResults = results[0].get<List<String>>("results")
val dbResults = results[1].get<List<String>>("results")
// Combine results
val combinedResults = webResults + dbResults
mapOf("results" to combinedResults)
}
Error Handling ✅
flow {
try {
val result = executeStep(riskyStep, mapOf("input" to input.get<String>("input")))
mapOf("result" to result.get<String>("output"))
} catch (e: Exception) {
executeStep(fallbackStep, mapOf("error" to e.message))
mapOf("result" to "Used fallback due to error: ${e.message}")
}
}
Resource Suspension ✅
flow {
// Start a long-running process
val processId = executeStep(startProcessStep, mapOf("input" to input.get<String>("input")))
.get<String>("processId")
// Suspend the workflow until the process completes
suspend("process-completion", mapOf("processId" to processId))
// When resumed, continue with the result
val result = input.get<String>("result")
mapOf("result" to result)
}
Integrating with Agents and Tools ✅
vNext workflows can integrate with Kastrax agents and tools:
// Create an agent step
val agentStep = createStep {
name("agent")
description("Use an agent to generate a response")
input {
field("query", String::class, "The user's query")
}
output {
field("response", String::class, "The agent's response")
}
execute { input ->
val query = input.get<String>("query")
// Create an agent
val agent = agent {
name("WorkflowAgent")
description("An agent used in a workflow")
model = deepSeek {
apiKey("your-deepseek-api-key")
model(DeepSeekModel.DEEPSEEK_CHAT)
temperature(0.7)
}
}
// Generate a response
val response = agent.generate(query)
mapOf("response" to response.text)
}
}
// Create a tool step
val toolStep = createStep {
name("tool")
description("Use a tool to perform an action")
input {
field("input", String::class, "The tool input")
}
output {
field("result", String::class, "The tool result")
}
execute { input ->
val toolInput = input.get<String>("input")
// Create a tool
val tool = tool("calculator") {
description("Calculate a mathematical expression")
parameters {
parameter("expression", "Mathematical expression", String::class)
}
execute { params ->
val expression = params["expression"] as String
// Simple expression evaluator (for demonstration)
val result = when {
expression.contains("+") -> {
val parts = expression.split("+")
parts[0].trim().toDouble() + parts[1].trim().toDouble()
}
expression.contains("-") -> {
val parts = expression.split("-")
parts[0].trim().toDouble() - parts[1].trim().toDouble()
}
else -> throw IllegalArgumentException("Unsupported operation")
}
"Result: $result"
}
}
// Execute the tool
val result = tool.execute(mapOf("expression" to toolInput))
mapOf("result" to result)
}
}
Best Practices ✅
- Keep Steps Focused: Each step should do one thing well
- Handle Errors: Always include error handling in your workflows
- Use Schemas: Define clear input and output schemas for type safety
- Test Thoroughly: Test workflows with various inputs and edge cases
- Monitor Execution: Use logging and tracing to monitor workflow execution
Next Steps ✅
Now that you understand vNext workflows, you can:
- Learn about using workflows with agents and tools
- Explore conditional branching
- Implement parallel execution
Last updated on