Building A2A Agent Servers in Python and C# (Part 4 of 8)

Building A2A Agent Servers in Python and C# (Part 4 of 8)

Part 3 built a complete A2A agent server in Node.js. This part delivers the same Inventory Management Agent in Python using FastAPI and in C# using ASP.NET Core. The architecture, business logic, and A2A behavior are identical across all three implementations. Pick the language your team runs and use it as your production starting point.

Rather than re-explaining the protocol concepts, this post is a focused implementation reference. If you need to understand what Agent Cards, task lifecycle states, or SSE streaming are, revisit Parts 1 through 3 first.

Python Implementation with FastAPI

FastAPI is the natural choice for Python A2A servers. Its async-first design handles SSE streaming cleanly, its Pydantic models map directly to the A2A JSON structures, and its OpenAPI integration makes the authentication configuration straightforward.

Project Setup

mkdir a2a-inventory-agent-python
cd a2a-inventory-agent-python
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate
pip install fastapi uvicorn[standard] httpx python-dotenv pydantic

Project structure:

a2a-inventory-agent-python/
  app/
    agent_card.py       # Agent Card definition
    task_store.py       # In-memory task state management
    task_handler.py     # Business logic per skill
    routes/
      well_known.py     # GET /.well-known/agent.json
      tasks.py          # POST /a2a (JSON-RPC handler)
    middleware/
      auth.py           # Bearer token validation
    models.py           # Pydantic request/response models
  main.py               # Entry point
  .env

Pydantic Models

# app/models.py

from pydantic import BaseModel
from typing import Any, Optional, List
from enum import Enum

class TaskState(str, Enum):
    SUBMITTED = "submitted"
    WORKING = "working"
    INPUT_REQUIRED = "input_required"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELED = "canceled"

class MessagePart(BaseModel):
    type: str
    text: Optional[str] = None
    data: Optional[Any] = None

class Message(BaseModel):
    role: str
    parts: List[MessagePart]

class PushNotification(BaseModel):
    url: str
    token: Optional[str] = None

class TaskStatus(BaseModel):
    state: TaskState
    timestamp: str
    message: Optional[Message] = None

class Artifact(BaseModel):
    name: str
    description: Optional[str] = None
    parts: List[MessagePart]
    last_chunk: bool = True

class TaskSendParams(BaseModel):
    id: str
    message: Message
    push_notification: Optional[PushNotification] = None
    metadata: Optional[dict] = None

class TaskGetParams(BaseModel):
    id: str

class TaskCancelParams(BaseModel):
    id: str

class JsonRpcRequest(BaseModel):
    jsonrpc: str
    method: str
    id: Any
    params: dict

Agent Card

# app/agent_card.py

import os

def get_agent_card():
    return {
        "name": "Inventory Management Agent",
        "description": "Manages stock levels, processes inventory queries, and triggers reorder workflows",
        "version": "1.0.0",
        "url": os.getenv("AGENT_URL", "http://localhost:8000/a2a"),
        "documentationUrl": "http://localhost:8000/docs",
        "provider": {
            "organization": "Acme Corp",
            "url": "https://acmecorp.example.com"
        },
        "capabilities": {
            "streaming": True,
            "pushNotifications": True,
            "stateTransitionHistory": True
        },
        "defaultInputModes": ["text", "application/json"],
        "defaultOutputModes": ["text", "application/json"],
        "skills": [
            {
                "id": "check-stock",
                "name": "Check Stock Level",
                "description": "Returns current stock quantity for one or more product SKUs",
                "tags": ["inventory", "stock", "query"],
                "examples": ["Check stock for SKU-12345 and SKU-67890"],
                "inputModes": ["text", "application/json"],
                "outputModes": ["application/json"]
            },
            {
                "id": "trigger-reorder",
                "name": "Trigger Reorder",
                "description": "Initiates a reorder workflow when stock falls below threshold",
                "tags": ["inventory", "reorder", "procurement"],
                "inputModes": ["application/json"],
                "outputModes": ["application/json"]
            }
        ],
        "securitySchemes": {
            "bearer": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"}
        },
        "security": [{"bearer": []}]
    }

Task Store

# app/task_store.py

import asyncio
import json
from datetime import datetime, timezone
from typing import Dict, Set, Optional, Any
from app.models import TaskState

TERMINAL_STATES = {TaskState.COMPLETED, TaskState.FAILED, TaskState.CANCELED}

tasks: Dict[str, dict] = {}
# Maps task_id to a set of asyncio.Queue objects (one per SSE client)
sse_queues: Dict[str, Set[asyncio.Queue]] = {}

