Kafka producers are the entry point for data flowing into your event streaming platform. While writing data to Kafka might seem straightforward, production scenarios demand careful attention to reliability, performance, and error handling. A poorly configured producer can cause duplicate messages, data loss, or performance bottlenecks that ripple through your entire pipeline.
This comprehensive guide explores production-ready Kafka producer patterns, from basic configuration through advanced features like idempotent producers, transactions, and performance tuning. We will implement working examples across C#, Node.js, and Python to demonstrate these patterns in real-world scenarios.
Understanding Producer Fundamentals
Producers publish records to Kafka topics. Each record consists of a key (optional), value, headers (metadata), and timestamp. The producer API abstracts the complexity of determining which broker and partition should receive each message, batching multiple messages for efficiency, and retrying failed sends.
When a producer sends a record, it makes several critical decisions. First, it determines the target partition using the partitioning strategy. If a key is provided, Kafka uses consistent hashing to ensure records with the same key always go to the same partition. If no key is provided, records are distributed across partitions using a round-robin or sticky partitioning strategy.
Next, the producer serializes both key and value into byte arrays. The serialization format depends on your configured serializers – common options include String, JSON, Avro, or Protobuf. Finally, the producer buffers records in memory, batches them together, optionally compresses them, and sends batches to the appropriate broker.
flowchart TB
APP[Application] -->|Create Record| SER[Serialization]
SER --> PART[Partitioner]
PART --> BUF[Buffer/Accumulator]
BUF --> BATCH[Batch Formation]
BATCH --> COMP[Compression]
COMP --> SEND[Network Send]
SEND --> BROKER{Broker}
BROKER -->|Success| ACK[Acknowledgment]
BROKER -->|Failure| RETRY[Retry Logic]
RETRY -->|Retriable| SEND
RETRY -->|Fatal| ERR[Error Callback]
ACK --> SUCCESS[Success Callback]
style APP fill:#E1F5FE
style BROKER fill:#FFF9C4
style SUCCESS fill:#C8E6C9
style ERR fill:#FFCDD2Basic Producer Configuration
Every producer requires a minimal set of configuration properties. The bootstrap servers property tells the producer where to initially connect to discover the full cluster topology. Key and value serializers determine how to convert your objects into bytes for transmission. A client ID helps identify your producer in logs and metrics.
Beyond these basics, several configuration choices significantly impact reliability and performance. The acknowledgment configuration controls durability guarantees. Compression improves throughput by reducing network transfer sizes. Batch settings tune the tradeoff between latency and throughput. Retry settings determine how the producer handles transient failures.
C# Producer Configuration
using Confluent.Kafka;
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ClientId = "order-service-producer",
// Serialization
// Key/Value serializers set programmatically in KafkaProducer constructor
// Idempotency enabled by default in Kafka 3.0+
EnableIdempotence = true,
// Acknowledgment settings
Acks = Acks.All, // Wait for all in-sync replicas
// Performance tuning
CompressionType = CompressionType.Snappy,
LingerMs = 10, // Wait up to 10ms to batch messages
BatchSize = 16384, // 16KB batch size
// Reliability settings
MessageTimeoutMs = 300000, // 5 minutes total timeout
RequestTimeoutMs = 30000, // 30 seconds per request
// Retry configuration (default is int.MaxValue with idempotence)
MessageSendMaxRetries = int.MaxValue,
RetryBackoffMs = 100
};
using var producer = new ProducerBuilder<string, string>(config)
.SetKeySerializer(Serializers.Utf8)
.SetValueSerializer(Serializers.Utf8)
.Build();
// Producer is now ready to send messagesNode.js Producer Configuration
const { Kafka, CompressionTypes } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-service-producer',
brokers: ['localhost:9092'],
// Connection settings
connectionTimeout: 30000,
requestTimeout: 30000
});
const producer = kafka.producer({
// Idempotency enabled by default
idempotent: true,
// Acknowledgment settings
requiredAcks: -1, // Wait for all in-sync replicas (equivalent to acks=all)
// Performance tuning
compression: CompressionTypes.Snappy,
// Batching configuration
batch: {
maxBytes: 16384, // 16KB batch size
maxMessages: 100 // Or 100 messages, whichever comes first
},
// Retry configuration
retry: {
initialRetryTime: 100,
retries: Number.MAX_SAFE_INTEGER,
maxRetryTime: 30000,
multiplier: 2,
factor: 0.2
}
});
// Connect to Kafka
await producer.connect();
// Producer is now ready to send messagesPython Producer Configuration
from confluent_kafka import Producer
config = {
'bootstrap.servers': 'localhost:9092',
'client.id': 'order-service-producer',
# Idempotency enabled by default in recent versions
'enable.idempotence': True,
# Acknowledgment settings
'acks': 'all', # Wait for all in-sync replicas
# Performance tuning
'compression.type': 'snappy',
'linger.ms': 10, # Wait up to 10ms to batch messages
'batch.size': 16384, # 16KB batch size
# Reliability settings
'message.timeout.ms': 300000, # 5 minutes total timeout
'request.timeout.ms': 30000, # 30 seconds per request
# Retry configuration
'retries': 2147483647, # Maximum retries with idempotence
'retry.backoff.ms': 100
}
producer = Producer(config)
# Producer is now ready to send messagesIdempotent Producers: Preventing Duplicates
In distributed systems, network failures and broker crashes are inevitable. When a producer sends a message and the broker successfully writes it but fails to send an acknowledgment back, the producer cannot know if the write succeeded. The safe choice is to retry, but this creates a risk of duplicate messages.
Idempotent producers solve this problem by ensuring that retries never create duplicates. When idempotence is enabled (which is the default since Kafka 3.0), each producer receives a unique Producer ID during initialization. The producer assigns a monotonically increasing sequence number to every message sent to each partition.
Brokers track the highest sequence number received from each producer for each partition. If a broker receives a message with a sequence number it has already processed, it acknowledges the message but does not write it again. If a broker receives a message with a sequence number higher than expected, it rejects the message with an OutOfOrderSequenceException.
Enabling idempotence requires specific configuration. The producer must use acks=all to ensure messages are replicated before acknowledgment. The maximum in-flight requests per connection should be limited to 5 to maintain message ordering. Retries should be set to a high value (default is Integer.MAX_VALUE) to handle transient failures.
sequenceDiagram
participant P as Producer
(PID: 123)
participant B as Broker
Note over P,B: Normal Flow
P->>B: Message (PID:123, Seq:1)
B->>P: ACK
Note over P,B: Network Failure Scenario
P->>B: Message (PID:123, Seq:2)
Note over B: Message written
to log
B--xP: ACK lost
Note over P: Timeout, retry
P->>B: Message (PID:123, Seq:2)
Note over B: Duplicate detected
Seq 2 already written
B->>P: ACK (message not re-written)
Note over P,B: Normal Flow Continues
P->>B: Message (PID:123, Seq:3)
B->>P: ACKIdempotence provides exactly-once semantics within a single producer session for a single partition. However, it has important limitations. Idempotence only works within a single producer session – if the producer restarts, it receives a new Producer ID and cannot detect duplicates from the previous session. Idempotence guarantees are per-partition – messages sent to different partitions have independent sequence numbers. For guarantees across partitions or across producer sessions, you need transactional producers.
Transactional Producers: Atomic Multi-Partition Writes
Transactional producers extend idempotence to provide atomic writes across multiple partitions and topics. Transactions ensure that a group of messages either all commit successfully or none of them do, enabling exactly-once semantics for complex workflows.
Transactions require a transactional ID, which must be unique for each producer instance but consistent across restarts. This allows Kafka to fence out old producer instances when a new one starts with the same transactional ID, preventing zombie producers from writing conflicting data.
C# Transactional Producer
using Confluent.Kafka;
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
TransactionalId = "order-processor-1",
EnableIdempotence = true,
Acks = Acks.All
};
using var producer = new ProducerBuilder<string, string>(config)
.SetKeySerializer(Serializers.Utf8)
.SetValueSerializer(Serializers.Utf8)
.Build();
// Initialize transactions (required before first use)
producer.InitTransactions(TimeSpan.FromSeconds(30));
try
{
// Begin transaction
producer.BeginTransaction();
// Send messages to multiple topics/partitions
producer.Produce("orders",
new Message<string, string>
{
Key = "order-123",
Value = "{\"amount\":100}"
});
producer.Produce("payments",
new Message<string, string>
{
Key = "order-123",
Value = "{\"status\":\"pending\"}"
});
producer.Produce("inventory",
new Message<string, string>
{
Key = "item-456",
Value = "{\"reserved\":1}"
});
// Commit transaction - all messages become visible atomically
producer.CommitTransaction();
}
catch (ProducerFencedException)
{
// Another producer with same transactional.id is active
// Cannot recover - close this producer
producer.Close();
throw;
}
catch (KafkaException ex)
{
// Abort transaction on any error
producer.AbortTransaction();
// Handle error or retry entire transaction
Console.WriteLine($"Transaction failed: {ex.Message}");
}Node.js Transactional Producer
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-processor',
brokers: ['localhost:9092']
});
const producer = kafka.producer({
transactionalId: 'order-processor-1',
idempotent: true,
maxInFlightRequests: 1 // Required for transactions
});
await producer.connect();
try {
// Begin transaction
const transaction = await producer.transaction();
try {
// Send messages to multiple topics
await transaction.send({
topic: 'orders',
messages: [{
key: 'order-123',
value: JSON.stringify({ amount: 100 })
}]
});
await transaction.send({
topic: 'payments',
messages: [{
key: 'order-123',
value: JSON.stringify({ status: 'pending' })
}]
});
await transaction.send({
topic: 'inventory',
messages: [{
key: 'item-456',
value: JSON.stringify({ reserved: 1 })
}]
});
// Commit transaction
await transaction.commit();
} catch (error) {
// Abort transaction on any error
await transaction.abort();
throw error;
}
} catch (error) {
console.error('Transaction failed:', error);
}Python Transactional Producer
from confluent_kafka import Producer
import json
config = {
'bootstrap.servers': 'localhost:9092',
'transactional.id': 'order-processor-1',
'enable.idempotence': True,
'acks': 'all'
}
producer = Producer(config)
# Initialize transactions
producer.init_transactions()
try:
# Begin transaction
producer.begin_transaction()
# Send messages to multiple topics
producer.produce(
'orders',
key='order-123',
value=json.dumps({'amount': 100})
)
producer.produce(
'payments',
key='order-123',
value=json.dumps({'status': 'pending'})
)
producer.produce(
'inventory',
key='item-456',
value=json.dumps({'reserved': 1})
)
# Commit transaction
producer.commit_transaction()
except Exception as e:
# Abort transaction on any error
producer.abort_transaction()
print(f"Transaction failed: {e}")Transactions provide powerful guarantees but come with tradeoffs. Transaction commits involve a two-phase commit protocol, adding latency compared to non-transactional sends. Throughput may be reduced due to coordination overhead. Only one transaction can be active per producer at a time. Consumers must be configured to read only committed messages (isolation.level=read_committed) to benefit from transactional guarantees.
Asynchronous vs Synchronous Sends
Producers can send messages asynchronously or synchronously. Asynchronous sends return immediately and invoke a callback when the message is acknowledged or fails. Synchronous sends block until the message is acknowledged or an error occurs. The choice significantly impacts throughput, latency, and error handling complexity.
Asynchronous sends provide maximum throughput by allowing the application to continue working while messages are in flight. However, error handling becomes more complex since errors arrive asynchronously in callbacks. Applications must carefully manage state to correlate callbacks with their originating requests.
Synchronous sends simplify error handling since errors are returned directly to the caller. However, they dramatically reduce throughput since the application blocks waiting for each acknowledgment. For high-volume scenarios, asynchronous sends with proper error handling are essential.
Asynchronous Send Patterns
// C# Asynchronous Send
producer.Produce("orders",
new Message<string, string> { Key = "order-123", Value = orderJson },
deliveryReport =>
{
if (deliveryReport.Error.IsError)
{
Console.WriteLine($"Failed: {deliveryReport.Error.Reason}");
}
else
{
Console.WriteLine($"Delivered to {deliveryReport.TopicPartitionOffset}");
}
});
// Node.js Asynchronous Send
producer.send({
topic: 'orders',
messages: [{ key: 'order-123', value: orderJson }]
})
.then(recordMetadata => {
console.log(`Delivered to ${recordMetadata.partition} at offset ${recordMetadata.offset}`);
})
.catch(error => {
console.error(`Failed: ${error.message}`);
});
// Python Asynchronous Send
def delivery_callback(err, msg):
if err:
print(f'Failed: {err}')
else:
print(f'Delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
producer.produce('orders', key='order-123', value=order_json, callback=delivery_callback)Performance Optimization Strategies
Producer performance depends on several configurable parameters. Batching groups multiple messages into a single request, amortizing network overhead. The batch size parameter controls the maximum bytes per batch, while linger time adds a small delay to collect more messages before sending. Compression reduces network transfer sizes at the cost of CPU cycles. Appropriate partitioning distributes load evenly across brokers.
For high-throughput scenarios, increase batch size to 32KB or 64KB and set linger time to 10-20ms. Enable compression using Snappy or LZ4 for good compression ratios with low CPU cost. Use meaningful keys to ensure related messages go to the same partition while distributing load evenly.
For low-latency scenarios, reduce batch size to 1KB or less and set linger time to 0 (send immediately). Disable compression to avoid CPU overhead. Monitor producer metrics like request latency, batch size, and compression ratio to identify optimization opportunities.
Error Handling and Retry Strategies
Kafka producers encounter two types of errors: retriable and non-retriable. Retriable errors include temporary conditions like leader elections, not enough replicas, or network timeouts. The producer automatically retries these errors based on retry configuration. Non-retriable errors include invalid topics, serialization failures, or message size violations. These errors cannot be fixed by retrying and require application-level handling.
With idempotence enabled, the producer can safely retry without creating duplicates. Configure a high retry count (default is Integer.MAX_VALUE) and use message timeout to limit total retry duration. Implement exponential backoff between retries to avoid overwhelming brokers during outages.
For critical data, implement application-level retry logic with durable storage. Persist failed messages to a database or file system and retry sending them later. Consider implementing a dead-letter queue pattern for messages that fail after exhausting retries.
Best Practices for Production Deployments
Enable idempotence for all producers unless you have specific reasons not to. Use acks=all for critical data to ensure durability. Set appropriate timeouts – message timeout should account for retry backoff duration. Implement proper error handling with logging and monitoring. Use meaningful client IDs to identify producers in logs and metrics.
Avoid creating producer instances for each message – producers are thread-safe and expensive to create. Reuse producer instances across multiple threads when possible. Close producers gracefully during application shutdown to flush pending messages. Monitor producer metrics including request rate, error rate, batch size, and compression ratio.
For mission-critical applications, consider implementing producer pooling to handle high concurrency. Use circuit breakers to stop sending during extended outages. Implement health checks that verify producer connectivity before accepting application traffic.
Coming Up Next
Understanding producer patterns provides the foundation for reliable data ingestion into Kafka. Part 3 of this series explores Kafka consumers in depth, covering consumer groups, offset management, rebalancing strategies, and building resilient consumption patterns across multiple programming languages.
References
- Medium – Kafka Idempotent Producer and Consumer
- Nejc Korasa – Idempotent Processing with Kafka
- Apache Kafka – KafkaProducer JavaDoc
- Baeldung – Exactly Once Processing in Kafka with Java
- Hevo Data – What is Kafka Exactly Once Semantics?
- Conduktor – Idempotent Kafka Producer
- SocketDaddy – Kafka Idempotent Producer: Exactly-Once Message Delivery
- Cloudurable – Kafka Tutorial: Creating a Kafka Producer in Java – 2025 Edition
- Red Hat Documentation – Kafka Producer Configuration Tuning
- LinkedIn – Kafka Idempotent Producer by Rob Golder
