Multi-Language Edge Inference Servers: Building REST APIs for Real-Time Object Detection

Multi-Language Edge Inference Servers: Building REST APIs for Real-Time Object Detection

Production edge AI deployments require robust inference servers exposing optimized models through standardized APIs. This post provides comprehensive implementation guidance for building multi-language edge inference servers, covering Node.js/Express and C#/ASP.NET Core architectures, camera integration patterns for live video streams, asynchronous request handling for concurrent inference, error recovery mechanisms ensuring reliability, and achieving 15-22ms end-to-end latency while supporting 30+ concurrent inference requests on Jetson platforms.

Part 3 covered deploying optimized TensorRT models on NVIDIA Jetson hardware. This post focuses on application layer implementation: designing RESTful inference APIs, integrating camera inputs for continuous monitoring, implementing efficient request queuing and batching, handling errors gracefully with automatic recovery, and benchmarking real-world server performance under concurrent load.

Inference Server Architecture Overview

Edge inference servers bridge hardware-accelerated models with application-level APIs, managing resources efficiently while providing reliable service interfaces.

flowchart TD
    A[Client Applications] --> B[Load Balancer]
    B --> C[API Server 1]
    B --> D[API Server 2]
    
    C --> E[Request Queue]
    D --> E
    
    E --> F[Inference Engine Manager]
    F --> G[TensorRT Engine Pool]
    
    H[Camera Streams] --> I[Frame Buffer]
    I --> E
    
    G --> J[GPU Memory Pool]
    J --> K[Inference Results]
    
    K --> L[Post-Processing]
    L --> M[Response Cache]
    M --> C
    M --> D
    
    N[Health Monitor] --> F
    N --> O[Metrics Collector]
    O --> P[Prometheus/Grafana]

Key architectural components include API layer handling HTTP/WebSocket requests and response formatting, request queue managing inference workload with priority scheduling and rate limiting, inference engine pool maintaining multiple TensorRT contexts for concurrent processing, camera integration layer managing video streams with frame buffering and synchronization, resource manager monitoring GPU memory, thermal state, and system health, and observability layer collecting metrics, logging, and distributed tracing.

Node.js/Express Inference Server Implementation

Node.js provides excellent asynchronous I/O performance ideal for managing concurrent API requests while integrating with TensorRT through native addons or child processes.

Server Setup and Dependencies:

// package.json
{
  "name": "edge-inference-server",
  "version": "1.0.0",
  "dependencies": {
    "express": "^4.18.2",
    "multer": "^1.4.5-lts.1",
    "sharp": "^0.33.0",
    "ws": "^8.14.2",
    "prom-client": "^15.0.0",
    "winston": "^3.11.0"
  }
}

// Install dependencies
npm install

Core Inference Server (Node.js):

// server.js
const express = require('express');
const multer = require('multer');
const sharp = require('sharp');
const { spawn } = require('child_process');
const WebSocket = require('ws');
const promClient = require('prom-client');
const winston = require('winston');

// Configure logging
const logger = winston.createLogger({
  level: 'info',
  format: winston.format.json(),
  transports: [
    new winston.transports.File({ filename: 'error.log', level: 'error' }),
    new winston.transports.File({ filename: 'combined.log' }),
    new winston.transports.Console({ format: winston.format.simple() })
  ]
});

// Configure Prometheus metrics
const register = new promClient.Registry();
promClient.collectDefaultMetrics({ register });

const inferenceLatency = new promClient.Histogram({
  name: 'inference_latency_seconds',
  help: 'Inference latency in seconds',
  buckets: [0.01, 0.02, 0.03, 0.05, 0.1, 0.2, 0.5]
});

const inferenceCounter = new promClient.Counter({
  name: 'inference_requests_total',
  help: 'Total inference requests'
});

const inferenceErrors = new promClient.Counter({
  name: 'inference_errors_total',
  help: 'Total inference errors'
});

register.registerMetric(inferenceLatency);
register.registerMetric(inferenceCounter);
register.registerMetric(inferenceErrors);

