Stream processing transforms raw event data into actionable insights in real-time. Kafka Streams and ksqlDB provide complementary approaches to building stream processing applications: Kafka Streams offers a powerful Java library for programmatic control, while ksqlDB brings SQL to stream processing for rapid development and accessibility.
This comprehensive guide explores both technologies, from fundamental concepts through production patterns. We will examine stateless and stateful operations, windowing strategies, joins, aggregations, and deployment patterns that enable building real-time data pipelines at scale.
Understanding Stream Processing Fundamentals
Stream processing operates on unbounded data sets that arrive continuously over time. Unlike batch processing that operates on finite datasets, stream processing must handle infinite streams while maintaining low latency and providing timely results.
Two core abstractions define stream processing in Kafka. Streams represent unbounded sequences of immutable records. Each record flows through the system once, arriving in order within a partition. Tables represent mutable state derived from streams. Tables maintain the latest value for each key, updated as new records arrive.
This stream-table duality enables powerful processing patterns. A table can be viewed as a materialized view of a stream, maintaining the latest state. A stream can be viewed as a changelog of a table, capturing every update. Understanding this relationship is fundamental to effective stream processing design.
flowchart LR
subgraph Input["Input Stream"]
E1[Event: user=A, value=10]
E2[Event: user=B, value=20]
E3[Event: user=A, value=30]
E4[Event: user=B, value=15]
end
subgraph Processing["Stream Processing"]
AGG[Aggregation by User]
end
subgraph Output["Output Table"]
T1[user=A: 40]
T2[user=B: 35]
end
E1 --> AGG
E2 --> AGG
E3 --> AGG
E4 --> AGG
AGG --> T1
AGG --> T2
style Input fill:#E1F5FE
style Processing fill:#C8E6C9
style Output fill:#FFF9C4Kafka Streams: Java Library for Stream Processing
Kafka Streams is a client library for building stream processing applications in Java or Scala. Applications using Kafka Streams are standard Java applications that can run anywhere Java runs, with no external dependencies beyond Kafka itself.
Key characteristics make Kafka Streams compelling for production use. It is lightweight with no separate cluster to manage, just a library embedded in your application. It is elastic, automatically distributing work across application instances. It is fault-tolerant with exactly-once processing semantics. It provides stateful processing using local state stores backed by Kafka topics. It integrates naturally with Kafka’s security, monitoring, and operational tooling.
Basic Kafka Streams Application
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.common.serialization.Serdes;
import java.util.Properties;
public class WordCountApplication {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Enable exactly-once processing
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
StreamsBuilder builder = new StreamsBuilder();
// Read from input topic
KStream<String, String> textLines = builder.stream("text-input");
// Process: split into words, group by word, count
textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.as("word-counts-store"))
.toStream()
.to("word-counts-output", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// Graceful shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
}
}This application reads text from an input topic, splits each line into words, groups by word, counts occurrences, and writes results to an output topic. The count is maintained in a local state store that is automatically backed up to Kafka for fault tolerance.
Stateless Transformations
Stateless operations process each record independently without maintaining state between records. Common stateless operations include filtering records based on predicates, mapping records to new values or keys, flatMapping to produce multiple output records from a single input, branching to route records to different streams based on predicates, and merging multiple streams into one.
// Filtering
KStream<String, Transaction> largeTransactions = transactions
.filter((key, transaction) -> transaction.getAmount() > 1000);
// Mapping values
KStream<String, String> upperCaseValues = textStream
.mapValues(value -> value.toUpperCase());
// Mapping keys
KStream<String, Transaction> reKeyedStream = transactions
.selectKey((key, transaction) -> transaction.getUserId());
// FlatMap
KStream<String, String> words = sentences
.flatMapValues(sentence -> Arrays.asList(sentence.split(" ")));
// Branch
KStream<String, Order>[] branches = orders.branch(
(key, order) -> order.getStatus() == OrderStatus.PENDING,
(key, order) -> order.getStatus() == OrderStatus.COMPLETED,
(key, order) -> true // catch-all
);Stateful Operations: Aggregations
Stateful operations maintain state across multiple records. Aggregations are the most common stateful operation, combining multiple records into a summary result.
// Count aggregation
KTable<String, Long> wordCounts = words
.groupBy((key, word) -> word)
.count(Materialized.as("counts-store"));
// Sum aggregation
KTable<String, Double> totalByUser = transactions
.groupByKey()
.aggregate(
() -> 0.0, // Initializer
(key, transaction, total) -> total + transaction.getAmount(), // Adder
Materialized.with(Serdes.String(), Serdes.Double())
);
// Custom aggregation with reduce
KTable<String, Transaction> latestByUser = transactions
.groupByKey()
.reduce(
(aggValue, newValue) ->
newValue.getTimestamp() > aggValue.getTimestamp() ? newValue : aggValue
);Windowed Aggregations
Windowing groups records into finite time buckets for aggregation. Kafka Streams supports several windowing strategies.
Tumbling windows are fixed-size, non-overlapping windows. Each record belongs to exactly one window. Session windows group records by bursts of activity separated by gaps of inactivity. Sliding windows are fixed-size, overlapping windows. Hopping windows are fixed-size windows that advance by a configurable hop interval.
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import java.time.Duration;
// Tumbling window: 5-minute non-overlapping windows
KTable<Windowed<String>, Long> windowedCounts = events
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count();
// Session window: Group user activity with 30-minute inactivity gap
KTable<Windowed<String>, Long> sessionCounts = userEvents
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
.count();
// Sliding window: 10-minute windows advancing every 1 minute
KTable<Windowed<String>, Long> slidingCounts = events
.groupByKey()
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10)))
.count();Joins: Combining Streams and Tables
Joins combine data from multiple streams or tables. Kafka Streams supports stream-stream joins, stream-table joins, and table-table joins.
// Stream-Table join: Enrich order events with user information
KStream<String, Order> orders = builder.stream("orders");
KTable<String, User> users = builder.table("users");
KStream<String, EnrichedOrder> enrichedOrders = orders
.selectKey((key, order) -> order.getUserId())
.join(users,
(order, user) -> new EnrichedOrder(order, user)
);
// Stream-Stream join: Match click and impression events within 1-hour window
KStream<String, Click> clicks = builder.stream("clicks");
KStream<String, Impression> impressions = builder.stream("impressions");
KStream<String, ClickWithImpression> joined = clicks
.join(impressions,
(click, impression) -> new ClickWithImpression(click, impression),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
StreamJoined.with(Serdes.String(), clickSerde, impressionSerde)
);
// Table-Table join: Combine user profiles with preferences
KTable<String, UserProfile> profiles = builder.table("user-profiles");
KTable<String, UserPreferences> preferences = builder.table("user-preferences");
KTable<String, CompleteUserData> completeData = profiles
.join(preferences,
(profile, prefs) -> new CompleteUserData(profile, prefs)
);ksqlDB: SQL for Stream Processing
ksqlDB is a database purpose-built for stream processing applications. It provides a SQL interface for defining stream processing logic, making stream processing accessible to SQL developers and enabling rapid development.
ksqlDB runs as a cluster of servers that coordinate to execute SQL statements against Kafka topics. Queries can be persistent (continuously updating materialized views) or transient (one-time pull queries). ksqlDB integrates with Kafka Connect for data integration, Schema Registry for schema management, and supports user-defined functions for custom logic.
Creating Streams and Tables
-- Create a stream from a Kafka topic
CREATE STREAM user_events (
user_id VARCHAR KEY,
event_type VARCHAR,
page VARCHAR,
timestamp BIGINT
) WITH (
KAFKA_TOPIC = 'user-events',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'timestamp'
);
-- Create a table from a Kafka topic
CREATE TABLE users (
user_id VARCHAR PRIMARY KEY,
name VARCHAR,
email VARCHAR,
created_at BIGINT
) WITH (
KAFKA_TOPIC = 'users',
VALUE_FORMAT = 'JSON'
);
-- Create a materialized view (table from aggregation)
CREATE TABLE user_event_counts AS
SELECT user_id, COUNT(*) AS event_count
FROM user_events
GROUP BY user_id
EMIT CHANGES;Filtering and Transformation
-- Filter and transform stream
CREATE STREAM premium_user_events AS
SELECT
user_id,
event_type,
UCASE(page) AS page_upper,
TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd HH:mm:ss') AS formatted_time
FROM user_events
WHERE event_type IN ('purchase', 'subscription')
EMIT CHANGES;
-- Flatten nested structures
CREATE STREAM flattened_orders AS
SELECT
order_id,
customer->id AS customer_id,
customer->name AS customer_name,
items[0]->product_id AS first_product_id
FROM orders
EMIT CHANGES;Aggregations and Windowing
-- Simple aggregation
CREATE TABLE page_view_counts AS
SELECT page, COUNT(*) AS views
FROM user_events
WHERE event_type = 'page_view'
GROUP BY page
EMIT CHANGES;
-- Tumbling window aggregation (5-minute windows)
CREATE TABLE user_activity_5min AS
SELECT
user_id,
WINDOWSTART AS window_start,
WINDOWEND AS window_end,
COUNT(*) AS event_count
FROM user_events
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY user_id
EMIT CHANGES;
-- Session window (30-minute inactivity gap)
CREATE TABLE user_sessions AS
SELECT
user_id,
WINDOWSTART AS session_start,
WINDOWEND AS session_end,
COUNT(*) AS events_in_session,
COLLECT_LIST(page) AS pages_visited
FROM user_events
WINDOW SESSION (30 MINUTES)
GROUP BY user_id
EMIT CHANGES;
-- Complex aggregation with HAVING clause
CREATE TABLE high_value_customers AS
SELECT
customer_id,
SUM(amount) AS total_spent,
COUNT(*) AS order_count,
AVG(amount) AS avg_order_value
FROM orders
WINDOW TUMBLING (SIZE 1 DAY)
GROUP BY customer_id
HAVING SUM(amount) > 1000
EMIT CHANGES;Joins in ksqlDB
-- Stream-Table join: Enrich events with user data
CREATE STREAM enriched_events AS
SELECT
e.user_id,
e.event_type,
e.page,
u.name AS user_name,
u.email AS user_email
FROM user_events e
LEFT JOIN users u ON e.user_id = u.user_id
EMIT CHANGES;
-- Stream-Stream join within 1-hour window
CREATE STREAM order_shipment_matched AS
SELECT
o.order_id,
o.customer_id,
o.total_amount,
s.tracking_number,
s.shipped_at
FROM orders o
INNER JOIN shipments s
WITHIN 1 HOURS
ON o.order_id = s.order_id
EMIT CHANGES;
-- Table-Table join
CREATE TABLE customer_complete AS
SELECT
p.customer_id,
p.name,
p.email,
pr.preferences,
pr.notification_settings
FROM customer_profiles p
LEFT JOIN customer_preferences pr
ON p.customer_id = pr.customer_id
EMIT CHANGES;Push and Pull Queries
ksqlDB supports two query types. Push queries continuously stream results as data changes, providing real-time updates. Pull queries return point-in-time snapshots of materialized views, similar to traditional database queries.
-- Push query: Stream continuous results
SELECT * FROM user_events
WHERE event_type = 'purchase'
EMIT CHANGES;
-- Pull query: Get current state from materialized view
SELECT * FROM user_event_counts
WHERE user_id = 'user-123';
-- Pull query with filtering
SELECT user_id, event_count
FROM user_event_counts
WHERE event_count > 100;Production Deployment Patterns
Kafka Streams Deployment
Kafka Streams applications are standard Java applications. Deploy multiple instances of the same application (same application.id) to form a stream processing cluster. Kafka Streams automatically distributes processing across instances based on input topic partitions.
Each application instance maintains local state stores for stateful operations. These state stores are backed up to changelog topics in Kafka for fault tolerance. When an instance fails, another instance automatically takes over its partitions and rebuilds state from the changelog.
// Production configuration
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
// State store configuration
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams/state");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
// Performance tuning
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024);
// Exactly-once semantics
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// Monitoring
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");ksqlDB Deployment
ksqlDB runs as a cluster of servers. Deploy at least 3 servers for production to ensure high availability. Servers coordinate using Kafka topics for command distribution and query sharing.
ksqlDB supports two deployment modes. Interactive mode allows creating and managing queries via REST API or CLI. Headless mode runs predefined queries from SQL files, enabling GitOps workflows and immutable deployments.
# ksqldb-server.properties
# Bootstrap servers
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092
# Service ID (unique per ksqlDB cluster)
ksql.service.id=production_ksqldb_
# Internal topics replication
ksql.internal.topic.replicas=3
ksql.streams.replication.factor=3
# State directory
ksql.streams.state.dir=/var/ksqldb/state
# Query configuration
ksql.queries.file=/etc/ksqldb/queries.sql # For headless mode
ksql.streams.num.stream.threads=4
# Security
ksql.security.extension.class=io.confluent.ksql.security.KsqlDefaultSecurityExtension
# Monitoring
ksql.streams.producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
ksql.streams.consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptorChoosing Between Kafka Streams and ksqlDB
Choose Kafka Streams when you need complex custom logic not easily expressed in SQL, fine-grained control over state management and performance optimization, deep integration with existing Java or Scala applications, or embedded streaming within microservices.
Choose ksqlDB when you want rapid development with familiar SQL syntax, the ability to empower data analysts to build streaming pipelines, simpler operational model with centralized query management, or quick prototyping and iteration on streaming logic.
Many organizations use both. ksqlDB handles initial data preparation and simpler transformations, while Kafka Streams applications consume processed streams for complex stateful operations, machine learning inference, or custom business logic.
Monitoring and Troubleshooting
Both Kafka Streams and ksqlDB expose metrics for monitoring. Key metrics include processing rate (records per second), lag (how far behind the input the application is), state store size, commit latency, and error rates.
For Kafka Streams, monitor stream thread state, rebalancing frequency, and state restoration progress. For ksqlDB, monitor query execution status, server health, and resource utilization.
Common troubleshooting scenarios include high lag indicating insufficient processing capacity, frequent rebalancing suggesting configuration issues or unstable cluster, and state store growth requiring partition compaction or window retention tuning.
What Comes Next
Understanding stream processing with Kafka Streams and ksqlDB enables building real-time data pipelines that transform raw events into actionable insights. Part 6 of this series explores security and monitoring for Kafka deployments, covering authentication, authorization, encryption, observability patterns, and operational best practices for production environments.
References
- O’Reilly – Mastering Kafka Streams and ksqlDB by Mitch Seymour
- O’Reilly Library – Mastering Kafka Streams and ksqlDB
- Cloudurable – The Kafka Ecosystem 2025 Edition
- Confluent Blog – Kafka Streams vs ksqlDB for Stream Processing
- GitHub – ksqlDB: The Database Purpose-Built for Stream Processing
- ksqlDB Documentation – Relationship to Kafka Streams
- Confluent Documentation – ksqlDB and Kafka Streams
- ActiveWizards – Building Real-time Applications with Kafka Streams and ksqlDB
