When running Apache Kafka in production, it’s crucial to monitor whether messages are being consumed effectively. Unused or unread messages can indicate consumer issues, processing bottlenecks, or misconfigured applications. This guide covers multiple approaches to identify unused messages in Kafka, regardless of whether you’re running in KRaft mode or traditional ZooKeeper mode.
Understanding Kafka Message States
flowchart TD A[Producer] --> B[Kafka Topic/Partition] B --> C{Message States} C --> D[Unread Messages] C --> E[Uncommitted Messages] C --> F[Lag Messages] D --> G[No consumer groups reading] E --> H[Consumed but not committed] F --> I[Available but not processed]
- Unread Messages: Messages that haven’t been consumed by any consumer group
- Uncommitted Messages: Messages consumed but not committed (offset not updated)
- Lag Messages: Messages available but not yet processed due to consumer lag
Method 1: Command-Line Tools
# Check consumer group lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--all-groups --describe
# Get topic offsets
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic your-topic-name \
--time -1
Method 2: Python Monitor
#!/usr/bin/env python3
from kafka import KafkaAdminClient, KafkaConsumer
from kafka.structs import TopicPartition
import json
from datetime import datetime
class KafkaMessageMonitor:
def __init__(self, bootstrap_servers=['localhost:9092']):
self.bootstrap_servers = bootstrap_servers
self.admin_client = KafkaAdminClient(
bootstrap_servers=bootstrap_servers,
client_id='unused_message_monitor'
)
def get_partition_offsets(self, topic, partition):
consumer = KafkaConsumer(
bootstrap_servers=self.bootstrap_servers,
enable_auto_commit=False
)
tp = TopicPartition(topic, partition)
consumer.assign([tp])
end_offsets = consumer.end_offsets([tp])
beginning_offsets = consumer.beginning_offsets([tp])
consumer.close()
return beginning_offsets[tp], end_offsets[tp]
def calculate_unused_messages(self):
metadata = self.admin_client.describe_topics()
unused_total = 0
for topic_name, topic_metadata in metadata.items():
if topic_name.startswith('__'):
continue
for partition in topic_metadata.partitions:
earliest, latest = self.get_partition_offsets(
topic_name, partition.partition
)
# Get consumer group offsets
groups = self.admin_client.list_consumer_groups()
max_consumed = earliest
for group in groups:
try:
offsets = self.admin_client.list_consumer_group_offsets(
group.group_id
)
for tp, offset_meta in offsets.items():
if (tp.topic == topic_name and
tp.partition == partition.partition):
max_consumed = max(max_consumed, offset_meta.offset)
except:
continue
unused = latest - max_consumed
unused_total += unused
if unused > 0:
print(f"Topic: {topic_name}, Partition: {partition.partition}")
print(f" Unused messages: {unused}")
return unused_total
# Usage
monitor = KafkaMessageMonitor(['localhost:9092'])
total = monitor.calculate_unused_messages()
print(f"Total unused messages: {total}")
Method 3: Node.js Monitor
const { Kafka } = require('kafkajs');
class KafkaUnusedMonitor {
constructor(brokers = ['localhost:9092']) {
this.kafka = new Kafka({
clientId: 'unused-monitor',
brokers: brokers
});
this.admin = this.kafka.admin();
}
async connect() {
await this.admin.connect();
}
async disconnect() {
await this.admin.disconnect();
}
async getUnusedMessages() {
const metadata = await this.admin.fetchTopicMetadata();
let totalUnused = 0;
for (const topic of metadata.topics) {
if (topic.name.startsWith('__')) continue;
for (const partition of topic.partitions) {
const consumer = this.kafka.consumer({
groupId: `temp-${Date.now()}`
});
try {
await consumer.connect();
await consumer.subscribe({ topic: topic.name });
const offsets = await consumer.fetchOffsets([{
topic: topic.name,
partition: partition.partitionId
}]);
const latestOffset = parseInt(offsets[0].offset);
// Get consumer group offsets
const groups = await this.admin.listGroups();
let maxConsumed = 0;
for (const group of groups.groups) {
try {
const groupOffsets = await this.admin.fetchOffsets({
groupId: group.groupId
});
for (const topicOffset of groupOffsets) {
if (topicOffset.topic === topic.name) {
for (const partOffset of topicOffset.partitions) {
if (partOffset.partition === partition.partitionId) {
maxConsumed = Math.max(
maxConsumed,
parseInt(partOffset.offset)
);
}
}
}
}
} catch (error) {
// Skip groups with errors
}
}
const unused = latestOffset - maxConsumed;
totalUnused += unused;
if (unused > 0) {
console.log(`${topic.name}:${partition.partitionId} - ${unused} unused`);
}
} finally {
await consumer.disconnect();
}
}
}
return totalUnused;
}
}
// Usage
async function main() {
const monitor = new KafkaUnusedMonitor(['localhost:9092']);
await monitor.connect();
const total = await monitor.getUnusedMessages();
console.log(`Total unused messages: ${total}`);
await monitor.disconnect();
}
main().catch(console.error);
These methods provide effective ways to identify unused messages in your Kafka cluster. The next part will cover advanced enterprise monitoring with Prometheus and alerting systems.