def create_task(task_id: str, initial_message: dict, push_notification=None) -> dict:
    task = {
        "id": task_id,
        "status": {"state": TaskState.SUBMITTED, "timestamp": _now()},
        "messages": [initial_message],
        "artifacts": [],
        "push_notification": push_notification,
        "history": [{"state": TaskState.SUBMITTED, "timestamp": _now()}]
    }
    tasks[task_id] = task
    sse_queues[task_id] = set()
    return task

def get_task(task_id: str) -> Optional[dict]:
    return tasks.get(task_id)

async def update_task_state(task_id: str, new_state: TaskState, message: Optional[dict] = None):
    task = tasks.get(task_id)
    if not task:
        raise ValueError(f"Task {task_id} not found")

    task["status"] = {"state": new_state, "timestamp": _now()}
    if message:
        task["messages"].append(message)
    task["history"].append({"state": new_state, "timestamp": _now()})

    await _broadcast(task_id, {
        "type": "taskStatusUpdate",
        "taskId": task_id,
        "status": {**task["status"], "message": message}
    })

async def add_artifact(task_id: str, artifact: dict, last_chunk: bool = True):
    task = tasks.get(task_id)
    if not task:
        raise ValueError(f"Task {task_id} not found")

    task["artifacts"].append(artifact)
    await _broadcast(task_id, {
        "type": "taskArtifactUpdate",
        "taskId": task_id,
        "artifact": {**artifact, "lastChunk": last_chunk}
    })

def add_sse_queue(task_id: str, queue: asyncio.Queue):
    if task_id in sse_queues:
        sse_queues[task_id].add(queue)

def remove_sse_queue(task_id: str, queue: asyncio.Queue):
    if task_id in sse_queues:
        sse_queues[task_id].discard(queue)

def is_terminal(state: TaskState) -> bool:
    return state in TERMINAL_STATES

async def _broadcast(task_id: str, event: dict):
    queues = sse_queues.get(task_id, set())
    for q in list(queues):
        await q.put(event)

def _now() -> str:
    return datetime.now(timezone.utc).isoformat()

Task Handler

# app/task_handler.py

import asyncio
import re
import httpx
from app.task_store import update_task_state, add_artifact, get_task, is_terminal, TaskState

INVENTORY = {
    "SKU-12345": {"quantity": 240, "location": "Warehouse-A", "reorderThreshold": 50, "unitPrice": 12.5},
    "SKU-67890": {"quantity": 12,  "location": "Warehouse-B", "reorderThreshold": 30, "unitPrice": 42.0},
    "SKU-99999": {"quantity": 0,   "location": "Warehouse-A", "reorderThreshold": 20, "unitPrice": 8.75}
}

async def process_task(task_id: str):
    task = get_task(task_id)
    if not task:
        return

    text = _extract_text(task["messages"][0])

    try:
        await update_task_state(task_id, TaskState.WORKING)

        if "reorder" in text.lower():
            await _handle_reorder(task_id, task)
        else:
            await _handle_check_stock(task_id, text)
    except Exception as e:
        await update_task_state(task_id, TaskState.FAILED, {
            "role": "agent",
            "parts": [{"type": "text", "text": f"Error: {str(e)}"}]
        })

    await _deliver_push_notification(task_id)

async def _handle_check_stock(task_id: str, text: str):
    await update_task_state(task_id, TaskState.WORKING, {
        "role": "agent",
        "parts": [{"type": "text", "text": "Parsing SKU identifiers from request..."}]
    })
    await asyncio.sleep(0.3)

    skus = [s.upper() for s in re.findall(r"SKU-\d+", text, re.IGNORECASE)]

    if not skus:
        await update_task_state(task_id, TaskState.INPUT_REQUIRED, {
            "role": "agent",
            "parts": [{"type": "text", "text": "No SKU identifiers found. Please provide one or more SKU IDs (e.g. SKU-12345)."}]
        })
        return

    await update_task_state(task_id, TaskState.WORKING, {
        "role": "agent",
        "parts": [{"type": "text", "text": f"Querying inventory database for {len(skus)} SKU(s)..."}]
    })
    await asyncio.sleep(0.4)

    result = {}
    alerts = []
    for sku in skus:
        item = INVENTORY.get(sku)
        if item:
            result[sku] = item
            if item["quantity"] <= item["reorderThreshold"]:
                alerts.append(f"{sku} is at or below reorder threshold (qty: {item['quantity']})")
        else:
            result[sku] = {"error": "SKU not found"}

    await add_artifact(task_id, {
        "name": "stock-levels",
        "description": "Current inventory levels for requested SKUs",
        "parts": [{"type": "application/json", "data": {"stockLevels": result, "alerts": alerts}}]
    })
    await update_task_state(task_id, TaskState.COMPLETED)

