Kafka Streams and ksqlDB: Building Real-Time Stream Processing Applications

Kafka Streams and ksqlDB: Building Real-Time Stream Processing Applications

Stream processing transforms raw event data into actionable insights in real-time. Kafka Streams and ksqlDB provide complementary approaches to building stream processing applications: Kafka Streams offers a powerful Java library for programmatic control, while ksqlDB brings SQL to stream processing for rapid development and accessibility.

This comprehensive guide explores both technologies, from fundamental concepts through production patterns. We will examine stateless and stateful operations, windowing strategies, joins, aggregations, and deployment patterns that enable building real-time data pipelines at scale.

Understanding Stream Processing Fundamentals

Stream processing operates on unbounded data sets that arrive continuously over time. Unlike batch processing that operates on finite datasets, stream processing must handle infinite streams while maintaining low latency and providing timely results.

Two core abstractions define stream processing in Kafka. Streams represent unbounded sequences of immutable records. Each record flows through the system once, arriving in order within a partition. Tables represent mutable state derived from streams. Tables maintain the latest value for each key, updated as new records arrive.

This stream-table duality enables powerful processing patterns. A table can be viewed as a materialized view of a stream, maintaining the latest state. A stream can be viewed as a changelog of a table, capturing every update. Understanding this relationship is fundamental to effective stream processing design.

flowchart LR
    subgraph Input["Input Stream"]
        E1[Event: user=A, value=10]
        E2[Event: user=B, value=20]
        E3[Event: user=A, value=30]
        E4[Event: user=B, value=15]
    end
    
    subgraph Processing["Stream Processing"]
        AGG[Aggregation by User]
    end
    
    subgraph Output["Output Table"]
        T1[user=A: 40]
        T2[user=B: 35]
    end
    
    E1 --> AGG
    E2 --> AGG
    E3 --> AGG
    E4 --> AGG
    
    AGG --> T1
    AGG --> T2
    
    style Input fill:#E1F5FE
    style Processing fill:#C8E6C9
    style Output fill:#FFF9C4

Kafka Streams: Java Library for Stream Processing

Kafka Streams is a client library for building stream processing applications in Java or Scala. Applications using Kafka Streams are standard Java applications that can run anywhere Java runs, with no external dependencies beyond Kafka itself.

Key characteristics make Kafka Streams compelling for production use. It is lightweight with no separate cluster to manage, just a library embedded in your application. It is elastic, automatically distributing work across application instances. It is fault-tolerant with exactly-once processing semantics. It provides stateful processing using local state stores backed by Kafka topics. It integrates naturally with Kafka’s security, monitoring, and operational tooling.

Basic Kafka Streams Application

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.common.serialization.Serdes;

import java.util.Properties;

public class WordCountApplication {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        // Enable exactly-once processing
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Read from input topic
        KStream<String, String> textLines = builder.stream("text-input");
        
        // Process: split into words, group by word, count
        textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word)
            .count(Materialized.as("word-counts-store"))
            .toStream()
            .to("word-counts-output", Produced.with(Serdes.String(), Serdes.Long()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        
        // Graceful shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        
        streams.start();
    }
}

This application reads text from an input topic, splits each line into words, groups by word, counts occurrences, and writes results to an output topic. The count is maintained in a local state store that is automatically backed up to Kafka for fault tolerance.

Stateless Transformations

Stateless operations process each record independently without maintaining state between records. Common stateless operations include filtering records based on predicates, mapping records to new values or keys, flatMapping to produce multiple output records from a single input, branching to route records to different streams based on predicates, and merging multiple streams into one.

// Filtering
KStream<String, Transaction> largeTransactions = transactions
    .filter((key, transaction) -> transaction.getAmount() > 1000);

// Mapping values
KStream<String, String> upperCaseValues = textStream
    .mapValues(value -> value.toUpperCase());

// Mapping keys
KStream<String, Transaction> reKeyedStream = transactions
    .selectKey((key, transaction) -> transaction.getUserId());

