Building Your First A2A Agent Server in Node.js (Part 3 of 8)

Building Your First A2A Agent Server in Node.js (Part 3 of 8)

Parts 1 and 2 gave you the theory and the specification. Now we build something real. By the end of this post you will have a fully functional A2A-compliant agent server running in Node.js, complete with Agent Card discovery, task handling, SSE streaming, and push notification support.

We are building an Inventory Management Agent, the same one used as the example throughout Part 2. It exposes two skills: checking stock levels and triggering a reorder workflow. This keeps the business logic simple so we can focus entirely on the A2A implementation.

Project Setup

Create a new directory and initialize the project:

mkdir a2a-inventory-agent
cd a2a-inventory-agent
npm init -y
npm install express uuid zod

We are using Express for the HTTP server, uuid for task ID generation, and zod for request validation. No A2A-specific SDK is required since the protocol is built on standard HTTP and JSON-RPC, which Express handles natively.

The final project structure looks like this:

a2a-inventory-agent/
  src/
    agentCard.js        # Agent Card definition
    taskStore.js        # In-memory task state management
    taskHandler.js      # Business logic per skill
    routes/
      wellKnown.js      # GET /.well-known/agent.json
      tasks.js          # POST /a2a (JSON-RPC handler)
    middleware/
      auth.js           # Bearer token validation
      validate.js       # JSON-RPC request validation
  server.js             # Entry point
  .env

The Agent Card

Start with the Agent Card definition. This is the public contract that tells other agents what this server can do:

// src/agentCard.js

export const agentCard = {
  name: "Inventory Management Agent",
  description: "Manages stock levels, processes inventory queries, and triggers reorder workflows",
  version: "1.0.0",
  url: process.env.AGENT_URL || "http://localhost:3000/a2a",
  documentationUrl: "http://localhost:3000/docs",
  provider: {
    organization: "Acme Corp",
    url: "https://acmecorp.example.com"
  },
  capabilities: {
    streaming: true,
    pushNotifications: true,
    stateTransitionHistory: true
  },
  defaultInputModes: ["text", "application/json"],
  defaultOutputModes: ["text", "application/json"],
  skills: [
    {
      id: "check-stock",
      name: "Check Stock Level",
      description: "Returns current stock quantity for one or more product SKUs",
      tags: ["inventory", "stock", "query"],
      examples: [
        "What is the current stock for SKU-12345?",
        "Check inventory levels for SKU-12345 and SKU-67890"
      ],
      inputModes: ["text", "application/json"],
      outputModes: ["application/json"]
    },
    {
      id: "trigger-reorder",
      name: "Trigger Reorder",
      description: "Initiates a reorder workflow when stock falls below threshold",
      tags: ["inventory", "reorder", "procurement"],
      inputModes: ["application/json"],
      outputModes: ["application/json"]
    }
  ],
  securitySchemes: {
    bearer: {
      type: "http",
      scheme: "bearer",
      bearerFormat: "JWT"
    }
  },
  security: [{ bearer: [] }]
};

The Task Store

The task store manages the lifecycle of every task. In production you would back this with Redis or a database. For now we use an in-memory Map, which is sufficient to demonstrate the full state machine:

// src/taskStore.js

const tasks = new Map();

export const TaskState = {
  SUBMITTED: "submitted",
  WORKING: "working",
  INPUT_REQUIRED: "input_required",
  COMPLETED: "completed",
  FAILED: "failed",
  CANCELED: "canceled"
};

const TERMINAL_STATES = new Set([
  TaskState.COMPLETED,
  TaskState.FAILED,
  TaskState.CANCELED
]);

export function createTask(taskId, initialMessage, pushNotification = null) {
  const task = {
    id: taskId,
    status: {
      state: TaskState.SUBMITTED,
      timestamp: new Date().toISOString()
    },
    messages: [initialMessage],
    artifacts: [],
    pushNotification,
    sseClients: new Set(), // active SSE connections for this task
    history: [
      { state: TaskState.SUBMITTED, timestamp: new Date().toISOString() }
    ]
  };
  tasks.set(taskId, task);
  return task;
}

export function getTask(taskId) {
  return tasks.get(taskId) || null;
}

export function updateTaskState(taskId, newState, message = null) {
  const task = tasks.get(taskId);
  if (!task) throw new Error(`Task ${taskId} not found`);

  task.status = { state: newState, timestamp: new Date().toISOString() };
  if (message) task.messages.push(message);

  task.history.push({ state: newState, timestamp: new Date().toISOString() });

  // Broadcast state update to all SSE clients
  broadcastToTask(taskId, {
    type: "taskStatusUpdate",
    taskId,
    status: { ...task.status, message }
  });

  return task;
}

