Skip to Content

Parallel Execution with Steps ✅

When building AI applications, you often need to process multiple independent tasks simultaneously to improve efficiency. We make this functionality a core part of workflows through the .parallel method.

Define Planning Agent ✅

Define a planning agent which leverages an LLM call to plan activities given a location and corresponding weather conditions.

agents/planning-agent.ts
import { Agent } from '@kastrax/core/agent' import { openai } from '@ai-sdk/openai' const llm = openai('gpt-4o') const planningAgent = new Agent({ name: 'planningAgent', model: llm, instructions: ` You are a local activities and travel expert who excels at weather-based planning. Analyze the weather data and provide practical activity recommendations. 📅 [Day, Month Date, Year] ═══════════════════════════ 🌡️ WEATHER SUMMARY • Conditions: [brief description] • Temperature: [X°C/Y°F to A°C/B°F] • Precipitation: [X% chance] 🌅 MORNING ACTIVITIES Outdoor: • [Activity Name] - [Brief description including specific location/route] Best timing: [specific time range] Note: [relevant weather consideration] 🌞 AFTERNOON ACTIVITIES Outdoor: • [Activity Name] - [Brief description including specific location/route] Best timing: [specific time range] Note: [relevant weather consideration] 🏠 INDOOR ALTERNATIVES • [Activity Name] - [Brief description including specific venue] Ideal for: [weather condition that would trigger this alternative] ⚠️ SPECIAL CONSIDERATIONS • [Any relevant weather warnings, UV index, wind conditions, etc.] Guidelines: - Suggest 2-3 time-specific outdoor activities per day - Include 1-2 indoor backup options - For precipitation >50%, lead with indoor activities - All activities must be specific to the location - Include specific venues, trails, or locations - Consider activity intensity based on temperature - Keep descriptions concise but informative Maintain this exact formatting for consistency, using the emoji and section headers as shown. `, }) export { planningAgent }

Define Synthesize Agent ✅

Define a synthesize agent which takes planned indoor and outdoor activities and provides a full report on the day.

agents/synthesize-agent.ts
import { Agent } from '@kastrax/core/agent' import { openai } from '@ai-sdk/openai' const llm = openai('gpt-4o') const synthesizeAgent = new Agent({ name: 'synthesizeAgent', model: llm, instructions: ` You are given two different blocks of text, one about indoor activities and one about outdoor activities. Make this into a full report about the day and the possibilities depending on whether it rains or not. `, }) export { synthesizeAgent }

Define Parallel Workflow ✅

Here, we’ll define a workflow which orchestrates a parallel -> sequential flow between the planning steps and the synthesize step.