// FlatMap
KStream<String, String> words = sentences
    .flatMapValues(sentence -> Arrays.asList(sentence.split(" ")));

// Branch
KStream<String, Order>[] branches = orders.branch(
    (key, order) -> order.getStatus() == OrderStatus.PENDING,
    (key, order) -> order.getStatus() == OrderStatus.COMPLETED,
    (key, order) -> true  // catch-all
);

Stateful Operations: Aggregations

Stateful operations maintain state across multiple records. Aggregations are the most common stateful operation, combining multiple records into a summary result.

// Count aggregation
KTable<String, Long> wordCounts = words
    .groupBy((key, word) -> word)
    .count(Materialized.as("counts-store"));

// Sum aggregation
KTable<String, Double> totalByUser = transactions
    .groupByKey()
    .aggregate(
        () -> 0.0,  // Initializer
        (key, transaction, total) -> total + transaction.getAmount(),  // Adder
        Materialized.with(Serdes.String(), Serdes.Double())
    );

// Custom aggregation with reduce
KTable<String, Transaction> latestByUser = transactions
    .groupByKey()
    .reduce(
        (aggValue, newValue) -> 
            newValue.getTimestamp() > aggValue.getTimestamp() ? newValue : aggValue
    );

Windowed Aggregations

Windowing groups records into finite time buckets for aggregation. Kafka Streams supports several windowing strategies.

Tumbling windows are fixed-size, non-overlapping windows. Each record belongs to exactly one window. Session windows group records by bursts of activity separated by gaps of inactivity. Sliding windows are fixed-size, overlapping windows. Hopping windows are fixed-size windows that advance by a configurable hop interval.

import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import java.time.Duration;

// Tumbling window: 5-minute non-overlapping windows
KTable<Windowed<String>, Long> windowedCounts = events
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count();

// Session window: Group user activity with 30-minute inactivity gap
KTable<Windowed<String>, Long> sessionCounts = userEvents
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
    .count();

// Sliding window: 10-minute windows advancing every 1 minute
KTable<Windowed<String>, Long> slidingCounts = events
    .groupByKey()
    .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10)))
    .count();

Joins: Combining Streams and Tables

Joins combine data from multiple streams or tables. Kafka Streams supports stream-stream joins, stream-table joins, and table-table joins.

// Stream-Table join: Enrich order events with user information
KStream<String, Order> orders = builder.stream("orders");
KTable<String, User> users = builder.table("users");

KStream<String, EnrichedOrder> enrichedOrders = orders
    .selectKey((key, order) -> order.getUserId())
    .join(users,
        (order, user) -> new EnrichedOrder(order, user)
    );

// Stream-Stream join: Match click and impression events within 1-hour window
KStream<String, Click> clicks = builder.stream("clicks");
KStream<String, Impression> impressions = builder.stream("impressions");

KStream<String, ClickWithImpression> joined = clicks
    .join(impressions,
        (click, impression) -> new ClickWithImpression(click, impression),
        JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
        StreamJoined.with(Serdes.String(), clickSerde, impressionSerde)
    );

// Table-Table join: Combine user profiles with preferences
KTable<String, UserProfile> profiles = builder.table("user-profiles");
KTable<String, UserPreferences> preferences = builder.table("user-preferences");

KTable<String, CompleteUserData> completeData = profiles
    .join(preferences,
        (profile, prefs) -> new CompleteUserData(profile, prefs)
    );

ksqlDB: SQL for Stream Processing

ksqlDB is a database purpose-built for stream processing applications. It provides a SQL interface for defining stream processing logic, making stream processing accessible to SQL developers and enabling rapid development.

ksqlDB runs as a cluster of servers that coordinate to execute SQL statements against Kafka topics. Queries can be persistent (continuously updating materialized views) or transient (one-time pull queries). ksqlDB integrates with Kafka Connect for data integration, Schema Registry for schema management, and supports user-defined functions for custom logic.