async def _handle_reorder(task_id: str, task: dict):
    is_confirmation = len(task["messages"]) > 1

    if not is_confirmation:
        await update_task_state(task_id, TaskState.WORKING, {
            "role": "agent", "parts": [{"type": "text", "text": "Analyzing reorder requirements..."}]
        })
        await asyncio.sleep(0.5)
        await update_task_state(task_id, TaskState.INPUT_REQUIRED, {
            "role": "agent",
            "parts": [{"type": "text", "text": (
                "SKU-67890 requires reorder (current: 12, threshold: 30). Options:\n"
                "- 100 units at $42.00/unit = $4,200\n"
                "- 250 units at $40.00/unit = $10,000\n"
                "- 500 units at $37.50/unit = $18,750\n\n"
                "Reply with your preferred quantity to proceed."
            )}]
        })
        return

    confirm_text = _extract_text(task["messages"][-1])
    match = re.search(r"\d+", confirm_text)
    quantity = int(match.group()) if match else 100

    await update_task_state(task_id, TaskState.WORKING, {
        "role": "agent",
        "parts": [{"type": "text", "text": f"Creating purchase order for {quantity} units..."}]
    })
    await asyncio.sleep(0.6)

    unit_price = 37.5 if quantity >= 500 else 40.0 if quantity >= 250 else 42.0
    po = {
        "poNumber": f"PO-{int(asyncio.get_event_loop().time() * 1000)}",
        "sku": "SKU-67890",
        "quantity": quantity,
        "unitPrice": unit_price,
        "totalValue": quantity * unit_price,
        "status": "submitted"
    }

    await add_artifact(task_id, {
        "name": "purchase-order",
        "description": "Generated purchase order",
        "parts": [{"type": "application/json", "data": po}]
    })
    await update_task_state(task_id, TaskState.COMPLETED, {
        "role": "agent",
        "parts": [{"type": "text", "text": f"Purchase order {po['poNumber']} submitted."}]
    })

async def _deliver_push_notification(task_id: str):
    task = get_task(task_id)
    if not task or not task.get("push_notification"):
        return
    pn = task["push_notification"]
    headers = {"Content-Type": "application/json"}
    if pn.get("token"):
        headers["Authorization"] = f"Bearer {pn['token']}"
    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            await client.post(pn["url"], json={
                "taskId": task_id,
                "status": task["status"],
                "artifacts": task["artifacts"]
            }, headers=headers)
    except Exception as e:
        print(f"Push notification failed for {task_id}: {e}")

def _extract_text(message: dict) -> str:
    return " ".join(
        p.get("text", "") for p in message.get("parts", []) if p.get("type") == "text"
    )

Routes and Main App

# app/routes/well_known.py

from fastapi import APIRouter
from fastapi.responses import JSONResponse
from app.agent_card import get_agent_card

router = APIRouter()

@router.get("/.well-known/agent.json")
async def agent_card():
    return JSONResponse(content=get_agent_card(), headers={"Cache-Control": "public, max-age=300"})
# app/routes/tasks.py

import asyncio
import json
from fastapi import APIRouter, Request, Depends
from fastapi.responses import JSONResponse, StreamingResponse
from app.task_store import (
    create_task, get_task, update_task_state, add_sse_queue,
    remove_sse_queue, is_terminal, TaskState
)
from app.task_handler import process_task
from app.middleware.auth import verify_token

router = APIRouter()

def rpc_error(rpc_id, code, message):
    return {"jsonrpc": "2.0", "id": rpc_id, "error": {"code": code, "message": message}}

@router.post("/")
async def handle_rpc(request: Request, _=Depends(verify_token)):
    body = await request.json()
    jsonrpc = body.get("jsonrpc")
    method = body.get("method")
    rpc_id = body.get("id")
    params = body.get("params", {})

    if jsonrpc != "2.0" or not method or not params:
        return JSONResponse(rpc_error(rpc_id, -32600, "Invalid request"))

    if method == "tasks/send":
        return await handle_task_send(rpc_id, params)
    elif method == "tasks/sendSubscribe":
        return await handle_task_send_subscribe(rpc_id, params)
    elif method == "tasks/get":
        return handle_task_get(rpc_id, params)
    elif method == "tasks/cancel":
        return await handle_task_cancel(rpc_id, params)
    else:
        return JSONResponse(rpc_error(rpc_id, -32601, "Method not found"))

async def handle_task_send(rpc_id, params):
    task_id = params.get("id")
    message = params.get("message")
    if not task_id or not message:
        return JSONResponse(rpc_error(rpc_id, -32602, "id and message are required"))

    task = get_task(task_id)
    if task:
        if is_terminal(task["status"]["state"]):
            return JSONResponse(rpc_error(rpc_id, -32002, "Task already in terminal state"))
        task["messages"].append(message)
    else:
        create_task(task_id, message, params.get("push_notification"))

    await process_task(task_id)
    task = get_task(task_id)
    return JSONResponse({"jsonrpc": "2.0", "id": rpc_id, "result": {
        "id": task["id"], "status": task["status"],
        "artifacts": task["artifacts"], "history": task["history"]
    }})

