Idempotency in Distributed APIs — Part 3: Building Idempotent Endpoints in Rust with Axum

Idempotency in Distributed APIs — Part 3: Building Idempotent Endpoints in Rust with Axum

In Part 2, we built the key storage layer. Now we wire it into an Axum middleware so every POST endpoint gets idempotency protection automatically, without duplicating the logic in every handler.

The goal is a Tower middleware layer that intercepts requests, checks for an Idempotency-Key header, and either short-circuits with a cached response or lets the request through and captures the response for storage.

The Architecture

flowchart TD
    A[Incoming Request] --> B{Has Idempotency-Key header?}
    B -->|No| C[Pass through normally]
    B -->|Yes| D{Method is POST or PATCH?}
    D -->|No| C
    D -->|Yes| E{Key in Redis?}
    E -->|Yes, complete| F[Return cached response]
    E -->|Yes, processing| G[Return 409 Conflict]
    E -->|No| H[SET NX key as processing]
    H --> I[Call inner handler]
    I --> J[Capture response body + status]
    J --> K[Store result in Redis]
    K --> L[Return response to client]

Project Setup

[dependencies]
axum = { version = "0.7", features = ["macros"] }
tower = "0.4"
tokio = { version = "1", features = ["full"] }
redis = { version = "0.25", features = ["tokio-comp"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
uuid = { version = "1", features = ["v4"] }
sha2 = "0.10"
bytes = "1"
http-body-util = "0.1"

The Middleware Implementation

use axum::{
    body::Body,
    extract::Request,
    http::{HeaderMap, Method, StatusCode},
    middleware::Next,
    response::{IntoResponse, Response},
    Extension,
};
use bytes::Bytes;
use http_body_util::BodyExt;
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(Clone, Serialize, Deserialize)]
struct IdempotencyRecord {
    status: String,
    status_code: u16,
    response_body: String,
    request_fingerprint: String,
}

#[derive(Clone)]
pub struct IdempotencyConfig {
    pub redis: Arc<Mutex<redis::aio::Connection>>,
    pub ttl_seconds: u64,
}

pub async fn idempotency_middleware(
    Extension(config): Extension<IdempotencyConfig>,
    // In production extract user_id from JWT claims instead
    req: Request,
    next: Next,
) -> Response {
    // Only apply to mutating methods
    if !matches!(req.method(), &Method::POST | &Method::PATCH) {
        return next.run(req).await;
    }

    let idempotency_key = match req.headers().get("Idempotency-Key") {
        Some(v) => match v.to_str() {
            Ok(s) => s.to_string(),
            Err(_) => return StatusCode::BAD_REQUEST.into_response(),
        },
        None => return next.run(req).await,
    };

    // Hard-coded user_id for illustration; extract from auth context in real code
    let user_id = "user-placeholder";
    let namespaced_key = format!("idempotency:{}:{}", user_id, idempotency_key);

    // Read and buffer the body so we can fingerprint it and still pass it to the handler
    let (parts, body) = req.into_parts();
    let body_bytes = match body.collect().await {
        Ok(collected) => collected.to_bytes(),
        Err(_) => return StatusCode::INTERNAL_SERVER_ERROR.into_response(),
    };

    let fingerprint = compute_fingerprint(&body_bytes);

    // Check Redis for existing record
    {
        let mut redis = config.redis.lock().await;
        let existing: Option<String> = redis.get(&namespaced_key).await.unwrap_or(None);

        if let Some(raw) = existing {
            if let Ok(record) = serde_json::from_str::<IdempotencyRecord>(&raw) {
                if record.status == "processing" {
                    return (
                        StatusCode::CONFLICT,
                        "A request with this idempotency key is still being processed",
                    )
                        .into_response();
                }

                if record.request_fingerprint != fingerprint {
                    return (
                        StatusCode::UNPROCESSABLE_ENTITY,
                        "Idempotency key reused with different request body",
                    )
                        .into_response();
                }

                // Return cached response
                let status = StatusCode::from_u16(record.status_code)
                    .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
                return (status, record.response_body).into_response();
            }
        }

        // Reserve the key
        let placeholder = IdempotencyRecord {
            status: "processing".to_string(),
            status_code: 0,
            response_body: String::new(),
            request_fingerprint: fingerprint.clone(),
        };

        let set: bool = redis::cmd("SET")
            .arg(&namespaced_key)
            .arg(serde_json::to_string(&placeholder).unwrap())
            .arg("NX")
            .arg("EX")
            .arg(config.ttl_seconds)
            .query_async(&mut *redis)
            .await
            .unwrap_or(false);

        if !set {
            return (StatusCode::CONFLICT, "Concurrent duplicate request").into_response();
        }
    }

    // Rebuild request with buffered body and pass to handler
    let req = Request::from_parts(parts, Body::from(body_bytes));
    let response = next.run(req).await;

    // Capture the response
    let status_code = response.status().as_u16();
    let (resp_parts, resp_body) = response.into_parts();

    let resp_bytes = match resp_body.collect().await {
        Ok(collected) => collected.to_bytes(),
        Err(_) => return StatusCode::INTERNAL_SERVER_ERROR.into_response(),
    };

    let resp_body_str = String::from_utf8_lossy(&resp_bytes).to_string();

    // Store completed result in Redis
    {
        let mut redis = config.redis.lock().await;
        let record = IdempotencyRecord {
            status: "complete".to_string(),
            status_code,
            response_body: resp_body_str.clone(),
            request_fingerprint: fingerprint,
        };

        let _: () = redis
            .set_ex(
                &namespaced_key,
                serde_json::to_string(&record).unwrap(),
                config.ttl_seconds,
            )
            .await
            .unwrap_or(());
    }

    Response::from_parts(resp_parts, Body::from(resp_bytes))
}

fn compute_fingerprint(body: &Bytes) -> String {
    let mut hasher = Sha256::new();
    hasher.update(body);
    format!("{:x}", hasher.finalize())
}

Wiring It Into the Router

use axum::{middleware, routing::post, Extension, Router};
use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let redis_client = redis::Client::open("redis://127.0.0.1/").unwrap();
    let redis_conn = redis_client.get_async_connection().await.unwrap();

    let idempotency_config = IdempotencyConfig {
        redis: Arc::new(Mutex::new(redis_conn)),
        ttl_seconds: 86_400, // 24 hours
    };

    let app = Router::new()
        .route("/orders", post(create_order_handler))
        .route("/payments", post(create_payment_handler))
        .layer(middleware::from_fn(idempotency_middleware))
        .layer(Extension(idempotency_config));

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

async fn create_order_handler() -> &'static str {
    // Handler code here -- idempotency is handled by the middleware layer
    "Order created"
}