Creating Streams and Tables

-- Create a stream from a Kafka topic
CREATE STREAM user_events (
    user_id VARCHAR KEY,
    event_type VARCHAR,
    page VARCHAR,
    timestamp BIGINT
) WITH (
    KAFKA_TOPIC = 'user-events',
    VALUE_FORMAT = 'JSON',
    TIMESTAMP = 'timestamp'
);

-- Create a table from a Kafka topic
CREATE TABLE users (
    user_id VARCHAR PRIMARY KEY,
    name VARCHAR,
    email VARCHAR,
    created_at BIGINT
) WITH (
    KAFKA_TOPIC = 'users',
    VALUE_FORMAT = 'JSON'
);

-- Create a materialized view (table from aggregation)
CREATE TABLE user_event_counts AS
    SELECT user_id, COUNT(*) AS event_count
    FROM user_events
    GROUP BY user_id
    EMIT CHANGES;

Filtering and Transformation

-- Filter and transform stream
CREATE STREAM premium_user_events AS
    SELECT 
        user_id,
        event_type,
        UCASE(page) AS page_upper,
        TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd HH:mm:ss') AS formatted_time
    FROM user_events
    WHERE event_type IN ('purchase', 'subscription')
    EMIT CHANGES;

-- Flatten nested structures
CREATE STREAM flattened_orders AS
    SELECT 
        order_id,
        customer->id AS customer_id,
        customer->name AS customer_name,
        items[0]->product_id AS first_product_id
    FROM orders
    EMIT CHANGES;

Aggregations and Windowing

-- Simple aggregation
CREATE TABLE page_view_counts AS
    SELECT page, COUNT(*) AS views
    FROM user_events
    WHERE event_type = 'page_view'
    GROUP BY page
    EMIT CHANGES;

-- Tumbling window aggregation (5-minute windows)
CREATE TABLE user_activity_5min AS
    SELECT 
        user_id,
        WINDOWSTART AS window_start,
        WINDOWEND AS window_end,
        COUNT(*) AS event_count
    FROM user_events
    WINDOW TUMBLING (SIZE 5 MINUTES)
    GROUP BY user_id
    EMIT CHANGES;

-- Session window (30-minute inactivity gap)
CREATE TABLE user_sessions AS
    SELECT 
        user_id,
        WINDOWSTART AS session_start,
        WINDOWEND AS session_end,
        COUNT(*) AS events_in_session,
        COLLECT_LIST(page) AS pages_visited
    FROM user_events
    WINDOW SESSION (30 MINUTES)
    GROUP BY user_id
    EMIT CHANGES;

-- Complex aggregation with HAVING clause
CREATE TABLE high_value_customers AS
    SELECT 
        customer_id,
        SUM(amount) AS total_spent,
        COUNT(*) AS order_count,
        AVG(amount) AS avg_order_value
    FROM orders
    WINDOW TUMBLING (SIZE 1 DAY)
    GROUP BY customer_id
    HAVING SUM(amount) > 1000
    EMIT CHANGES;

Joins in ksqlDB

-- Stream-Table join: Enrich events with user data
CREATE STREAM enriched_events AS
    SELECT 
        e.user_id,
        e.event_type,
        e.page,
        u.name AS user_name,
        u.email AS user_email
    FROM user_events e
    LEFT JOIN users u ON e.user_id = u.user_id
    EMIT CHANGES;

-- Stream-Stream join within 1-hour window
CREATE STREAM order_shipment_matched AS
    SELECT 
        o.order_id,
        o.customer_id,
        o.total_amount,
        s.tracking_number,
        s.shipped_at
    FROM orders o
    INNER JOIN shipments s 
        WITHIN 1 HOURS
        ON o.order_id = s.order_id
    EMIT CHANGES;

