Agent Discovery and Orchestration: Building the Client Agent (Part 5 of 8)

Agent Discovery and Orchestration: Building the Client Agent (Part 5 of 8)

Parts 3 and 4 built the server side: A2A-compliant agent servers in Node.js, Python, and C# that receive tasks, process them, and stream results back. Now we build the other side of the protocol: the client agent, also called the orchestrator.

The orchestrator is responsible for three things: discovering what remote agents can do by reading their Agent Cards, routing tasks to the right agent based on declared skills, and managing the full task lifecycle including concurrent execution, streaming updates, and multi-turn interactions. By the end of this post you will have a working orchestrator in Node.js that you can run directly against the servers built in Parts 3 and 4.

What the Orchestrator Does

flowchart TD
    OA[Orchestrator Agent] -->|1 - Fetch Agent Cards| R[Agent Registry]
    R -->|Agent Cards cached| OA

    OA -->|2 - Match task to skill| SM[Skill Matcher]
    SM -->|Returns best agent URL| OA

    OA -->|3 - Delegate via A2A| IA[Inventory Agent]
    OA -->|3 - Delegate via A2A| PA[Procurement Agent]
    OA -->|3 - Delegate via A2A| SA[Supplier Agent]

    IA -->|SSE stream| OA
    PA -->|SSE stream| OA
    SA -->|SSE stream| OA

    OA -->|4 - Aggregate results| Result[Final Output]

The orchestrator never contains business logic about what the remote agents do. It only knows how to discover agents, match tasks to skills, and manage the communication. This separation is what makes the architecture composable and extensible.

Project Setup

mkdir a2a-orchestrator
cd a2a-orchestrator
npm init -y
npm install node-fetch eventsource dotenv uuid

Project structure:

a2a-orchestrator/
  src/
    agentRegistry.js      # Agent Card fetching and caching
    skillMatcher.js       # Route tasks to the right agent
    a2aClient.js          # Low-level A2A HTTP client
    orchestrator.js       # High-level task delegation and lifecycle management
  workflows/
    supplyChain.js        # Example multi-agent workflow
  index.js                # Entry point / demo runner
  .env
# .env
INVENTORY_AGENT_URL=http://localhost:3000
PROCUREMENT_AGENT_URL=http://localhost:3001
API_TOKEN=dev-token-12345

The Agent Registry

The registry fetches and caches Agent Cards from remote agents. It also exposes a method to register agent URLs manually, which is useful when you are bootstrapping a system where agents cannot be auto-discovered.

// src/agentRegistry.js

import fetch from "node-fetch";

const CACHE_TTL_MS = 5 * 60 * 1000; // 5 minutes

export class AgentRegistry {
  constructor() {
    // Map of baseUrl -> { card, fetchedAt }
    this._cache = new Map();
    // Manually registered agent URLs
    this._registeredUrls = new Set();
  }

  register(baseUrl) {
    this._registeredUrls.add(baseUrl.replace(/\/$/, ""));
    return this;
  }

  async fetchCard(baseUrl) {
    const url = baseUrl.replace(/\/$/, "");
    const cached = this._cache.get(url);

    if (cached && Date.now() - cached.fetchedAt < CACHE_TTL_MS) {
      return cached.card;
    }

    const response = await fetch(`${url}/.well-known/agent.json`, {
      headers: { Accept: "application/json" },
      timeout: 5000,
    });

    if (!response.ok) {
      throw new Error(
        `Failed to fetch Agent Card from ${url}: HTTP ${response.status}`
      );
    }

    const card = await response.json();
    this._cache.set(url, { card, fetchedAt: Date.now() });
    console.log(`[Registry] Fetched Agent Card: ${card.name} (${url})`);
    return card;
  }

  async loadAll() {
    const results = await Promise.allSettled(
      [...this._registeredUrls].map(async (url) => {
        const card = await this.fetchCard(url);
        return { url, card };
      })
    );

    const agents = [];
    for (const result of results) {
      if (result.status === "fulfilled") {
        agents.push(result.value);
      } else {
        console.warn(`[Registry] Failed to load agent: ${result.reason.message}`);
      }
    }
    return agents;
  }

