Parts 3 and 4 built the server side: A2A-compliant agent servers in Node.js, Python, and C# that receive tasks, process them, and stream results back. Now we build the other side of the protocol: the client agent, also called the orchestrator.
The orchestrator is responsible for three things: discovering what remote agents can do by reading their Agent Cards, routing tasks to the right agent based on declared skills, and managing the full task lifecycle including concurrent execution, streaming updates, and multi-turn interactions. By the end of this post you will have a working orchestrator in Node.js that you can run directly against the servers built in Parts 3 and 4.
What the Orchestrator Does
flowchart TD
OA[Orchestrator Agent] -->|1 - Fetch Agent Cards| R[Agent Registry]
R -->|Agent Cards cached| OA
OA -->|2 - Match task to skill| SM[Skill Matcher]
SM -->|Returns best agent URL| OA
OA -->|3 - Delegate via A2A| IA[Inventory Agent]
OA -->|3 - Delegate via A2A| PA[Procurement Agent]
OA -->|3 - Delegate via A2A| SA[Supplier Agent]
IA -->|SSE stream| OA
PA -->|SSE stream| OA
SA -->|SSE stream| OA
OA -->|4 - Aggregate results| Result[Final Output]
The orchestrator never contains business logic about what the remote agents do. It only knows how to discover agents, match tasks to skills, and manage the communication. This separation is what makes the architecture composable and extensible.
Project Setup
mkdir a2a-orchestrator
cd a2a-orchestrator
npm init -y
npm install node-fetch eventsource dotenv uuidProject structure:
a2a-orchestrator/
src/
agentRegistry.js # Agent Card fetching and caching
skillMatcher.js # Route tasks to the right agent
a2aClient.js # Low-level A2A HTTP client
orchestrator.js # High-level task delegation and lifecycle management
workflows/
supplyChain.js # Example multi-agent workflow
index.js # Entry point / demo runner
.env# .env
INVENTORY_AGENT_URL=http://localhost:3000
PROCUREMENT_AGENT_URL=http://localhost:3001
API_TOKEN=dev-token-12345The Agent Registry
The registry fetches and caches Agent Cards from remote agents. It also exposes a method to register agent URLs manually, which is useful when you are bootstrapping a system where agents cannot be auto-discovered.
// src/agentRegistry.js
import fetch from "node-fetch";
const CACHE_TTL_MS = 5 * 60 * 1000; // 5 minutes
export class AgentRegistry {
constructor() {
// Map of baseUrl -> { card, fetchedAt }
this._cache = new Map();
// Manually registered agent URLs
this._registeredUrls = new Set();
}
register(baseUrl) {
this._registeredUrls.add(baseUrl.replace(/\/$/, ""));
return this;
}
async fetchCard(baseUrl) {
const url = baseUrl.replace(/\/$/, "");
const cached = this._cache.get(url);
if (cached && Date.now() - cached.fetchedAt < CACHE_TTL_MS) {
return cached.card;
}
const response = await fetch(`${url}/.well-known/agent.json`, {
headers: { Accept: "application/json" },
timeout: 5000,
});
if (!response.ok) {
throw new Error(
`Failed to fetch Agent Card from ${url}: HTTP ${response.status}`
);
}
const card = await response.json();
this._cache.set(url, { card, fetchedAt: Date.now() });
console.log(`[Registry] Fetched Agent Card: ${card.name} (${url})`);
return card;
}
async loadAll() {
const results = await Promise.allSettled(
[...this._registeredUrls].map(async (url) => {
const card = await this.fetchCard(url);
return { url, card };
})
);
const agents = [];
for (const result of results) {
if (result.status === "fulfilled") {
agents.push(result.value);
} else {
console.warn(`[Registry] Failed to load agent: ${result.reason.message}`);
}
}
return agents;
}
getCached(baseUrl) {
const url = baseUrl.replace(/\/$/, "");
return this._cache.get(url)?.card || null;
}
invalidate(baseUrl) {
this._cache.delete(baseUrl.replace(/\/$/, ""));
}
}The Skill Matcher
The skill matcher takes a natural language task description and a list of loaded Agent Cards, then returns the best matching agent. In production this could use an embedding model for semantic matching. This implementation uses keyword scoring, which is fast, deterministic, and transparent enough for most enterprise routing needs.
// src/skillMatcher.js
export class SkillMatcher {
/**
* Find the best agent for a given task description.
* @param {string} taskText - Natural language task description
* @param {Array} agents - Array of { url, card } objects from AgentRegistry
* @returns {{ url, card, skill, score } | null}
*/
findBestMatch(taskText, agents) {
const text = taskText.toLowerCase();
let best = null;
let bestScore = 0;
for (const { url, card } of agents) {
for (const skill of card.skills || []) {
const score = this._scoreSkill(text, skill);
if (score > bestScore) {
bestScore = score;
best = { url, card, skill, score };
}
}
}
if (!best) {
console.warn(`[SkillMatcher] No matching agent found for: "${taskText}"`);
} else {
console.log(
`[SkillMatcher] Matched "${taskText}" -> ${best.card.name} / ${best.skill.name} (score: ${best.score})`
);
}
return best;
}
/**
* Find all agents that can handle a given task (useful for parallel execution).
*/
findAllMatches(taskText, agents, minScore = 1) {
const text = taskText.toLowerCase();
const matches = [];
for (const { url, card } of agents) {
for (const skill of card.skills || []) {
const score = this._scoreSkill(text, skill);
if (score >= minScore) {
matches.push({ url, card, skill, score });
}
}
}
return matches.sort((a, b) => b.score - a.score);
}
_scoreSkill(text, skill) {
let score = 0;
const fields = [
skill.name?.toLowerCase() || "",
skill.description?.toLowerCase() || "",
(skill.tags || []).join(" ").toLowerCase(),
(skill.examples || []).join(" ").toLowerCase(),
];
for (const field of fields) {
const words = field.split(/\s+/);
for (const word of words) {
if (word.length > 3 && text.includes(word)) {
score += field === fields[0] ? 3 : field === fields[1] ? 2 : 1;
}
}
}
return score;
}
}The A2A Client
The A2A client handles the low-level HTTP communication. It wraps both the synchronous tasks/send call and the streaming tasks/sendSubscribe call, and exposes clean async interfaces for the orchestrator to use.
// src/a2aClient.js
import fetch from "node-fetch";
import { EventSource } from "eventsource";
import { v4 as uuidv4 } from "uuid";
export class A2AClient {
constructor(token) {
this._token = token;
}
_headers() {
return {
"Content-Type": "application/json",
Authorization: `Bearer ${this._token}`,
};
}
_rpcBody(method, params) {
return JSON.stringify({
jsonrpc: "2.0",
method,
id: uuidv4(),
params,
});
}
/**
* Send a task and wait for the final result (synchronous).
*/
async sendTask(agentUrl, taskId, message, pushNotification = null) {
const params = { id: taskId, message };
if (pushNotification) params.pushNotification = pushNotification;
const response = await fetch(agentUrl, {
method: "POST",
headers: this._headers(),
body: this._rpcBody("tasks/send", params),
});
const data = await response.json();
if (data.error) {
throw new Error(`A2A error ${data.error.code}: ${data.error.message}`);
}
return data.result;
}
/**
* Send a task and receive streaming SSE updates.
* Returns an async generator that yields parsed SSE events.
*/
async *sendTaskStream(agentUrl, taskId, message) {
const body = this._rpcBody("tasks/sendSubscribe", { id: taskId, message });
const token = this._token;
// We use a manual fetch + readable stream approach for Node.js SSE
const response = await fetch(agentUrl, {
method: "POST",
headers: { ...this._headers(), Accept: "text/event-stream" },
body,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status} from ${agentUrl}`);
}
// Read the SSE stream line by line
let buffer = "";
for await (const chunk of response.body) {
buffer += chunk.toString();
const lines = buffer.split("\n");
buffer = lines.pop(); // Keep incomplete last line
for (const line of lines) {
if (line.startsWith("data: ")) {
const raw = line.slice(6).trim();
if (!raw || raw === "[DONE]") continue;
try {
const event = JSON.parse(raw);
yield event;
} catch {
// Skip malformed events
}
}
}
}
}
/**
* Poll for a task's current state.
*/
async getTask(agentUrl, taskId) {
const response = await fetch(agentUrl, {
method: "POST",
headers: this._headers(),
body: this._rpcBody("tasks/get", { id: taskId }),
});
const data = await response.json();
if (data.error) throw new Error(`A2A error: ${data.error.message}`);
return data.result;
}
/**
* Cancel a task.
*/
async cancelTask(agentUrl, taskId) {
const response = await fetch(agentUrl, {
method: "POST",
headers: this._headers(),
body: this._rpcBody("tasks/cancel", { id: taskId }),
});
const data = await response.json();
if (data.error) throw new Error(`A2A error: ${data.error.message}`);
return data.result;
}
}The Orchestrator
The orchestrator ties everything together. It exposes high-level methods for delegating single tasks, running tasks concurrently across multiple agents, and handling multi-turn interactions when an agent enters the input_required state.
// src/orchestrator.js
import { v4 as uuidv4 } from "uuid";
import { A2AClient } from "./a2aClient.js";
import { SkillMatcher } from "./skillMatcher.js";
const TERMINAL_STATES = new Set(["completed", "failed", "canceled"]);
export class Orchestrator {
constructor(registry, token) {
this._registry = registry;
this._client = new A2AClient(token);
this._matcher = new SkillMatcher();
this._agents = []; // loaded from registry
}
async initialize() {
this._agents = await this._registry.loadAll();
console.log(`[Orchestrator] Loaded ${this._agents.length} agent(s)`);
return this;
}
/**
* Delegate a single task to the best matching agent.
* Streams progress to the console and returns the final artifacts.
*/
async delegate(taskDescription, onEvent = null) {
const match = this._matcher.findBestMatch(taskDescription, this._agents);
if (!match) throw new Error(`No agent found for: "${taskDescription}"`);
const taskId = uuidv4();
const message = this._textMessage(taskDescription);
console.log(`\n[Orchestrator] Delegating to ${match.card.name}`);
console.log(`[Orchestrator] Task ID: ${taskId}`);
return await this._executeWithStreaming(
match.url + "/a2a",
taskId,
message,
match.card.name,
onEvent
);
}
/**
* Run multiple tasks concurrently, each routed to the best agent.
* Returns an array of results in the same order as the input tasks.
*/
async delegateConcurrent(tasks) {
console.log(`\n[Orchestrator] Running ${tasks.length} tasks concurrently`);
const promises = tasks.map(async ({ description, onEvent }) => {
try {
return await this.delegate(description, onEvent);
} catch (err) {
return { error: err.message };
}
});
return await Promise.all(promises);
}
/**
* Delegate a task with automatic multi-turn handling.
* When the agent enters input_required, calls the inputProvider callback
* to get the next message, then resumes the task.
*/
async delegateWithMultiTurn(taskDescription, inputProvider) {
const match = this._matcher.findBestMatch(taskDescription, this._agents);
if (!match) throw new Error(`No agent found for: "${taskDescription}"`);
const agentUrl = match.url + "/a2a";
const taskId = uuidv4();
console.log(`\n[Orchestrator] Multi-turn delegation to ${match.card.name}`);
let message = this._textMessage(taskDescription);
let artifacts = [];
let turnCount = 0;
const MAX_TURNS = 10;
while (turnCount < MAX_TURNS) {
turnCount++;
console.log(`[Orchestrator] Turn ${turnCount}`);
const result = await this._executeWithStreaming(
agentUrl,
taskId,
message,
match.card.name
);
const state = result.status?.state;
if (state === "completed") {
artifacts = result.artifacts || [];
console.log(`[Orchestrator] Task completed after ${turnCount} turn(s)`);
break;
}
if (state === "input_required") {
// Extract what the agent is asking for
const agentQuestion = this._extractLastAgentMessage(result);
console.log(`[Orchestrator] Agent needs input: ${agentQuestion}`);
// Ask the input provider for the next message
const userInput = await inputProvider(agentQuestion, result);
if (!userInput) {
console.log("[Orchestrator] No input provided, canceling task");
await this._client.cancelTask(agentUrl, taskId);
break;
}
message = this._textMessage(userInput);
continue;
}
if (state === "failed" || state === "canceled") {
console.error(`[Orchestrator] Task ended with state: ${state}`);
break;
}
}
return artifacts;
}
/**
* Execute a task with SSE streaming, printing progress and returning final result.
*/
async _executeWithStreaming(agentUrl, taskId, message, agentName, onEvent = null) {
let lastResult = null;
try {
for await (const event of this._client.sendTaskStream(agentUrl, taskId, message)) {
this._logEvent(event, agentName);
if (onEvent) onEvent(event);
lastResult = event;
// Stop consuming the stream once we hit a terminal state
const state = event.status?.state;
if (state && TERMINAL_STATES.has(state)) break;
// Also stop on input_required - caller handles the next turn
if (state === "input_required") break;
}
} catch (err) {
console.error(`[Orchestrator] Stream error for task ${taskId}: ${err.message}`);
// Fall back to polling
lastResult = await this._client.getTask(agentUrl, taskId);
}
// Fetch the full final task state (artifacts may arrive in multiple chunks)
return await this._client.getTask(agentUrl, taskId);
}
_logEvent(event, agentName) {
if (event.type === "taskStatusUpdate") {
const state = event.status?.state;
const msg = event.status?.message?.parts?.[0]?.text;
console.log(` [${agentName}] ${state}${msg ? `: ${msg}` : ""}`);
} else if (event.type === "taskArtifactUpdate") {
console.log(` [${agentName}] artifact received: ${event.artifact?.name}`);
}
}
_extractLastAgentMessage(result) {
const messages = result.messages || [];
for (let i = messages.length - 1; i >= 0; i--) {
if (messages[i].role === "agent") {
return messages[i].parts?.[0]?.text || "";
}
}
return "";
}
_textMessage(text) {
return { role: "user", parts: [{ type: "text", text }] };
}
}A Real Multi-Agent Workflow
With the orchestrator in place, building a multi-agent workflow becomes straightforward. Here is a supply chain workflow that runs an inventory check and a procurement status check concurrently, then conditionally triggers a reorder based on the results:
// workflows/supplyChain.js
export async function runSupplyChainWorkflow(orchestrator) {
console.log("=== Supply Chain Workflow Starting ===\n");
// Step 1: Run inventory check and procurement status concurrently
const [inventoryResult, procurementResult] = await orchestrator.delegateConcurrent([
{
description: "Check stock for SKU-12345 and SKU-67890",
onEvent: (e) => {
if (e.type === "taskStatusUpdate" && e.status?.state === "working") {
process.stdout.write(".");
}
},
},
{
description: "Check stock for SKU-99999",
},
]);
console.log("\n\n--- Inventory Results ---");
// Extract alerts from inventory check
const inventoryData = inventoryResult?.artifacts?.[0]?.parts?.[0]?.data;
const alerts = inventoryData?.alerts || [];
if (alerts.length > 0) {
console.log("Alerts detected:");
alerts.forEach((a) => console.log(` - ${a}`));
// Step 2: Trigger reorder with multi-turn interaction
console.log("\n--- Starting Reorder Workflow ---");
const reorderArtifacts = await orchestrator.delegateWithMultiTurn(
"Trigger reorder for low stock items",
async (agentQuestion) => {
// In production, this would be a human approval step or another agent's decision.
// For demo, we auto-approve 250 units.
console.log(`\nAgent asks: ${agentQuestion}`);
console.log("Auto-approving: 250 units");
return "Proceed with 250 units.";
}
);
console.log("\n--- Purchase Order Generated ---");
if (reorderArtifacts.length > 0) {
const po = reorderArtifacts[0]?.parts?.[0]?.data;
console.log(`PO Number: ${po?.poNumber}`);
console.log(`SKU: ${po?.sku}`);
console.log(`Quantity: ${po?.quantity}`);
console.log(`Total Value: $${po?.totalValue}`);
console.log(`Status: ${po?.status}`);
}
} else {
console.log("All stock levels healthy. No reorder needed.");
}
console.log("\n=== Supply Chain Workflow Complete ===");
}Entry Point
// index.js
import "dotenv/config";
import { AgentRegistry } from "./src/agentRegistry.js";
import { Orchestrator } from "./src/orchestrator.js";
import { runSupplyChainWorkflow } from "./workflows/supplyChain.js";
async function main() {
const token = process.env.API_TOKEN || "dev-token-12345";
const registry = new AgentRegistry()
.register(process.env.INVENTORY_AGENT_URL || "http://localhost:3000")
.register(process.env.PROCUREMENT_AGENT_URL || "http://localhost:3001");
const orchestrator = new Orchestrator(registry, token);
await orchestrator.initialize();
// Run a single task
console.log("--- Single Task Demo ---");
const result = await orchestrator.delegate("Check stock for SKU-12345");
console.log("Artifacts:", JSON.stringify(result.artifacts, null, 2));
// Run the full supply chain workflow
await runSupplyChainWorkflow(orchestrator);
}
main().catch(console.error);Running the Full System
Start the agent servers from Parts 3 and 4 in separate terminals, then run the orchestrator:
# Terminal 1 - Inventory Agent (Node.js from Part 3)
cd a2a-inventory-agent
node server.js
# Terminal 2 - Second agent instance on port 3001 (or Python/C# from Part 4)
PORT=3001 node server.js
# Terminal 3 - Orchestrator
cd a2a-orchestrator
node index.jsYou should see output like this:
[Registry] Fetched Agent Card: Inventory Management Agent (http://localhost:3000)
[Registry] Fetched Agent Card: Inventory Management Agent (http://localhost:3001)
[Orchestrator] Loaded 2 agent(s)
--- Single Task Demo ---
[SkillMatcher] Matched "Check stock for SKU-12345" -> Inventory Management Agent / Check Stock Level (score: 9)
[Orchestrator] Delegating to Inventory Management Agent
[Inventory Management Agent] submitted
[Inventory Management Agent] working: Parsing SKU identifiers from request...
[Inventory Management Agent] working: Querying inventory database for 1 SKU(s)...
[Inventory Management Agent] artifact received: stock-levels
[Inventory Management Agent] completedHandling Agent Unavailability
Production orchestrators need to handle agents that are temporarily unavailable. Here is a retry wrapper you can add to the A2A client:
// Add to src/a2aClient.js
async sendTaskWithRetry(agentUrl, taskId, message, options = {}) {
const { maxRetries = 3, backoffMs = 1000, retryOn = [503, 504] } = options;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await this.sendTask(agentUrl, taskId, message);
} catch (err) {
const isRetryable = err.message.includes("retryable") ||
retryOn.some(code => err.message.includes(String(code)));
if (!isRetryable || attempt === maxRetries) throw err;
const delay = backoffMs * Math.pow(2, attempt - 1); // exponential backoff
console.warn(`[A2AClient] Retry ${attempt}/${maxRetries} in ${delay}ms: ${err.message}`);
await new Promise(r => setTimeout(r, delay));
}
}
}Complete Orchestration Flow
sequenceDiagram
participant I as index.js
participant O as Orchestrator
participant R as AgentRegistry
participant SM as SkillMatcher
participant C as A2AClient
participant IA as Inventory Agent
I->>O: initialize()
O->>R: loadAll()
R->>IA: GET /.well-known/agent.json
IA-->>R: Agent Card
R-->>O: [{url, card}]
I->>O: delegate("Check stock SKU-12345")
O->>SM: findBestMatch(text, agents)
SM-->>O: {url, card, skill}
O->>C: sendTaskStream(agentUrl, taskId, message)
C->>IA: POST /a2a tasks/sendSubscribe
IA-->>C: SSE: submitted
IA-->>C: SSE: working
IA-->>C: SSE: artifact received
IA-->>C: SSE: completed
C-->>O: async generator events
O->>C: getTask(agentUrl, taskId)
C->>IA: POST /a2a tasks/get
IA-->>C: full task result
C-->>O: result
O-->>I: artifacts
Key Design Decisions
Agent Card caching. Fetching an Agent Card on every task would be wasteful and fragile. The 5-minute TTL cache means the orchestrator can survive short-lived network blips during a workflow without failing discovery. The invalidate() method lets you force a refresh when you know an agent has been updated.
Skill matching over hard-coded routing. Hard-coding which agent handles which task creates a maintenance problem the moment you add or remove agents. Skill-based routing means adding a new agent with new skills is zero-config from the orchestrator’s perspective.
Streaming with a polling fallback. SSE connections can drop in environments with aggressive load balancers or proxies. The _executeWithStreaming method catches stream errors and falls back to a tasks/get poll, so tasks always complete even if the stream breaks mid-way.
Multi-turn as a loop, not recursion. Using a while loop with a max-turns guard is safer than recursion for multi-turn interactions. It prevents stack overflows on long conversations and gives you a natural place to add turn-counting metrics.
What is Missing for Production
This orchestrator is complete and functional but deliberately omits a few production concerns: JWT verification on the orchestrator’s own incoming requests, persistent task storage for recovery after restarts, distributed tracing with correlation IDs across agent hops, and rate limiting to prevent a single workflow from flooding a remote agent. Parts 6 and 8 address all of these directly.
In Part 6, we harden everything with production-grade security: JWT verification, mutual TLS, RBAC, and Agent Card signing. That is where the implementation moves from working to enterprise-ready.
References
- A2A Protocol – Official Specification (https://a2a-protocol.org/latest/specification/)
- GitHub – A2A Protocol Repository, Linux Foundation (https://github.com/a2aproject/A2A)
- Node.js – EventEmitter Documentation (https://nodejs.org/api/events.html)
- MDN Web Docs – Using Server-Sent Events (https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
- JSON-RPC 2.0 Specification (https://www.jsonrpc.org/specification)