// Inference Engine Manager
class InferenceEngineManager {
  constructor(enginePath, maxWorkers = 4) {
    this.enginePath = enginePath;
    this.maxWorkers = maxWorkers;
    this.workers = [];
    this.queue = [];
    this.processing = 0;
    
    this.initializeWorkers();
  }
  
  initializeWorkers() {
    for (let i = 0; i < this.maxWorkers; i++) {
      const worker = {
        id: i,
        busy: false,
        process: null
      };
      this.workers.push(worker);
    }
    
    logger.info(`Initialized ${this.maxWorkers} inference workers`);
  }
  
  async infer(imageBuffer) {
    return new Promise((resolve, reject) => {
      const request = { imageBuffer, resolve, reject, timestamp: Date.now() };
      this.queue.push(request);
      this.processQueue();
    });
  }
  
  async processQueue() {
    if (this.queue.length === 0) return;
    
    const availableWorker = this.workers.find(w => !w.busy);
    if (!availableWorker) return;
    
    const request = this.queue.shift();
    availableWorker.busy = true;
    this.processing++;
    
    try {
      const result = await this.executeInference(availableWorker, request.imageBuffer);
      request.resolve(result);
    } catch (error) {
      logger.error(`Inference error: ${error.message}`);
      request.reject(error);
    } finally {
      availableWorker.busy = false;
      this.processing--;
      this.processQueue();
    }
  }
  
  async executeInference(worker, imageBuffer) {
    return new Promise((resolve, reject) => {
      const pythonProcess = spawn('python3', [
        'inference.py',
        '--engine', this.enginePath,
        '--input', '-'
      ]);
      
      let output = '';
      let errorOutput = '';
      
      pythonProcess.stdout.on('data', (data) => {
        output += data.toString();
      });
      
      pythonProcess.stderr.on('data', (data) => {
        errorOutput += data.toString();
      });
      
      pythonProcess.on('close', (code) => {
        if (code === 0) {
          try {
            const result = JSON.parse(output);
            resolve(result);
          } catch (error) {
            reject(new Error(`Failed to parse inference output: ${error.message}`));
          }
        } else {
          reject(new Error(`Inference failed with code ${code}: ${errorOutput}`));
        }
      });
      
      pythonProcess.stdin.write(imageBuffer);
      pythonProcess.stdin.end();
    });
  }
  
  getStatus() {
    return {
      workers: this.workers.length,
      busy: this.workers.filter(w => w.busy).length,
      queue: this.queue.length,
      processing: this.processing
    };
  }
}

// Initialize Express app
const app = express();
const inferenceManager = new InferenceEngineManager('yolov8n_int8.engine', 4);

// Configure multer for file uploads
const upload = multer({
  storage: multer.memoryStorage(),
  limits: { fileSize: 10 * 1024 * 1024 } // 10MB limit
});

// Middleware
app.use(express.json());
app.use((req, res, next) => {
  logger.info(`${req.method} ${req.path}`);
  next();
});

// Health check endpoint
app.get('/health', (req, res) => {
  const status = inferenceManager.getStatus();
  res.json({
    status: 'healthy',
    timestamp: new Date().toISOString(),
    inference: status
  });
});

// Metrics endpoint
app.get('/metrics', async (req, res) => {
  res.set('Content-Type', register.contentType);
  res.end(await register.metrics());
});

// Single image inference endpoint
app.post('/infer', upload.single('image'), async (req, res) => {
  const startTime = Date.now();
  inferenceCounter.inc();
  
  try {
    if (!req.file) {
      return res.status(400).json({ error: 'No image provided' });
    }
    
    // Preprocess image
    const processedImage = await sharp(req.file.buffer)
      .resize(640, 640, { fit: 'fill' })
      .raw()
      .toBuffer();
    
    // Run inference
    const result = await inferenceManager.infer(processedImage);
    
    // Record metrics
    const latency = (Date.now() - startTime) / 1000;
    inferenceLatency.observe(latency);
    
    res.json({
      detections: result.detections,
      latency: latency,
      timestamp: new Date().toISOString()
    });
    
    logger.info(`Inference completed in ${latency.toFixed(3)}s`);
    
  } catch (error) {
    inferenceErrors.inc();
    logger.error(`Inference request failed: ${error.message}`);
    res.status(500).json({ error: error.message });
  }
});