async def handle_task_send_subscribe(rpc_id, params):
    task_id = params.get("id")
    message = params.get("message")
    if not task_id or not message:
        return JSONResponse(rpc_error(rpc_id, -32602, "id and message are required"))

    task = get_task(task_id)
    if task:
        if is_terminal(task["status"]["state"]):
            return JSONResponse(rpc_error(rpc_id, -32002, "Task already in terminal state"))
        task["messages"].append(message)
    else:
        create_task(task_id, message, params.get("push_notification"))

    queue: asyncio.Queue = asyncio.Queue()
    add_sse_queue(task_id, queue)

    async def event_stream():
        asyncio.create_task(process_task(task_id))
        try:
            while True:
                try:
                    event = await asyncio.wait_for(queue.get(), timeout=60.0)
                    yield f"data: {json.dumps(event)}\n\n"
                    t = get_task(task_id)
                    if t and is_terminal(t["status"]["state"]):
                        break
                except asyncio.TimeoutError:
                    yield "data: {\"type\":\"keepalive\"}\n\n"
        finally:
            remove_sse_queue(task_id, queue)

    return StreamingResponse(event_stream(), media_type="text/event-stream",
                              headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})

def handle_task_get(rpc_id, params):
    task = get_task(params.get("id", ""))
    if not task:
        return JSONResponse(rpc_error(rpc_id, -32001, "Task not found"))
    return JSONResponse({"jsonrpc": "2.0", "id": rpc_id, "result": {
        "id": task["id"], "status": task["status"],
        "artifacts": task["artifacts"], "history": task["history"]
    }})

async def handle_task_cancel(rpc_id, params):
    task = get_task(params.get("id", ""))
    if not task:
        return JSONResponse(rpc_error(rpc_id, -32001, "Task not found"))
    if is_terminal(task["status"]["state"]):
        return JSONResponse(rpc_error(rpc_id, -32002, "Task not cancelable"))
    await update_task_state(params["id"], TaskState.CANCELED)
    return JSONResponse({"jsonrpc": "2.0", "id": rpc_id, "result": {
        "id": task["id"], "status": task["status"]
    }})
# app/middleware/auth.py

import os
from fastapi import Header, HTTPException

async def verify_token(authorization: str = Header(...)):
    if not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Bearer token required")
    token = authorization[7:]
    valid = os.getenv("API_TOKEN", "dev-token-12345")
    if token != valid:
        raise HTTPException(status_code=403, detail="Invalid token")
# main.py

from fastapi import FastAPI
from dotenv import load_dotenv
from app.routes.well_known import router as well_known_router
from app.routes.tasks import router as tasks_router

load_dotenv()

app = FastAPI(title="Inventory Management A2A Agent", version="1.0.0")

app.include_router(well_known_router)
app.include_router(tasks_router, prefix="/a2a")
uvicorn main:app --host 0.0.0.0 --port 8000 --reload

The interactive API docs are at http://localhost:8000/docs once the server is running. The Agent Card is at http://localhost:8000/.well-known/agent.json.

C# Implementation with ASP.NET Core

The C# implementation uses ASP.NET Core minimal APIs for the routing, System.Text.Json for serialization, and IAsyncEnumerable for SSE streaming. This approach keeps the code clean and avoids unnecessary dependencies.

Project Setup

dotnet new web -n A2AInventoryAgent
cd A2AInventoryAgent
dotnet add package Microsoft.Extensions.Http

Project structure:

A2AInventoryAgent/
  Models/
    A2AModels.cs        # All request/response/task models
  Services/
    TaskStore.cs        # In-memory task state management
    TaskHandler.cs      # Business logic per skill
  Endpoints/
    WellKnownEndpoints.cs
    TaskEndpoints.cs
  Middleware/
    BearerAuthMiddleware.cs
  Program.cs

Models

// Models/A2AModels.cs

using System.Text.Json.Serialization;

namespace A2AInventoryAgent.Models;

public enum TaskState
{
    Submitted, Working, InputRequired, Completed, Failed, Canceled
}

public record MessagePart(
    [property: JsonPropertyName("type")] string Type,
    [property: JsonPropertyName("text")] string? Text = null,
    [property: JsonPropertyName("data")] object? Data = null
);

public record AgentMessage(
    [property: JsonPropertyName("role")] string Role,
    [property: JsonPropertyName("parts")] List Parts
);

