Advanced Kafka Message Monitoring: Enterprise Solutions with Prometheus and Grafana

Advanced Kafka Message Monitoring: Enterprise Solutions with Prometheus and Grafana

Continuing from our previous guide on identifying unused messages in Kafka, this article focuses on advanced monitoring techniques, automated alerting systems, and C# implementations for enterprise environments. We’ll also explore how to visualize message usage patterns and create comprehensive monitoring dashboards.

Advanced Monitoring Architecture

A robust Kafka monitoring system requires multiple components working together to provide real-time insights into message usage patterns.

graph TB
    A[Kafka Cluster] --> B[Metrics Collector]
    B --> C[Time Series DB]
    B --> D[Alert Manager]
    C --> E[Dashboard]
    D --> F[Notification System]
    
    subgraph "Monitoring Components"
        B --> G[JMX Metrics]
        B --> H[Admin API]
        B --> I[Consumer Group Monitor]
    end
    
    subgraph "Storage & Analysis"
        C --> J[Prometheus]
        C --> K[InfluxDB]
        E --> L[Grafana]
        E --> M[Custom Dashboard]
    end
    
    subgraph "Alerting"
        F --> N[Slack/Teams]
        F --> O[Email]
        F --> P[PagerDuty]
    end

Method 4: C# .NET Monitoring Solution

For enterprise environments using .NET, here’s a comprehensive C# implementation:

// KafkaMessageMonitor.cs
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Configuration;

namespace KafkaMonitoring
{
    public class UnusedMessageReport
    {
        public DateTime Timestamp { get; set; }
        public Dictionary<string, TopicReport> Topics { get; set; } = new();
        public long TotalUnusedMessages { get; set; }
    }

    public class TopicReport
    {
        public Dictionary<int, PartitionReport> Partitions { get; set; } = new();
        public long TopicTotalUnused { get; set; }
    }

    public class PartitionReport
    {
        public long EarliestOffset { get; set; }
        public long LatestOffset { get; set; }
        public long TotalMessages { get; set; }
        public long MaxConsumedOffset { get; set; }
        public long UnusedMessages { get; set; }
        public List<string> ConsumingGroups { get; set; } = new();
    }

    public class KafkaUnusedMessagesMonitor
    {
        private readonly ILogger<KafkaUnusedMessagesMonitor> _logger;
        private readonly AdminClientConfig _adminConfig;
        private readonly ConsumerConfig _consumerConfig;

        public KafkaUnusedMessagesMonitor(
            ILogger<KafkaUnusedMessagesMonitor> logger,
            IConfiguration configuration)
        {
            _logger = logger;
            
            _adminConfig = new AdminClientConfig
            {
                BootstrapServers = configuration["Kafka:BootstrapServers"] ?? "localhost:9092"
            };
            
            _consumerConfig = new ConsumerConfig
            {
                BootstrapServers = _adminConfig.BootstrapServers,
                GroupId = "unused-messages-monitor",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnableAutoCommit = false
            };
        }