// Batch inference endpoint
app.post('/infer/batch', upload.array('images', 10), async (req, res) => {
  const startTime = Date.now();
  
  try {
    if (!req.files || req.files.length === 0) {
      return res.status(400).json({ error: 'No images provided' });
    }
    
    // Process all images concurrently
    const inferencePromises = req.files.map(async (file) => {
      const processedImage = await sharp(file.buffer)
        .resize(640, 640, { fit: 'fill' })
        .raw()
        .toBuffer();
      
      return inferenceManager.infer(processedImage);
    });
    
    const results = await Promise.all(inferencePromises);
    
    const latency = (Date.now() - startTime) / 1000;
    
    res.json({
      results: results,
      count: results.length,
      latency: latency,
      timestamp: new Date().toISOString()
    });
    
  } catch (error) {
    logger.error(`Batch inference failed: ${error.message}`);
    res.status(500).json({ error: error.message });
  }
});

// WebSocket server for real-time inference
const wss = new WebSocket.Server({ noServer: true });

wss.on('connection', (ws) => {
  logger.info('WebSocket client connected');
  
  ws.on('message', async (message) => {
    try {
      const imageBuffer = Buffer.from(message);
      
      // Preprocess and infer
      const processedImage = await sharp(imageBuffer)
        .resize(640, 640, { fit: 'fill' })
        .raw()
        .toBuffer();
      
      const result = await inferenceManager.infer(processedImage);
      
      ws.send(JSON.stringify({
        type: 'detection',
        data: result,
        timestamp: new Date().toISOString()
      }));
      
    } catch (error) {
      logger.error(`WebSocket inference error: ${error.message}`);
      ws.send(JSON.stringify({
        type: 'error',
        message: error.message
      }));
    }
  });
  
  ws.on('close', () => {
    logger.info('WebSocket client disconnected');
  });
});

// HTTP server
const PORT = process.env.PORT || 3000;
const server = app.listen(PORT, () => {
  logger.info(`Inference server listening on port ${PORT}`);
});

// WebSocket upgrade handler
server.on('upgrade', (request, socket, head) => {
  wss.handleUpgrade(request, socket, head, (ws) => {
    wss.emit('connection', ws, request);
  });
});

// Graceful shutdown
process.on('SIGTERM', () => {
  logger.info('SIGTERM received, shutting down gracefully');
  server.close(() => {
    logger.info('Server closed');
    process.exit(0);
  });
});

Python Inference Worker (inference.py):

#!/usr/bin/env python3
"""
TensorRT inference worker for Node.js server
"""

import sys
import json
import argparse
import numpy as np
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit

class TensorRTInference:
    def __init__(self, engine_path):
        self.logger = trt.Logger(trt.Logger.WARNING)
        
        with open(engine_path, 'rb') as f:
            self.runtime = trt.Runtime(self.logger)
            self.engine = self.runtime.deserialize_cuda_engine(f.read())
        
        self.context = self.engine.create_execution_context()
        
        # Allocate buffers
        self.inputs = []
        self.outputs = []
        self.bindings = []
        self.stream = cuda.Stream()
        
        for binding in self.engine:
            size = trt.volume(self.engine.get_binding_shape(binding))
            dtype = trt.nptype(self.engine.get_binding_dtype(binding))
            
            host_mem = cuda.pagelocked_empty(size, dtype)
            device_mem = cuda.mem_alloc(host_mem.nbytes)
            
            self.bindings.append(int(device_mem))
            
            if self.engine.binding_is_input(binding):
                self.inputs.append({'host': host_mem, 'device': device_mem})
            else:
                self.outputs.append({'host': host_mem, 'device': device_mem})
    
    def infer(self, image_data):
        # Image already preprocessed as 640x640 RGB
        input_data = np.frombuffer(image_data, dtype=np.uint8)
        input_data = input_data.reshape(640, 640, 3)
        input_data = input_data.transpose(2, 0, 1).astype(np.float32) / 255.0
        input_data = np.expand_dims(input_data, axis=0)
        
        # Copy to device
        np.copyto(self.inputs[0]['host'], input_data.ravel())
        cuda.memcpy_htod_async(self.inputs[0]['device'], 
                               self.inputs[0]['host'], 
                               self.stream)
        
        # Execute
        self.context.execute_async_v2(bindings=self.bindings,
                                      stream_handle=self.stream.handle)
        
        # Copy output
        cuda.memcpy_dtoh_async(self.outputs[0]['host'],
                              self.outputs[0]['device'],
                              self.stream)
        
        self.stream.synchronize()
        
        output = self.outputs[0]['host'].reshape(self.engine.get_binding_shape(1))
        
        return self.postprocess(output)
    
    def postprocess(self, output, conf_threshold=0.25, iou_threshold=0.45):
        output = output[0].T
        
        boxes = output[:, :4]
        scores = output[:, 4:].max(axis=1)
        class_ids = output[:, 4:].argmax(axis=1)
        
        mask = scores > conf_threshold
        boxes = boxes[mask]
        scores = scores[mask]
        class_ids = class_ids[mask]
        
        # Convert to corner format
        x_center, y_center, width, height = boxes.T
        x1 = x_center - width / 2
        y1 = y_center - height / 2
        x2 = x_center + width / 2
        y2 = y_center + height / 2
        boxes = np.stack([x1, y1, x2, y2], axis=1)
        
        # NMS
        indices = self.nms(boxes, scores, iou_threshold)
        
        detections = []
        for idx in indices:
            detections.append({
                'bbox': boxes[idx].tolist(),
                'score': float(scores[idx]),
                'class_id': int(class_ids[idx])
            })
        
        return {'detections': detections}
    
    def nms(self, boxes, scores, threshold):
        x1, y1, x2, y2 = boxes.T
        areas = (x2 - x1) * (y2 - y1)
        order = scores.argsort()[::-1]
        
        keep = []
        while order.size > 0:
            i = order[0]
            keep.append(i)
            
            xx1 = np.maximum(x1[i], x1[order[1:]])
            yy1 = np.maximum(y1[i], y1[order[1:]])
            xx2 = np.minimum(x2[i], x2[order[1:]])
            yy2 = np.minimum(y2[i], y2[order[1:]])
            
            w = np.maximum(0, xx2 - xx1)
            h = np.maximum(0, yy2 - yy1)
            inter = w * h
            
            iou = inter / (areas[i] + areas[order[1:]] - inter)
            
            inds = np.where(iou <= threshold)[0]
            order = order[inds + 1]
        
        return np.array(keep)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--engine', required=True)
    parser.add_argument('--input', default='-')
    args = parser.parse_args()
    
    # Initialize inference
    inference = TensorRTInference(args.engine)
    
    # Read image from stdin
    image_data = sys.stdin.buffer.read()
    
    # Run inference
    result = inference.infer(image_data)
    
    # Output JSON
    print(json.dumps(result))

C#/ASP.NET Core Inference Server Implementation

C# provides strong typing and excellent performance for production enterprise deployments requiring robust error handling and maintainability.