public record TaskStatus(
    [property: JsonPropertyName("state")] string State,
    [property: JsonPropertyName("timestamp")] string Timestamp,
    [property: JsonPropertyName("message")] AgentMessage? Message = null
);

public record Artifact(
    [property: JsonPropertyName("name")] string Name,
    [property: JsonPropertyName("description")] string? Description,
    [property: JsonPropertyName("parts")] List Parts,
    [property: JsonPropertyName("lastChunk")] bool LastChunk = true
);

public record PushNotification(
    [property: JsonPropertyName("url")] string Url,
    [property: JsonPropertyName("token")] string? Token = null
);

public class A2ATask
{
    public string Id { get; set; } = "";
    public TaskStatus Status { get; set; } = new("submitted", DateTime.UtcNow.ToString("O"));
    public List Messages { get; set; } = new();
    public List Artifacts { get; set; } = new();
    public PushNotification? PushNotification { get; set; }
    public List History { get; set; } = new();
    public List> SseChannels { get; } = new();
}

public record JsonRpcRequest(
    [property: JsonPropertyName("jsonrpc")] string Jsonrpc,
    [property: JsonPropertyName("method")] string Method,
    [property: JsonPropertyName("id")] object? Id,
    [property: JsonPropertyName("params")] JsonElement Params
);

public record JsonRpcError(int Code, string Message);
public record JsonRpcErrorResponse(string Jsonrpc, object? Id, JsonRpcError Error);
public record JsonRpcSuccessResponse(string Jsonrpc, object? Id, object Result);

Task Store

// Services/TaskStore.cs

using System.Threading.Channels;
using A2AInventoryAgent.Models;
using System.Text.Json;

namespace A2AInventoryAgent.Services;

public class TaskStore
{
    private readonly Dictionary _tasks = new();
    private readonly Lock _lock = new();

    private static readonly HashSet TerminalStates =
        [TaskState.Completed, TaskState.Failed, TaskState.Canceled];

    public A2ATask CreateTask(string taskId, AgentMessage initialMessage, PushNotification? push = null)
    {
        var task = new A2ATask
        {
            Id = taskId,
            Status = new TaskStatus("submitted", DateTime.UtcNow.ToString("O")),
            PushNotification = push
        };
        task.Messages.Add(initialMessage);
        task.History.Add(new { state = "submitted", timestamp = DateTime.UtcNow.ToString("O") });

        lock (_lock) { _tasks[taskId] = task; }
        return task;
    }

    public A2ATask? GetTask(string taskId)
    {
        lock (_lock) { return _tasks.GetValueOrDefault(taskId); }
    }

    public async Task UpdateStateAsync(string taskId, TaskState state, AgentMessage? message = null)
    {
        A2ATask? task;
        lock (_lock) { task = _tasks.GetValueOrDefault(taskId); }
        if (task is null) throw new InvalidOperationException($"Task {taskId} not found");

        var stateStr = state.ToString().ToLowerInvariant().Replace("inputrequired", "input_required");
        task.Status = new TaskStatus(stateStr, DateTime.UtcNow.ToString("O"), message);
        if (message is not null) task.Messages.Add(message);
        task.History.Add(new { state = stateStr, timestamp = DateTime.UtcNow.ToString("O") });

        await BroadcastAsync(task, new
        {
            type = "taskStatusUpdate",
            taskId,
            status = task.Status
        });
    }

    public async Task AddArtifactAsync(string taskId, Artifact artifact)
    {
        A2ATask? task;
        lock (_lock) { task = _tasks.GetValueOrDefault(taskId); }
        if (task is null) return;

        task.Artifacts.Add(artifact);
        await BroadcastAsync(task, new { type = "taskArtifactUpdate", taskId, artifact });
    }

    public Channel AddSseChannel(string taskId)
    {
        var channel = Channel.CreateUnbounded();
        lock (_lock)
        {
            var task = _tasks.GetValueOrDefault(taskId);
            task?.SseChannels.Add(channel);
        }
        return channel;
    }

    public void RemoveSseChannel(string taskId, Channel channel)
    {
        lock (_lock) { _tasks.GetValueOrDefault(taskId)?.SseChannels.Remove(channel); }
    }

    public bool IsTerminal(TaskState state) => TerminalStates.Contains(state);

    private static async Task BroadcastAsync(A2ATask task, object evt)
    {
        foreach (var channel in task.SseChannels.ToList())
        {
            await channel.Writer.WriteAsync(evt);
        }
    }
}

Task Handler

// Services/TaskHandler.cs

using System.Text.RegularExpressions;
using A2AInventoryAgent.Models;

namespace A2AInventoryAgent.Services;