workflows/parallel-workflow.ts
import { Step, Workflow } from '@kastrax/core/workflows' import { z } from 'zod' import { activityPlannerAgent } from '../agents' import { createStep, createWorkflow } from '@kastrax/core/workflows/vNext' const forecastSchema = z.object({ date: z.string(), maxTemp: z.number(), minTemp: z.number(), precipitationChance: z.number(), condition: z.string(), location: z.string(), }) const fetchWeather = createStep({ id: 'fetch-weather', description: 'Fetches weather forecast for a given city', inputSchema: z.object({ city: z.string(), }), outputSchema: forecastSchema, execute: async ({ inputData }) => { if (!inputData) { throw new Error('Trigger data not found') } const geocodingUrl = `https://geocoding-api.open-meteo.com/v1/search?name=${encodeURIComponent(inputData.city)}&count=1` const geocodingResponse = await fetch(geocodingUrl) const geocodingData = (await geocodingResponse.json()) as { results: { latitude: number; longitude: number; name: string }[] } if (!geocodingData.results?.[0]) { throw new Error(`Location '${inputData.city}' not found`) } const { latitude, longitude, name } = geocodingData.results[0] const weatherUrl = `https://api.open-meteo.com/v1/forecast?latitude=${latitude}&longitude=${longitude}&current=precipitation,weathercode&timezone=auto,&hourly=precipitation_probability,temperature_2m` const response = await fetch(weatherUrl) const data = (await response.json()) as { current: { time: string precipitation: number weathercode: number } hourly: { precipitation_probability: number[] temperature_2m: number[] } } const forecast = { date: new Date().toISOString(), maxTemp: Math.max(...data.hourly.temperature_2m), minTemp: Math.min(...data.hourly.temperature_2m), condition: getWeatherCondition(data.current.weathercode), location: name, precipitationChance: data.hourly.precipitation_probability.reduce( (acc, curr) => Math.max(acc, curr), 0 ), } return forecast }, }) const planActivities = createStep({ id: 'plan-activities', description: 'Suggests activities based on weather conditions', inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), execute: async ({ inputData, kastrax }) => { console.log('kastrax', kastrax) console.log('planActivities', inputData) const forecast = inputData if (!forecast) { throw new Error('Forecast data not found') } const prompt = `Based on the following weather forecast for ${forecast.location}, suggest appropriate activities: ${JSON.stringify(forecast, null, 2)} ` const agent = kastrax?.getAgent('planningAgent') if (!agent) { throw new Error('Planning agent not found') } const response = await agent.stream([ { role: 'user', content: prompt, }, ]) let activitiesText = '' for await (const chunk of response.textStream) { process.stdout.write(chunk) activitiesText += chunk } console.log('planActivities', activitiesText) return { activities: activitiesText, } }, }) function getWeatherCondition(code: number): string { const conditions: Record<number, string> = { 0: 'Clear sky', 1: 'Mainly clear', 2: 'Partly cloudy', 3: 'Overcast', 45: 'Foggy', 48: 'Depositing rime fog', 51: 'Light drizzle', 53: 'Moderate drizzle', 55: 'Dense drizzle', 61: 'Slight rain', 63: 'Moderate rain', 65: 'Heavy rain', 71: 'Slight snow fall', 73: 'Moderate snow fall', 75: 'Heavy snow fall', 95: 'Thunderstorm', } return conditions[code] || 'Unknown' } const planIndoorActivities = createStep({ id: 'plan-indoor-activities', description: 'Suggests indoor activities based on weather conditions', inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), execute: async ({ inputData, kastrax }) => { console.log('planIndoorActivities', inputData) const forecast = inputData if (!forecast) { throw new Error('Forecast data not found') } const prompt = `In case it rains, plan indoor activities for ${forecast.location} on ${forecast.date}` const agent = kastrax?.getAgent('planningAgent') if (!agent) { throw new Error('Planning agent not found') } const response = await agent.stream([ { role: 'user', content: prompt, }, ]) let activitiesText = '' for await (const chunk of response.textStream) { activitiesText += chunk } console.log('planIndoorActivities', activitiesText) return { activities: activitiesText, } }, }) const sythesizeStep = createStep({ id: 'sythesize-step', description: 'Synthesizes the results of the indoor and outdoor activities', inputSchema: z.object({ 'plan-activities': z.object({ activities: z.string(), }), 'plan-indoor-activities': z.object({ activities: z.string(), }), }), outputSchema: z.object({ activities: z.string(), }), execute: async ({ inputData, kastrax }) => { console.log('sythesizeStep', inputData) const indoorActivities = inputData?.['plan-indoor-activities'] const outdoorActivities = inputData?.['plan-activities'] const prompt = `Indoor activtities: ${indoorActivities?.activities} Outdoor activities: ${outdoorActivities?.activities} There is a chance of rain so be prepared to do indoor activities if needed.` const agent = kastrax?.getAgent('synthesizeAgent') if (!agent) { throw new Error('Planning agent not found') } const response = await agent.stream([ { role: 'user', content: prompt, }, ]) let activitiesText = '' for await (const chunk of response.textStream) { process.stdout.write(chunk) activitiesText += chunk } return { activities: activitiesText, } }, }) const weatherWorkflow = createWorkflow({ id: 'plan-both-workflow', inputSchema: forecastSchema, outputSchema: z.object({ activities: z.string(), }), steps: [planActivities, planIndoorActivities, sythesizeStep], }) // run `planActivities` and `planIndoorActivities` in parallel // `synthesizeStep` waits for both steps to be completed before executing. .parallel([planActivities, planIndoorActivities]) .then(sythesizeStep) .commit() export { weatherWorkflow }

Register Agent and Workflow instances with Kastrax class ✅

Register the agents and workflow with the kastrax instance. This is critical for enabling access to the agents within the workflow.

index.ts
import { Kastrax } from '@kastrax/core/kastrax' import { createLogger } from '@kastrax/core/logger' import { weatherWorkflow } from './workflows' import { planningAgent, synthesizeAgent } from './agents' const kastrax = new Kastrax({ vnext_workflows: { weatherWorkflow, }, agents: { planningAgent, synthesizeAgent }, logger: createLogger({ name: 'Kastrax', level: 'info', }), }) export { kastrax }

Execute the weather workflow ✅

Here, we’ll get the weather workflow from the kastrax instance, then create a run and execute the created run with the required inputData.

exec.ts
import { kastrax } from "./" const workflow = kastrax.vnext_getWorkflow('weatherWorkflow') const run = workflow.createRun() const result = await run.start({ inputData: { city: 'Ibiza' } }) console.dir(result, { depth: null })





View Example on GitHub
Last updated on