Continuing from our previous guide on identifying unused messages in Kafka, this article focuses on advanced monitoring techniques, automated alerting systems, and C# implementations for enterprise environments. We’ll also explore how to visualize message usage patterns and create comprehensive monitoring dashboards.
Advanced Monitoring Architecture
A robust Kafka monitoring system requires multiple components working together to provide real-time insights into message usage patterns.
graph TB A[Kafka Cluster] --> B[Metrics Collector] B --> C[Time Series DB] B --> D[Alert Manager] C --> E[Dashboard] D --> F[Notification System] subgraph "Monitoring Components" B --> G[JMX Metrics] B --> H[Admin API] B --> I[Consumer Group Monitor] end subgraph "Storage & Analysis" C --> J[Prometheus] C --> K[InfluxDB] E --> L[Grafana] E --> M[Custom Dashboard] end subgraph "Alerting" F --> N[Slack/Teams] F --> O[Email] F --> P[PagerDuty] end
Method 4: C# .NET Monitoring Solution
For enterprise environments using .NET, here’s a comprehensive C# implementation:
// KafkaMessageMonitor.cs
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Configuration;
namespace KafkaMonitoring
{
public class UnusedMessageReport
{
public DateTime Timestamp { get; set; }
public Dictionary<string, TopicReport> Topics { get; set; } = new();
public long TotalUnusedMessages { get; set; }
}
public class TopicReport
{
public Dictionary<int, PartitionReport> Partitions { get; set; } = new();
public long TopicTotalUnused { get; set; }
}
public class PartitionReport
{
public long EarliestOffset { get; set; }
public long LatestOffset { get; set; }
public long TotalMessages { get; set; }
public long MaxConsumedOffset { get; set; }
public long UnusedMessages { get; set; }
public List<string> ConsumingGroups { get; set; } = new();
}
public class KafkaUnusedMessagesMonitor
{
private readonly ILogger<KafkaUnusedMessagesMonitor> _logger;
private readonly AdminClientConfig _adminConfig;
private readonly ConsumerConfig _consumerConfig;
public KafkaUnusedMessagesMonitor(
ILogger<KafkaUnusedMessagesMonitor> logger,
IConfiguration configuration)
{
_logger = logger;
_adminConfig = new AdminClientConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"] ?? "localhost:9092"
};
_consumerConfig = new ConsumerConfig
{
BootstrapServers = _adminConfig.BootstrapServers,
GroupId = "unused-messages-monitor",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
}
public async Task<UnusedMessageReport> GenerateReportAsync()
{
_logger.LogInformation("Starting unused messages analysis...");
var report = new UnusedMessageReport
{
Timestamp = DateTime.UtcNow
};
try
{
using var adminClient = new AdminClientBuilder(_adminConfig).Build();
// Get all topics and their metadata
var topicMetadata = await GetTopicMetadataAsync(adminClient);
// Get consumer group information
var consumerGroupOffsets = await GetConsumerGroupOffsetsAsync(adminClient);
// Process each topic
foreach (var (topicName, partitions) in topicMetadata)
{
_logger.LogDebug($"Processing topic: {topicName}");
var topicReport = new TopicReport();
report.Topics[topicName] = topicReport;
foreach (var partition in partitions)
{
try
{
var partitionReport = await ProcessPartitionAsync(
topicName, partition, consumerGroupOffsets);
topicReport.Partitions[partition] = partitionReport;
topicReport.TopicTotalUnused += partitionReport.UnusedMessages;
}
catch (Exception ex)
{
_logger.LogWarning(ex,
$"Failed to process partition {topicName}:{partition}");
}
}
report.TotalUnusedMessages += topicReport.TopicTotalUnused;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to generate unused messages report");
throw;
}
_logger.LogInformation($"Analysis complete. Total unused messages: {report.TotalUnusedMessages:N0}");
return report;
}
private async Task<Dictionary<string, List<int>>> GetTopicMetadataAsync(IAdminClient adminClient)
{
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
var topicPartitions = new Dictionary<string, List<int>>();
foreach (var topic in metadata.Topics)
{
if (topic.Error != ErrorCode.NoError)
{
_logger.LogWarning($"Error in topic {topic.Topic}: {topic.Error}");
continue;
}
topicPartitions[topic.Topic] = topic.Partitions
.Select(p => p.PartitionId)
.ToList();
}
return topicPartitions;
}
private async Task<Dictionary<string, Dictionary<string, Dictionary<int, long>>>>
GetConsumerGroupOffsetsAsync(IAdminClient adminClient)
{
var consumerGroupOffsets = new Dictionary<string, Dictionary<string, Dictionary<int, long>>>();
try
{
var groups = await adminClient.ListGroupsAsync(TimeSpan.FromSeconds(10));
foreach (var group in groups)
{
if (group.GroupId.Contains("temp") || group.GroupId.Contains("monitor"))
continue;
try
{
var groupOffsets = await adminClient.ListConsumerGroupOffsetsAsync(
new[] { group.GroupId }, TimeSpan.FromSeconds(10));
if (groupOffsets.ContainsKey(group.GroupId))
{
var offsets = groupOffsets[group.GroupId];
consumerGroupOffsets[group.GroupId] = new Dictionary<string, Dictionary<int, long>>();
foreach (var offset in offsets)
{
var topic = offset.Topic;
var partition = offset.Partition.Value;
var offsetValue = offset.Offset.Value;
if (!consumerGroupOffsets[group.GroupId].ContainsKey(topic))
{
consumerGroupOffsets[group.GroupId][topic] = new Dictionary<int, long>();
}
consumerGroupOffsets[group.GroupId][topic][partition] = offsetValue;
}
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, $"Failed to get offsets for group {group.GroupId}");
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to get consumer group offsets");
}
return consumerGroupOffsets;
}
private async Task<PartitionReport> ProcessPartitionAsync(
string topicName,
int partition,
Dictionary<string, Dictionary<string, Dictionary<int, long>>> consumerGroupOffsets)
{
using var consumer = new ConsumerBuilder<Ignore, Ignore>(_consumerConfig).Build();
var topicPartition = new TopicPartition(topicName, partition);
var topicPartitionList = new[] { topicPartition };
// Get watermark offsets
var watermarkOffsets = consumer.QueryWatermarkOffsets(topicPartition, TimeSpan.FromSeconds(5));
var earliestOffset = watermarkOffsets.Low.Value;
var latestOffset = watermarkOffsets.High.Value;
var totalMessages = latestOffset - earliestOffset;
// Find maximum consumed offset across all consumer groups
long maxConsumedOffset = earliestOffset;
var consumingGroups = new List<string>();
foreach (var (groupId, groupOffsets) in consumerGroupOffsets)
{
if (groupOffsets.ContainsKey(topicName) &&
groupOffsets[topicName].ContainsKey(partition))
{
var groupOffset = groupOffsets[topicName][partition];
maxConsumedOffset = Math.Max(maxConsumedOffset, groupOffset);
consumingGroups.Add(groupId);
}
}
var unusedMessages = Math.Max(0, latestOffset - maxConsumedOffset);
return new PartitionReport
{
EarliestOffset = earliestOffset,
LatestOffset = latestOffset,
TotalMessages = totalMessages,
MaxConsumedOffset = maxConsumedOffset,
UnusedMessages = unusedMessages,
ConsumingGroups = consumingGroups
};
}
public void PrintReport(UnusedMessageReport report)
{
Console.WriteLine(new string('=', 60));
Console.WriteLine("KAFKA UNUSED MESSAGES REPORT");
Console.WriteLine($"Generated: {report.Timestamp:yyyy-MM-dd HH:mm:ss} UTC");
Console.WriteLine(new string('=', 60));
Console.WriteLine($"\nTOTAL UNUSED MESSAGES: {report.TotalUnusedMessages:N0}");
Console.WriteLine("\nDETAILED BREAKDOWN:");
Console.WriteLine(new string('-', 60));
foreach (var (topicName, topicData) in report.Topics)
{
if (topicData.TopicTotalUnused > 0)
{
Console.WriteLine($"\nTopic: {topicName}");
Console.WriteLine($" Unused Messages: {topicData.TopicTotalUnused:N0}");
foreach (var (partition, partitionData) in topicData.Partitions)
{
if (partitionData.UnusedMessages > 0)
{
Console.WriteLine($" Partition {partition}:");
Console.WriteLine($" Latest Offset: {partitionData.LatestOffset:N0}");
Console.WriteLine($" Max Consumed: {partitionData.MaxConsumedOffset:N0}");
Console.WriteLine($" Unused: {partitionData.UnusedMessages:N0}");
Console.WriteLine($" Consumer Groups: {string.Join(", ", partitionData.ConsumingGroups)}");
}
}
}
}
}
public async Task SaveReportAsync(UnusedMessageReport report, string filePath = null)
{
filePath ??= $"kafka-unused-messages-{DateTime.Now:yyyyMMdd_HHmmss}.json";
var json = JsonSerializer.Serialize(report, new JsonSerializerOptions
{
WriteIndented = true,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
});
await File.WriteAllTextAsync(filePath, json);
_logger.LogInformation($"Report saved to: {filePath}");
}
}
// Usage example in Program.cs
class Program
{
static async Task Main(string[] args)
{
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json")
.Build();
var loggerFactory = LoggerFactory.Create(builder =>
builder.AddConsole().SetMinimumLevel(LogLevel.Information));
var logger = loggerFactory.CreateLogger<KafkaUnusedMessagesMonitor>();
var monitor = new KafkaUnusedMessagesMonitor(logger, configuration);
try
{
var report = await monitor.GenerateReportAsync();
monitor.PrintReport(report);
await monitor.SaveReportAsync(report);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to generate report");
}
}
}
}
Method 5: Real-time Monitoring with Prometheus Integration
For production environments, integrate with Prometheus for continuous monitoring:
# kafka-prometheus-exporter.py
import time
import logging
from prometheus_client import start_http_server, Gauge, Info
from kafka import KafkaAdminClient, KafkaConsumer
from kafka.structs import TopicPartition
import threading
from datetime import datetime
# Prometheus metrics
unused_messages_gauge = Gauge(
'kafka_unused_messages_total',
'Total unused messages per topic partition',
['topic', 'partition', 'cluster']
)
consumer_lag_gauge = Gauge(
'kafka_consumer_lag_sum',
'Consumer lag per topic partition per group',
['topic', 'partition', 'consumer_group', 'cluster']
)
kafka_info = Info(
'kafka_cluster_info',
'Kafka cluster information'
)
class KafkaPrometheusExporter:
def __init__(self, bootstrap_servers, cluster_name='default'):
self.bootstrap_servers = bootstrap_servers
self.cluster_name = cluster_name
self.admin_client = KafkaAdminClient(
bootstrap_servers=bootstrap_servers,
client_id=f'prometheus_exporter_{cluster_name}'
)
self.running = False
def collect_metrics(self):
"""Collect and update Prometheus metrics"""
try:
# Get topic metadata
metadata = self.admin_client.describe_topics()
for topic_name, topic_metadata in metadata.items():
for partition_metadata in topic_metadata.partitions:
partition_id = partition_metadata.partition
# Get partition offsets
try:
consumer = KafkaConsumer(
bootstrap_servers=self.bootstrap_servers,
enable_auto_commit=False
)
tp = TopicPartition(topic_name, partition_id)
consumer.assign([tp])
# Get high water mark
end_offsets = consumer.end_offsets([tp])
latest_offset = end_offsets[tp]
# Get earliest offset
beginning_offsets = consumer.beginning_offsets([tp])
earliest_offset = beginning_offsets[tp]
consumer.close()
# Get consumer group offsets
consumer_groups = self.admin_client.list_consumer_groups()
max_consumed_offset = earliest_offset
for group in consumer_groups:
try:
group_offsets = self.admin_client.list_consumer_group_offsets(group.group_id)
for topic_partition, offset_metadata in group_offsets.items():
if (topic_partition.topic == topic_name and
topic_partition.partition == partition_id):
max_consumed_offset = max(max_consumed_offset, offset_metadata.offset)
# Update consumer lag metric
lag = latest_offset - offset_metadata.offset
consumer_lag_gauge.labels(
topic=topic_name,
partition=str(partition_id),
consumer_group=group.group_id,
cluster=self.cluster_name
).set(lag)
except Exception as e:
logging.warning(f"Failed to get offsets for group {group.group_id}: {e}")
# Calculate unused messages
unused_messages = max(0, latest_offset - max_consumed_offset)
# Update unused messages metric
unused_messages_gauge.labels(
topic=topic_name,
partition=str(partition_id),
cluster=self.cluster_name
).set(unused_messages)
except Exception as e:
logging.error(f"Error processing {topic_name}:{partition_id} - {e}")
except Exception as e:
logging.error(f"Error collecting metrics: {e}")
def start_monitoring(self, interval=60):
"""Start continuous monitoring"""
self.running = True
def monitor_loop():
while self.running:
try:
self.collect_metrics()
time.sleep(interval)
except Exception as e:
logging.error(f"Monitoring error: {e}")
time.sleep(5) # Short retry interval
monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
monitor_thread.start()
logging.info(f"Started Kafka monitoring for cluster: {self.cluster_name}")
def stop_monitoring(self):
"""Stop monitoring"""
self.running = False
logging.info("Stopped Kafka monitoring")
# Main execution
if __name__ == "__main__":
import os
# Configuration
BOOTSTRAP_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092').split(',')
CLUSTER_NAME = os.getenv('KAFKA_CLUSTER_NAME', 'default')
METRICS_PORT = int(os.getenv('METRICS_PORT', 8000))
COLLECTION_INTERVAL = int(os.getenv('COLLECTION_INTERVAL', 60))
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Start Prometheus metrics server
start_http_server(METRICS_PORT)
logging.info(f"Prometheus metrics server started on port {METRICS_PORT}")
# Initialize and start monitoring
exporter = KafkaPrometheusExporter(BOOTSTRAP_SERVERS, CLUSTER_NAME)
exporter.start_monitoring(COLLECTION_INTERVAL)
try:
# Keep the main thread alive
while True:
time.sleep(1)
except KeyboardInterrupt:
exporter.stop_monitoring()
logging.info("Exporter stopped")
Docker Configuration for Monitoring Stack
Deploy the complete monitoring solution using Docker Compose:
# docker-compose.monitoring.yml
version: '3.8'
services:
kafka-exporter:
build: .
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092
- KAFKA_CLUSTER_NAME=production
- METRICS_PORT=8000
- COLLECTION_INTERVAL=30
ports:
- "8000:8000"
depends_on:
- kafka-1
- kafka-2
- kafka-3
restart: unless-stopped
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
- '--storage.tsdb.retention.time=90d'
- '--web.enable-lifecycle'
restart: unless-stopped
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/dashboards:/var/lib/grafana/dashboards
- ./grafana/provisioning:/etc/grafana/provisioning
restart: unless-stopped
alertmanager:
image: prom/alertmanager:latest
container_name: alertmanager
ports:
- "9093:9093"
volumes:
- ./alertmanager.yml:/etc/alertmanager/alertmanager.yml
restart: unless-stopped
volumes:
prometheus_data:
grafana_data:
Alerting Rules for Unused Messages
Configure Prometheus alerting rules to notify when unused messages exceed thresholds:
# kafka-alerts.yml
groups:
- name: kafka.unused.messages
rules:
- alert: KafkaHighUnusedMessages
expr: kafka_unused_messages_total > 10000
for: 5m
labels:
severity: warning
component: kafka
annotations:
summary: "High number of unused messages in Kafka topic {{ $labels.topic }}"
description: "Topic {{ $labels.topic }} partition {{ $labels.partition }} has {{ $value }} unused messages for more than 5 minutes."
- alert: KafkaCriticalUnusedMessages
expr: kafka_unused_messages_total > 100000
for: 2m
labels:
severity: critical
component: kafka
annotations:
summary: "Critical number of unused messages in Kafka"
description: "Topic {{ $labels.topic }} partition {{ $labels.partition }} has {{ $value }} unused messages. Immediate attention required."
- alert: KafkaNoConsumersForTopic
expr: kafka_unused_messages_total > 1000 and kafka_consumer_lag_sum == 0
for: 10m
labels:
severity: warning
component: kafka
annotations:
summary: "Kafka topic has no active consumers"
description: "Topic {{ $labels.topic }} has unused messages but no consumer lag, indicating no active consumers."
- alert: KafkaConsumerLagHigh
expr: kafka_consumer_lag_sum > 50000
for: 5m
labels:
severity: warning
component: kafka
annotations:
summary: "High consumer lag detected"
description: "Consumer group {{ $labels.consumer_group }} for topic {{ $labels.topic }} has lag of {{ $value }} messages."
Grafana Dashboard Configuration
Create comprehensive Grafana dashboards for visualizing unused message trends:
{
"dashboard": {
"title": "Kafka Unused Messages Dashboard",
"panels": [
{
"title": "Total Unused Messages",
"type": "stat",
"targets": [
{
"expr": "sum(kafka_unused_messages_total)",
"legendFormat": "Total Unused Messages"
}
]
},
{
"title": "Unused Messages by Topic",
"type": "bargauge",
"targets": [
{
"expr": "sum(kafka_unused_messages_total) by (topic)",
"legendFormat": "{{ topic }}"
}
]
},
{
"title": "Consumer Lag Trends",
"type": "graph",
"targets": [
{
"expr": "kafka_consumer_lag_sum",
"legendFormat": "{{ consumer_group }} - {{ topic }}"
}
]
},
{
"title": "Topics with No Consumers",
"type": "table",
"targets": [
{
"expr": "kafka_unused_messages_total > 0 unless kafka_consumer_lag_sum",
"legendFormat": "{{ topic }}:{{ partition }}"
}
]
}
]
}
}
This comprehensive monitoring solution provides real-time visibility into Kafka message usage patterns, enabling proactive identification and resolution of unused message issues. The combination of multiple programming languages and monitoring tools ensures flexibility for different enterprise environments.