export function addArtifact(taskId, artifact, lastChunk = true) {
  const task = tasks.get(taskId);
  if (!task) throw new Error(`Task ${taskId} not found`);

  task.artifacts.push(artifact);

  broadcastToTask(taskId, {
    type: "taskArtifactUpdate",
    taskId,
    artifact: { ...artifact, lastChunk }
  });
}

export function addSseClient(taskId, res) {
  const task = tasks.get(taskId);
  if (task) task.sseClients.add(res);
}

export function removeSseClient(taskId, res) {
  const task = tasks.get(taskId);
  if (task) task.sseClients.delete(res);
}

export function isTerminal(state) {
  return TERMINAL_STATES.has(state);
}

function broadcastToTask(taskId, event) {
  const task = tasks.get(taskId);
  if (!task) return;

  const data = `data: ${JSON.stringify(event)}\n\n`;
  for (const client of task.sseClients) {
    try {
      client.write(data);
    } catch {
      task.sseClients.delete(client);
    }
  }
}

The Task Handler

The task handler contains the business logic. It reads the incoming message, routes to the right skill, and updates the task state as work progresses. The async processing is deliberately detached from the HTTP response so SSE streaming works correctly:

// src/taskHandler.js

import {
  updateTaskState,
  addArtifact,
  getTask,
  TaskState
} from "./taskStore.js";
import axios from "axios"; // npm install axios - for push notification delivery

// Simulated inventory database
const inventory = {
  "SKU-12345": { quantity: 240, location: "Warehouse-A", reorderThreshold: 50, unitPrice: 12.5 },
  "SKU-67890": { quantity: 12,  location: "Warehouse-B", reorderThreshold: 30, unitPrice: 42.0 },
  "SKU-99999": { quantity: 0,   location: "Warehouse-A", reorderThreshold: 20, unitPrice: 8.75 }
};

export async function processTask(taskId) {
  const task = getTask(taskId);
  if (!task) return;

  const messageText = extractText(task.messages[0]);

  try {
    await updateTaskState(taskId, TaskState.WORKING);

    // Route to the right skill based on message content
    if (messageText.toLowerCase().includes("reorder")) {
      await handleReorderSkill(taskId, task);
    } else {
      await handleCheckStockSkill(taskId, messageText);
    }
  } catch (err) {
    await updateTaskState(taskId, TaskState.FAILED, {
      role: "agent",
      parts: [{ type: "text", text: `Error processing task: ${err.message}` }]
    });
  }

  // Deliver push notification if registered
  await deliverPushNotification(taskId);
}

async function handleCheckStockSkill(taskId, messageText) {
  // Progress update 1
  await updateTaskState(taskId, TaskState.WORKING, {
    role: "agent",
    parts: [{ type: "text", text: "Parsing SKU identifiers from request..." }]
  });
  await delay(300);

  // Extract SKUs from message (simple regex for demo)
  const skus = messageText.match(/SKU-\d+/gi) || [];

  if (skus.length === 0) {
    // No SKUs found - request more input
    await updateTaskState(taskId, TaskState.INPUT_REQUIRED, {
      role: "agent",
      parts: [{ type: "text", text: "No SKU identifiers found in your request. Please provide one or more SKU IDs (e.g. SKU-12345)." }]
    });
    return;
  }

  // Progress update 2
  await updateTaskState(taskId, TaskState.WORKING, {
    role: "agent",
    parts: [{ type: "text", text: `Querying inventory database for ${skus.length} SKU(s)...` }]
  });
  await delay(400);

  // Build result
  const result = {};
  const alerts = [];

  for (const sku of skus) {
    const item = inventory[sku.toUpperCase()];
    if (item) {
      result[sku.toUpperCase()] = item;
      if (item.quantity <= item.reorderThreshold) {
        alerts.push(`${sku.toUpperCase()} is at or below reorder threshold (qty: ${item.quantity}, threshold: ${item.reorderThreshold})`);
      }
    } else {
      result[sku.toUpperCase()] = { error: "SKU not found in inventory system" };
    }
  }

  // Emit the artifact
  addArtifact(taskId, {
    name: "stock-levels",
    description: "Current inventory levels for requested SKUs",
    parts: [{ type: "application/json", data: { stockLevels: result, alerts } }]
  });

  await updateTaskState(taskId, TaskState.COMPLETED);
}