public class TaskHandler(TaskStore store, IHttpClientFactory httpClientFactory)
{
    private static readonly Dictionary Inventory = new()
    {
        ["SKU-12345"] = new { quantity = 240, location = "Warehouse-A", reorderThreshold = 50, unitPrice = 12.5 },
        ["SKU-67890"] = new { quantity = 12,  location = "Warehouse-B", reorderThreshold = 30, unitPrice = 42.0 },
        ["SKU-99999"] = new { quantity = 0,   location = "Warehouse-A", reorderThreshold = 20, unitPrice = 8.75 }
    };

    public async Task ProcessTaskAsync(string taskId)
    {
        var task = store.GetTask(taskId);
        if (task is null) return;

        var text = ExtractText(task.Messages[0]);

        try
        {
            await store.UpdateStateAsync(taskId, TaskState.Working);

            if (text.Contains("reorder", StringComparison.OrdinalIgnoreCase))
                await HandleReorderAsync(taskId, task);
            else
                await HandleCheckStockAsync(taskId, text);
        }
        catch (Exception ex)
        {
            await store.UpdateStateAsync(taskId, TaskState.Failed,
                AgentMsg($"Error: {ex.Message}"));
        }

        await DeliverPushNotificationAsync(taskId);
    }

    private async Task HandleCheckStockAsync(string taskId, string text)
    {
        await store.UpdateStateAsync(taskId, TaskState.Working, AgentMsg("Parsing SKU identifiers..."));
        await Task.Delay(300);

        var skus = Regex.Matches(text, @"SKU-\d+", RegexOptions.IgnoreCase)
                        .Select(m => m.Value.ToUpper()).ToList();

        if (skus.Count == 0)
        {
            await store.UpdateStateAsync(taskId, TaskState.InputRequired,
                AgentMsg("No SKU identifiers found. Please provide one or more SKU IDs (e.g. SKU-12345)."));
            return;
        }

        await store.UpdateStateAsync(taskId, TaskState.Working,
            AgentMsg($"Querying inventory database for {skus.Count} SKU(s)..."));
        await Task.Delay(400);

        var result = new Dictionary();
        var alerts = new List();

        foreach (var sku in skus)
        {
            if (Inventory.TryGetValue(sku, out var item))
            {
                result[sku] = item;
                dynamic d = item;
                if (d.quantity <= d.reorderThreshold)
                    alerts.Add($"{sku} is at or below reorder threshold");
            }
            else
            {
                result[sku] = new { error = "SKU not found" };
            }
        }

        await store.AddArtifactAsync(taskId, new Artifact(
            "stock-levels", "Current inventory levels",
            [new MessagePart("application/json", Data: new { stockLevels = result, alerts })]
        ));
        await store.UpdateStateAsync(taskId, TaskState.Completed);
    }

    private async Task HandleReorderAsync(string taskId, A2ATask task)
    {
        bool isConfirmation = task.Messages.Count > 1;

        if (!isConfirmation)
        {
            await store.UpdateStateAsync(taskId, TaskState.Working, AgentMsg("Analyzing reorder requirements..."));
            await Task.Delay(500);
            await store.UpdateStateAsync(taskId, TaskState.InputRequired, AgentMsg(
                "SKU-67890 requires reorder (current: 12, threshold: 30). Options:\n" +
                "- 100 units at $42.00/unit = $4,200\n" +
                "- 250 units at $40.00/unit = $10,000\n" +
                "- 500 units at $37.50/unit = $18,750\n\nReply with your preferred quantity."));
            return;
        }

        var confirmText = ExtractText(task.Messages[^1]);
        var match = Regex.Match(confirmText, @"\d+");
        var quantity = match.Success ? int.Parse(match.Value) : 100;

        await store.UpdateStateAsync(taskId, TaskState.Working,
            AgentMsg($"Creating purchase order for {quantity} units..."));
        await Task.Delay(600);

        double unitPrice = quantity >= 500 ? 37.5 : quantity >= 250 ? 40.0 : 42.0;
        var po = new
        {
            poNumber = $"PO-{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}",
            sku = "SKU-67890", quantity, unitPrice,
            totalValue = quantity * unitPrice, status = "submitted"
        };

        await store.AddArtifactAsync(taskId, new Artifact(
            "purchase-order", "Generated purchase order",
            [new MessagePart("application/json", Data: po)]
        ));
        await store.UpdateStateAsync(taskId, TaskState.Completed,
            AgentMsg($"Purchase order {po.poNumber} submitted."));
    }

    private async Task DeliverPushNotificationAsync(string taskId)
    {
        var task = store.GetTask(taskId);
        if (task?.PushNotification is null) return;

        var client = httpClientFactory.CreateClient();
        if (task.PushNotification.Token is not null)
            client.DefaultRequestHeaders.Add("Authorization", $"Bearer {task.PushNotification.Token}");

        try
        {
            await client.PostAsJsonAsync(task.PushNotification.Url, new
            {
                taskId, status = task.Status, artifacts = task.Artifacts
            });
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Push notification failed for {taskId}: {ex.Message}");
        }
    }