  getCached(baseUrl) {
    const url = baseUrl.replace(/\/$/, "");
    return this._cache.get(url)?.card || null;
  }

  invalidate(baseUrl) {
    this._cache.delete(baseUrl.replace(/\/$/, ""));
  }
}

The Skill Matcher

The skill matcher takes a natural language task description and a list of loaded Agent Cards, then returns the best matching agent. In production this could use an embedding model for semantic matching. This implementation uses keyword scoring, which is fast, deterministic, and transparent enough for most enterprise routing needs.

// src/skillMatcher.js

export class SkillMatcher {
  /**
   * Find the best agent for a given task description.
   * @param {string} taskText - Natural language task description
   * @param {Array} agents - Array of { url, card } objects from AgentRegistry
   * @returns {{ url, card, skill, score } | null}
   */
  findBestMatch(taskText, agents) {
    const text = taskText.toLowerCase();
    let best = null;
    let bestScore = 0;

    for (const { url, card } of agents) {
      for (const skill of card.skills || []) {
        const score = this._scoreSkill(text, skill);
        if (score > bestScore) {
          bestScore = score;
          best = { url, card, skill, score };
        }
      }
    }

    if (!best) {
      console.warn(`[SkillMatcher] No matching agent found for: "${taskText}"`);
    } else {
      console.log(
        `[SkillMatcher] Matched "${taskText}" -> ${best.card.name} / ${best.skill.name} (score: ${best.score})`
      );
    }

    return best;
  }

  /**
   * Find all agents that can handle a given task (useful for parallel execution).
   */
  findAllMatches(taskText, agents, minScore = 1) {
    const text = taskText.toLowerCase();
    const matches = [];

    for (const { url, card } of agents) {
      for (const skill of card.skills || []) {
        const score = this._scoreSkill(text, skill);
        if (score >= minScore) {
          matches.push({ url, card, skill, score });
        }
      }
    }

    return matches.sort((a, b) => b.score - a.score);
  }

  _scoreSkill(text, skill) {
    let score = 0;
    const fields = [
      skill.name?.toLowerCase() || "",
      skill.description?.toLowerCase() || "",
      (skill.tags || []).join(" ").toLowerCase(),
      (skill.examples || []).join(" ").toLowerCase(),
    ];

    for (const field of fields) {
      const words = field.split(/\s+/);
      for (const word of words) {
        if (word.length > 3 && text.includes(word)) {
          score += field === fields[0] ? 3 : field === fields[1] ? 2 : 1;
        }
      }
    }

    return score;
  }
}

The A2A Client

The A2A client handles the low-level HTTP communication. It wraps both the synchronous tasks/send call and the streaming tasks/sendSubscribe call, and exposes clean async interfaces for the orchestrator to use.

// src/a2aClient.js

import fetch from "node-fetch";
import { EventSource } from "eventsource";
import { v4 as uuidv4 } from "uuid";

export class A2AClient {
  constructor(token) {
    this._token = token;
  }

  _headers() {
    return {
      "Content-Type": "application/json",
      Authorization: `Bearer ${this._token}`,
    };
  }

  _rpcBody(method, params) {
    return JSON.stringify({
      jsonrpc: "2.0",
      method,
      id: uuidv4(),
      params,
    });
  }

  /**
   * Send a task and wait for the final result (synchronous).
   */
  async sendTask(agentUrl, taskId, message, pushNotification = null) {
    const params = { id: taskId, message };
    if (pushNotification) params.pushNotification = pushNotification;

    const response = await fetch(agentUrl, {
      method: "POST",
      headers: this._headers(),
      body: this._rpcBody("tasks/send", params),
    });

    const data = await response.json();
    if (data.error) {
      throw new Error(`A2A error ${data.error.code}: ${data.error.message}`);
    }
    return data.result;
  }