        public async Task<UnusedMessageReport> GenerateReportAsync()
        {
            _logger.LogInformation("Starting unused messages analysis...");
            
            var report = new UnusedMessageReport
            {
                Timestamp = DateTime.UtcNow
            };

            try
            {
                using var adminClient = new AdminClientBuilder(_adminConfig).Build();
                
                // Get all topics and their metadata
                var topicMetadata = await GetTopicMetadataAsync(adminClient);
                
                // Get consumer group information
                var consumerGroupOffsets = await GetConsumerGroupOffsetsAsync(adminClient);
                
                // Process each topic
                foreach (var (topicName, partitions) in topicMetadata)
                {
                    _logger.LogDebug($"Processing topic: {topicName}");
                    
                    var topicReport = new TopicReport();
                    report.Topics[topicName] = topicReport;
                    
                    foreach (var partition in partitions)
                    {
                        try
                        {
                            var partitionReport = await ProcessPartitionAsync(
                                topicName, partition, consumerGroupOffsets);
                            
                            topicReport.Partitions[partition] = partitionReport;
                            topicReport.TopicTotalUnused += partitionReport.UnusedMessages;
                        }
                        catch (Exception ex)
                        {
                            _logger.LogWarning(ex, 
                                $"Failed to process partition {topicName}:{partition}");
                        }
                    }
                    
                    report.TotalUnusedMessages += topicReport.TopicTotalUnused;
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to generate unused messages report");
                throw;
            }

            _logger.LogInformation($"Analysis complete. Total unused messages: {report.TotalUnusedMessages:N0}");
            return report;
        }

        private async Task<Dictionary<string, List<int>>> GetTopicMetadataAsync(IAdminClient adminClient)
        {
            var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
            var topicPartitions = new Dictionary<string, List<int>>();
            
            foreach (var topic in metadata.Topics)
            {
                if (topic.Error != ErrorCode.NoError)
                {
                    _logger.LogWarning($"Error in topic {topic.Topic}: {topic.Error}");
                    continue;
                }
                
                topicPartitions[topic.Topic] = topic.Partitions
                    .Select(p => p.PartitionId)
                    .ToList();
            }
            
            return topicPartitions;
        }

        private async Task<Dictionary<string, Dictionary<string, Dictionary<int, long>>>> 
            GetConsumerGroupOffsetsAsync(IAdminClient adminClient)
        {
            var consumerGroupOffsets = new Dictionary<string, Dictionary<string, Dictionary<int, long>>>();
            
            try
            {
                var groups = await adminClient.ListGroupsAsync(TimeSpan.FromSeconds(10));
                
                foreach (var group in groups)
                {
                    if (group.GroupId.Contains("temp") || group.GroupId.Contains("monitor"))
                        continue;
                    
                    try
                    {
                        var groupOffsets = await adminClient.ListConsumerGroupOffsetsAsync(
                            new[] { group.GroupId }, TimeSpan.FromSeconds(10));
                        
                        if (groupOffsets.ContainsKey(group.GroupId))
                        {
                            var offsets = groupOffsets[group.GroupId];
                            consumerGroupOffsets[group.GroupId] = new Dictionary<string, Dictionary<int, long>>();
                            
                            foreach (var offset in offsets)
                            {
                                var topic = offset.Topic;
                                var partition = offset.Partition.Value;
                                var offsetValue = offset.Offset.Value;
                                
                                if (!consumerGroupOffsets[group.GroupId].ContainsKey(topic))
                                {
                                    consumerGroupOffsets[group.GroupId][topic] = new Dictionary<int, long>();
                                }
                                
                                consumerGroupOffsets[group.GroupId][topic][partition] = offsetValue;
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        _logger.LogWarning(ex, $"Failed to get offsets for group {group.GroupId}");
                    }
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to get consumer group offsets");
            }
            
            return consumerGroupOffsets;
        }

        private async Task<PartitionReport> ProcessPartitionAsync(
            string topicName, 
            int partition,
            Dictionary<string, Dictionary<string, Dictionary<int, long>>> consumerGroupOffsets)
        {
            using var consumer = new ConsumerBuilder<Ignore, Ignore>(_consumerConfig).Build();
            
            var topicPartition = new TopicPartition(topicName, partition);
            var topicPartitionList = new[] { topicPartition };
            
            // Get watermark offsets
            var watermarkOffsets = consumer.QueryWatermarkOffsets(topicPartition, TimeSpan.FromSeconds(5));
            
            var earliestOffset = watermarkOffsets.Low.Value;
            var latestOffset = watermarkOffsets.High.Value;
            var totalMessages = latestOffset - earliestOffset;
            
            // Find maximum consumed offset across all consumer groups
            long maxConsumedOffset = earliestOffset;
            var consumingGroups = new List<string>();
            
            foreach (var (groupId, groupOffsets) in consumerGroupOffsets)
            {
                if (groupOffsets.ContainsKey(topicName) && 
                    groupOffsets[topicName].ContainsKey(partition))
                {
                    var groupOffset = groupOffsets[topicName][partition];
                    maxConsumedOffset = Math.Max(maxConsumedOffset, groupOffset);
                    consumingGroups.Add(groupId);
                }
            }
            
            var unusedMessages = Math.Max(0, latestOffset - maxConsumedOffset);
            
            return new PartitionReport
            {
                EarliestOffset = earliestOffset,
                LatestOffset = latestOffset,
                TotalMessages = totalMessages,
                MaxConsumedOffset = maxConsumedOffset,
                UnusedMessages = unusedMessages,
                ConsumingGroups = consumingGroups
            };
        }

        public void PrintReport(UnusedMessageReport report)
        {
            Console.WriteLine(new string('=', 60));
            Console.WriteLine("KAFKA UNUSED MESSAGES REPORT");
            Console.WriteLine($"Generated: {report.Timestamp:yyyy-MM-dd HH:mm:ss} UTC");
            Console.WriteLine(new string('=', 60));
            
            Console.WriteLine($"\nTOTAL UNUSED MESSAGES: {report.TotalUnusedMessages:N0}");
            Console.WriteLine("\nDETAILED BREAKDOWN:");
            Console.WriteLine(new string('-', 60));
            
            foreach (var (topicName, topicData) in report.Topics)
            {
                if (topicData.TopicTotalUnused > 0)
                {
                    Console.WriteLine($"\nTopic: {topicName}");
                    Console.WriteLine($"  Unused Messages: {topicData.TopicTotalUnused:N0}");
                    
                    foreach (var (partition, partitionData) in topicData.Partitions)
                    {
                        if (partitionData.UnusedMessages > 0)
                        {
                            Console.WriteLine($"  Partition {partition}:");
                            Console.WriteLine($"    Latest Offset: {partitionData.LatestOffset:N0}");
                            Console.WriteLine($"    Max Consumed: {partitionData.MaxConsumedOffset:N0}");
                            Console.WriteLine($"    Unused: {partitionData.UnusedMessages:N0}");
                            Console.WriteLine($"    Consumer Groups: {string.Join(", ", partitionData.ConsumingGroups)}");
                        }
                    }
                }
            }
        }

        public async Task SaveReportAsync(UnusedMessageReport report, string filePath = null)
        {
            filePath ??= $"kafka-unused-messages-{DateTime.Now:yyyyMMdd_HHmmss}.json";
            
            var json = JsonSerializer.Serialize(report, new JsonSerializerOptions
            {
                WriteIndented = true,
                PropertyNamingPolicy = JsonNamingPolicy.CamelCase
            });
            
            await File.WriteAllTextAsync(filePath, json);
            _logger.LogInformation($"Report saved to: {filePath}");
        }
    }

    // Usage example in Program.cs
    class Program
    {
        static async Task Main(string[] args)
        {
            var configuration = new ConfigurationBuilder()
                .AddJsonFile("appsettings.json")
                .Build();

            var loggerFactory = LoggerFactory.Create(builder => 
                builder.AddConsole().SetMinimumLevel(LogLevel.Information));
            var logger = loggerFactory.CreateLogger<KafkaUnusedMessagesMonitor>();

            var monitor = new KafkaUnusedMessagesMonitor(logger, configuration);
            
            try
            {
                var report = await monitor.GenerateReportAsync();
                monitor.PrintReport(report);
                await monitor.SaveReportAsync(report);
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Failed to generate report");
            }
        }
    }
}

Method 5: Real-time Monitoring with Prometheus Integration

For production environments, integrate with Prometheus for continuous monitoring:

# kafka-prometheus-exporter.py
import time
import logging
from prometheus_client import start_http_server, Gauge, Info
from kafka import KafkaAdminClient, KafkaConsumer
from kafka.structs import TopicPartition
import threading
from datetime import datetime

# Prometheus metrics
unused_messages_gauge = Gauge(
    'kafka_unused_messages_total',
    'Total unused messages per topic partition',
    ['topic', 'partition', 'cluster']
)

consumer_lag_gauge = Gauge(
    'kafka_consumer_lag_sum',
    'Consumer lag per topic partition per group',
    ['topic', 'partition', 'consumer_group', 'cluster']
)

kafka_info = Info(
    'kafka_cluster_info',
    'Kafka cluster information'
)

class KafkaPrometheusExporter:
    def __init__(self, bootstrap_servers, cluster_name='default'):
        self.bootstrap_servers = bootstrap_servers
        self.cluster_name = cluster_name
        self.admin_client = KafkaAdminClient(
            bootstrap_servers=bootstrap_servers,
            client_id=f'prometheus_exporter_{cluster_name}'
        )
        self.running = False
        
    def collect_metrics(self):
        """Collect and update Prometheus metrics"""
        try:
            # Get topic metadata
            metadata = self.admin_client.describe_topics()
            
            for topic_name, topic_metadata in metadata.items():
                for partition_metadata in topic_metadata.partitions:
                    partition_id = partition_metadata.partition
                    
                    # Get partition offsets
                    try:
                        consumer = KafkaConsumer(
                            bootstrap_servers=self.bootstrap_servers,
                            enable_auto_commit=False
                        )
                        
                        tp = TopicPartition(topic_name, partition_id)
                        consumer.assign([tp])
                        
                        # Get high water mark
                        end_offsets = consumer.end_offsets([tp])
                        latest_offset = end_offsets[tp]
                        
                        # Get earliest offset
                        beginning_offsets = consumer.beginning_offsets([tp])
                        earliest_offset = beginning_offsets[tp]
                        
                        consumer.close()
                        
                        # Get consumer group offsets
                        consumer_groups = self.admin_client.list_consumer_groups()
                        max_consumed_offset = earliest_offset
                        
                        for group in consumer_groups:
                            try:
                                group_offsets = self.admin_client.list_consumer_group_offsets(group.group_id)
                                for topic_partition, offset_metadata in group_offsets.items():
                                    if (topic_partition.topic == topic_name and 
                                        topic_partition.partition == partition_id):
                                        max_consumed_offset = max(max_consumed_offset, offset_metadata.offset)
                                        
                                        # Update consumer lag metric
                                        lag = latest_offset - offset_metadata.offset
                                        consumer_lag_gauge.labels(
                                            topic=topic_name,
                                            partition=str(partition_id),
                                            consumer_group=group.group_id,
                                            cluster=self.cluster_name
                                        ).set(lag)
                                        
                            except Exception as e:
                                logging.warning(f"Failed to get offsets for group {group.group_id}: {e}")
                        
                        # Calculate unused messages
                        unused_messages = max(0, latest_offset - max_consumed_offset)
                        
                        # Update unused messages metric
                        unused_messages_gauge.labels(
                            topic=topic_name,
                            partition=str(partition_id),
                            cluster=self.cluster_name
                        ).set(unused_messages)
                        
                    except Exception as e:
                        logging.error(f"Error processing {topic_name}:{partition_id} - {e}")
                        
        except Exception as e:
            logging.error(f"Error collecting metrics: {e}")
    
    def start_monitoring(self, interval=60):
        """Start continuous monitoring"""
        self.running = True
        
        def monitor_loop():
            while self.running:
                try:
                    self.collect_metrics()
                    time.sleep(interval)
                except Exception as e:
                    logging.error(f"Monitoring error: {e}")
                    time.sleep(5)  # Short retry interval
        
        monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
        monitor_thread.start()
        logging.info(f"Started Kafka monitoring for cluster: {self.cluster_name}")
    
    def stop_monitoring(self):
        """Stop monitoring"""
        self.running = False
        logging.info("Stopped Kafka monitoring")

# Main execution
if __name__ == "__main__":
    import os
    
    # Configuration
    BOOTSTRAP_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092').split(',')
    CLUSTER_NAME = os.getenv('KAFKA_CLUSTER_NAME', 'default')
    METRICS_PORT = int(os.getenv('METRICS_PORT', 8000))
    COLLECTION_INTERVAL = int(os.getenv('COLLECTION_INTERVAL', 60))
    
    # Setup logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    
    # Start Prometheus metrics server
    start_http_server(METRICS_PORT)
    logging.info(f"Prometheus metrics server started on port {METRICS_PORT}")
    
    # Initialize and start monitoring
    exporter = KafkaPrometheusExporter(BOOTSTRAP_SERVERS, CLUSTER_NAME)
    exporter.start_monitoring(COLLECTION_INTERVAL)
    
    try:
        # Keep the main thread alive
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        exporter.stop_monitoring()
        logging.info("Exporter stopped")

Docker Configuration for Monitoring Stack

Deploy the complete monitoring solution using Docker Compose:

# docker-compose.monitoring.yml
version: '3.8'

services:
  kafka-exporter:
    build: .
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092
      - KAFKA_CLUSTER_NAME=production
      - METRICS_PORT=8000
      - COLLECTION_INTERVAL=30
    ports:
      - "8000:8000"
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
    restart: unless-stopped

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/etc/prometheus/console_libraries'
      - '--web.console.templates=/etc/prometheus/consoles'
      - '--storage.tsdb.retention.time=90d'
      - '--web.enable-lifecycle'
    restart: unless-stopped

  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana/dashboards:/var/lib/grafana/dashboards
      - ./grafana/provisioning:/etc/grafana/provisioning
    restart: unless-stopped

  alertmanager:
    image: prom/alertmanager:latest
    container_name: alertmanager
    ports:
      - "9093:9093"
    volumes:
      - ./alertmanager.yml:/etc/alertmanager/alertmanager.yml
    restart: unless-stopped

volumes:
  prometheus_data:
  grafana_data:

Alerting Rules for Unused Messages

Configure Prometheus alerting rules to notify when unused messages exceed thresholds:

# kafka-alerts.yml
groups:
  - name: kafka.unused.messages
    rules:
      - alert: KafkaHighUnusedMessages
        expr: kafka_unused_messages_total > 10000
        for: 5m
        labels:
          severity: warning
          component: kafka
        annotations:
          summary: "High number of unused messages in Kafka topic {{ $labels.topic }}"
          description: "Topic {{ $labels.topic }} partition {{ $labels.partition }} has {{ $value }} unused messages for more than 5 minutes."

      - alert: KafkaCriticalUnusedMessages
        expr: kafka_unused_messages_total > 100000
        for: 2m
        labels:
          severity: critical
          component: kafka
        annotations:
          summary: "Critical number of unused messages in Kafka"
          description: "Topic {{ $labels.topic }} partition {{ $labels.partition }} has {{ $value }} unused messages. Immediate attention required."

      - alert: KafkaNoConsumersForTopic
        expr: kafka_unused_messages_total > 1000 and kafka_consumer_lag_sum == 0
        for: 10m
        labels:
          severity: warning
          component: kafka
        annotations:
          summary: "Kafka topic has no active consumers"
          description: "Topic {{ $labels.topic }} has unused messages but no consumer lag, indicating no active consumers."

      - alert: KafkaConsumerLagHigh
        expr: kafka_consumer_lag_sum > 50000
        for: 5m
        labels:
          severity: warning
          component: kafka
        annotations:
          summary: "High consumer lag detected"
          description: "Consumer group {{ $labels.consumer_group }} for topic {{ $labels.topic }} has lag of {{ $value }} messages."

Grafana Dashboard Configuration

Create comprehensive Grafana dashboards for visualizing unused message trends:

{
  "dashboard": {
    "title": "Kafka Unused Messages Dashboard",
    "panels": [
      {
        "title": "Total Unused Messages",
        "type": "stat",
        "targets": [
          {
            "expr": "sum(kafka_unused_messages_total)",
            "legendFormat": "Total Unused Messages"
          }
        ]
      },
      {
        "title": "Unused Messages by Topic",
        "type": "bargauge",
        "targets": [
          {
            "expr": "sum(kafka_unused_messages_total) by (topic)",
            "legendFormat": "{{ topic }}"
          }
        ]
      },
      {
        "title": "Consumer Lag Trends",
        "type": "graph",
        "targets": [
          {
            "expr": "kafka_consumer_lag_sum",
            "legendFormat": "{{ consumer_group }} - {{ topic }}"
          }
        ]
      },
      {
        "title": "Topics with No Consumers",
        "type": "table",
        "targets": [
          {
            "expr": "kafka_unused_messages_total > 0 unless kafka_consumer_lag_sum",
            "legendFormat": "{{ topic }}:{{ partition }}"
          }
        ]
      }
    ]
  }
}

This comprehensive monitoring solution provides real-time visibility into Kafka message usage patterns, enabling proactive identification and resolution of unused message issues. The combination of multiple programming languages and monitoring tools ensures flexibility for different enterprise environments.

Written by:

339 Posts

View All Posts
Follow Me :