-- Table-Table join
CREATE TABLE customer_complete AS
    SELECT 
        p.customer_id,
        p.name,
        p.email,
        pr.preferences,
        pr.notification_settings
    FROM customer_profiles p
    LEFT JOIN customer_preferences pr 
        ON p.customer_id = pr.customer_id
    EMIT CHANGES;

Push and Pull Queries

ksqlDB supports two query types. Push queries continuously stream results as data changes, providing real-time updates. Pull queries return point-in-time snapshots of materialized views, similar to traditional database queries.

-- Push query: Stream continuous results
SELECT * FROM user_events
WHERE event_type = 'purchase'
EMIT CHANGES;

-- Pull query: Get current state from materialized view
SELECT * FROM user_event_counts
WHERE user_id = 'user-123';

-- Pull query with filtering
SELECT user_id, event_count 
FROM user_event_counts
WHERE event_count > 100;

Production Deployment Patterns

Kafka Streams Deployment

Kafka Streams applications are standard Java applications. Deploy multiple instances of the same application (same application.id) to form a stream processing cluster. Kafka Streams automatically distributes processing across instances based on input topic partitions.

Each application instance maintains local state stores for stateful operations. These state stores are backed up to changelog topics in Kafka for fault tolerance. When an instance fails, another instance automatically takes over its partitions and rebuilds state from the changelog.

// Production configuration
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");

// State store configuration
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams/state");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);

// Performance tuning
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024);

// Exactly-once semantics
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

// Monitoring
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");

ksqlDB Deployment

ksqlDB runs as a cluster of servers. Deploy at least 3 servers for production to ensure high availability. Servers coordinate using Kafka topics for command distribution and query sharing.

ksqlDB supports two deployment modes. Interactive mode allows creating and managing queries via REST API or CLI. Headless mode runs predefined queries from SQL files, enabling GitOps workflows and immutable deployments.

# ksqldb-server.properties

# Bootstrap servers
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092

# Service ID (unique per ksqlDB cluster)
ksql.service.id=production_ksqldb_

# Internal topics replication
ksql.internal.topic.replicas=3
ksql.streams.replication.factor=3

# State directory
ksql.streams.state.dir=/var/ksqldb/state

# Query configuration
ksql.queries.file=/etc/ksqldb/queries.sql  # For headless mode
ksql.streams.num.stream.threads=4

# Security
ksql.security.extension.class=io.confluent.ksql.security.KsqlDefaultSecurityExtension

# Monitoring
ksql.streams.producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
ksql.streams.consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

Choosing Between Kafka Streams and ksqlDB

Choose Kafka Streams when you need complex custom logic not easily expressed in SQL, fine-grained control over state management and performance optimization, deep integration with existing Java or Scala applications, or embedded streaming within microservices.

Choose ksqlDB when you want rapid development with familiar SQL syntax, the ability to empower data analysts to build streaming pipelines, simpler operational model with centralized query management, or quick prototyping and iteration on streaming logic.

Many organizations use both. ksqlDB handles initial data preparation and simpler transformations, while Kafka Streams applications consume processed streams for complex stateful operations, machine learning inference, or custom business logic.

Monitoring and Troubleshooting

Both Kafka Streams and ksqlDB expose metrics for monitoring. Key metrics include processing rate (records per second), lag (how far behind the input the application is), state store size, commit latency, and error rates.

For Kafka Streams, monitor stream thread state, rebalancing frequency, and state restoration progress. For ksqlDB, monitor query execution status, server health, and resource utilization.

Common troubleshooting scenarios include high lag indicating insufficient processing capacity, frequent rebalancing suggesting configuration issues or unstable cluster, and state store growth requiring partition compaction or window retention tuning.

What Comes Next

Understanding stream processing with Kafka Streams and ksqlDB enables building real-time data pipelines that transform raw events into actionable insights. Part 6 of this series explores security and monitoring for Kafka deployments, covering authentication, authorization, encryption, observability patterns, and operational best practices for production environments.

References

Written by:

535 Posts

View All Posts
Follow Me :