    private static AgentMessage AgentMsg(string text) =>
        new("agent", [new MessagePart("text", Text: text)]);

    private static string ExtractText(AgentMessage msg) =>
        string.Join(" ", msg.Parts.Where(p => p.Type == "text").Select(p => p.Text ?? ""));
}

Program.cs – Endpoints and Wiring

// Program.cs

using System.Text;
using System.Text.Json;
using A2AInventoryAgent.Models;
using A2AInventoryAgent.Services;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton();
builder.Services.AddScoped();
builder.Services.AddHttpClient();

var app = builder.Build();

var agentCard = new
{
    name = "Inventory Management Agent",
    description = "Manages stock levels and triggers reorder workflows",
    version = "1.0.0",
    url = Environment.GetEnvironmentVariable("AGENT_URL") ?? "http://localhost:5000/a2a",
    capabilities = new { streaming = true, pushNotifications = true, stateTransitionHistory = true },
    defaultInputModes = new[] { "text", "application/json" },
    defaultOutputModes = new[] { "application/json" },
    skills = new[]
    {
        new { id = "check-stock", name = "Check Stock Level",
              description = "Returns stock levels for SKUs", tags = new[] { "inventory", "stock" } },
        new { id = "trigger-reorder", name = "Trigger Reorder",
              description = "Initiates reorder workflow", tags = new[] { "reorder", "procurement" } }
    },
    securitySchemes = new { bearer = new { type = "http", scheme = "bearer", bearerFormat = "JWT" } },
    security = new[] { new { bearer = Array.Empty() } }
};

// Agent Card endpoint - no auth
app.MapGet("/.well-known/agent.json", () => Results.Json(agentCard));
app.MapGet("/health", () => Results.Json(new { status = "ok" }));

// All A2A task endpoints require auth
app.MapPost("/a2a", async (HttpContext ctx, TaskStore store, TaskHandler handler) =>
{
    // Bearer token auth
    var authHeader = ctx.Request.Headers.Authorization.ToString();
    var validToken = Environment.GetEnvironmentVariable("API_TOKEN") ?? "dev-token-12345";
    if (!authHeader.StartsWith("Bearer ") || authHeader[7..] != validToken)
    {
        ctx.Response.StatusCode = 401;
        await ctx.Response.WriteAsJsonAsync(new { error = "Unauthorized" });
        return;
    }

    var body = await JsonSerializer.DeserializeAsync(ctx.Request.Body,
        new JsonSerializerOptions { PropertyNameCaseInsensitive = true });

    if (body is null || body.Jsonrpc != "2.0")
    {
        await ctx.Response.WriteAsJsonAsync(RpcError(null, -32600, "Invalid request"));
        return;
    }

    switch (body.Method)
    {
        case "tasks/send":
            await HandleTaskSend(ctx, store, handler, body);
            break;
        case "tasks/sendSubscribe":
            await HandleTaskSendSubscribe(ctx, store, handler, body);
            break;
        case "tasks/get":
            await HandleTaskGet(ctx, store, body);
            break;
        case "tasks/cancel":
            await HandleTaskCancel(ctx, store, body);
            break;
        default:
            await ctx.Response.WriteAsJsonAsync(RpcError(body.Id, -32601, "Method not found"));
            break;
    }
});

app.Run("http://0.0.0.0:5000");

// --- Handlers ---

static async Task HandleTaskSend(HttpContext ctx, TaskStore store, TaskHandler handler, JsonRpcRequest req)
{
    var taskId = req.Params.GetProperty("id").GetString() ?? "";
    var msgEl = req.Params.GetProperty("message");
    var message = ParseMessage(msgEl);

    var task = store.GetTask(taskId);
    if (task is not null)
    {
        task.Messages.Add(message);
    }
    else
    {
        PushNotification? push = null;
        if (req.Params.TryGetProperty("pushNotification", out var pnEl))
            push = new PushNotification(pnEl.GetProperty("url").GetString()!,
                pnEl.TryGetProperty("token", out var t) ? t.GetString() : null);
        store.CreateTask(taskId, message, push);
    }

    await handler.ProcessTaskAsync(taskId);
    task = store.GetTask(taskId)!;

    await ctx.Response.WriteAsJsonAsync(new JsonRpcSuccessResponse("2.0", req.Id, new
    {
        id = task.Id, status = task.Status,
        artifacts = task.Artifacts, history = task.History
    }));
}