  /**
   * Send a task and receive streaming SSE updates.
   * Returns an async generator that yields parsed SSE events.
   */
  async *sendTaskStream(agentUrl, taskId, message) {
    const body = this._rpcBody("tasks/sendSubscribe", { id: taskId, message });
    const token = this._token;

    // We use a manual fetch + readable stream approach for Node.js SSE
    const response = await fetch(agentUrl, {
      method: "POST",
      headers: { ...this._headers(), Accept: "text/event-stream" },
      body,
    });

    if (!response.ok) {
      throw new Error(`HTTP ${response.status} from ${agentUrl}`);
    }

    // Read the SSE stream line by line
    let buffer = "";
    for await (const chunk of response.body) {
      buffer += chunk.toString();
      const lines = buffer.split("\n");
      buffer = lines.pop(); // Keep incomplete last line

      for (const line of lines) {
        if (line.startsWith("data: ")) {
          const raw = line.slice(6).trim();
          if (!raw || raw === "[DONE]") continue;
          try {
            const event = JSON.parse(raw);
            yield event;
          } catch {
            // Skip malformed events
          }
        }
      }
    }
  }

  /**
   * Poll for a task's current state.
   */
  async getTask(agentUrl, taskId) {
    const response = await fetch(agentUrl, {
      method: "POST",
      headers: this._headers(),
      body: this._rpcBody("tasks/get", { id: taskId }),
    });
    const data = await response.json();
    if (data.error) throw new Error(`A2A error: ${data.error.message}`);
    return data.result;
  }

  /**
   * Cancel a task.
   */
  async cancelTask(agentUrl, taskId) {
    const response = await fetch(agentUrl, {
      method: "POST",
      headers: this._headers(),
      body: this._rpcBody("tasks/cancel", { id: taskId }),
    });
    const data = await response.json();
    if (data.error) throw new Error(`A2A error: ${data.error.message}`);
    return data.result;
  }
}

The Orchestrator

The orchestrator ties everything together. It exposes high-level methods for delegating single tasks, running tasks concurrently across multiple agents, and handling multi-turn interactions when an agent enters the input_required state.

// src/orchestrator.js

import { v4 as uuidv4 } from "uuid";
import { A2AClient } from "./a2aClient.js";
import { SkillMatcher } from "./skillMatcher.js";

const TERMINAL_STATES = new Set(["completed", "failed", "canceled"]);

export class Orchestrator {
  constructor(registry, token) {
    this._registry = registry;
    this._client = new A2AClient(token);
    this._matcher = new SkillMatcher();
    this._agents = []; // loaded from registry
  }

  async initialize() {
    this._agents = await this._registry.loadAll();
    console.log(`[Orchestrator] Loaded ${this._agents.length} agent(s)`);
    return this;
  }

  /**
   * Delegate a single task to the best matching agent.
   * Streams progress to the console and returns the final artifacts.
   */
  async delegate(taskDescription, onEvent = null) {
    const match = this._matcher.findBestMatch(taskDescription, this._agents);
    if (!match) throw new Error(`No agent found for: "${taskDescription}"`);

    const taskId = uuidv4();
    const message = this._textMessage(taskDescription);

    console.log(`\n[Orchestrator] Delegating to ${match.card.name}`);
    console.log(`[Orchestrator] Task ID: ${taskId}`);

    return await this._executeWithStreaming(
      match.url + "/a2a",
      taskId,
      message,
      match.card.name,
      onEvent
    );
  }

  /**
   * Run multiple tasks concurrently, each routed to the best agent.
   * Returns an array of results in the same order as the input tasks.
   */
  async delegateConcurrent(tasks) {
    console.log(`\n[Orchestrator] Running ${tasks.length} tasks concurrently`);

    const promises = tasks.map(async ({ description, onEvent }) => {
      try {
        return await this.delegate(description, onEvent);
      } catch (err) {
        return { error: err.message };
      }
    });

    return await Promise.all(promises);
  }