ASP.NET Core Server (C#):

// Program.cs
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Diagnostics;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

// Register inference service as singleton
builder.Services.AddSingleton<InferenceService>(sp => 
    new InferenceService("yolov8n_int8.engine", maxWorkers: 4));

var app = builder.Build();

if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseAuthorization();
app.MapControllers();

app.Run();

// InferenceService.cs
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

public class InferenceService
{
    private readonly string _enginePath;
    private readonly int _maxWorkers;
    private readonly ConcurrentQueue<InferenceRequest> _queue;
    private readonly SemaphoreSlim _workerSemaphore;
    private int _processing;

    public InferenceService(string enginePath, int maxWorkers = 4)
    {
        _enginePath = enginePath;
        _maxWorkers = maxWorkers;
        _queue = new ConcurrentQueue<InferenceRequest>();
        _workerSemaphore = new SemaphoreSlim(maxWorkers, maxWorkers);
        _processing = 0;
    }

    public async Task<InferenceResult> InferAsync(byte[] imageData)
    {
        var tcs = new TaskCompletionSource<InferenceResult>();
        var request = new InferenceRequest
        {
            ImageData = imageData,
            CompletionSource = tcs,
            Timestamp = DateTime.UtcNow
        };

        _queue.Enqueue(request);
        _ = ProcessQueueAsync();

        return await tcs.Task;
    }

    private async Task ProcessQueueAsync()
    {
        if (!_queue.TryDequeue(out var request))
            return;

        await _workerSemaphore.WaitAsync();
        Interlocked.Increment(ref _processing);

        try
        {
            var result = await ExecuteInferenceAsync(request.ImageData);
            request.CompletionSource.SetResult(result);
        }
        catch (Exception ex)
        {
            request.CompletionSource.SetException(ex);
        }
        finally
        {
            _workerSemaphore.Release();
            Interlocked.Decrement(ref _processing);
            _ = ProcessQueueAsync();
        }
    }

    private async Task<InferenceResult> ExecuteInferenceAsync(byte[] imageData)
    {
        var startInfo = new ProcessStartInfo
        {
            FileName = "python3",
            Arguments = $"inference.py --engine {_enginePath} --input -",
            RedirectStandardInput = true,
            RedirectStandardOutput = true,
            RedirectStandardError = true,
            UseShellExecute = false,
            CreateNoWindow = true
        };

        using var process = new Process { StartInfo = startInfo };
        process.Start();

        // Write image data to stdin
        await process.StandardInput.BaseStream.WriteAsync(imageData);
        process.StandardInput.Close();

        // Read output
        var output = await process.StandardOutput.ReadToEndAsync();
        var errorOutput = await process.StandardError.ReadToEndAsync();

        await process.WaitForExitAsync();

        if (process.ExitCode != 0)
        {
            throw new Exception($"Inference failed: {errorOutput}");
        }

        var result = JsonSerializer.Deserialize<InferenceResult>(output);
        return result ?? throw new Exception("Failed to deserialize inference result");
    }

    public ServiceStatus GetStatus()
    {
        return new ServiceStatus
        {
            MaxWorkers = _maxWorkers,
            Processing = _processing,
            QueueDepth = _queue.Count,
            AvailableWorkers = _workerSemaphore.CurrentCount
        };
    }
}

public class InferenceRequest
{
    public byte[] ImageData { get; set; } = Array.Empty<byte>();
    public TaskCompletionSource<InferenceResult> CompletionSource { get; set; } = new();
    public DateTime Timestamp { get; set; }
}

public class InferenceResult
{
    public Detection[] Detections { get; set; } = Array.Empty<Detection>();
}

public class Detection
{
    public float[] Bbox { get; set; } = Array.Empty<float>();
    public float Score { get; set; }
    public int ClassId { get; set; }
}

public class ServiceStatus
{
    public int MaxWorkers { get; set; }
    public int Processing { get; set; }
    public int QueueDepth { get; set; }
    public int AvailableWorkers { get; set; }
}

// InferenceController.cs
using Microsoft.AspNetCore.Mvc;
using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;

[ApiController]
[Route("api")]
public class InferenceController : ControllerBase
{
    private readonly InferenceService _inferenceService;

    public InferenceController(InferenceService inferenceService)
    {
        _inferenceService = inferenceService;
    }

    [HttpGet("health")]
    public IActionResult GetHealth()
    {
        var status = _inferenceService.GetStatus();
        return Ok(new
        {
            Status = "healthy",
            Timestamp = DateTime.UtcNow,
            Inference = status
        });
    }

    [HttpPost("infer")]
    public async Task<IActionResult> Infer([FromForm] IFormFile image)
    {
        var sw = Stopwatch.StartNew();

        try
        {
            if (image == null || image.Length == 0)
            {
                return BadRequest(new { Error = "No image provided" });
            }

            // Read image data
            using var memoryStream = new MemoryStream();
            await image.CopyToAsync(memoryStream);
            var imageData = memoryStream.ToArray();

            // Preprocess with ImageSharp (install SixLabors.ImageSharp package)
            var processedImage = await PreprocessImageAsync(imageData);

            // Run inference
            var result = await _inferenceService.InferAsync(processedImage);

            sw.Stop();

            return Ok(new
            {
                Detections = result.Detections,
                Latency = sw.Elapsed.TotalSeconds,
                Timestamp = DateTime.UtcNow
            });
        }
        catch (Exception ex)
        {
            return StatusCode(500, new { Error = ex.Message });
        }
    }

    private async Task<byte[]> PreprocessImageAsync(byte[] imageData)
    {
        // Use ImageSharp for preprocessing
        using var image = SixLabors.ImageSharp.Image.Load(imageData);
        image.Mutate(x => x.Resize(640, 640));

        using var output = new MemoryStream();
        await image.SaveAsync(output, new SixLabors.ImageSharp.Formats.Png.PngEncoder());
        
        return output.ToArray();
    }

    [HttpPost("infer/batch")]
    public async Task<IActionResult> InferBatch([FromForm] IFormFileCollection images)
    {
        var sw = Stopwatch.StartNew();

        try
        {
            if (images == null || images.Count == 0)
            {
                return BadRequest(new { Error = "No images provided" });
            }

            var tasks = new Task<InferenceResult>[images.Count];
            
            for (int i = 0; i < images.Count; i++)
            {
                using var memoryStream = new MemoryStream();
                await images[i].CopyToAsync(memoryStream);
                var imageData = memoryStream.ToArray();
                var processedImage = await PreprocessImageAsync(imageData);
                
                tasks[i] = _inferenceService.InferAsync(processedImage);
            }

            var results = await Task.WhenAll(tasks);

            sw.Stop();

            return Ok(new
            {
                Results = results,
                Count = results.Length,
                Latency = sw.Elapsed.TotalSeconds,
                Timestamp = DateTime.UtcNow
            });
        }
        catch (Exception ex)
        {
            return StatusCode(500, new { Error = ex.Message });
        }
    }
}

Camera Integration for Live Video Streams

Integrating camera inputs enables continuous monitoring applications with frame-by-frame inference and real-time alerting.

Camera Integration (Python with OpenCV):

#!/usr/bin/env python3
"""
Camera integration with inference server
"""

import cv2
import requests
import time
import threading
from queue import Queue
from io import BytesIO

class CameraInferenceClient:
    def __init__(self, camera_id, inference_url, fps=30):
        self.camera_id = camera_id
        self.inference_url = inference_url
        self.target_fps = fps
        self.frame_queue = Queue(maxsize=5)
        self.result_queue = Queue()
        self.running = False
        
    def start(self):
        self.running = True
        
        # Start capture thread
        capture_thread = threading.Thread(target=self._capture_loop)
        capture_thread.start()
        
        # Start inference thread
        inference_thread = threading.Thread(target=self._inference_loop)
        inference_thread.start()
        
        # Start display thread
        display_thread = threading.Thread(target=self._display_loop)
        display_thread.start()
    
    def _capture_loop(self):
        cap = cv2.VideoCapture(self.camera_id)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
        cap.set(cv2.CAP_PROP_FPS, self.target_fps)
        
        frame_interval = 1.0 / self.target_fps
        
        while self.running:
            start_time = time.time()
            
            ret, frame = cap.read()
            if not ret:
                print("Failed to capture frame")
                continue
            
            # Add to queue if not full
            if not self.frame_queue.full():
                self.frame_queue.put((frame, time.time()))
            
            # Maintain target FPS
            elapsed = time.time() - start_time
            sleep_time = max(0, frame_interval - elapsed)
            time.sleep(sleep_time)
        
        cap.release()
    
    def _inference_loop(self):
        while self.running:
            if self.frame_queue.empty():
                time.sleep(0.001)
                continue
            
            frame, capture_time = self.frame_queue.get()
            
            try:
                # Encode frame
                _, buffer = cv2.imencode('.jpg', frame)
                
                # Send to inference server
                response = requests.post(
                    self.inference_url,
                    files={'image': BytesIO(buffer)},
                    timeout=1.0
                )
                
                if response.status_code == 200:
                    result = response.json()
                    result['frame'] = frame
                    result['capture_time'] = capture_time
                    
                    self.result_queue.put(result)
            
            except Exception as e:
                print(f"Inference error: {e}")
    
    def _display_loop(self):
        while self.running:
            if self.result_queue.empty():
                time.sleep(0.001)
                continue
            
            result = self.result_queue.get()
            frame = result['frame']
            detections = result.get('detections', [])
            
            # Draw detections
            for det in detections:
                bbox = det['bbox']
                score = det['score']
                class_id = det['class_id']
                
                x1, y1, x2, y2 = map(int, bbox)
                
                # Draw bounding box
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                
                # Draw label
                label = f"Class {class_id}: {score:.2f}"
                cv2.putText(frame, label, (x1, y1-10),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
            
            # Display FPS
            latency = result.get('latency', 0)
            fps_text = f"Latency: {latency*1000:.1f}ms"
            cv2.putText(frame, fps_text, (10, 30),
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            
            cv2.imshow('Camera Feed', frame)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                self.running = False
        
        cv2.destroyAllWindows()
    
    def stop(self):
        self.running = False

# Usage
if __name__ == '__main__':
    client = CameraInferenceClient(
        camera_id=0,
        inference_url='http://localhost:3000/infer',
        fps=30
    )
    
    client.start()

Error Recovery and Reliability Patterns

Production servers require robust error handling with automatic recovery from transient failures.

Retry Logic with Exponential Backoff (Node.js):

class ResilientInferenceManager extends InferenceEngineManager {
  async inferWithRetry(imageBuffer, maxRetries = 3) {
    let lastError;
    
    for (let attempt = 0; attempt < maxRetries; attempt++) {
      try {
        return await this.infer(imageBuffer);
      } catch (error) {
        lastError = error;
        logger.warn(`Inference attempt ${attempt + 1} failed: ${error.message}`);
        
        if (attempt < maxRetries - 1) {
          const backoff = Math.pow(2, attempt) * 100; // 100ms, 200ms, 400ms
          await new Promise(resolve => setTimeout(resolve, backoff));
        }
      }
    }
    
    throw new Error(`Inference failed after ${maxRetries} attempts: ${lastError.message}`);
  }
  
  async healthCheck() {
    try {
      const testImage = Buffer.alloc(640 * 640 * 3);
      await this.infer(testImage);
      return true;
    } catch (error) {
      logger.error(`Health check failed: ${error.message}`);
      return false;
    }
  }
}

Circuit Breaker Pattern (C#):

public class CircuitBreakerInferenceService : InferenceService
{
    private int _failureCount = 0;
    private DateTime _lastFailureTime;
    private bool _isOpen = false;
    private readonly int _failureThreshold = 5;
    private readonly TimeSpan _timeout = TimeSpan.FromSeconds(30);

    public override async Task<InferenceResult> InferAsync(byte[] imageData)
    {
        if (_isOpen)
        {
            if (DateTime.UtcNow - _lastFailureTime > _timeout)
            {
                // Try half-open state
                _isOpen = false;
                _failureCount = 0;
            }
            else
            {
                throw new Exception("Circuit breaker is open");
            }
        }

        try
        {
            var result = await base.InferAsync(imageData);
            _failureCount = 0; // Reset on success
            return result;
        }
        catch (Exception ex)
        {
            _failureCount++;
            _lastFailureTime = DateTime.UtcNow;

            if (_failureCount >= _failureThreshold)
            {
                _isOpen = true;
                Console.WriteLine("Circuit breaker opened");
            }

            throw;
        }
    }
}

Performance Benchmarking

Systematic load testing validates server performance under realistic concurrent workloads.

Load Test Script (Python):

#!/usr/bin/env python3
"""
Load testing for inference server
"""

import requests
import time
import threading
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed

class LoadTester:
    def __init__(self, server_url, test_image_path):
        self.server_url = server_url
        self.test_image_path = test_image_path
        self.results = []
        self.lock = threading.Lock()
    
    def single_request(self):
        start = time.perf_counter()
        
        try:
            with open(self.test_image_path, 'rb') as f:
                response = requests.post(
                    f"{self.server_url}/infer",
                    files={'image': f},
                    timeout=5.0
                )
            
            latency = time.perf_counter() - start
            success = response.status_code == 200
            
            with self.lock:
                self.results.append({
                    'latency': latency,
                    'success': success,
                    'status_code': response.status_code
                })
            
            return latency, success
        
        except Exception as e:
            with self.lock:
                self.results.append({
                    'latency': None,
                    'success': False,
                    'error': str(e)
                })
            return None, False
    
    def run_load_test(self, num_requests, concurrency):
        print(f"Running load test: {num_requests} requests, {concurrency} concurrent")
        
        self.results = []
        start_time = time.perf_counter()
        
        with ThreadPoolExecutor(max_workers=concurrency) as executor:
            futures = [executor.submit(self.single_request) 
                      for _ in range(num_requests)]
            
            for future in as_completed(futures):
                pass
        
        total_time = time.perf_counter() - start_time
        
        # Analyze results
        successful = [r for r in self.results if r['success']]
        failed = [r for r in self.results if not r['success']]
        
        latencies = [r['latency'] for r in successful if r['latency']]
        
        print(f"\nResults:")
        print(f"Total time: {total_time:.2f}s")
        print(f"Successful: {len(successful)}")
        print(f"Failed: {len(failed)}")
        print(f"Success rate: {len(successful)/num_requests*100:.1f}%")
        
        if latencies:
            print(f"\nLatency Statistics:")
            print(f"Mean: {np.mean(latencies)*1000:.2f}ms")
            print(f"Median: {np.median(latencies)*1000:.2f}ms")
            print(f"P95: {np.percentile(latencies, 95)*1000:.2f}ms")
            print(f"P99: {np.percentile(latencies, 99)*1000:.2f}ms")
            print(f"Min: {np.min(latencies)*1000:.2f}ms")
            print(f"Max: {np.max(latencies)*1000:.2f}ms")
            
            throughput = len(successful) / total_time
            print(f"\nThroughput: {throughput:.2f} req/s")

if __name__ == '__main__':
    tester = LoadTester(
        server_url='http://localhost:3000',
        test_image_path='test.jpg'
    )
    
    # Test different concurrency levels
    for concurrency in [1, 5, 10, 20, 30]:
        print(f"\n{'='*60}")
        tester.run_load_test(num_requests=100, concurrency=concurrency)

Expected Performance Metrics: Single request latency of 15-22ms end-to-end on Jetson Orin Nano, 10-18ms on Jetson AGX Orin. Concurrent throughput of 30-40 req/s with 4 workers on Orin Nano, 60-80 req/s on AGX Orin. Success rate above 99.5% under sustained load with proper error handling and retries.

Key Takeaways

Multi-language inference servers enable flexible integration of edge AI capabilities into diverse application ecosystems. Node.js/Express provides excellent asynchronous I/O performance for high-concurrency scenarios while C#/ASP.NET Core offers strong typing and enterprise-grade tooling for maintainable production systems. Both approaches achieve comparable performance when properly architected with asynchronous request handling and efficient worker management.

Camera integration requires careful frame buffering and queue management to maintain target frame rates while avoiding dropped frames. Threading separation between capture, inference, and display prevents blocking and enables smooth real-time performance. WebSocket connections provide efficient bidirectional communication for streaming inference results with minimal overhead.

Production reliability demands comprehensive error handling including retry logic with exponential backoff for transient failures, circuit breaker patterns preventing cascade failures, health monitoring enabling proactive issue detection, and graceful degradation maintaining partial service during component failures. Load testing under realistic concurrent workloads validates performance characteristics and identifies bottlenecks before production deployment.

Part 5 continues with advanced optimization patterns covering memory-aware scheduling for multiple concurrent models, GPU resource pooling and contention management, KV cache optimization for sequence processing, adaptive batching strategies, SLA enforcement mechanisms, and demonstrating 50-70% latency reduction through intelligent resource coordination.

References

Written by:

534 Posts

View All Posts
Follow Me :
How to whitelist website on AdBlocker?

How to whitelist website on AdBlocker?

  1. 1 Click on the AdBlock Plus icon on the top right corner of your browser
  2. 2 Click on "Enabled on this site" from the AdBlock Plus option
  3. 3 Refresh the page and start browsing the site