async function handleReorderSkill(taskId, task) {
  // Check if this is a confirmation response to an input_required state
  const lastMessage = task.messages[task.messages.length - 1];
  const isConfirmation = task.messages.length > 1;

  if (!isConfirmation) {
    // First pass: ask for confirmation
    await updateTaskState(taskId, TaskState.WORKING, {
      role: "agent",
      parts: [{ type: "text", text: "Analyzing reorder requirements..." }]
    });
    await delay(500);

    await updateTaskState(taskId, TaskState.INPUT_REQUIRED, {
      role: "agent",
      parts: [{
        type: "text",
        text: "SKU-67890 requires reorder (current: 12, threshold: 30). Options:\n" +
              "- 100 units at $42.00/unit = $4,200\n" +
              "- 250 units at $40.00/unit = $10,000\n" +
              "- 500 units at $37.50/unit = $18,750\n\n" +
              "Reply with your preferred quantity to proceed."
      }]
    });
    return;
  }

  // Second pass: process the confirmation
  const confirmText = extractText(lastMessage);
  const quantityMatch = confirmText.match(/\d+/);
  const quantity = quantityMatch ? parseInt(quantityMatch[0]) : 100;

  await updateTaskState(taskId, TaskState.WORKING, {
    role: "agent",
    parts: [{ type: "text", text: `Creating purchase order for ${quantity} units of SKU-67890...` }]
  });
  await delay(600);

  const po = {
    poNumber: `PO-${Date.now()}`,
    sku: "SKU-67890",
    quantity,
    unitPrice: quantity >= 500 ? 37.5 : quantity >= 250 ? 40.0 : 42.0,
    totalValue: quantity * (quantity >= 500 ? 37.5 : quantity >= 250 ? 40.0 : 42.0),
    status: "submitted",
    estimatedDelivery: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000).toISOString().split("T")[0]
  };

  addArtifact(taskId, {
    name: "purchase-order",
    description: "Generated purchase order for reorder",
    parts: [{ type: "application/json", data: po }]
  });

  await updateTaskState(taskId, TaskState.COMPLETED, {
    role: "agent",
    parts: [{ type: "text", text: `Purchase order ${po.poNumber} submitted successfully.` }]
  });
}

async function deliverPushNotification(taskId) {
  const task = getTask(taskId);
  if (!task?.pushNotification?.url) return;

  const { url, token } = task.pushNotification;
  const payload = {
    taskId,
    status: task.status,
    artifacts: task.artifacts
  };

  try {
    await axios.post(url, payload, {
      headers: {
        "Content-Type": "application/json",
        ...(token ? { Authorization: `Bearer ${token}` } : {})
      },
      timeout: 5000
    });
  } catch (err) {
    console.error(`Push notification delivery failed for task ${taskId}: ${err.message}`);
  }
}

function extractText(message) {
  if (!message?.parts) return "";
  return message.parts
    .filter(p => p.type === "text")
    .map(p => p.text)
    .join(" ");
}

