In Part 2, we built the key storage layer. Now we wire it into an Axum middleware so every POST endpoint gets idempotency protection automatically, without duplicating the logic in every handler.
The goal is a Tower middleware layer that intercepts requests, checks for an Idempotency-Key header, and either short-circuits with a cached response or lets the request through and captures the response for storage.
The Architecture
flowchart TD
A[Incoming Request] --> B{Has Idempotency-Key header?}
B -->|No| C[Pass through normally]
B -->|Yes| D{Method is POST or PATCH?}
D -->|No| C
D -->|Yes| E{Key in Redis?}
E -->|Yes, complete| F[Return cached response]
E -->|Yes, processing| G[Return 409 Conflict]
E -->|No| H[SET NX key as processing]
H --> I[Call inner handler]
I --> J[Capture response body + status]
J --> K[Store result in Redis]
K --> L[Return response to client]Project Setup
[dependencies]
axum = { version = "0.7", features = ["macros"] }
tower = "0.4"
tokio = { version = "1", features = ["full"] }
redis = { version = "0.25", features = ["tokio-comp"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
uuid = { version = "1", features = ["v4"] }
sha2 = "0.10"
bytes = "1"
http-body-util = "0.1"The Middleware Implementation
use axum::{
body::Body,
extract::Request,
http::{HeaderMap, Method, StatusCode},
middleware::Next,
response::{IntoResponse, Response},
Extension,
};
use bytes::Bytes;
use http_body_util::BodyExt;
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Clone, Serialize, Deserialize)]
struct IdempotencyRecord {
status: String,
status_code: u16,
response_body: String,
request_fingerprint: String,
}
#[derive(Clone)]
pub struct IdempotencyConfig {
pub redis: Arc<Mutex<redis::aio::Connection>>,
pub ttl_seconds: u64,
}
pub async fn idempotency_middleware(
Extension(config): Extension<IdempotencyConfig>,
// In production extract user_id from JWT claims instead
req: Request,
next: Next,
) -> Response {
// Only apply to mutating methods
if !matches!(req.method(), &Method::POST | &Method::PATCH) {
return next.run(req).await;
}
let idempotency_key = match req.headers().get("Idempotency-Key") {
Some(v) => match v.to_str() {
Ok(s) => s.to_string(),
Err(_) => return StatusCode::BAD_REQUEST.into_response(),
},
None => return next.run(req).await,
};
// Hard-coded user_id for illustration; extract from auth context in real code
let user_id = "user-placeholder";
let namespaced_key = format!("idempotency:{}:{}", user_id, idempotency_key);
// Read and buffer the body so we can fingerprint it and still pass it to the handler
let (parts, body) = req.into_parts();
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(_) => return StatusCode::INTERNAL_SERVER_ERROR.into_response(),
};
let fingerprint = compute_fingerprint(&body_bytes);
// Check Redis for existing record
{
let mut redis = config.redis.lock().await;
let existing: Option<String> = redis.get(&namespaced_key).await.unwrap_or(None);
if let Some(raw) = existing {
if let Ok(record) = serde_json::from_str::<IdempotencyRecord>(&raw) {
if record.status == "processing" {
return (
StatusCode::CONFLICT,
"A request with this idempotency key is still being processed",
)
.into_response();
}
if record.request_fingerprint != fingerprint {
return (
StatusCode::UNPROCESSABLE_ENTITY,
"Idempotency key reused with different request body",
)
.into_response();
}
// Return cached response
let status = StatusCode::from_u16(record.status_code)
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
return (status, record.response_body).into_response();
}
}
// Reserve the key
let placeholder = IdempotencyRecord {
status: "processing".to_string(),
status_code: 0,
response_body: String::new(),
request_fingerprint: fingerprint.clone(),
};
let set: bool = redis::cmd("SET")
.arg(&namespaced_key)
.arg(serde_json::to_string(&placeholder).unwrap())
.arg("NX")
.arg("EX")
.arg(config.ttl_seconds)
.query_async(&mut *redis)
.await
.unwrap_or(false);
if !set {
return (StatusCode::CONFLICT, "Concurrent duplicate request").into_response();
}
}
// Rebuild request with buffered body and pass to handler
let req = Request::from_parts(parts, Body::from(body_bytes));
let response = next.run(req).await;
// Capture the response
let status_code = response.status().as_u16();
let (resp_parts, resp_body) = response.into_parts();
let resp_bytes = match resp_body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(_) => return StatusCode::INTERNAL_SERVER_ERROR.into_response(),
};
let resp_body_str = String::from_utf8_lossy(&resp_bytes).to_string();
// Store completed result in Redis
{
let mut redis = config.redis.lock().await;
let record = IdempotencyRecord {
status: "complete".to_string(),
status_code,
response_body: resp_body_str.clone(),
request_fingerprint: fingerprint,
};
let _: () = redis
.set_ex(
&namespaced_key,
serde_json::to_string(&record).unwrap(),
config.ttl_seconds,
)
.await
.unwrap_or(());
}
Response::from_parts(resp_parts, Body::from(resp_bytes))
}
fn compute_fingerprint(body: &Bytes) -> String {
let mut hasher = Sha256::new();
hasher.update(body);
format!("{:x}", hasher.finalize())
}Wiring It Into the Router
use axum::{middleware, routing::post, Extension, Router};
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
let redis_client = redis::Client::open("redis://127.0.0.1/").unwrap();
let redis_conn = redis_client.get_async_connection().await.unwrap();
let idempotency_config = IdempotencyConfig {
redis: Arc::new(Mutex::new(redis_conn)),
ttl_seconds: 86_400, // 24 hours
};
let app = Router::new()
.route("/orders", post(create_order_handler))
.route("/payments", post(create_payment_handler))
.layer(middleware::from_fn(idempotency_middleware))
.layer(Extension(idempotency_config));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
async fn create_order_handler() -> &'static str {
// Handler code here -- idempotency is handled by the middleware layer
"Order created"
}
async fn create_payment_handler() -> &'static str {
"Payment processed"
}What the Handler Does Not Need to Know
Notice that create_order_handler and create_payment_handler have zero idempotency code. They just do their job. The middleware intercepts before them, checks the key store, and either returns a cached response or lets the request through and captures the result. This separation of concerns is the main benefit of the middleware approach.
Production Considerations
- Extract user ID from auth context. The placeholder user ID in the example above must come from your JWT or session middleware in production. Never trust a user-supplied user ID.
- Use a Redis connection pool. The single Mutex-wrapped connection in the example will bottleneck under load. Use
deadpool-redisorbb8-redisfor a proper connection pool. - Limit key length. Reject
Idempotency-Keyheaders over 256 characters to prevent abuse. - Do not apply to GET routes. The middleware checks the method, but be deliberate about which routes are in scope.
Next Up
The server side is now solid. In Part 4, we look at the client side — how to implement retry logic with exponential backoff and jitter so that retries are safe, bounded, and do not amplify load on a struggling service.
References
- Axum Middleware Documentation (https://docs.rs/axum/latest/axum/middleware/index.html)
- Tower Middleware Documentation (https://docs.rs/tower/latest/tower/)
- Redis SET NX Pattern (https://redis.io/docs/manual/patterns/distributed-locks/)
- deadpool-redis Connection Pool (https://docs.rs/deadpool-redis/latest/deadpool_redis/)
