Configuring a Kafka Cluster (with 3 Zookeepers and 3 Brokers) and Bucket Notifications with Kafka Cluster
Introduction
This blog post provides the steps to configure a Kafka cluster with 3 nodes. Each node has a Zookeeper server and a Kafka broker running on them. It also explains how to integrate the Kafka cluster with Ceph Object Gateway for bucket notifications.
Prerequisites
- RHEL9: Kafka setup on nodes with RHEL9 OS
- Java Development Kit (JDK) 23: Ensure JDK 23 is installed on your nodes
- Kafka Tarball: The Kafka binary distribution (e.g.,
kafka_2.13-2.8.0.tgz
)
- Required Ports: Ensure ports
2181
, 2888
, 3888
, and 9092
are available and opened by the firewall
- IBM Storage Ceph Cluster: A running cluster with Ceph Object Gateway
Step 1: Install Java and Download Kafka on All Kafka Nodes
First, log into each node and perform the following installations:
Install Java
Use the following command to install JDK 23:
yum install -y https://download.oracle.com/java/23/latest/jdk-23_linux-x64_bin.rpm
Download and Extract Kafka
Download the Kafka tarball and extract it:
curl -o /tmp/kafka.tgz http://magna002.ceph.redhat.com/cephci-jenkins/kafka_2.13-2.8.0.tgz
tar -zxvf /tmp/kafka.tgz -C /usr/local/
mv /usr/local/kafka_2.13-2.8.0 /usr/local/kafka
Step 2: Configure Zookeeper on Each Node
For each of your three Zookeeper nodes, follow these configuration steps:
Create Zookeeper Data Directory
cd /usr/local/kafka/
mkdir -p data/zookeeper
This creates the necessary directory for Zookeeper's data.
Create myid
File
Navigate into the Zookeeper directory and create a myid
file. This file contains a unique integer ID for each Zookeeper instance.
cd /usr/local/kafka/data/zookeeper
on the first Zookeeper node:
echo "1" > myid
on the second node:
echo "2" > myid
on the third node:
echo "3" > myid
Configure zookeeper.properties
Navigate to the Kafka configuration directory:
cd /usr/local/kafka/config/
vi zookeeper.properties
Edit the zookeeper.properties
file to include the following lines (replace IP addresses with your actual Zookeeper node IPs):
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/kafka/data/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
server.1=10.0.66.220:2888:3888
server.2=10.0.64.191:2888:3888
server.3=10.0.66.18:2888:3888
dataDir
: Directory where Zookeeper stores its data.
clientPort
: Port on which clients connect to Zookeeper.
server.X
: Each Zookeeper server in the ensemble. X
is the myid
of the server, followed by its IP address, follower-to-leader communication port, and leader election port.
Step 3: Start Zookeeper Servers
Once Zookeeper is configured on all three nodes, start the Zookeeper service on each node:
/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties
The -daemon
flag runs the Zookeeper server in the background.
To verify Zookeeper is listening on the client port (2181) and communication ports (2888, 3888):
netstat -nltp
Expected output:
tcp6 0 0 :::2181 :::* LISTEN <PID>/java
tcp6 0 0 <Zookeeper IP>:3888 :::* LISTEN <PID>/java
Step 4: Configure Kafka Brokers on Each Node
For each of your three Kafka broker nodes, follow these configuration steps:
Configure server.properties
Navigate to the Kafka configuration directory:
cd /usr/local/kafka/config/
vi server.properties
Edit the server.properties
file. Important parameters to configure are:
broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=10.0.66.220:2181,10.0.64.191:2181,10.0.66.18:2181
listeners=PLAINTEXT://10.0.66.220:9092
#advertised.listeners=PLAINTEXT://your.host.name:9092
broker.id
: Assign a unique integer ID to each broker. For the first Kafka node, use 0
; for the second, use 1
; for the third, use 2
.
log.dirs
: Directory for Kafka log files.
zookeeper.connect
: Connection string listing all Zookeeper nodes and their client ports.
listeners
: (Optional but recommended) Defines the network interface Kafka listens on. Replace the IP with the respective node's IP address.
advertised.listeners
: (Optional but recommended) Used to advertise a different hostname or IP address to clients than the one Kafka is binding to.
The server.properties
file also contains other important settings for network threads, socket buffers, log retention, and more. Review and adjust these based on your requirements.
Step 5: Start Kafka Brokers
After configuring server.properties
on each Kafka broker node, start the Kafka broker service:
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
The -daemon
flag runs the Kafka server in the background.
To verify Kafka is listening on the default broker port (9092):
netstat -nltp
Expected output:
tcp6 0 0 :::9092 :::* LISTEN <PID>/java
Step 6: Create a Partitioned Topic on Kafka
Create a topic with partitions and replication-factor:
/usr/local/kafka/bin/kafka-topics.sh --create --topic topic1 --bootstrap-server kafka://10.0.66.220:9092 --partitions 2 --replication-factor 2
Expected output:
Created topic topic1.
--partitions
: Sets the number of partitions for the topic. Each partition acts as an ordered, immutable sequence of records. More partitions allow greater parallelism in producing and consuming messages.
--replication-factor
: Specifies the number of replicas for each partition. A replication factor of 1
means one copy per partition. For production environments, a higher replication factor (e.g., 2
or 3
) is recommended for fault tolerance.
The following command can be used to describe the Kafka topic and get detailed metadata about it:
/usr/local/kafka/bin/kafka-topics.sh --describe --topic topic1 --bootstrap-server kafka://10.0.66.220:9092
Expected output:
Topic: topic1 TopicId: tNJaPMf2TzGJoK2Xk5Ru6w PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Integration with Ceph Object Gateway
Step 1: Create a Topic from Object Gateway Side
aws --endpoint-url http://10.0.67.122:80 sns create-topic --name topic1 --attributes 'push-endpoint=kafka://10.0.66.220:9092,persistent=false,verify-ssl=false,kafka-brokers="[10.0.64.191:9092, 10.0.66.18:9092]"'
Note:
kafka-brokers
is a comma-separated list of host:port
values for Kafka brokers. These brokers (may contain a broker which is defined in Kafka URI) will be added to Kafka URI to support sending notifications to the Kafka cluster. Multiple brokers ensure high availability. RGW will attempt to connect to the first available broker in the list. If one broker is unreachable or down, it will failover to the next available broker.
Expected output:
{
"TopicArn": "arn:aws:sns:default::topic1"
}
Step 2: Create a Bucket, Configure Notifications, and Trigger notification Configured Event
Create a bucket:
aws --endpoint-url http://10.0.67.122:80 s3 mb s3://bkt1
Create notification configuration file notif.json
with below Contents:
{
"TopicConfigurations": [
{
"Id": "notif1",
"TopicArn": "arn:aws:sns:default::topic1",
"Events": [
"s3:ObjectCreated:*",
"s3:ObjectRemoved:*"
]
}
]
}
s3:ObjectCreated:*
: notifications sent for all object creation events, including PutObject, PostObject, CopyObject, CompleteMultipartUpload.
s3:ObjectRemoved:*
: notifications sent for all object deletion events, including DeleteObject, DeleteMarkerCreated.
Apply the notification configuration:
aws --endpoint-url http://10.0.67.122:80 s3api put-bucket-notification-configuration --bucket bkt1 --notification-configuration file://notif.json
Upload an object to trigger the event:
fallocate -l 10MB obj10MB
aws --endpoint-url http://10.0.67.122:80 s3 cp obj10MB s3://bkt1/obj1
Expected output:
upload: ./obj10MB to s3://bkt1/obj1
Step 3: Verify Event Record Received at Kafka Side
Consume messages from Kafka topic:
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka://10.0.65.246:9092 --from-beginning --topic topic1 --timeout-ms 30000 | jq .
Expected output:
{
"Records": [
{
"eventVersion": "2.2",
"eventSource": "ceph:s3",
"awsRegion": "default",
"eventTime": "2025-03-24T04:38:19.784111Z",
"eventName": "ObjectCreated:CompleteMultipartUpload",
"userIdentity": {
"principalId": "hsm"
},
"requestParameters": {
"sourceIPAddress": ""
},
"responseElements": {
"x-amz-request-id": "2be0346c-4268-4e39-8d19-38c1b1ff6883.74180.18386630626840617633",
"x-amz-id-2": "74180-default-default"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "notif1",
"bucket": {
"name": "bkt1",
"ownerIdentity": {
"principalId": "hsm"
},
"arn": "arn:aws:s3:default::bkt1",
"id": "2be0346c-4268-4e39-8d19-38c1b1ff6883.74180.7"
},
"object": {
"key": "obj1",
"size": 10000000,
"eTag": "0b223142030f0d12db211f3e18069fbd-2",
"versionId": "",
"sequencer": "BBE1E067EF23A32F",
"metadata": [],
"tags": []
}
},
"eventId": "1742791099.799220.0b223142030f0d12db211f3e18069fbd-2",
"opaqueData": ""
}
]
}
Kafka log output may be shown additionally like below:
[2025-03-24 04:39:19,314] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 1 messages
Note:
The output is formatted using jq
for better readability. Validate the details in the event records for object, bucket, eventName
, etc.
This event record confirms that the Ceph Object Gateway is successfully sending event notifications to the configured Kafka topic for object creation of obj1
under bkt1
, demonstrating real-time event streaming between Ceph and Kafka.
Conclusion
This blog post provides a comprehensive guide for configuring a Kafka cluster with three Zookeeper nodes and three Kafka brokers, along with integrating it with Ceph Object Gateway for bucket notifications.
By following the outlined steps, users can successfully set up a robust Kafka environment, ensuring fault tolerance and efficient message handling. This guide also demonstrates how to create partitioned Kafka topics and integrate them with Ceph Object Gateway bucket notifications, allowing for real-time event monitoring and verification of notifications received at the Kafka side.
References: