A2A Capabilities ✅
Capabilities are the core functional units of the A2A protocol, representing the functions that agents can expose to other agents. This guide explains how to define, implement, and use capabilities in Kastrax.
What Are Capabilities? ✅
In the A2A protocol, capabilities are well-defined functions that agents can expose to other agents. Each capability has:
- A unique identifier
- A human-readable name and description
- A set of parameters with types and descriptions
- A return type
- Optional examples
Defining Capabilities ✅
You can define capabilities using the Kastrax DSL:
import ai.kastrax.a2a.a2a
import ai.kastrax.a2a.agent.a2aAgent
import ai.kastrax.a2a.model.AuthType
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val a2aInstance = a2a {
a2aAgent {
id = "weather-agent"
name = "Weather Agent"
description = "Provides weather information"
// Define a capability
capability {
id = "get_weather"
name = "Get Weather"
description = "Gets the current weather for a location"
// Define parameters
parameter {
name = "location"
type = "string"
description = "The location to get weather for"
required = true
}
parameter {
name = "units"
type = "string"
description = "The units to use (metric or imperial)"
required = false
}
// Define return type
returnType = "json"
// Add examples
example {
input = mapOf(
"location" to "New York",
"units" to "metric"
)
output = mapOf(
"temperature" to "22",
"conditions" to "Sunny",
"humidity" to "65%"
)
description = "Get weather for New York in metric units"
}
}
// Configure authentication
authentication {
type = AuthType.API_KEY
}
}
}
}
Implementing Capability Handlers ✅
You need to implement handlers for your capabilities:
import ai.kastrax.a2a.a2a
import ai.kastrax.a2a.agent.a2aAgent
import ai.kastrax.a2a.model.AuthType
import ai.kastrax.a2a.model.InvokeRequest
import ai.kastrax.a2a.model.InvokeResponse
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.json.JsonPrimitive
import kotlinx.serialization.json.buildJsonObject
import kotlinx.serialization.json.put
fun main() = runBlocking {
val a2aInstance = a2a {
a2aAgent {
id = "weather-agent"
name = "Weather Agent"
description = "Provides weather information"
// Define a capability
capability {
id = "get_weather"
name = "Get Weather"
description = "Gets the current weather for a location"
// Define parameters
parameter {
name = "location"
type = "string"
description = "The location to get weather for"
required = true
}
parameter {
name = "units"
type = "string"
description = "The units to use (metric or imperial)"
required = false
}
// Define return type
returnType = "json"
}
// Implement capability handler
handler("get_weather") { request: InvokeRequest ->
// Extract parameters
val location = request.parameters["location"]?.toString() ?: ""
val units = request.parameters["units"]?.toString() ?: "metric"
// Fetch weather data (simplified example)
val weatherData = fetchWeatherData(location, units)
// Create response
InvokeResponse(
id = request.id,
result = buildJsonObject {
put("temperature", JsonPrimitive(weatherData.temperature))
put("conditions", JsonPrimitive(weatherData.conditions))
put("humidity", JsonPrimitive(weatherData.humidity))
}
)
}
// Configure authentication
authentication {
type = AuthType.API_KEY
}
}
}
}
// Simplified weather data class
data class WeatherData(
val temperature: String,
val conditions: String,
val humidity: String
)
// Simplified weather data fetching function
fun fetchWeatherData(location: String, units: String): WeatherData {
// In a real implementation, this would call a weather API
return WeatherData(
temperature = "22",
conditions = "Sunny",
humidity = "65%"
)
}
Invoking Capabilities ✅
You can invoke capabilities from other agents:
import ai.kastrax.a2a.client.A2AClient
import ai.kastrax.a2a.client.A2AClientConfig
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.json.JsonPrimitive
import kotlinx.serialization.json.jsonObject
fun main() = runBlocking {
// Create an A2A client
val client = A2AClient(
A2AClientConfig(
serverUrl = "http://localhost:8080",
apiKey = "your-api-key"
)
)
// Invoke the get_weather capability
val response = client.invoke(
"get_weather",
mapOf(
"location" to "New York",
"units" to "metric"
)
)
// Process the response
val result = response.result.jsonObject
val temperature = result["temperature"]?.jsonPrimitive?.content
val conditions = result["conditions"]?.jsonPrimitive?.content
val humidity = result["humidity"]?.jsonPrimitive?.content
println("Weather in New York:")
println("Temperature: $temperature°C")
println("Conditions: $conditions")
println("Humidity: $humidity")
// Close the client
client.close()
}
Capability Discovery ✅
Agents can discover capabilities of other agents:
import ai.kastrax.a2a.client.A2AClient
import ai.kastrax.a2a.client.A2AClientConfig
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
// Create an A2A client
val client = A2AClient(
A2AClientConfig(
serverUrl = "http://localhost:8080",
apiKey = "your-api-key"
)
)
// Get the agent card
val agentCard = client.getAgentCard()
// List all capabilities
println("Agent: ${agentCard.name}")
println("Capabilities:")
for (capability in agentCard.capabilities) {
println("- ${capability.name}: ${capability.description}")
println(" Parameters:")
for (parameter in capability.parameters) {
val requiredText = if (parameter.required) "required" else "optional"
println(" - ${parameter.name} (${parameter.type}, $requiredText): ${parameter.description}")
}
println(" Return type: ${capability.returnType}")
}
// Close the client
client.close()
}
Capability Validation ✅
The A2A protocol validates capability parameters:
import ai.kastrax.a2a.a2a
import ai.kastrax.a2a.agent.a2aAgent
import ai.kastrax.a2a.model.AuthType
import ai.kastrax.a2a.model.JsonObject
import ai.kastrax.a2a.validation.ParameterValidator
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val a2aInstance = a2a {
a2aAgent {
id = "data-validator-agent"
name = "Data Validator Agent"
description = "Validates data against schemas"
// Define a capability with schema validation
capability {
id = "validate_user"
name = "Validate User"
description = "Validates user data against a schema"
// Define a parameter with a JSON schema
parameter {
name = "user_data"
type = "object"
description = "User data to validate"
required = true
schema = JsonObject(
mapOf(
"type" to JsonPrimitive("object"),
"required" to JsonArray(listOf("name", "email")),
"properties" to JsonObject(
mapOf(
"name" to JsonObject(
mapOf(
"type" to JsonPrimitive("string"),
"minLength" to JsonPrimitive(1)
)
),
"email" to JsonObject(
mapOf(
"type" to JsonPrimitive("string"),
"format" to JsonPrimitive("email")
)
),
"age" to JsonObject(
mapOf(
"type" to JsonPrimitive("number"),
"minimum" to JsonPrimitive(18)
)
)
)
)
)
)
}
// Define return type
returnType = "json"
}
// Implement capability handler with validation
handler("validate_user") { request ->
// The parameters are automatically validated against the schema
// If validation fails, an error response is returned
// Process the validated data
val userData = request.parameters["user_data"]?.jsonObject
// Return validation result
InvokeResponse(
id = request.id,
result = buildJsonObject {
put("valid", JsonPrimitive(true))
put("message", JsonPrimitive("User data is valid"))
}
)
}
// Configure authentication
authentication {
type = AuthType.API_KEY
}
}
}
}
Streaming Capabilities ✅
You can implement streaming capabilities for long-running operations:
import ai.kastrax.a2a.a2a
import ai.kastrax.a2a.agent.a2aAgent
import ai.kastrax.a2a.model.AuthType
import ai.kastrax.a2a.model.StreamingInvokeRequest
import ai.kastrax.a2a.model.StreamingInvokeResponse
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.json.JsonPrimitive
import kotlinx.serialization.json.buildJsonObject
import kotlinx.serialization.json.put
fun main() = runBlocking {
val a2aInstance = a2a {
a2aAgent {
id = "data-processor-agent"
name = "Data Processor Agent"
description = "Processes large datasets with streaming updates"
// Define a streaming capability
streamingCapability {
id = "process_dataset"
name = "Process Dataset"
description = "Processes a large dataset with streaming progress updates"
// Define parameters
parameter {
name = "dataset_url"
type = "string"
description = "URL of the dataset to process"
required = true
}
// Define return type
returnType = "json"
}
// Implement streaming capability handler
streamingHandler("process_dataset") { request: StreamingInvokeRequest ->
// Return a flow of streaming responses
flow {
val datasetUrl = request.parameters["dataset_url"]?.toString() ?: ""
// Simulate processing with progress updates
for (i in 1..10) {
val progress = i * 10
// Emit progress update
emit(
StreamingInvokeResponse(
id = request.id,
result = buildJsonObject {
put("progress", JsonPrimitive(progress))
put("status", JsonPrimitive("Processing"))
put("message", JsonPrimitive("Processed $progress% of the dataset"))
}
)
)
// Simulate processing time
delay(1000)
}
// Emit final result
emit(
StreamingInvokeResponse(
id = request.id,
result = buildJsonObject {
put("progress", JsonPrimitive(100))
put("status", JsonPrimitive("Completed"))
put("message", JsonPrimitive("Dataset processing completed"))
put("result_url", JsonPrimitive("https://example.com/results/123"))
}
)
)
}
}
// Configure authentication
authentication {
type = AuthType.API_KEY
}
}
}
}
Capability Composition ✅
You can compose capabilities from multiple agents:
import ai.kastrax.a2a.a2a
import ai.kastrax.a2a.agent.a2aAgent
import ai.kastrax.a2a.client.A2AClient
import ai.kastrax.a2a.client.A2AClientConfig
import ai.kastrax.a2a.model.AuthType
import ai.kastrax.a2a.model.InvokeRequest
import ai.kastrax.a2a.model.InvokeResponse
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.json.JsonPrimitive
import kotlinx.serialization.json.buildJsonObject
import kotlinx.serialization.json.jsonObject
import kotlinx.serialization.json.put
fun main() = runBlocking {
val a2aInstance = a2a {
a2aAgent {
id = "data-pipeline-agent"
name = "Data Pipeline Agent"
description = "Orchestrates data processing pipelines"
// Define a capability that composes other capabilities
capability {
id = "process_and_visualize"
name = "Process and Visualize"
description = "Processes a dataset and creates visualizations"
// Define parameters
parameter {
name = "dataset_url"
type = "string"
description = "URL of the dataset to process"
required = true
}
parameter {
name = "visualization_type"
type = "string"
description = "Type of visualization to create"
required = true
}
// Define return type
returnType = "json"
}
// Implement capability handler that composes other capabilities
handler("process_and_visualize") { request: InvokeRequest ->
// Extract parameters
val datasetUrl = request.parameters["dataset_url"]?.toString() ?: ""
val visualizationType = request.parameters["visualization_type"]?.toString() ?: ""
// Create clients for other agents
val processorClient = A2AClient(
A2AClientConfig(
serverUrl = "http://localhost:8081",
apiKey = "processor-api-key"
)
)
val visualizerClient = A2AClient(
A2AClientConfig(
serverUrl = "http://localhost:8082",
apiKey = "visualizer-api-key"
)
)
try {
// Step 1: Process the dataset
val processingResponse = processorClient.invoke(
"process_dataset",
mapOf(
"dataset_url" to datasetUrl
)
)
val processedDataUrl = processingResponse.result.jsonObject["result_url"]?.jsonPrimitive?.content
?: throw Exception("Processing failed")
// Step 2: Create visualizations
val visualizationResponse = visualizerClient.invoke(
"create_visualization",
mapOf(
"data_url" to processedDataUrl,
"visualization_type" to visualizationType
)
)
val visualizationUrl = visualizationResponse.result.jsonObject["visualization_url"]?.jsonPrimitive?.content
?: throw Exception("Visualization failed")
// Return combined result
InvokeResponse(
id = request.id,
result = buildJsonObject {
put("processed_data_url", JsonPrimitive(processedDataUrl))
put("visualization_url", JsonPrimitive(visualizationUrl))
put("status", JsonPrimitive("Completed"))
}
)
} finally {
// Close clients
processorClient.close()
visualizerClient.close()
}
}
// Configure authentication
authentication {
type = AuthType.API_KEY
}
}
}
}
Next Steps ✅
Now that you understand A2A capabilities, you can:
- Learn about A2A security
- Explore multi-agent systems
- Implement A2A workflows
Last updated on