function delay(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

The Well-Known Route

This is the simplest route but also the most important one for discovery. Every A2A server must serve its Agent Card at this exact path:

// src/routes/wellKnown.js

import { Router } from "express";
import { agentCard } from "../agentCard.js";

const router = Router();

router.get("/.well-known/agent.json", (req, res) => {
  res.setHeader("Cache-Control", "public, max-age=300");
  res.json(agentCard);
});

export default router;

The JSON-RPC Task Router

This is the heart of the A2A server. It handles all four JSON-RPC methods and manages both synchronous responses and SSE streaming connections:

// src/routes/tasks.js

import { Router } from "express";
import { v4 as uuidv4 } from "uuid";
import {
  createTask,
  getTask,
  updateTaskState,
  addSseClient,
  removeSseClient,
  isTerminal,
  TaskState
} from "../taskStore.js";
import { processTask } from "../taskHandler.js";

const router = Router();

// Single POST endpoint handles all JSON-RPC methods
router.post("/", async (req, res) => {
  const { jsonrpc, method, id, params } = req.body;

  // Basic JSON-RPC validation
  if (jsonrpc !== "2.0" || !method || !params) {
    return res.json(rpcError(id, -32600, "Invalid request"));
  }

  switch (method) {
    case "tasks/send":
      return handleTaskSend(req, res, id, params);
    case "tasks/sendSubscribe":
      return handleTaskSendSubscribe(req, res, id, params);
    case "tasks/get":
      return handleTaskGet(req, res, id, params);
    case "tasks/cancel":
      return handleTaskCancel(req, res, id, params);
    default:
      return res.json(rpcError(id, -32601, "Method not found"));
  }
});

// Synchronous task submission - waits for completion
async function handleTaskSend(req, res, rpcId, params) {
  const { id: taskId, message, pushNotification, metadata } = params;

  if (!taskId || !message) {
    return res.json(rpcError(rpcId, -32602, "Invalid params: id and message are required"));
  }

  // Handle re-submission to an existing task (multi-turn / input_required)
  let task = getTask(taskId);
  if (task) {
    if (isTerminal(task.status.state)) {
      return res.json(rpcError(rpcId, -32002, "Task is already in a terminal state"));
    }
    // Append the new message and resume processing
    task.messages.push(message);
    await processTask(taskId);
  } else {
    // New task
    task = createTask(taskId, message, pushNotification || null);
    // Process asynchronously so we can respond after completion
    await processTask(taskId);
  }

  const finalTask = getTask(taskId);
  return res.json({
    jsonrpc: "2.0",
    id: rpcId,
    result: {
      id: finalTask.id,
      status: finalTask.status,
      artifacts: finalTask.artifacts,
      history: finalTask.history
    }
  });
}

// Streaming task submission - returns SSE stream
function handleTaskSendSubscribe(req, res, rpcId, params) {
  const { id: taskId, message, pushNotification } = params;

  if (!taskId || !message) {
    return res.json(rpcError(rpcId, -32602, "Invalid params: id and message are required"));
  }

  // Set up SSE headers before anything else
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");
  res.setHeader("X-Accel-Buffering", "no"); // Disable Nginx buffering
  res.flushHeaders();

  let task = getTask(taskId);

  if (task) {
    // Multi-turn: resume existing task
    if (isTerminal(task.status.state)) {
      res.write(`data: ${JSON.stringify(rpcError(rpcId, -32002, "Task already in terminal state"))}\n\n`);
      return res.end();
    }
    task.messages.push(message);
  } else {
    task = createTask(taskId, message, pushNotification || null);
  }

  // Register this response object as an SSE client
  addSseClient(taskId, res);

  // Clean up when client disconnects
  req.on("close", () => {
    removeSseClient(taskId, res);
  });

  // Process task in the background (non-blocking)
  processTask(taskId).then(() => {
    const finalTask = getTask(taskId);
    if (finalTask && isTerminal(finalTask.status.state)) {
      // Give the last SSE events a moment to flush before closing
      setTimeout(() => res.end(), 100);
    }
  });
}

// Poll task state
function handleTaskGet(req, res, rpcId, params) {
  const { id: taskId } = params;
  const task = getTask(taskId);

  if (!task) {
    return res.json(rpcError(rpcId, -32001, "Task not found"));
  }

  return res.json({
    jsonrpc: "2.0",
    id: rpcId,
    result: {
      id: task.id,
      status: task.status,
      artifacts: task.artifacts,
      history: task.history
    }
  });
}

// Cancel a task
async function handleTaskCancel(req, res, rpcId, params) {
  const { id: taskId } = params;
  const task = getTask(taskId);

  if (!task) {
    return res.json(rpcError(rpcId, -32001, "Task not found"));
  }

  if (isTerminal(task.status.state)) {
    return res.json(rpcError(rpcId, -32002, "Task is not cancelable"));
  }

  await updateTaskState(taskId, TaskState.CANCELED);

  return res.json({
    jsonrpc: "2.0",
    id: rpcId,
    result: { id: taskId, status: task.status }
  });
}

function rpcError(id, code, message, data = null) {
  return {
    jsonrpc: "2.0",
    id,
    error: { code, message, ...(data ? { data } : {}) }
  };
}

export default router;

Auth Middleware

Bearer token validation sits as middleware on the task route. In production this verifies a JWT. For development we check against an environment variable:

// src/middleware/auth.js

export function bearerAuth(req, res, next) {
  // Skip auth for Agent Card endpoint
  if (req.path === "/.well-known/agent.json") return next();

  const authHeader = req.headers.authorization;
  if (!authHeader?.startsWith("Bearer ")) {
    return res.status(401).json({
      jsonrpc: "2.0",
      id: null,
      error: { code: -32001, message: "Unauthorized: Bearer token required" }
    });
  }

  const token = authHeader.slice(7);

  // In production: verify JWT signature, expiry, and claims here
  const validToken = process.env.API_TOKEN || "dev-token-12345";
  if (token !== validToken) {
    return res.status(403).json({
      jsonrpc: "2.0",
      id: null,
      error: { code: -32001, message: "Forbidden: Invalid token" }
    });
  }

  next();
}

Server Entry Point

// server.js

import express from "express";
import dotenv from "dotenv";
import wellKnownRouter from "./src/routes/wellKnown.js";
import tasksRouter from "./src/routes/tasks.js";
import { bearerAuth } from "./src/middleware/auth.js";

dotenv.config();

const app = express();
const PORT = process.env.PORT || 3000;

app.use(express.json());

// Agent Card - no auth required (public discovery endpoint)
app.use(wellKnownRouter);

// All task routes require auth
app.use("/a2a", bearerAuth, tasksRouter);

// Health check
app.get("/health", (req, res) => res.json({ status: "ok", agent: "Inventory Management Agent" }));

app.listen(PORT, () => {
  console.log(`A2A Inventory Agent running on port ${PORT}`);
  console.log(`Agent Card: http://localhost:${PORT}/.well-known/agent.json`);
  console.log(`A2A Endpoint: http://localhost:${PORT}/a2a`);
});

Add a .env file:

PORT=3000
API_TOKEN=dev-token-12345
AGENT_URL=http://localhost:3000/a2a

Running and Testing the Server

Add "type": "module" to your package.json and start the server:

node server.js

Test the Agent Card endpoint first:

curl http://localhost:3000/.well-known/agent.json

Submit a synchronous stock check task:

curl -X POST http://localhost:3000/a2a \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer dev-token-12345" \
  -d '{
    "jsonrpc": "2.0",
    "method": "tasks/send",
    "id": "req-001",
    "params": {
      "id": "task-abc-001",
      "message": {
        "role": "user",
        "parts": [{ "type": "text", "text": "Check stock for SKU-12345 and SKU-67890" }]
      }
    }
  }'

