How to Identify Unused Messages in Apache Kafka: Complete Monitoring Guide

How to Identify Unused Messages in Apache Kafka: Complete Monitoring Guide

When running Apache Kafka in production, it’s crucial to monitor whether messages are being consumed effectively. Unused or unread messages can indicate consumer issues, processing bottlenecks, or misconfigured applications. This guide covers multiple approaches to identify unused messages in Kafka, regardless of whether you’re running in KRaft mode or traditional ZooKeeper mode.

Understanding Kafka Message States

flowchart TD
    A[Producer] --> B[Kafka Topic/Partition]
    B --> C{Message States}
    C --> D[Unread Messages]
    C --> E[Uncommitted Messages] 
    C --> F[Lag Messages]
    
    D --> G[No consumer groups reading]
    E --> H[Consumed but not committed]
    F --> I[Available but not processed]
  • Unread Messages: Messages that haven’t been consumed by any consumer group
  • Uncommitted Messages: Messages consumed but not committed (offset not updated)
  • Lag Messages: Messages available but not yet processed due to consumer lag

Method 1: Command-Line Tools

# Check consumer group lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --all-groups --describe

# Get topic offsets
kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 \
  --topic your-topic-name \
  --time -1

Method 2: Python Monitor

#!/usr/bin/env python3
from kafka import KafkaAdminClient, KafkaConsumer
from kafka.structs import TopicPartition
import json
from datetime import datetime

class KafkaMessageMonitor:
    def __init__(self, bootstrap_servers=['localhost:9092']):
        self.bootstrap_servers = bootstrap_servers
        self.admin_client = KafkaAdminClient(
            bootstrap_servers=bootstrap_servers,
            client_id='unused_message_monitor'
        )
        
    def get_partition_offsets(self, topic, partition):
        consumer = KafkaConsumer(
            bootstrap_servers=self.bootstrap_servers,
            enable_auto_commit=False
        )
        
        tp = TopicPartition(topic, partition)
        consumer.assign([tp])
        
        end_offsets = consumer.end_offsets([tp])
        beginning_offsets = consumer.beginning_offsets([tp])
        
        consumer.close()
        return beginning_offsets[tp], end_offsets[tp]
    
    def calculate_unused_messages(self):
        metadata = self.admin_client.describe_topics()
        unused_total = 0
        
        for topic_name, topic_metadata in metadata.items():
            if topic_name.startswith('__'):
                continue
                
            for partition in topic_metadata.partitions:
                earliest, latest = self.get_partition_offsets(
                    topic_name, partition.partition
                )
                
                # Get consumer group offsets
                groups = self.admin_client.list_consumer_groups()
                max_consumed = earliest
                
                for group in groups:
                    try:
                        offsets = self.admin_client.list_consumer_group_offsets(
                            group.group_id
                        )
                        for tp, offset_meta in offsets.items():
                            if (tp.topic == topic_name and 
                                tp.partition == partition.partition):
                                max_consumed = max(max_consumed, offset_meta.offset)
                    except:
                        continue
                
                unused = latest - max_consumed
                unused_total += unused
                
                if unused > 0:
                    print(f"Topic: {topic_name}, Partition: {partition.partition}")
                    print(f"  Unused messages: {unused}")
        
        return unused_total

# Usage
monitor = KafkaMessageMonitor(['localhost:9092'])
total = monitor.calculate_unused_messages()
print(f"Total unused messages: {total}")

Method 3: Node.js Monitor

const { Kafka } = require('kafkajs');

class KafkaUnusedMonitor {
    constructor(brokers = ['localhost:9092']) {
        this.kafka = new Kafka({
            clientId: 'unused-monitor',
            brokers: brokers
        });
        this.admin = this.kafka.admin();
    }
    
    async connect() {
        await this.admin.connect();
    }
    
    async disconnect() {
        await this.admin.disconnect();
    }
    
    async getUnusedMessages() {
        const metadata = await this.admin.fetchTopicMetadata();
        let totalUnused = 0;
        
        for (const topic of metadata.topics) {
            if (topic.name.startsWith('__')) continue;
            
            for (const partition of topic.partitions) {
                const consumer = this.kafka.consumer({ 
                    groupId: `temp-${Date.now()}` 
                });
                
                try {
                    await consumer.connect();
                    await consumer.subscribe({ topic: topic.name });
                    
                    const offsets = await consumer.fetchOffsets([{
                        topic: topic.name,
                        partition: partition.partitionId
                    }]);
                    
                    const latestOffset = parseInt(offsets[0].offset);
                    
                    // Get consumer group offsets
                    const groups = await this.admin.listGroups();
                    let maxConsumed = 0;
                    
                    for (const group of groups.groups) {
                        try {
                            const groupOffsets = await this.admin.fetchOffsets({
                                groupId: group.groupId
                            });
                            
                            for (const topicOffset of groupOffsets) {
                                if (topicOffset.topic === topic.name) {
                                    for (const partOffset of topicOffset.partitions) {
                                        if (partOffset.partition === partition.partitionId) {
                                            maxConsumed = Math.max(
                                                maxConsumed, 
                                                parseInt(partOffset.offset)
                                            );
                                        }
                                    }
                                }
                            }
                        } catch (error) {
                            // Skip groups with errors
                        }
                    }
                    
                    const unused = latestOffset - maxConsumed;
                    totalUnused += unused;
                    
                    if (unused > 0) {
                        console.log(`${topic.name}:${partition.partitionId} - ${unused} unused`);
                    }
                    
                } finally {
                    await consumer.disconnect();
                }
            }
        }
        
        return totalUnused;
    }
}

// Usage
async function main() {
    const monitor = new KafkaUnusedMonitor(['localhost:9092']);
    await monitor.connect();
    
    const total = await monitor.getUnusedMessages();
    console.log(`Total unused messages: ${total}`);
    
    await monitor.disconnect();
}

main().catch(console.error);

These methods provide effective ways to identify unused messages in your Kafka cluster. The next part will cover advanced enterprise monitoring with Prometheus and alerting systems.

Written by:

339 Posts

View All Posts
Follow Me :