Building data pipelines between Kafka and external systems traditionally requires writing custom producer and consumer applications. Kafka Connect eliminates this complexity by providing a scalable, fault-tolerant framework for streaming data between Kafka and databases, cloud storage, message queues, and other systems without writing code.
This comprehensive guide explores Kafka Connect architecture, from fundamental concepts through production deployment patterns. We will examine source and sink connectors, Single Message Transforms, distributed mode deployment, and operational best practices that enable reliable data integration at enterprise scale.
Understanding Kafka Connect Architecture
Kafka Connect is a framework for scalable and reliable streaming data integration between Kafka and other systems. It provides a standardized way to move data in and out of Kafka using pre-built connectors, eliminating the need to write custom integration code for common scenarios.
The architecture consists of several key components. Workers are JVM processes that execute connectors and tasks. Workers run in either standalone mode (single process for development) or distributed mode (cluster for production). Connectors are plugins that define how to connect to external systems. Source connectors pull data from external systems into Kafka topics. Sink connectors push data from Kafka topics to external systems. Tasks are the actual units of work that move data. Each connector spawns one or more tasks to parallelize data transfer. Converters transform data between Kafka Connect’s internal format and the serialized form used in Kafka. Transforms apply lightweight modifications to records as they flow through connectors.
flowchart TB
subgraph External["External Systems"]
DB[(Database)]
S3[Cloud Storage]
API[REST API]
end
subgraph Connect["Kafka Connect Cluster"]
W1[Worker 1]
W2[Worker 2]
W3[Worker 3]
subgraph Connectors["Connectors & Tasks"]
SC1[Source Connector
Task 1, Task 2]
SK1[Sink Connector
Task 1, Task 2]
end
end
subgraph Kafka["Kafka Cluster"]
T1[Topic 1]
T2[Topic 2]
end
DB -->|Source| SC1
API -->|Source| SC1
SC1 -->|Produce| T1
T1 -->|Consume| SK1
T2 -->|Consume| SK1
SK1 -->|Sink| S3
W1 -.Executes.-> SC1
W2 -.Executes.-> SK1
style External fill:#FFE0B2
style Connect fill:#C8E6C9
style Kafka fill:#BBDEFBDeployment Modes: Standalone vs Distributed
Standalone Mode
Standalone mode runs all connectors, tasks, and configuration in a single process. This simplifies development and testing but lacks fault tolerance and scalability. Configuration is provided via properties files on startup. Offset and connector state are stored on the local filesystem. If the process crashes, all connectors stop until manually restarted.
Standalone mode works well for development environments, proof-of-concept deployments, and simple single-connector scenarios. However, it should not be used for production workloads requiring high availability or scale.
Distributed Mode
Distributed mode is the recommended deployment for production environments. Multiple worker processes form a cluster that coordinates to distribute connector tasks across available workers. Configuration, offsets, and status are stored in Kafka topics, making the cluster stateless and resilient to worker failures.
Distributed mode provides several critical capabilities. Scalability allows adding workers to the cluster to increase throughput capacity. Fault tolerance means that if a worker fails, its tasks are automatically redistributed to remaining workers. Load balancing distributes tasks across workers to optimize resource usage. Dynamic reconfiguration enables adding, updating, or removing connectors via REST API without restarting workers.
Distributed mode uses three internal Kafka topics. The config topic stores connector and task configurations. The offset topic stores source connector offsets tracking progress in external systems. The status topic stores connector and task status information. These topics are configured as compacted logs to retain the latest state indefinitely.
Distributed Mode Worker Configuration
# connect-distributed.properties
# Kafka cluster connection
bootstrap.servers=localhost:9092
# Unique name for this Connect cluster
group.id=connect-cluster
# Internal topics for storing connector state
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
# Replication factors for internal topics (3 for production)
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
# Converter settings (how data is serialized to/from Kafka)
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# Plugin path (where connector JARs are located)
plugin.path=/usr/local/share/kafka/plugins
# REST API settings
rest.port=8083
rest.host.name=connect-worker-1
# Worker configuration
offset.flush.interval.ms=10000
offset.flush.timeout.ms=5000Source Connectors: Ingesting Data into Kafka
Source connectors pull data from external systems and write it to Kafka topics. Common source connector types include database connectors using JDBC or change data capture (Debezium), file system connectors reading from local or network filesystems, message queue connectors from systems like RabbitMQ or ActiveMQ, cloud service connectors from AWS S3, Azure Blob Storage, or Google Cloud Storage, and API connectors for REST endpoints and SaaS platforms.
Source connectors manage several critical concerns. Offset tracking maintains position in the external system to enable resumption after failures. Polling determines when to check for new data in the external system. Partitioning decides how to distribute data across Kafka topic partitions. Error handling manages transient failures, connection issues, and invalid data.
JDBC Source Connector Example
The JDBC source connector streams data from relational databases into Kafka. It supports both bulk loading existing data and incremental loading of new or updated rows.
{
"name": "postgres-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "3",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "kafka_user",
"connection.password": "secure_password",
"mode": "incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "updated_at",
"table.whitelist": "orders,customers,products",
"topic.prefix": "postgres-",
"poll.interval.ms": 5000,
"batch.max.rows": 1000,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": true
}
}This configuration creates a source connector that reads from three PostgreSQL tables, using the id column to track which rows have been processed. The connector polls every 5 seconds for new rows and creates three parallel tasks to distribute the workload.
Debezium Change Data Capture
Debezium provides change data capture connectors that stream every insert, update, and delete from databases in real-time by reading transaction logs. This enables building event-driven architectures based on database changes without modifying application code.
{
"name": "mysql-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql-server",
"database.port": "3306",
"database.user": "debezium",
"database.password": "secure_password",
"database.server.id": "184054",
"database.server.name": "mysql-prod",
"database.include.list": "ecommerce",
"table.include.list": "ecommerce.orders,ecommerce.order_items",
"snapshot.mode": "initial",
"include.schema.changes": "true",
"time.precision.mode": "connect",
"decimal.handling.mode": "precise",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}Sink Connectors: Exporting Data from Kafka
Sink connectors read data from Kafka topics and write it to external systems. Common sink connector types include database connectors using JDBC or native drivers, cloud storage connectors to S3, Azure Blob, or GCS, search index connectors to Elasticsearch or Solr, data warehouse connectors to Snowflake, BigQuery, or Redshift, and monitoring system connectors to Prometheus, Datadog, or Splunk.
Sink connectors handle critical operational concerns. Batching groups multiple records before writing to optimize throughput. Retries handle transient failures when writing to external systems. Idempotency ensures duplicate records do not create inconsistent state. Schema evolution manages changes to record structure over time.
S3 Sink Connector Example
{
"name": "s3-sink-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
"topics": "user-events,order-events",
"s3.region": "us-east-1",
"s3.bucket.name": "kafka-data-lake",
"s3.part.size": 5242880,
"flush.size": 1000,
"rotate.interval.ms": 3600000,
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": 3600000,
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "en-US",
"timezone": "UTC",
"schema.compatibility": "BACKWARD",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}This connector writes data from Kafka topics to S3 in Parquet format, partitioned by hour. It flushes files every 1000 records or every hour, whichever comes first, creating a time-based data lake structure.
Single Message Transforms: Lightweight Data Transformation
Single Message Transforms (SMTs) apply lightweight modifications to individual records as they flow through connectors. SMTs enable common transformations without requiring external stream processing frameworks.
Common built-in SMTs include adding or removing fields, renaming fields, masking sensitive data, extracting portions of complex fields, filtering records based on predicates, changing message timestamps, and routing records to different topics.
SMTs are configured as part of connector configuration and execute within the Connect worker process. They should be used for simple transformations only. For complex transformations, joins, or aggregations, use Kafka Streams or Apache Flink instead.
SMT Configuration Example
{
"name": "users-sink-with-transforms",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "2",
"topics": "user-events",
"connection.url": "jdbc:postgresql://localhost:5432/analytics",
"auto.create": "true",
"auto.evolve": "true",
"transforms": "addTimestamp,maskEmail,renameField,routeByRegion",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "processed_at",
"transforms.maskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskEmail.fields": "email",
"transforms.maskEmail.replacement": "***MASKED***",
"transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameField.renames": "userId:user_id,userName:user_name",
"transforms.routeByRegion.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeByRegion.regex": ".*",
"transforms.routeByRegion.replacement": "users_${topic.region}"
}
}Managing Connectors via REST API
In distributed mode, connectors are managed through a REST API exposed by Connect workers. This enables dynamic configuration without restarting workers.
Common REST API Operations
# List all connectors
curl http://localhost:8083/connectors
# Get connector status
curl http://localhost:8083/connectors/postgres-source/status
# Create a new connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @connector-config.json
# Update connector configuration
curl -X PUT http://localhost:8083/connectors/postgres-source/config \
-H "Content-Type: application/json" \
-d @updated-config.json
# Pause a connector
curl -X PUT http://localhost:8083/connectors/postgres-source/pause
# Resume a connector
curl -X PUT http://localhost:8083/connectors/postgres-source/resume
# Restart a failed task
curl -X POST http://localhost:8083/connectors/postgres-source/tasks/0/restart
# Delete a connector
curl -X DELETE http://localhost:8083/connectors/postgres-sourceProduction Deployment Best Practices
Cluster Sizing and Scaling
Run a minimum of 3 workers for production deployments to ensure fault tolerance. Workers should have adequate CPU, memory, and network capacity for expected throughput. As a general guideline, allocate 4-8 CPU cores and 8-16 GB RAM per worker for typical workloads.
Scale horizontally by adding workers to the cluster rather than vertically increasing worker resources. Tasks automatically rebalance across available workers. Monitor task distribution to ensure even load balancing.
Kubernetes Deployment
Deploy Connect workers as a StatefulSet to provide stable network identities. Use a Kafka Connect Operator like Strimzi to simplify deployment and management. Define resource requests and limits to prevent resource contention. Configure liveness and readiness probes for automatic restart of unhealthy workers.
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka-connect
spec:
serviceName: kafka-connect
replicas: 3
selector:
matchLabels:
app: kafka-connect
template:
metadata:
labels:
app: kafka-connect
spec:
containers:
- name: kafka-connect
image: confluentinc/cp-kafka-connect:7.6.0
ports:
- containerPort: 8083
name: rest-api
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
livenessProbe:
httpGet:
path: /
port: 8083
initialDelaySeconds: 60
periodSeconds: 30
readinessProbe:
httpGet:
path: /connectors
port: 8083
initialDelaySeconds: 30
periodSeconds: 10Security Configuration
Enable TLS encryption for communication between Connect workers and Kafka brokers. Configure SASL authentication to control which clients can create and manage connectors. Use connector-level ACLs to restrict which topics connectors can produce to or consume from. Secure the REST API with authentication and authorization.
For sensitive data, use SMTs to mask personally identifiable information before writing to Kafka. Consider encrypting data at rest in both Kafka and external systems. Implement audit logging to track connector operations and configuration changes.
Error Handling and Dead Letter Queues
Configure error tolerance to determine how connectors handle failures. Use dead letter queues to route failed records to separate topics for later analysis and reprocessing. Configure retry policies with exponential backoff for transient failures.
{
"name": "resilient-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "dlq-jdbc-sink",
"errors.deadletterqueue.topic.replication.factor": "3",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.retry.timeout": "300000",
"errors.retry.delay.max.ms": "60000"
}
}Monitoring and Observability
Connect exposes JMX metrics for monitoring connector health and performance. Key metrics to monitor include connector state (running, paused, failed), task state and distribution, offset lag for source connectors, throughput (records per second), error rates, and failed tasks.
Integrate Connect metrics with monitoring systems like Prometheus, Grafana, or Datadog. Set up alerts for connector failures, high error rates, and tasks stuck in failed state. Monitor resource utilization on worker nodes to identify scaling needs.
Use the REST API to regularly check connector and task status. Implement health checks that verify connectors are running and processing data. Log all connector operations and configuration changes for audit and troubleshooting purposes.
Common Integration Patterns
Database to Kafka to Data Warehouse
Use Debezium to capture database changes in real-time. Stream changes to Kafka for processing, filtering, and enrichment. Use sink connectors to load processed data into analytical databases or data warehouses. This pattern enables real-time analytics without impacting production databases.
Multi-Cloud Data Replication
Deploy Connect clusters in multiple cloud regions. Use source connectors to ingest data from regional systems. Replicate Kafka topics across regions for disaster recovery. Use sink connectors to write data to regional storage or databases. This pattern enables global data distribution with local processing.
Event-Driven Microservices Integration
Microservices publish events to Kafka topics. Connect sinks stream events to search indexes, caching systems, or external APIs. Source connectors ingest events from external systems into Kafka. This pattern decouples services while maintaining data consistency across the architecture.
What Comes Next
Understanding Kafka Connect enables building production-ready data integration pipelines without custom code. Part 5 of this series explores Kafka Streams and ksqlDB, covering how to build real-time stream processing applications that transform, aggregate, and join data flowing through Kafka topics.
References
- Confluent Blog – Kafka Connect in Production: Scaling & Security Guide
- Confluent Blog – Best Practices for Kafka Connect Data Transformation & Schema Management
- AutoMQ – What is Kafka Connect? Concepts & Best Practices
- Confluent Documentation – Running Kafka in Production with Confluent Platform
- Instaclustr – Apache Kafka: Architecture, Deployment and Ecosystem [2025 Guide]
- Confluent Documentation – Best Practices for Kafka Production Deployments
- Confluent Documentation – How to Use Kafka Connect
- Confluent Developer – Install and Deploy Kafka Connect on Cloud or Local
- Axual – Kafka Connect Clusters: Structure, Scaling, and Task Management