static async Task HandleTaskSendSubscribe(HttpContext ctx, TaskStore store, TaskHandler handler, JsonRpcRequest req)
{
    var taskId = req.Params.GetProperty("id").GetString() ?? "";
    var message = ParseMessage(req.Params.GetProperty("message"));

    var task = store.GetTask(taskId);
    if (task is not null)
        task.Messages.Add(message);
    else
        store.CreateTask(taskId, message);

    ctx.Response.Headers.ContentType = "text/event-stream";
    ctx.Response.Headers.CacheControl = "no-cache";
    ctx.Response.Headers["X-Accel-Buffering"] = "no";

    var channel = store.AddSseChannel(taskId);
    _ = Task.Run(() => handler.ProcessTaskAsync(taskId));

    try
    {
        await foreach (var evt in channel.Reader.ReadAllAsync(ctx.RequestAborted))
        {
            var data = $"data: {JsonSerializer.Serialize(evt)}\n\n";
            await ctx.Response.WriteAsync(data, ctx.RequestAborted);
            await ctx.Response.Body.FlushAsync(ctx.RequestAborted);
        }
    }
    catch (OperationCanceledException) { }
    finally
    {
        store.RemoveSseChannel(taskId, channel);
    }
}

static async Task HandleTaskGet(HttpContext ctx, TaskStore store, JsonRpcRequest req)
{
    var taskId = req.Params.GetProperty("id").GetString() ?? "";
    var task = store.GetTask(taskId);
    if (task is null)
    {
        await ctx.Response.WriteAsJsonAsync(RpcError(req.Id, -32001, "Task not found"));
        return;
    }
    await ctx.Response.WriteAsJsonAsync(new JsonRpcSuccessResponse("2.0", req.Id, new
    {
        id = task.Id, status = task.Status, artifacts = task.Artifacts, history = task.History
    }));
}

static async Task HandleTaskCancel(HttpContext ctx, TaskStore store, JsonRpcRequest req)
{
    var taskId = req.Params.GetProperty("id").GetString() ?? "";
    var task = store.GetTask(taskId);
    if (task is null)
    {
        await ctx.Response.WriteAsJsonAsync(RpcError(req.Id, -32001, "Task not found"));
        return;
    }
    await store.UpdateStateAsync(taskId, TaskState.Canceled);
    await ctx.Response.WriteAsJsonAsync(new JsonRpcSuccessResponse("2.0", req.Id,
        new { id = task.Id, status = task.Status }));
}

static AgentMessage ParseMessage(JsonElement el)
{
    var role = el.GetProperty("role").GetString() ?? "user";
    var parts = el.GetProperty("parts").EnumerateArray().Select(p =>
    {
        var type = p.GetProperty("type").GetString() ?? "text";
        var text = p.TryGetProperty("text", out var t) ? t.GetString() : null;
        return new MessagePart(type, text);
    }).ToList();
    return new AgentMessage(role, parts);
}

static object RpcError(object? id, int code, string message) =>
    new JsonRpcErrorResponse("2.0", id, new JsonRpcError(code, message));
dotnet run

The server starts at http://localhost:5000. Agent Card is at http://localhost:5000/.well-known/agent.json and the A2A endpoint is at http://localhost:5000/a2a.

Comparison: Node.js vs Python vs C#

flowchart LR
    subgraph NodeJS["Node.js (Express)"]
        N1[express + uuid + zod]
        N2[EventEmitter for SSE]
        N3[In-memory Map]
    end
    subgraph Python["Python (FastAPI)"]
        P1[fastapi + uvicorn + pydantic]
        P2[asyncio.Queue for SSE]
        P3[Dict task store]
    end
    subgraph CSharp["C# (ASP.NET Core)"]
        C1[Minimal APIs + IHttpClientFactory]
        C2[System.Threading.Channels for SSE]
        C3[Dictionary + lock]
    end
    NodeJS --- A2A[A2A Protocol\nIdentical behavior]
    Python --- A2A
    CSharp --- A2A

All three implementations expose the same four JSON-RPC methods, serve an identical Agent Card schema, stream SSE events in the same format, and handle multi-turn interactions through the input_required state. A client agent built against any one of these will work against all three without modification. That is the whole point of the protocol.

The main language-specific differences are the SSE mechanism (Node.js uses a direct write set, Python uses asyncio queues, C# uses System.Threading.Channels) and how async processing is detached from the HTTP response. Everything else is a direct translation.

What Comes Next

At this point you have a fully working A2A agent server in all three backend languages. The next step is building the other side of the protocol: the client agent that discovers remote agents, routes tasks intelligently, and orchestrates multiple agents into a coordinated workflow.

Part 5 builds the orchestrator layer in Node.js. It fetches Agent Cards, matches tasks to the right agents based on declared skills, manages concurrent tasks across multiple remote agents, and handles the full state machine including multi-turn interactions.

References

Written by:

581 Posts

View All Posts
Follow Me :