async fn create_payment_handler() -> &'static str {
    "Payment processed"
}

What the Handler Does Not Need to Know

Notice that create_order_handler and create_payment_handler have zero idempotency code. They just do their job. The middleware intercepts before them, checks the key store, and either returns a cached response or lets the request through and captures the result. This separation of concerns is the main benefit of the middleware approach.

Production Considerations

  • Extract user ID from auth context. The placeholder user ID in the example above must come from your JWT or session middleware in production. Never trust a user-supplied user ID.
  • Use a Redis connection pool. The single Mutex-wrapped connection in the example will bottleneck under load. Use deadpool-redis or bb8-redis for a proper connection pool.
  • Limit key length. Reject Idempotency-Key headers over 256 characters to prevent abuse.
  • Do not apply to GET routes. The middleware checks the method, but be deliberate about which routes are in scope.

Next Up

The server side is now solid. In Part 4, we look at the client side — how to implement retry logic with exponential backoff and jitter so that retries are safe, bounded, and do not amplify load on a struggling service.

References

Written by:

656 Posts

View All Posts
Follow Me :
How to whitelist website on AdBlocker?

How to whitelist website on AdBlocker?

  1. 1 Click on the AdBlock Plus icon on the top right corner of your browser
  2. 2 Click on "Enabled on this site" from the AdBlock Plus option
  3. 3 Refresh the page and start browsing the site