Test streaming with tasks/sendSubscribe:

curl -X POST http://localhost:3000/a2a \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer dev-token-12345" \
  -H "Accept: text/event-stream" \
  -d '{
    "jsonrpc": "2.0",
    "method": "tasks/sendSubscribe",
    "id": "req-002",
    "params": {
      "id": "task-abc-002",
      "message": {
        "role": "user",
        "parts": [{ "type": "text", "text": "Check stock for SKU-67890" }]
      }
    }
  }'

You should see SSE events streaming back in real time, moving through submitted, working, and completed states.

Test the multi-turn reorder flow:

# Step 1: Initiate the reorder
curl -X POST http://localhost:3000/a2a \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer dev-token-12345" \
  -d '{
    "jsonrpc": "2.0",
    "method": "tasks/send",
    "id": "req-003",
    "params": {
      "id": "task-reorder-001",
      "message": {
        "role": "user",
        "parts": [{ "type": "text", "text": "Trigger reorder for low stock items" }]
      }
    }
  }'

# Step 2: Confirm the quantity (task enters input_required, then resume with same task ID)
curl -X POST http://localhost:3000/a2a \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer dev-token-12345" \
  -d '{
    "jsonrpc": "2.0",
    "method": "tasks/send",
    "id": "req-004",
    "params": {
      "id": "task-reorder-001",
      "message": {
        "role": "user",
        "parts": [{ "type": "text", "text": "Proceed with 250 units." }]
      }
    }
  }'

Architecture Summary

flowchart TD
    Client([Client Agent / curl]) -->|GET /.well-known/agent.json| WK[Well-Known Route]
    Client -->|POST /a2a + Bearer token| Auth[Auth Middleware]

    Auth -->|valid| TR[Task Router]
    Auth -->|invalid| E401[401 Unauthorized]

    TR -->|tasks/send| TS[Task Store\ncreateTask / updateState]
    TR -->|tasks/sendSubscribe| SSE[SSE Stream\nkeep-alive connection]
    TR -->|tasks/get| TS
    TR -->|tasks/cancel| TS

    TS --> TH[Task Handler\nbusiness logic]
    TH -->|state updates + artifacts| TS
    TS -->|broadcast| SSE
    TS -->|POST webhook| PN[Push Notification\ndelivery]

What You Have Now

At this point you have a fully A2A-compliant agent server that handles all four JSON-RPC methods, streams status updates over SSE, supports multi-turn interactions through the input_required state, delivers push notifications for disconnected clients, and enforces bearer token authentication.

What this implementation intentionally omits for now: production-grade JWT verification, persistent task storage, rate limiting, and observability hooks. Those come in Parts 6 and 8.

In Part 4, we build the same agent server in Python and C#. The architecture and business logic stay identical, so Part 4 is a clean multi-language reference rather than repeating the conceptual explanation. If your team runs Python or .NET, that is the post you want.

References

Written by:

579 Posts

View All Posts
Follow Me :
How to whitelist website on AdBlocker?

How to whitelist website on AdBlocker?

  1. 1 Click on the AdBlock Plus icon on the top right corner of your browser
  2. 2 Click on "Enabled on this site" from the AdBlock Plus option
  3. 3 Refresh the page and start browsing the site