Production edge AI deployments require robust inference servers exposing optimized models through standardized APIs. This post provides comprehensive implementation guidance for building multi-language edge inference servers, covering Node.js/Express and C#/ASP.NET Core architectures, camera integration patterns for live video streams, asynchronous request handling for concurrent inference, error recovery mechanisms ensuring reliability, and achieving 15-22ms end-to-end latency while supporting 30+ concurrent inference requests on Jetson platforms.
Part 3 covered deploying optimized TensorRT models on NVIDIA Jetson hardware. This post focuses on application layer implementation: designing RESTful inference APIs, integrating camera inputs for continuous monitoring, implementing efficient request queuing and batching, handling errors gracefully with automatic recovery, and benchmarking real-world server performance under concurrent load.
Inference Server Architecture Overview
Edge inference servers bridge hardware-accelerated models with application-level APIs, managing resources efficiently while providing reliable service interfaces.
flowchart TD
A[Client Applications] --> B[Load Balancer]
B --> C[API Server 1]
B --> D[API Server 2]
C --> E[Request Queue]
D --> E
E --> F[Inference Engine Manager]
F --> G[TensorRT Engine Pool]
H[Camera Streams] --> I[Frame Buffer]
I --> E
G --> J[GPU Memory Pool]
J --> K[Inference Results]
K --> L[Post-Processing]
L --> M[Response Cache]
M --> C
M --> D
N[Health Monitor] --> F
N --> O[Metrics Collector]
O --> P[Prometheus/Grafana]Key architectural components include API layer handling HTTP/WebSocket requests and response formatting, request queue managing inference workload with priority scheduling and rate limiting, inference engine pool maintaining multiple TensorRT contexts for concurrent processing, camera integration layer managing video streams with frame buffering and synchronization, resource manager monitoring GPU memory, thermal state, and system health, and observability layer collecting metrics, logging, and distributed tracing.
Node.js/Express Inference Server Implementation
Node.js provides excellent asynchronous I/O performance ideal for managing concurrent API requests while integrating with TensorRT through native addons or child processes.
Server Setup and Dependencies:
// package.json
{
"name": "edge-inference-server",
"version": "1.0.0",
"dependencies": {
"express": "^4.18.2",
"multer": "^1.4.5-lts.1",
"sharp": "^0.33.0",
"ws": "^8.14.2",
"prom-client": "^15.0.0",
"winston": "^3.11.0"
}
}
// Install dependencies
npm install
Core Inference Server (Node.js):
// server.js
const express = require('express');
const multer = require('multer');
const sharp = require('sharp');
const { spawn } = require('child_process');
const WebSocket = require('ws');
const promClient = require('prom-client');
const winston = require('winston');
// Configure logging
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
new winston.transports.File({ filename: 'error.log', level: 'error' }),
new winston.transports.File({ filename: 'combined.log' }),
new winston.transports.Console({ format: winston.format.simple() })
]
});
// Configure Prometheus metrics
const register = new promClient.Registry();
promClient.collectDefaultMetrics({ register });
const inferenceLatency = new promClient.Histogram({
name: 'inference_latency_seconds',
help: 'Inference latency in seconds',
buckets: [0.01, 0.02, 0.03, 0.05, 0.1, 0.2, 0.5]
});
const inferenceCounter = new promClient.Counter({
name: 'inference_requests_total',
help: 'Total inference requests'
});
const inferenceErrors = new promClient.Counter({
name: 'inference_errors_total',
help: 'Total inference errors'
});
register.registerMetric(inferenceLatency);
register.registerMetric(inferenceCounter);
register.registerMetric(inferenceErrors);
// Inference Engine Manager
class InferenceEngineManager {
constructor(enginePath, maxWorkers = 4) {
this.enginePath = enginePath;
this.maxWorkers = maxWorkers;
this.workers = [];
this.queue = [];
this.processing = 0;
this.initializeWorkers();
}
initializeWorkers() {
for (let i = 0; i < this.maxWorkers; i++) {
const worker = {
id: i,
busy: false,
process: null
};
this.workers.push(worker);
}
logger.info(`Initialized ${this.maxWorkers} inference workers`);
}
async infer(imageBuffer) {
return new Promise((resolve, reject) => {
const request = { imageBuffer, resolve, reject, timestamp: Date.now() };
this.queue.push(request);
this.processQueue();
});
}
async processQueue() {
if (this.queue.length === 0) return;
const availableWorker = this.workers.find(w => !w.busy);
if (!availableWorker) return;
const request = this.queue.shift();
availableWorker.busy = true;
this.processing++;
try {
const result = await this.executeInference(availableWorker, request.imageBuffer);
request.resolve(result);
} catch (error) {
logger.error(`Inference error: ${error.message}`);
request.reject(error);
} finally {
availableWorker.busy = false;
this.processing--;
this.processQueue();
}
}
async executeInference(worker, imageBuffer) {
return new Promise((resolve, reject) => {
const pythonProcess = spawn('python3', [
'inference.py',
'--engine', this.enginePath,
'--input', '-'
]);
let output = '';
let errorOutput = '';
pythonProcess.stdout.on('data', (data) => {
output += data.toString();
});
pythonProcess.stderr.on('data', (data) => {
errorOutput += data.toString();
});
pythonProcess.on('close', (code) => {
if (code === 0) {
try {
const result = JSON.parse(output);
resolve(result);
} catch (error) {
reject(new Error(`Failed to parse inference output: ${error.message}`));
}
} else {
reject(new Error(`Inference failed with code ${code}: ${errorOutput}`));
}
});
pythonProcess.stdin.write(imageBuffer);
pythonProcess.stdin.end();
});
}
getStatus() {
return {
workers: this.workers.length,
busy: this.workers.filter(w => w.busy).length,
queue: this.queue.length,
processing: this.processing
};
}
}
// Initialize Express app
const app = express();
const inferenceManager = new InferenceEngineManager('yolov8n_int8.engine', 4);
// Configure multer for file uploads
const upload = multer({
storage: multer.memoryStorage(),
limits: { fileSize: 10 * 1024 * 1024 } // 10MB limit
});
// Middleware
app.use(express.json());
app.use((req, res, next) => {
logger.info(`${req.method} ${req.path}`);
next();
});
// Health check endpoint
app.get('/health', (req, res) => {
const status = inferenceManager.getStatus();
res.json({
status: 'healthy',
timestamp: new Date().toISOString(),
inference: status
});
});
// Metrics endpoint
app.get('/metrics', async (req, res) => {
res.set('Content-Type', register.contentType);
res.end(await register.metrics());
});
// Single image inference endpoint
app.post('/infer', upload.single('image'), async (req, res) => {
const startTime = Date.now();
inferenceCounter.inc();
try {
if (!req.file) {
return res.status(400).json({ error: 'No image provided' });
}
// Preprocess image
const processedImage = await sharp(req.file.buffer)
.resize(640, 640, { fit: 'fill' })
.raw()
.toBuffer();
// Run inference
const result = await inferenceManager.infer(processedImage);
// Record metrics
const latency = (Date.now() - startTime) / 1000;
inferenceLatency.observe(latency);
res.json({
detections: result.detections,
latency: latency,
timestamp: new Date().toISOString()
});
logger.info(`Inference completed in ${latency.toFixed(3)}s`);
} catch (error) {
inferenceErrors.inc();
logger.error(`Inference request failed: ${error.message}`);
res.status(500).json({ error: error.message });
}
});
// Batch inference endpoint
app.post('/infer/batch', upload.array('images', 10), async (req, res) => {
const startTime = Date.now();
try {
if (!req.files || req.files.length === 0) {
return res.status(400).json({ error: 'No images provided' });
}
// Process all images concurrently
const inferencePromises = req.files.map(async (file) => {
const processedImage = await sharp(file.buffer)
.resize(640, 640, { fit: 'fill' })
.raw()
.toBuffer();
return inferenceManager.infer(processedImage);
});
const results = await Promise.all(inferencePromises);
const latency = (Date.now() - startTime) / 1000;
res.json({
results: results,
count: results.length,
latency: latency,
timestamp: new Date().toISOString()
});
} catch (error) {
logger.error(`Batch inference failed: ${error.message}`);
res.status(500).json({ error: error.message });
}
});
// WebSocket server for real-time inference
const wss = new WebSocket.Server({ noServer: true });
wss.on('connection', (ws) => {
logger.info('WebSocket client connected');
ws.on('message', async (message) => {
try {
const imageBuffer = Buffer.from(message);
// Preprocess and infer
const processedImage = await sharp(imageBuffer)
.resize(640, 640, { fit: 'fill' })
.raw()
.toBuffer();
const result = await inferenceManager.infer(processedImage);
ws.send(JSON.stringify({
type: 'detection',
data: result,
timestamp: new Date().toISOString()
}));
} catch (error) {
logger.error(`WebSocket inference error: ${error.message}`);
ws.send(JSON.stringify({
type: 'error',
message: error.message
}));
}
});
ws.on('close', () => {
logger.info('WebSocket client disconnected');
});
});
// HTTP server
const PORT = process.env.PORT || 3000;
const server = app.listen(PORT, () => {
logger.info(`Inference server listening on port ${PORT}`);
});
// WebSocket upgrade handler
server.on('upgrade', (request, socket, head) => {
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request);
});
});
// Graceful shutdown
process.on('SIGTERM', () => {
logger.info('SIGTERM received, shutting down gracefully');
server.close(() => {
logger.info('Server closed');
process.exit(0);
});
});
Python Inference Worker (inference.py):
#!/usr/bin/env python3
"""
TensorRT inference worker for Node.js server
"""
import sys
import json
import argparse
import numpy as np
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
class TensorRTInference:
def __init__(self, engine_path):
self.logger = trt.Logger(trt.Logger.WARNING)
with open(engine_path, 'rb') as f:
self.runtime = trt.Runtime(self.logger)
self.engine = self.runtime.deserialize_cuda_engine(f.read())
self.context = self.engine.create_execution_context()
# Allocate buffers
self.inputs = []
self.outputs = []
self.bindings = []
self.stream = cuda.Stream()
for binding in self.engine:
size = trt.volume(self.engine.get_binding_shape(binding))
dtype = trt.nptype(self.engine.get_binding_dtype(binding))
host_mem = cuda.pagelocked_empty(size, dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
self.bindings.append(int(device_mem))
if self.engine.binding_is_input(binding):
self.inputs.append({'host': host_mem, 'device': device_mem})
else:
self.outputs.append({'host': host_mem, 'device': device_mem})
def infer(self, image_data):
# Image already preprocessed as 640x640 RGB
input_data = np.frombuffer(image_data, dtype=np.uint8)
input_data = input_data.reshape(640, 640, 3)
input_data = input_data.transpose(2, 0, 1).astype(np.float32) / 255.0
input_data = np.expand_dims(input_data, axis=0)
# Copy to device
np.copyto(self.inputs[0]['host'], input_data.ravel())
cuda.memcpy_htod_async(self.inputs[0]['device'],
self.inputs[0]['host'],
self.stream)
# Execute
self.context.execute_async_v2(bindings=self.bindings,
stream_handle=self.stream.handle)
# Copy output
cuda.memcpy_dtoh_async(self.outputs[0]['host'],
self.outputs[0]['device'],
self.stream)
self.stream.synchronize()
output = self.outputs[0]['host'].reshape(self.engine.get_binding_shape(1))
return self.postprocess(output)
def postprocess(self, output, conf_threshold=0.25, iou_threshold=0.45):
output = output[0].T
boxes = output[:, :4]
scores = output[:, 4:].max(axis=1)
class_ids = output[:, 4:].argmax(axis=1)
mask = scores > conf_threshold
boxes = boxes[mask]
scores = scores[mask]
class_ids = class_ids[mask]
# Convert to corner format
x_center, y_center, width, height = boxes.T
x1 = x_center - width / 2
y1 = y_center - height / 2
x2 = x_center + width / 2
y2 = y_center + height / 2
boxes = np.stack([x1, y1, x2, y2], axis=1)
# NMS
indices = self.nms(boxes, scores, iou_threshold)
detections = []
for idx in indices:
detections.append({
'bbox': boxes[idx].tolist(),
'score': float(scores[idx]),
'class_id': int(class_ids[idx])
})
return {'detections': detections}
def nms(self, boxes, scores, threshold):
x1, y1, x2, y2 = boxes.T
areas = (x2 - x1) * (y2 - y1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0, xx2 - xx1)
h = np.maximum(0, yy2 - yy1)
inter = w * h
iou = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(iou <= threshold)[0]
order = order[inds + 1]
return np.array(keep)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--engine', required=True)
parser.add_argument('--input', default='-')
args = parser.parse_args()
# Initialize inference
inference = TensorRTInference(args.engine)
# Read image from stdin
image_data = sys.stdin.buffer.read()
# Run inference
result = inference.infer(image_data)
# Output JSON
print(json.dumps(result))
C#/ASP.NET Core Inference Server Implementation
C# provides strong typing and excellent performance for production enterprise deployments requiring robust error handling and maintainability.
ASP.NET Core Server (C#):
// Program.cs
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Diagnostics;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
// Register inference service as singleton
builder.Services.AddSingleton<InferenceService>(sp =>
new InferenceService("yolov8n_int8.engine", maxWorkers: 4));
var app = builder.Build();
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseAuthorization();
app.MapControllers();
app.Run();
// InferenceService.cs
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
public class InferenceService
{
private readonly string _enginePath;
private readonly int _maxWorkers;
private readonly ConcurrentQueue<InferenceRequest> _queue;
private readonly SemaphoreSlim _workerSemaphore;
private int _processing;
public InferenceService(string enginePath, int maxWorkers = 4)
{
_enginePath = enginePath;
_maxWorkers = maxWorkers;
_queue = new ConcurrentQueue<InferenceRequest>();
_workerSemaphore = new SemaphoreSlim(maxWorkers, maxWorkers);
_processing = 0;
}
public async Task<InferenceResult> InferAsync(byte[] imageData)
{
var tcs = new TaskCompletionSource<InferenceResult>();
var request = new InferenceRequest
{
ImageData = imageData,
CompletionSource = tcs,
Timestamp = DateTime.UtcNow
};
_queue.Enqueue(request);
_ = ProcessQueueAsync();
return await tcs.Task;
}
private async Task ProcessQueueAsync()
{
if (!_queue.TryDequeue(out var request))
return;
await _workerSemaphore.WaitAsync();
Interlocked.Increment(ref _processing);
try
{
var result = await ExecuteInferenceAsync(request.ImageData);
request.CompletionSource.SetResult(result);
}
catch (Exception ex)
{
request.CompletionSource.SetException(ex);
}
finally
{
_workerSemaphore.Release();
Interlocked.Decrement(ref _processing);
_ = ProcessQueueAsync();
}
}
private async Task<InferenceResult> ExecuteInferenceAsync(byte[] imageData)
{
var startInfo = new ProcessStartInfo
{
FileName = "python3",
Arguments = $"inference.py --engine {_enginePath} --input -",
RedirectStandardInput = true,
RedirectStandardOutput = true,
RedirectStandardError = true,
UseShellExecute = false,
CreateNoWindow = true
};
using var process = new Process { StartInfo = startInfo };
process.Start();
// Write image data to stdin
await process.StandardInput.BaseStream.WriteAsync(imageData);
process.StandardInput.Close();
// Read output
var output = await process.StandardOutput.ReadToEndAsync();
var errorOutput = await process.StandardError.ReadToEndAsync();
await process.WaitForExitAsync();
if (process.ExitCode != 0)
{
throw new Exception($"Inference failed: {errorOutput}");
}
var result = JsonSerializer.Deserialize<InferenceResult>(output);
return result ?? throw new Exception("Failed to deserialize inference result");
}
public ServiceStatus GetStatus()
{
return new ServiceStatus
{
MaxWorkers = _maxWorkers,
Processing = _processing,
QueueDepth = _queue.Count,
AvailableWorkers = _workerSemaphore.CurrentCount
};
}
}
public class InferenceRequest
{
public byte[] ImageData { get; set; } = Array.Empty<byte>();
public TaskCompletionSource<InferenceResult> CompletionSource { get; set; } = new();
public DateTime Timestamp { get; set; }
}
public class InferenceResult
{
public Detection[] Detections { get; set; } = Array.Empty<Detection>();
}
public class Detection
{
public float[] Bbox { get; set; } = Array.Empty<float>();
public float Score { get; set; }
public int ClassId { get; set; }
}
public class ServiceStatus
{
public int MaxWorkers { get; set; }
public int Processing { get; set; }
public int QueueDepth { get; set; }
public int AvailableWorkers { get; set; }
}
// InferenceController.cs
using Microsoft.AspNetCore.Mvc;
using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;
[ApiController]
[Route("api")]
public class InferenceController : ControllerBase
{
private readonly InferenceService _inferenceService;
public InferenceController(InferenceService inferenceService)
{
_inferenceService = inferenceService;
}
[HttpGet("health")]
public IActionResult GetHealth()
{
var status = _inferenceService.GetStatus();
return Ok(new
{
Status = "healthy",
Timestamp = DateTime.UtcNow,
Inference = status
});
}
[HttpPost("infer")]
public async Task<IActionResult> Infer([FromForm] IFormFile image)
{
var sw = Stopwatch.StartNew();
try
{
if (image == null || image.Length == 0)
{
return BadRequest(new { Error = "No image provided" });
}
// Read image data
using var memoryStream = new MemoryStream();
await image.CopyToAsync(memoryStream);
var imageData = memoryStream.ToArray();
// Preprocess with ImageSharp (install SixLabors.ImageSharp package)
var processedImage = await PreprocessImageAsync(imageData);
// Run inference
var result = await _inferenceService.InferAsync(processedImage);
sw.Stop();
return Ok(new
{
Detections = result.Detections,
Latency = sw.Elapsed.TotalSeconds,
Timestamp = DateTime.UtcNow
});
}
catch (Exception ex)
{
return StatusCode(500, new { Error = ex.Message });
}
}
private async Task<byte[]> PreprocessImageAsync(byte[] imageData)
{
// Use ImageSharp for preprocessing
using var image = SixLabors.ImageSharp.Image.Load(imageData);
image.Mutate(x => x.Resize(640, 640));
using var output = new MemoryStream();
await image.SaveAsync(output, new SixLabors.ImageSharp.Formats.Png.PngEncoder());
return output.ToArray();
}
[HttpPost("infer/batch")]
public async Task<IActionResult> InferBatch([FromForm] IFormFileCollection images)
{
var sw = Stopwatch.StartNew();
try
{
if (images == null || images.Count == 0)
{
return BadRequest(new { Error = "No images provided" });
}
var tasks = new Task<InferenceResult>[images.Count];
for (int i = 0; i < images.Count; i++)
{
using var memoryStream = new MemoryStream();
await images[i].CopyToAsync(memoryStream);
var imageData = memoryStream.ToArray();
var processedImage = await PreprocessImageAsync(imageData);
tasks[i] = _inferenceService.InferAsync(processedImage);
}
var results = await Task.WhenAll(tasks);
sw.Stop();
return Ok(new
{
Results = results,
Count = results.Length,
Latency = sw.Elapsed.TotalSeconds,
Timestamp = DateTime.UtcNow
});
}
catch (Exception ex)
{
return StatusCode(500, new { Error = ex.Message });
}
}
}
Camera Integration for Live Video Streams
Integrating camera inputs enables continuous monitoring applications with frame-by-frame inference and real-time alerting.
Camera Integration (Python with OpenCV):
#!/usr/bin/env python3
"""
Camera integration with inference server
"""
import cv2
import requests
import time
import threading
from queue import Queue
from io import BytesIO
class CameraInferenceClient:
def __init__(self, camera_id, inference_url, fps=30):
self.camera_id = camera_id
self.inference_url = inference_url
self.target_fps = fps
self.frame_queue = Queue(maxsize=5)
self.result_queue = Queue()
self.running = False
def start(self):
self.running = True
# Start capture thread
capture_thread = threading.Thread(target=self._capture_loop)
capture_thread.start()
# Start inference thread
inference_thread = threading.Thread(target=self._inference_loop)
inference_thread.start()
# Start display thread
display_thread = threading.Thread(target=self._display_loop)
display_thread.start()
def _capture_loop(self):
cap = cv2.VideoCapture(self.camera_id)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
cap.set(cv2.CAP_PROP_FPS, self.target_fps)
frame_interval = 1.0 / self.target_fps
while self.running:
start_time = time.time()
ret, frame = cap.read()
if not ret:
print("Failed to capture frame")
continue
# Add to queue if not full
if not self.frame_queue.full():
self.frame_queue.put((frame, time.time()))
# Maintain target FPS
elapsed = time.time() - start_time
sleep_time = max(0, frame_interval - elapsed)
time.sleep(sleep_time)
cap.release()
def _inference_loop(self):
while self.running:
if self.frame_queue.empty():
time.sleep(0.001)
continue
frame, capture_time = self.frame_queue.get()
try:
# Encode frame
_, buffer = cv2.imencode('.jpg', frame)
# Send to inference server
response = requests.post(
self.inference_url,
files={'image': BytesIO(buffer)},
timeout=1.0
)
if response.status_code == 200:
result = response.json()
result['frame'] = frame
result['capture_time'] = capture_time
self.result_queue.put(result)
except Exception as e:
print(f"Inference error: {e}")
def _display_loop(self):
while self.running:
if self.result_queue.empty():
time.sleep(0.001)
continue
result = self.result_queue.get()
frame = result['frame']
detections = result.get('detections', [])
# Draw detections
for det in detections:
bbox = det['bbox']
score = det['score']
class_id = det['class_id']
x1, y1, x2, y2 = map(int, bbox)
# Draw bounding box
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Draw label
label = f"Class {class_id}: {score:.2f}"
cv2.putText(frame, label, (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
# Display FPS
latency = result.get('latency', 0)
fps_text = f"Latency: {latency*1000:.1f}ms"
cv2.putText(frame, fps_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow('Camera Feed', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
self.running = False
cv2.destroyAllWindows()
def stop(self):
self.running = False
# Usage
if __name__ == '__main__':
client = CameraInferenceClient(
camera_id=0,
inference_url='http://localhost:3000/infer',
fps=30
)
client.start()
Error Recovery and Reliability Patterns
Production servers require robust error handling with automatic recovery from transient failures.
Retry Logic with Exponential Backoff (Node.js):
class ResilientInferenceManager extends InferenceEngineManager {
async inferWithRetry(imageBuffer, maxRetries = 3) {
let lastError;
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await this.infer(imageBuffer);
} catch (error) {
lastError = error;
logger.warn(`Inference attempt ${attempt + 1} failed: ${error.message}`);
if (attempt < maxRetries - 1) {
const backoff = Math.pow(2, attempt) * 100; // 100ms, 200ms, 400ms
await new Promise(resolve => setTimeout(resolve, backoff));
}
}
}
throw new Error(`Inference failed after ${maxRetries} attempts: ${lastError.message}`);
}
async healthCheck() {
try {
const testImage = Buffer.alloc(640 * 640 * 3);
await this.infer(testImage);
return true;
} catch (error) {
logger.error(`Health check failed: ${error.message}`);
return false;
}
}
}
Circuit Breaker Pattern (C#):
public class CircuitBreakerInferenceService : InferenceService
{
private int _failureCount = 0;
private DateTime _lastFailureTime;
private bool _isOpen = false;
private readonly int _failureThreshold = 5;
private readonly TimeSpan _timeout = TimeSpan.FromSeconds(30);
public override async Task<InferenceResult> InferAsync(byte[] imageData)
{
if (_isOpen)
{
if (DateTime.UtcNow - _lastFailureTime > _timeout)
{
// Try half-open state
_isOpen = false;
_failureCount = 0;
}
else
{
throw new Exception("Circuit breaker is open");
}
}
try
{
var result = await base.InferAsync(imageData);
_failureCount = 0; // Reset on success
return result;
}
catch (Exception ex)
{
_failureCount++;
_lastFailureTime = DateTime.UtcNow;
if (_failureCount >= _failureThreshold)
{
_isOpen = true;
Console.WriteLine("Circuit breaker opened");
}
throw;
}
}
}
Performance Benchmarking
Systematic load testing validates server performance under realistic concurrent workloads.
Load Test Script (Python):
#!/usr/bin/env python3
"""
Load testing for inference server
"""
import requests
import time
import threading
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
class LoadTester:
def __init__(self, server_url, test_image_path):
self.server_url = server_url
self.test_image_path = test_image_path
self.results = []
self.lock = threading.Lock()
def single_request(self):
start = time.perf_counter()
try:
with open(self.test_image_path, 'rb') as f:
response = requests.post(
f"{self.server_url}/infer",
files={'image': f},
timeout=5.0
)
latency = time.perf_counter() - start
success = response.status_code == 200
with self.lock:
self.results.append({
'latency': latency,
'success': success,
'status_code': response.status_code
})
return latency, success
except Exception as e:
with self.lock:
self.results.append({
'latency': None,
'success': False,
'error': str(e)
})
return None, False
def run_load_test(self, num_requests, concurrency):
print(f"Running load test: {num_requests} requests, {concurrency} concurrent")
self.results = []
start_time = time.perf_counter()
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = [executor.submit(self.single_request)
for _ in range(num_requests)]
for future in as_completed(futures):
pass
total_time = time.perf_counter() - start_time
# Analyze results
successful = [r for r in self.results if r['success']]
failed = [r for r in self.results if not r['success']]
latencies = [r['latency'] for r in successful if r['latency']]
print(f"\nResults:")
print(f"Total time: {total_time:.2f}s")
print(f"Successful: {len(successful)}")
print(f"Failed: {len(failed)}")
print(f"Success rate: {len(successful)/num_requests*100:.1f}%")
if latencies:
print(f"\nLatency Statistics:")
print(f"Mean: {np.mean(latencies)*1000:.2f}ms")
print(f"Median: {np.median(latencies)*1000:.2f}ms")
print(f"P95: {np.percentile(latencies, 95)*1000:.2f}ms")
print(f"P99: {np.percentile(latencies, 99)*1000:.2f}ms")
print(f"Min: {np.min(latencies)*1000:.2f}ms")
print(f"Max: {np.max(latencies)*1000:.2f}ms")
throughput = len(successful) / total_time
print(f"\nThroughput: {throughput:.2f} req/s")
if __name__ == '__main__':
tester = LoadTester(
server_url='http://localhost:3000',
test_image_path='test.jpg'
)
# Test different concurrency levels
for concurrency in [1, 5, 10, 20, 30]:
print(f"\n{'='*60}")
tester.run_load_test(num_requests=100, concurrency=concurrency)
Expected Performance Metrics: Single request latency of 15-22ms end-to-end on Jetson Orin Nano, 10-18ms on Jetson AGX Orin. Concurrent throughput of 30-40 req/s with 4 workers on Orin Nano, 60-80 req/s on AGX Orin. Success rate above 99.5% under sustained load with proper error handling and retries.
Key Takeaways
Multi-language inference servers enable flexible integration of edge AI capabilities into diverse application ecosystems. Node.js/Express provides excellent asynchronous I/O performance for high-concurrency scenarios while C#/ASP.NET Core offers strong typing and enterprise-grade tooling for maintainable production systems. Both approaches achieve comparable performance when properly architected with asynchronous request handling and efficient worker management.
Camera integration requires careful frame buffering and queue management to maintain target frame rates while avoiding dropped frames. Threading separation between capture, inference, and display prevents blocking and enables smooth real-time performance. WebSocket connections provide efficient bidirectional communication for streaming inference results with minimal overhead.
Production reliability demands comprehensive error handling including retry logic with exponential backoff for transient failures, circuit breaker patterns preventing cascade failures, health monitoring enabling proactive issue detection, and graceful degradation maintaining partial service during component failures. Load testing under realistic concurrent workloads validates performance characteristics and identifies bottlenecks before production deployment.
Part 5 continues with advanced optimization patterns covering memory-aware scheduling for multiple concurrent models, GPU resource pooling and contention management, KV cache optimization for sequence processing, adaptive batching strategies, SLA enforcement mechanisms, and demonstrating 50-70% latency reduction through intelligent resource coordination.
References
- Express.js Documentation (https://expressjs.com/en/guide/routing.html)
- ASP.NET Core Documentation (https://docs.microsoft.com/en-us/aspnet/core/)
- OpenCV VideoCapture Documentation (https://docs.opencv.org/4.x/d8/dfe/classcv_1_1VideoCapture.html)
- WebSocket Implementation (ws) (https://github.com/websockets/ws)
- Prometheus Client Libraries (https://prometheus.io/docs/instrumenting/clientlibs/)
- Winston Logging Library (https://github.com/winstonjs/winston)
- Sharp Image Processing (https://sharp.pixelplumbing.com/)
- ImageSharp Documentation (https://docs.sixlabors.com/api/ImageSharp/)