  /**
   * Delegate a task with automatic multi-turn handling.
   * When the agent enters input_required, calls the inputProvider callback
   * to get the next message, then resumes the task.
   */
  async delegateWithMultiTurn(taskDescription, inputProvider) {
    const match = this._matcher.findBestMatch(taskDescription, this._agents);
    if (!match) throw new Error(`No agent found for: "${taskDescription}"`);

    const agentUrl = match.url + "/a2a";
    const taskId = uuidv4();

    console.log(`\n[Orchestrator] Multi-turn delegation to ${match.card.name}`);
    let message = this._textMessage(taskDescription);
    let artifacts = [];
    let turnCount = 0;
    const MAX_TURNS = 10;

    while (turnCount < MAX_TURNS) {
      turnCount++;
      console.log(`[Orchestrator] Turn ${turnCount}`);

      const result = await this._executeWithStreaming(
        agentUrl,
        taskId,
        message,
        match.card.name
      );

      const state = result.status?.state;

      if (state === "completed") {
        artifacts = result.artifacts || [];
        console.log(`[Orchestrator] Task completed after ${turnCount} turn(s)`);
        break;
      }

      if (state === "input_required") {
        // Extract what the agent is asking for
        const agentQuestion = this._extractLastAgentMessage(result);
        console.log(`[Orchestrator] Agent needs input: ${agentQuestion}`);

        // Ask the input provider for the next message
        const userInput = await inputProvider(agentQuestion, result);
        if (!userInput) {
          console.log("[Orchestrator] No input provided, canceling task");
          await this._client.cancelTask(agentUrl, taskId);
          break;
        }

        message = this._textMessage(userInput);
        continue;
      }

      if (state === "failed" || state === "canceled") {
        console.error(`[Orchestrator] Task ended with state: ${state}`);
        break;
      }
    }

    return artifacts;
  }

  /**
   * Execute a task with SSE streaming, printing progress and returning final result.
   */
  async _executeWithStreaming(agentUrl, taskId, message, agentName, onEvent = null) {
    let lastResult = null;

    try {
      for await (const event of this._client.sendTaskStream(agentUrl, taskId, message)) {
        this._logEvent(event, agentName);
        if (onEvent) onEvent(event);
        lastResult = event;

        // Stop consuming the stream once we hit a terminal state
        const state = event.status?.state;
        if (state && TERMINAL_STATES.has(state)) break;

        // Also stop on input_required - caller handles the next turn
        if (state === "input_required") break;
      }
    } catch (err) {
      console.error(`[Orchestrator] Stream error for task ${taskId}: ${err.message}`);
      // Fall back to polling
      lastResult = await this._client.getTask(agentUrl, taskId);
    }

    // Fetch the full final task state (artifacts may arrive in multiple chunks)
    return await this._client.getTask(agentUrl, taskId);
  }

  _logEvent(event, agentName) {
    if (event.type === "taskStatusUpdate") {
      const state = event.status?.state;
      const msg = event.status?.message?.parts?.[0]?.text;
      console.log(`  [${agentName}] ${state}${msg ? `: ${msg}` : ""}`);
    } else if (event.type === "taskArtifactUpdate") {
      console.log(`  [${agentName}] artifact received: ${event.artifact?.name}`);
    }
  }

  _extractLastAgentMessage(result) {
    const messages = result.messages || [];
    for (let i = messages.length - 1; i >= 0; i--) {
      if (messages[i].role === "agent") {
        return messages[i].parts?.[0]?.text || "";
      }
    }
    return "";
  }

  _textMessage(text) {
    return { role: "user", parts: [{ type: "text", text }] };
  }
}

A Real Multi-Agent Workflow

With the orchestrator in place, building a multi-agent workflow becomes straightforward. Here is a supply chain workflow that runs an inventory check and a procurement status check concurrently, then conditionally triggers a reorder based on the results:

// workflows/supplyChain.js

