Part 3 built a complete A2A agent server in Node.js. This part delivers the same Inventory Management Agent in Python using FastAPI and in C# using ASP.NET Core. The architecture, business logic, and A2A behavior are identical across all three implementations. Pick the language your team runs and use it as your production starting point.
Rather than re-explaining the protocol concepts, this post is a focused implementation reference. If you need to understand what Agent Cards, task lifecycle states, or SSE streaming are, revisit Parts 1 through 3 first.
Python Implementation with FastAPI
FastAPI is the natural choice for Python A2A servers. Its async-first design handles SSE streaming cleanly, its Pydantic models map directly to the A2A JSON structures, and its OpenAPI integration makes the authentication configuration straightforward.
Project Setup
mkdir a2a-inventory-agent-python
cd a2a-inventory-agent-python
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install fastapi uvicorn[standard] httpx python-dotenv pydanticProject structure:
a2a-inventory-agent-python/
app/
agent_card.py # Agent Card definition
task_store.py # In-memory task state management
task_handler.py # Business logic per skill
routes/
well_known.py # GET /.well-known/agent.json
tasks.py # POST /a2a (JSON-RPC handler)
middleware/
auth.py # Bearer token validation
models.py # Pydantic request/response models
main.py # Entry point
.envPydantic Models
# app/models.py
from pydantic import BaseModel
from typing import Any, Optional, List
from enum import Enum
class TaskState(str, Enum):
SUBMITTED = "submitted"
WORKING = "working"
INPUT_REQUIRED = "input_required"
COMPLETED = "completed"
FAILED = "failed"
CANCELED = "canceled"
class MessagePart(BaseModel):
type: str
text: Optional[str] = None
data: Optional[Any] = None
class Message(BaseModel):
role: str
parts: List[MessagePart]
class PushNotification(BaseModel):
url: str
token: Optional[str] = None
class TaskStatus(BaseModel):
state: TaskState
timestamp: str
message: Optional[Message] = None
class Artifact(BaseModel):
name: str
description: Optional[str] = None
parts: List[MessagePart]
last_chunk: bool = True
class TaskSendParams(BaseModel):
id: str
message: Message
push_notification: Optional[PushNotification] = None
metadata: Optional[dict] = None
class TaskGetParams(BaseModel):
id: str
class TaskCancelParams(BaseModel):
id: str
class JsonRpcRequest(BaseModel):
jsonrpc: str
method: str
id: Any
params: dictAgent Card
# app/agent_card.py
import os
def get_agent_card():
return {
"name": "Inventory Management Agent",
"description": "Manages stock levels, processes inventory queries, and triggers reorder workflows",
"version": "1.0.0",
"url": os.getenv("AGENT_URL", "http://localhost:8000/a2a"),
"documentationUrl": "http://localhost:8000/docs",
"provider": {
"organization": "Acme Corp",
"url": "https://acmecorp.example.com"
},
"capabilities": {
"streaming": True,
"pushNotifications": True,
"stateTransitionHistory": True
},
"defaultInputModes": ["text", "application/json"],
"defaultOutputModes": ["text", "application/json"],
"skills": [
{
"id": "check-stock",
"name": "Check Stock Level",
"description": "Returns current stock quantity for one or more product SKUs",
"tags": ["inventory", "stock", "query"],
"examples": ["Check stock for SKU-12345 and SKU-67890"],
"inputModes": ["text", "application/json"],
"outputModes": ["application/json"]
},
{
"id": "trigger-reorder",
"name": "Trigger Reorder",
"description": "Initiates a reorder workflow when stock falls below threshold",
"tags": ["inventory", "reorder", "procurement"],
"inputModes": ["application/json"],
"outputModes": ["application/json"]
}
],
"securitySchemes": {
"bearer": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"}
},
"security": [{"bearer": []}]
}Task Store
# app/task_store.py
import asyncio
import json
from datetime import datetime, timezone
from typing import Dict, Set, Optional, Any
from app.models import TaskState
TERMINAL_STATES = {TaskState.COMPLETED, TaskState.FAILED, TaskState.CANCELED}
tasks: Dict[str, dict] = {}
# Maps task_id to a set of asyncio.Queue objects (one per SSE client)
sse_queues: Dict[str, Set[asyncio.Queue]] = {}
def create_task(task_id: str, initial_message: dict, push_notification=None) -> dict:
task = {
"id": task_id,
"status": {"state": TaskState.SUBMITTED, "timestamp": _now()},
"messages": [initial_message],
"artifacts": [],
"push_notification": push_notification,
"history": [{"state": TaskState.SUBMITTED, "timestamp": _now()}]
}
tasks[task_id] = task
sse_queues[task_id] = set()
return task
def get_task(task_id: str) -> Optional[dict]:
return tasks.get(task_id)
async def update_task_state(task_id: str, new_state: TaskState, message: Optional[dict] = None):
task = tasks.get(task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
task["status"] = {"state": new_state, "timestamp": _now()}
if message:
task["messages"].append(message)
task["history"].append({"state": new_state, "timestamp": _now()})
await _broadcast(task_id, {
"type": "taskStatusUpdate",
"taskId": task_id,
"status": {**task["status"], "message": message}
})
async def add_artifact(task_id: str, artifact: dict, last_chunk: bool = True):
task = tasks.get(task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
task["artifacts"].append(artifact)
await _broadcast(task_id, {
"type": "taskArtifactUpdate",
"taskId": task_id,
"artifact": {**artifact, "lastChunk": last_chunk}
})
def add_sse_queue(task_id: str, queue: asyncio.Queue):
if task_id in sse_queues:
sse_queues[task_id].add(queue)
def remove_sse_queue(task_id: str, queue: asyncio.Queue):
if task_id in sse_queues:
sse_queues[task_id].discard(queue)
def is_terminal(state: TaskState) -> bool:
return state in TERMINAL_STATES
async def _broadcast(task_id: str, event: dict):
queues = sse_queues.get(task_id, set())
for q in list(queues):
await q.put(event)
def _now() -> str:
return datetime.now(timezone.utc).isoformat()Task Handler
# app/task_handler.py
import asyncio
import re
import httpx
from app.task_store import update_task_state, add_artifact, get_task, is_terminal, TaskState
INVENTORY = {
"SKU-12345": {"quantity": 240, "location": "Warehouse-A", "reorderThreshold": 50, "unitPrice": 12.5},
"SKU-67890": {"quantity": 12, "location": "Warehouse-B", "reorderThreshold": 30, "unitPrice": 42.0},
"SKU-99999": {"quantity": 0, "location": "Warehouse-A", "reorderThreshold": 20, "unitPrice": 8.75}
}
async def process_task(task_id: str):
task = get_task(task_id)
if not task:
return
text = _extract_text(task["messages"][0])
try:
await update_task_state(task_id, TaskState.WORKING)
if "reorder" in text.lower():
await _handle_reorder(task_id, task)
else:
await _handle_check_stock(task_id, text)
except Exception as e:
await update_task_state(task_id, TaskState.FAILED, {
"role": "agent",
"parts": [{"type": "text", "text": f"Error: {str(e)}"}]
})
await _deliver_push_notification(task_id)
async def _handle_check_stock(task_id: str, text: str):
await update_task_state(task_id, TaskState.WORKING, {
"role": "agent",
"parts": [{"type": "text", "text": "Parsing SKU identifiers from request..."}]
})
await asyncio.sleep(0.3)
skus = [s.upper() for s in re.findall(r"SKU-\d+", text, re.IGNORECASE)]
if not skus:
await update_task_state(task_id, TaskState.INPUT_REQUIRED, {
"role": "agent",
"parts": [{"type": "text", "text": "No SKU identifiers found. Please provide one or more SKU IDs (e.g. SKU-12345)."}]
})
return
await update_task_state(task_id, TaskState.WORKING, {
"role": "agent",
"parts": [{"type": "text", "text": f"Querying inventory database for {len(skus)} SKU(s)..."}]
})
await asyncio.sleep(0.4)
result = {}
alerts = []
for sku in skus:
item = INVENTORY.get(sku)
if item:
result[sku] = item
if item["quantity"] <= item["reorderThreshold"]:
alerts.append(f"{sku} is at or below reorder threshold (qty: {item['quantity']})")
else:
result[sku] = {"error": "SKU not found"}
await add_artifact(task_id, {
"name": "stock-levels",
"description": "Current inventory levels for requested SKUs",
"parts": [{"type": "application/json", "data": {"stockLevels": result, "alerts": alerts}}]
})
await update_task_state(task_id, TaskState.COMPLETED)
async def _handle_reorder(task_id: str, task: dict):
is_confirmation = len(task["messages"]) > 1
if not is_confirmation:
await update_task_state(task_id, TaskState.WORKING, {
"role": "agent", "parts": [{"type": "text", "text": "Analyzing reorder requirements..."}]
})
await asyncio.sleep(0.5)
await update_task_state(task_id, TaskState.INPUT_REQUIRED, {
"role": "agent",
"parts": [{"type": "text", "text": (
"SKU-67890 requires reorder (current: 12, threshold: 30). Options:\n"
"- 100 units at $42.00/unit = $4,200\n"
"- 250 units at $40.00/unit = $10,000\n"
"- 500 units at $37.50/unit = $18,750\n\n"
"Reply with your preferred quantity to proceed."
)}]
})
return
confirm_text = _extract_text(task["messages"][-1])
match = re.search(r"\d+", confirm_text)
quantity = int(match.group()) if match else 100
await update_task_state(task_id, TaskState.WORKING, {
"role": "agent",
"parts": [{"type": "text", "text": f"Creating purchase order for {quantity} units..."}]
})
await asyncio.sleep(0.6)
unit_price = 37.5 if quantity >= 500 else 40.0 if quantity >= 250 else 42.0
po = {
"poNumber": f"PO-{int(asyncio.get_event_loop().time() * 1000)}",
"sku": "SKU-67890",
"quantity": quantity,
"unitPrice": unit_price,
"totalValue": quantity * unit_price,
"status": "submitted"
}
await add_artifact(task_id, {
"name": "purchase-order",
"description": "Generated purchase order",
"parts": [{"type": "application/json", "data": po}]
})
await update_task_state(task_id, TaskState.COMPLETED, {
"role": "agent",
"parts": [{"type": "text", "text": f"Purchase order {po['poNumber']} submitted."}]
})
async def _deliver_push_notification(task_id: str):
task = get_task(task_id)
if not task or not task.get("push_notification"):
return
pn = task["push_notification"]
headers = {"Content-Type": "application/json"}
if pn.get("token"):
headers["Authorization"] = f"Bearer {pn['token']}"
try:
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(pn["url"], json={
"taskId": task_id,
"status": task["status"],
"artifacts": task["artifacts"]
}, headers=headers)
except Exception as e:
print(f"Push notification failed for {task_id}: {e}")
def _extract_text(message: dict) -> str:
return " ".join(
p.get("text", "") for p in message.get("parts", []) if p.get("type") == "text"
)Routes and Main App
# app/routes/well_known.py
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from app.agent_card import get_agent_card
router = APIRouter()
@router.get("/.well-known/agent.json")
async def agent_card():
return JSONResponse(content=get_agent_card(), headers={"Cache-Control": "public, max-age=300"})# app/routes/tasks.py
import asyncio
import json
from fastapi import APIRouter, Request, Depends
from fastapi.responses import JSONResponse, StreamingResponse
from app.task_store import (
create_task, get_task, update_task_state, add_sse_queue,
remove_sse_queue, is_terminal, TaskState
)
from app.task_handler import process_task
from app.middleware.auth import verify_token
router = APIRouter()
def rpc_error(rpc_id, code, message):
return {"jsonrpc": "2.0", "id": rpc_id, "error": {"code": code, "message": message}}
@router.post("/")
async def handle_rpc(request: Request, _=Depends(verify_token)):
body = await request.json()
jsonrpc = body.get("jsonrpc")
method = body.get("method")
rpc_id = body.get("id")
params = body.get("params", {})
if jsonrpc != "2.0" or not method or not params:
return JSONResponse(rpc_error(rpc_id, -32600, "Invalid request"))
if method == "tasks/send":
return await handle_task_send(rpc_id, params)
elif method == "tasks/sendSubscribe":
return await handle_task_send_subscribe(rpc_id, params)
elif method == "tasks/get":
return handle_task_get(rpc_id, params)
elif method == "tasks/cancel":
return await handle_task_cancel(rpc_id, params)
else:
return JSONResponse(rpc_error(rpc_id, -32601, "Method not found"))
async def handle_task_send(rpc_id, params):
task_id = params.get("id")
message = params.get("message")
if not task_id or not message:
return JSONResponse(rpc_error(rpc_id, -32602, "id and message are required"))
task = get_task(task_id)
if task:
if is_terminal(task["status"]["state"]):
return JSONResponse(rpc_error(rpc_id, -32002, "Task already in terminal state"))
task["messages"].append(message)
else:
create_task(task_id, message, params.get("push_notification"))
await process_task(task_id)
task = get_task(task_id)
return JSONResponse({"jsonrpc": "2.0", "id": rpc_id, "result": {
"id": task["id"], "status": task["status"],
"artifacts": task["artifacts"], "history": task["history"]
}})
async def handle_task_send_subscribe(rpc_id, params):
task_id = params.get("id")
message = params.get("message")
if not task_id or not message:
return JSONResponse(rpc_error(rpc_id, -32602, "id and message are required"))
task = get_task(task_id)
if task:
if is_terminal(task["status"]["state"]):
return JSONResponse(rpc_error(rpc_id, -32002, "Task already in terminal state"))
task["messages"].append(message)
else:
create_task(task_id, message, params.get("push_notification"))
queue: asyncio.Queue = asyncio.Queue()
add_sse_queue(task_id, queue)
async def event_stream():
asyncio.create_task(process_task(task_id))
try:
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=60.0)
yield f"data: {json.dumps(event)}\n\n"
t = get_task(task_id)
if t and is_terminal(t["status"]["state"]):
break
except asyncio.TimeoutError:
yield "data: {\"type\":\"keepalive\"}\n\n"
finally:
remove_sse_queue(task_id, queue)
return StreamingResponse(event_stream(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
def handle_task_get(rpc_id, params):
task = get_task(params.get("id", ""))
if not task:
return JSONResponse(rpc_error(rpc_id, -32001, "Task not found"))
return JSONResponse({"jsonrpc": "2.0", "id": rpc_id, "result": {
"id": task["id"], "status": task["status"],
"artifacts": task["artifacts"], "history": task["history"]
}})
async def handle_task_cancel(rpc_id, params):
task = get_task(params.get("id", ""))
if not task:
return JSONResponse(rpc_error(rpc_id, -32001, "Task not found"))
if is_terminal(task["status"]["state"]):
return JSONResponse(rpc_error(rpc_id, -32002, "Task not cancelable"))
await update_task_state(params["id"], TaskState.CANCELED)
return JSONResponse({"jsonrpc": "2.0", "id": rpc_id, "result": {
"id": task["id"], "status": task["status"]
}})# app/middleware/auth.py
import os
from fastapi import Header, HTTPException
async def verify_token(authorization: str = Header(...)):
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Bearer token required")
token = authorization[7:]
valid = os.getenv("API_TOKEN", "dev-token-12345")
if token != valid:
raise HTTPException(status_code=403, detail="Invalid token")# main.py
from fastapi import FastAPI
from dotenv import load_dotenv
from app.routes.well_known import router as well_known_router
from app.routes.tasks import router as tasks_router
load_dotenv()
app = FastAPI(title="Inventory Management A2A Agent", version="1.0.0")
app.include_router(well_known_router)
app.include_router(tasks_router, prefix="/a2a")uvicorn main:app --host 0.0.0.0 --port 8000 --reloadThe interactive API docs are at http://localhost:8000/docs once the server is running. The Agent Card is at http://localhost:8000/.well-known/agent.json.
C# Implementation with ASP.NET Core
The C# implementation uses ASP.NET Core minimal APIs for the routing, System.Text.Json for serialization, and IAsyncEnumerable for SSE streaming. This approach keeps the code clean and avoids unnecessary dependencies.
Project Setup
dotnet new web -n A2AInventoryAgent
cd A2AInventoryAgent
dotnet add package Microsoft.Extensions.HttpProject structure:
A2AInventoryAgent/
Models/
A2AModels.cs # All request/response/task models
Services/
TaskStore.cs # In-memory task state management
TaskHandler.cs # Business logic per skill
Endpoints/
WellKnownEndpoints.cs
TaskEndpoints.cs
Middleware/
BearerAuthMiddleware.cs
Program.csModels
// Models/A2AModels.cs
using System.Text.Json.Serialization;
namespace A2AInventoryAgent.Models;
public enum TaskState
{
Submitted, Working, InputRequired, Completed, Failed, Canceled
}
public record MessagePart(
[property: JsonPropertyName("type")] string Type,
[property: JsonPropertyName("text")] string? Text = null,
[property: JsonPropertyName("data")] object? Data = null
);
public record AgentMessage(
[property: JsonPropertyName("role")] string Role,
[property: JsonPropertyName("parts")] List Parts
);
public record TaskStatus(
[property: JsonPropertyName("state")] string State,
[property: JsonPropertyName("timestamp")] string Timestamp,
[property: JsonPropertyName("message")] AgentMessage? Message = null
);
public record Artifact(
[property: JsonPropertyName("name")] string Name,
[property: JsonPropertyName("description")] string? Description,
[property: JsonPropertyName("parts")] List Parts,
[property: JsonPropertyName("lastChunk")] bool LastChunk = true
);
public record PushNotification(
[property: JsonPropertyName("url")] string Url,
[property: JsonPropertyName("token")] string? Token = null
);
public class A2ATask
{
public string Id { get; set; } = "";
public TaskStatus Status { get; set; } = new("submitted", DateTime.UtcNow.ToString("O"));
public List Messages { get; set; } = new();
public List Artifacts { get; set; } = new();
public PushNotification? PushNotification { get; set; }
public List Task Store
// Services/TaskStore.cs
using System.Threading.Channels;
using A2AInventoryAgent.Models;
using System.Text.Json;
namespace A2AInventoryAgent.Services;
public class TaskStore
{
private readonly Dictionary _tasks = new();
private readonly Lock _lock = new();
private static readonly HashSet TerminalStates =
[TaskState.Completed, TaskState.Failed, TaskState.Canceled];
public A2ATask CreateTask(string taskId, AgentMessage initialMessage, PushNotification? push = null)
{
var task = new A2ATask
{
Id = taskId,
Status = new TaskStatus("submitted", DateTime.UtcNow.ToString("O")),
PushNotification = push
};
task.Messages.Add(initialMessage);
task.History.Add(new { state = "submitted", timestamp = DateTime.UtcNow.ToString("O") });
lock (_lock) { _tasks[taskId] = task; }
return task;
}
public A2ATask? GetTask(string taskId)
{
lock (_lock) { return _tasks.GetValueOrDefault(taskId); }
}
public async Task UpdateStateAsync(string taskId, TaskState state, AgentMessage? message = null)
{
A2ATask? task;
lock (_lock) { task = _tasks.GetValueOrDefault(taskId); }
if (task is null) throw new InvalidOperationException($"Task {taskId} not found");
var stateStr = state.ToString().ToLowerInvariant().Replace("inputrequired", "input_required");
task.Status = new TaskStatus(stateStr, DateTime.UtcNow.ToString("O"), message);
if (message is not null) task.Messages.Add(message);
task.History.Add(new { state = stateStr, timestamp = DateTime.UtcNow.ToString("O") });
await BroadcastAsync(task, new
{
type = "taskStatusUpdate",
taskId,
status = task.Status
});
}
public async Task AddArtifactAsync(string taskId, Artifact artifact)
{
A2ATask? task;
lock (_lock) { task = _tasks.GetValueOrDefault(taskId); }
if (task is null) return;
task.Artifacts.Add(artifact);
await BroadcastAsync(task, new { type = "taskArtifactUpdate", taskId, artifact });
}
public Channel Task Handler
// Services/TaskHandler.cs
using System.Text.RegularExpressions;
using A2AInventoryAgent.Models;
namespace A2AInventoryAgent.Services;
public class TaskHandler(TaskStore store, IHttpClientFactory httpClientFactory)
{
private static readonly Dictionary Inventory = new()
{
["SKU-12345"] = new { quantity = 240, location = "Warehouse-A", reorderThreshold = 50, unitPrice = 12.5 },
["SKU-67890"] = new { quantity = 12, location = "Warehouse-B", reorderThreshold = 30, unitPrice = 42.0 },
["SKU-99999"] = new { quantity = 0, location = "Warehouse-A", reorderThreshold = 20, unitPrice = 8.75 }
};
public async Task ProcessTaskAsync(string taskId)
{
var task = store.GetTask(taskId);
if (task is null) return;
var text = ExtractText(task.Messages[0]);
try
{
await store.UpdateStateAsync(taskId, TaskState.Working);
if (text.Contains("reorder", StringComparison.OrdinalIgnoreCase))
await HandleReorderAsync(taskId, task);
else
await HandleCheckStockAsync(taskId, text);
}
catch (Exception ex)
{
await store.UpdateStateAsync(taskId, TaskState.Failed,
AgentMsg($"Error: {ex.Message}"));
}
await DeliverPushNotificationAsync(taskId);
}
private async Task HandleCheckStockAsync(string taskId, string text)
{
await store.UpdateStateAsync(taskId, TaskState.Working, AgentMsg("Parsing SKU identifiers..."));
await Task.Delay(300);
var skus = Regex.Matches(text, @"SKU-\d+", RegexOptions.IgnoreCase)
.Select(m => m.Value.ToUpper()).ToList();
if (skus.Count == 0)
{
await store.UpdateStateAsync(taskId, TaskState.InputRequired,
AgentMsg("No SKU identifiers found. Please provide one or more SKU IDs (e.g. SKU-12345)."));
return;
}
await store.UpdateStateAsync(taskId, TaskState.Working,
AgentMsg($"Querying inventory database for {skus.Count} SKU(s)..."));
await Task.Delay(400);
var result = new Dictionary();
var alerts = new List();
foreach (var sku in skus)
{
if (Inventory.TryGetValue(sku, out var item))
{
result[sku] = item;
dynamic d = item;
if (d.quantity <= d.reorderThreshold)
alerts.Add($"{sku} is at or below reorder threshold");
}
else
{
result[sku] = new { error = "SKU not found" };
}
}
await store.AddArtifactAsync(taskId, new Artifact(
"stock-levels", "Current inventory levels",
[new MessagePart("application/json", Data: new { stockLevels = result, alerts })]
));
await store.UpdateStateAsync(taskId, TaskState.Completed);
}
private async Task HandleReorderAsync(string taskId, A2ATask task)
{
bool isConfirmation = task.Messages.Count > 1;
if (!isConfirmation)
{
await store.UpdateStateAsync(taskId, TaskState.Working, AgentMsg("Analyzing reorder requirements..."));
await Task.Delay(500);
await store.UpdateStateAsync(taskId, TaskState.InputRequired, AgentMsg(
"SKU-67890 requires reorder (current: 12, threshold: 30). Options:\n" +
"- 100 units at $42.00/unit = $4,200\n" +
"- 250 units at $40.00/unit = $10,000\n" +
"- 500 units at $37.50/unit = $18,750\n\nReply with your preferred quantity."));
return;
}
var confirmText = ExtractText(task.Messages[^1]);
var match = Regex.Match(confirmText, @"\d+");
var quantity = match.Success ? int.Parse(match.Value) : 100;
await store.UpdateStateAsync(taskId, TaskState.Working,
AgentMsg($"Creating purchase order for {quantity} units..."));
await Task.Delay(600);
double unitPrice = quantity >= 500 ? 37.5 : quantity >= 250 ? 40.0 : 42.0;
var po = new
{
poNumber = $"PO-{DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()}",
sku = "SKU-67890", quantity, unitPrice,
totalValue = quantity * unitPrice, status = "submitted"
};
await store.AddArtifactAsync(taskId, new Artifact(
"purchase-order", "Generated purchase order",
[new MessagePart("application/json", Data: po)]
));
await store.UpdateStateAsync(taskId, TaskState.Completed,
AgentMsg($"Purchase order {po.poNumber} submitted."));
}
private async Task DeliverPushNotificationAsync(string taskId)
{
var task = store.GetTask(taskId);
if (task?.PushNotification is null) return;
var client = httpClientFactory.CreateClient();
if (task.PushNotification.Token is not null)
client.DefaultRequestHeaders.Add("Authorization", $"Bearer {task.PushNotification.Token}");
try
{
await client.PostAsJsonAsync(task.PushNotification.Url, new
{
taskId, status = task.Status, artifacts = task.Artifacts
});
}
catch (Exception ex)
{
Console.WriteLine($"Push notification failed for {taskId}: {ex.Message}");
}
}
private static AgentMessage AgentMsg(string text) =>
new("agent", [new MessagePart("text", Text: text)]);
private static string ExtractText(AgentMessage msg) =>
string.Join(" ", msg.Parts.Where(p => p.Type == "text").Select(p => p.Text ?? ""));
} Program.cs – Endpoints and Wiring
// Program.cs
using System.Text;
using System.Text.Json;
using A2AInventoryAgent.Models;
using A2AInventoryAgent.Services;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton();
builder.Services.AddScoped();
builder.Services.AddHttpClient();
var app = builder.Build();
var agentCard = new
{
name = "Inventory Management Agent",
description = "Manages stock levels and triggers reorder workflows",
version = "1.0.0",
url = Environment.GetEnvironmentVariable("AGENT_URL") ?? "http://localhost:5000/a2a",
capabilities = new { streaming = true, pushNotifications = true, stateTransitionHistory = true },
defaultInputModes = new[] { "text", "application/json" },
defaultOutputModes = new[] { "application/json" },
skills = new[]
{
new { id = "check-stock", name = "Check Stock Level",
description = "Returns stock levels for SKUs", tags = new[] { "inventory", "stock" } },
new { id = "trigger-reorder", name = "Trigger Reorder",
description = "Initiates reorder workflow", tags = new[] { "reorder", "procurement" } }
},
securitySchemes = new { bearer = new { type = "http", scheme = "bearer", bearerFormat = "JWT" } },
security = new[] { new { bearer = Array.Empty() } }
};
// Agent Card endpoint - no auth
app.MapGet("/.well-known/agent.json", () => Results.Json(agentCard));
app.MapGet("/health", () => Results.Json(new { status = "ok" }));
// All A2A task endpoints require auth
app.MapPost("/a2a", async (HttpContext ctx, TaskStore store, TaskHandler handler) =>
{
// Bearer token auth
var authHeader = ctx.Request.Headers.Authorization.ToString();
var validToken = Environment.GetEnvironmentVariable("API_TOKEN") ?? "dev-token-12345";
if (!authHeader.StartsWith("Bearer ") || authHeader[7..] != validToken)
{
ctx.Response.StatusCode = 401;
await ctx.Response.WriteAsJsonAsync(new { error = "Unauthorized" });
return;
}
var body = await JsonSerializer.DeserializeAsync(ctx.Request.Body,
new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
if (body is null || body.Jsonrpc != "2.0")
{
await ctx.Response.WriteAsJsonAsync(RpcError(null, -32600, "Invalid request"));
return;
}
switch (body.Method)
{
case "tasks/send":
await HandleTaskSend(ctx, store, handler, body);
break;
case "tasks/sendSubscribe":
await HandleTaskSendSubscribe(ctx, store, handler, body);
break;
case "tasks/get":
await HandleTaskGet(ctx, store, body);
break;
case "tasks/cancel":
await HandleTaskCancel(ctx, store, body);
break;
default:
await ctx.Response.WriteAsJsonAsync(RpcError(body.Id, -32601, "Method not found"));
break;
}
});
app.Run("http://0.0.0.0:5000");
// --- Handlers ---
static async Task HandleTaskSend(HttpContext ctx, TaskStore store, TaskHandler handler, JsonRpcRequest req)
{
var taskId = req.Params.GetProperty("id").GetString() ?? "";
var msgEl = req.Params.GetProperty("message");
var message = ParseMessage(msgEl);
var task = store.GetTask(taskId);
if (task is not null)
{
task.Messages.Add(message);
}
else
{
PushNotification? push = null;
if (req.Params.TryGetProperty("pushNotification", out var pnEl))
push = new PushNotification(pnEl.GetProperty("url").GetString()!,
pnEl.TryGetProperty("token", out var t) ? t.GetString() : null);
store.CreateTask(taskId, message, push);
}
await handler.ProcessTaskAsync(taskId);
task = store.GetTask(taskId)!;
await ctx.Response.WriteAsJsonAsync(new JsonRpcSuccessResponse("2.0", req.Id, new
{
id = task.Id, status = task.Status,
artifacts = task.Artifacts, history = task.History
}));
}
static async Task HandleTaskSendSubscribe(HttpContext ctx, TaskStore store, TaskHandler handler, JsonRpcRequest req)
{
var taskId = req.Params.GetProperty("id").GetString() ?? "";
var message = ParseMessage(req.Params.GetProperty("message"));
var task = store.GetTask(taskId);
if (task is not null)
task.Messages.Add(message);
else
store.CreateTask(taskId, message);
ctx.Response.Headers.ContentType = "text/event-stream";
ctx.Response.Headers.CacheControl = "no-cache";
ctx.Response.Headers["X-Accel-Buffering"] = "no";
var channel = store.AddSseChannel(taskId);
_ = Task.Run(() => handler.ProcessTaskAsync(taskId));
try
{
await foreach (var evt in channel.Reader.ReadAllAsync(ctx.RequestAborted))
{
var data = $"data: {JsonSerializer.Serialize(evt)}\n\n";
await ctx.Response.WriteAsync(data, ctx.RequestAborted);
await ctx.Response.Body.FlushAsync(ctx.RequestAborted);
}
}
catch (OperationCanceledException) { }
finally
{
store.RemoveSseChannel(taskId, channel);
}
}
static async Task HandleTaskGet(HttpContext ctx, TaskStore store, JsonRpcRequest req)
{
var taskId = req.Params.GetProperty("id").GetString() ?? "";
var task = store.GetTask(taskId);
if (task is null)
{
await ctx.Response.WriteAsJsonAsync(RpcError(req.Id, -32001, "Task not found"));
return;
}
await ctx.Response.WriteAsJsonAsync(new JsonRpcSuccessResponse("2.0", req.Id, new
{
id = task.Id, status = task.Status, artifacts = task.Artifacts, history = task.History
}));
}
static async Task HandleTaskCancel(HttpContext ctx, TaskStore store, JsonRpcRequest req)
{
var taskId = req.Params.GetProperty("id").GetString() ?? "";
var task = store.GetTask(taskId);
if (task is null)
{
await ctx.Response.WriteAsJsonAsync(RpcError(req.Id, -32001, "Task not found"));
return;
}
await store.UpdateStateAsync(taskId, TaskState.Canceled);
await ctx.Response.WriteAsJsonAsync(new JsonRpcSuccessResponse("2.0", req.Id,
new { id = task.Id, status = task.Status }));
}
static AgentMessage ParseMessage(JsonElement el)
{
var role = el.GetProperty("role").GetString() ?? "user";
var parts = el.GetProperty("parts").EnumerateArray().Select(p =>
{
var type = p.GetProperty("type").GetString() ?? "text";
var text = p.TryGetProperty("text", out var t) ? t.GetString() : null;
return new MessagePart(type, text);
}).ToList();
return new AgentMessage(role, parts);
}
static object RpcError(object? id, int code, string message) =>
new JsonRpcErrorResponse("2.0", id, new JsonRpcError(code, message)); dotnet runThe server starts at http://localhost:5000. Agent Card is at http://localhost:5000/.well-known/agent.json and the A2A endpoint is at http://localhost:5000/a2a.
Comparison: Node.js vs Python vs C#
flowchart LR
subgraph NodeJS["Node.js (Express)"]
N1[express + uuid + zod]
N2[EventEmitter for SSE]
N3[In-memory Map]
end
subgraph Python["Python (FastAPI)"]
P1[fastapi + uvicorn + pydantic]
P2[asyncio.Queue for SSE]
P3[Dict task store]
end
subgraph CSharp["C# (ASP.NET Core)"]
C1[Minimal APIs + IHttpClientFactory]
C2[System.Threading.Channels for SSE]
C3[Dictionary + lock]
end
NodeJS --- A2A[A2A Protocol\nIdentical behavior]
Python --- A2A
CSharp --- A2A
All three implementations expose the same four JSON-RPC methods, serve an identical Agent Card schema, stream SSE events in the same format, and handle multi-turn interactions through the input_required state. A client agent built against any one of these will work against all three without modification. That is the whole point of the protocol.
The main language-specific differences are the SSE mechanism (Node.js uses a direct write set, Python uses asyncio queues, C# uses System.Threading.Channels) and how async processing is detached from the HTTP response. Everything else is a direct translation.
What Comes Next
At this point you have a fully working A2A agent server in all three backend languages. The next step is building the other side of the protocol: the client agent that discovers remote agents, routes tasks intelligently, and orchestrates multiple agents into a coordinated workflow.
Part 5 builds the orchestrator layer in Node.js. It fetches Agent Cards, matches tasks to the right agents based on declared skills, manages concurrent tasks across multiple remote agents, and handles the full state machine including multi-turn interactions.
References
- A2A Protocol – Official Specification (https://a2a-protocol.org/latest/specification/)
- GitHub – A2A Protocol Repository, Linux Foundation (https://github.com/a2aproject/A2A)
- FastAPI – StreamingResponse for SSE (https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse)
- Microsoft Learn – ASP.NET Core Minimal APIs (https://learn.microsoft.com/en-us/aspnet/core/fundamentals/minimal-apis)
- Microsoft Learn – System.Threading.Channels (https://learn.microsoft.com/en-us/dotnet/core/extensions/channels)
- JSON-RPC 2.0 Specification (https://www.jsonrpc.org/specification)