export async function runSupplyChainWorkflow(orchestrator) {
  console.log("=== Supply Chain Workflow Starting ===\n");

  // Step 1: Run inventory check and procurement status concurrently
  const [inventoryResult, procurementResult] = await orchestrator.delegateConcurrent([
    {
      description: "Check stock for SKU-12345 and SKU-67890",
      onEvent: (e) => {
        if (e.type === "taskStatusUpdate" && e.status?.state === "working") {
          process.stdout.write(".");
        }
      },
    },
    {
      description: "Check stock for SKU-99999",
    },
  ]);

  console.log("\n\n--- Inventory Results ---");

  // Extract alerts from inventory check
  const inventoryData = inventoryResult?.artifacts?.[0]?.parts?.[0]?.data;
  const alerts = inventoryData?.alerts || [];

  if (alerts.length > 0) {
    console.log("Alerts detected:");
    alerts.forEach((a) => console.log(`  - ${a}`));

    // Step 2: Trigger reorder with multi-turn interaction
    console.log("\n--- Starting Reorder Workflow ---");

    const reorderArtifacts = await orchestrator.delegateWithMultiTurn(
      "Trigger reorder for low stock items",
      async (agentQuestion) => {
        // In production, this would be a human approval step or another agent's decision.
        // For demo, we auto-approve 250 units.
        console.log(`\nAgent asks: ${agentQuestion}`);
        console.log("Auto-approving: 250 units");
        return "Proceed with 250 units.";
      }
    );

    console.log("\n--- Purchase Order Generated ---");
    if (reorderArtifacts.length > 0) {
      const po = reorderArtifacts[0]?.parts?.[0]?.data;
      console.log(`PO Number: ${po?.poNumber}`);
      console.log(`SKU: ${po?.sku}`);
      console.log(`Quantity: ${po?.quantity}`);
      console.log(`Total Value: $${po?.totalValue}`);
      console.log(`Status: ${po?.status}`);
    }
  } else {
    console.log("All stock levels healthy. No reorder needed.");
  }

  console.log("\n=== Supply Chain Workflow Complete ===");
}

Entry Point

// index.js

import "dotenv/config";
import { AgentRegistry } from "./src/agentRegistry.js";
import { Orchestrator } from "./src/orchestrator.js";
import { runSupplyChainWorkflow } from "./workflows/supplyChain.js";

async function main() {
  const token = process.env.API_TOKEN || "dev-token-12345";

  const registry = new AgentRegistry()
    .register(process.env.INVENTORY_AGENT_URL || "http://localhost:3000")
    .register(process.env.PROCUREMENT_AGENT_URL || "http://localhost:3001");

  const orchestrator = new Orchestrator(registry, token);
  await orchestrator.initialize();

  // Run a single task
  console.log("--- Single Task Demo ---");
  const result = await orchestrator.delegate("Check stock for SKU-12345");
  console.log("Artifacts:", JSON.stringify(result.artifacts, null, 2));

  // Run the full supply chain workflow
  await runSupplyChainWorkflow(orchestrator);
}

main().catch(console.error);

Running the Full System

Start the agent servers from Parts 3 and 4 in separate terminals, then run the orchestrator:

# Terminal 1 - Inventory Agent (Node.js from Part 3)
cd a2a-inventory-agent
node server.js

# Terminal 2 - Second agent instance on port 3001 (or Python/C# from Part 4)
PORT=3001 node server.js

# Terminal 3 - Orchestrator
cd a2a-orchestrator
node index.js

You should see output like this:

[Registry] Fetched Agent Card: Inventory Management Agent (http://localhost:3000)
[Registry] Fetched Agent Card: Inventory Management Agent (http://localhost:3001)
[Orchestrator] Loaded 2 agent(s)

--- Single Task Demo ---
[SkillMatcher] Matched "Check stock for SKU-12345" -> Inventory Management Agent / Check Stock Level (score: 9)
[Orchestrator] Delegating to Inventory Management Agent
  [Inventory Management Agent] submitted
  [Inventory Management Agent] working: Parsing SKU identifiers from request...
  [Inventory Management Agent] working: Querying inventory database for 1 SKU(s)...
  [Inventory Management Agent] artifact received: stock-levels
  [Inventory Management Agent] completed

Handling Agent Unavailability

Production orchestrators need to handle agents that are temporarily unavailable. Here is a retry wrapper you can add to the A2A client:

// Add to src/a2aClient.js

async sendTaskWithRetry(agentUrl, taskId, message, options = {}) {
  const { maxRetries = 3, backoffMs = 1000, retryOn = [503, 504] } = options;

  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      return await this.sendTask(agentUrl, taskId, message);
    } catch (err) {
      const isRetryable = err.message.includes("retryable") ||
                          retryOn.some(code => err.message.includes(String(code)));

      if (!isRetryable || attempt === maxRetries) throw err;

      const delay = backoffMs * Math.pow(2, attempt - 1); // exponential backoff
      console.warn(`[A2AClient] Retry ${attempt}/${maxRetries} in ${delay}ms: ${err.message}`);
      await new Promise(r => setTimeout(r, delay));
    }
  }
}

Complete Orchestration Flow

sequenceDiagram
    participant I as index.js
    participant O as Orchestrator
    participant R as AgentRegistry
    participant SM as SkillMatcher
    participant C as A2AClient
    participant IA as Inventory Agent

    I->>O: initialize()
    O->>R: loadAll()
    R->>IA: GET /.well-known/agent.json
    IA-->>R: Agent Card
    R-->>O: [{url, card}]

    I->>O: delegate("Check stock SKU-12345")
    O->>SM: findBestMatch(text, agents)
    SM-->>O: {url, card, skill}

    O->>C: sendTaskStream(agentUrl, taskId, message)
    C->>IA: POST /a2a tasks/sendSubscribe
    IA-->>C: SSE: submitted
    IA-->>C: SSE: working
    IA-->>C: SSE: artifact received
    IA-->>C: SSE: completed
    C-->>O: async generator events

    O->>C: getTask(agentUrl, taskId)
    C->>IA: POST /a2a tasks/get
    IA-->>C: full task result
    C-->>O: result
    O-->>I: artifacts

Key Design Decisions

Agent Card caching. Fetching an Agent Card on every task would be wasteful and fragile. The 5-minute TTL cache means the orchestrator can survive short-lived network blips during a workflow without failing discovery. The invalidate() method lets you force a refresh when you know an agent has been updated.

Skill matching over hard-coded routing. Hard-coding which agent handles which task creates a maintenance problem the moment you add or remove agents. Skill-based routing means adding a new agent with new skills is zero-config from the orchestrator’s perspective.

Streaming with a polling fallback. SSE connections can drop in environments with aggressive load balancers or proxies. The _executeWithStreaming method catches stream errors and falls back to a tasks/get poll, so tasks always complete even if the stream breaks mid-way.

Multi-turn as a loop, not recursion. Using a while loop with a max-turns guard is safer than recursion for multi-turn interactions. It prevents stack overflows on long conversations and gives you a natural place to add turn-counting metrics.

What is Missing for Production

This orchestrator is complete and functional but deliberately omits a few production concerns: JWT verification on the orchestrator’s own incoming requests, persistent task storage for recovery after restarts, distributed tracing with correlation IDs across agent hops, and rate limiting to prevent a single workflow from flooding a remote agent. Parts 6 and 8 address all of these directly.

In Part 6, we harden everything with production-grade security: JWT verification, mutual TLS, RBAC, and Agent Card signing. That is where the implementation moves from working to enterprise-ready.

References

Written by:

581 Posts

View All Posts
Follow Me :
How to whitelist website on AdBlocker?

How to whitelist website on AdBlocker?

  1. 1 Click on the AdBlock Plus icon on the top right corner of your browser
  2. 2 Click on "Enabled on this site" from the AdBlock Plus option
  3. 3 Refresh the page and start browsing the site