File and Object Storage

File and Object Storage

Software-defined storage for building a global AI, HPC and analytics data platform 

 View Only

IBM Storage Ceph Object Gateway - Bucket notifications with Kafka cluster

By Hemanth Maheswarla posted 4 days ago

  

Configuring a Kafka Cluster (with 3 Zookeepers and 3 Brokers) and Bucket Notifications with Kafka Cluster


Introduction

This blog post provides the steps to configure a Kafka cluster with 3 nodes. Each node has a Zookeeper server and a Kafka broker running on them. It also explains how to integrate the Kafka cluster with Ceph Object Gateway for bucket notifications. 

Prerequisites

  • RHEL9: Kafka setup on nodes with RHEL9 OS
  • Java Development Kit (JDK) 23: Ensure JDK 23 is installed on your nodes
  • Kafka Tarball: The Kafka binary distribution (e.g., kafka_2.13-2.8.0.tgz)
  • Required Ports: Ensure ports 2181, 2888, 3888, and 9092 are available and opened by the firewall
  • IBM Storage Ceph Cluster: A running cluster with Ceph Object Gateway

Step 1: Install Java and Download Kafka on All Kafka Nodes

First, log into each node and perform the following installations:

Install Java

Use the following command to install JDK 23:

yum install -y https://download.oracle.com/java/23/latest/jdk-23_linux-x64_bin.rpm

Download and Extract Kafka

Download the Kafka tarball and extract it:

curl -o /tmp/kafka.tgz http://magna002.ceph.redhat.com/cephci-jenkins/kafka_2.13-2.8.0.tgz
tar -zxvf /tmp/kafka.tgz -C /usr/local/
mv /usr/local/kafka_2.13-2.8.0 /usr/local/kafka

Step 2: Configure Zookeeper on Each Node

For each of your three Zookeeper nodes, follow these configuration steps:


Create Zookeeper Data Directory

cd /usr/local/kafka/
mkdir -p data/zookeeper

This creates the necessary directory for Zookeeper's data.


Create myid File

Navigate into the Zookeeper directory and create a myid file. This file contains a unique integer ID for each Zookeeper instance.

cd /usr/local/kafka/data/zookeeper

on the first Zookeeper node:

echo "1" > myid

on the second node:

echo "2" > myid

on the third node:

echo "3" > myid

Configure zookeeper.properties

Navigate to the Kafka configuration directory:

cd /usr/local/kafka/config/
vi zookeeper.properties

Edit the zookeeper.properties file to include the following lines (replace IP addresses with your actual Zookeeper node IPs):

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/kafka/data/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
server.1=10.0.66.220:2888:3888
server.2=10.0.64.191:2888:3888
server.3=10.0.66.18:2888:3888
  • dataDir: Directory where Zookeeper stores its data.
  • clientPort: Port on which clients connect to Zookeeper.
  • server.X: Each Zookeeper server in the ensemble. X is the myid of the server, followed by its IP address, follower-to-leader communication port, and leader election port.

Step 3: Start Zookeeper Servers

Once Zookeeper is configured on all three nodes, start the Zookeeper service on each node:

/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties

The -daemon flag runs the Zookeeper server in the background.

To verify Zookeeper is listening on the client port (2181) and communication ports (2888, 3888):

netstat -nltp

Expected output:

tcp6       0      0 :::2181                 :::*                    LISTEN      <PID>/java
tcp6       0      0 <Zookeeper IP>:3888     :::*                    LISTEN      <PID>/java

Step 4: Configure Kafka Brokers on Each Node

For each of your three Kafka broker nodes, follow these configuration steps:

Configure server.properties

Navigate to the Kafka configuration directory:

cd /usr/local/kafka/config/
vi server.properties

Edit the server.properties file. Important parameters to configure are:

broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=10.0.66.220:2181,10.0.64.191:2181,10.0.66.18:2181
listeners=PLAINTEXT://10.0.66.220:9092
#advertised.listeners=PLAINTEXT://your.host.name:9092
  • broker.id: Assign a unique integer ID to each broker. For the first Kafka node, use 0; for the second, use 1; for the third, use 2.
  • log.dirs: Directory for Kafka log files.
  • zookeeper.connect: Connection string listing all Zookeeper nodes and their client ports.
  • listeners: (Optional but recommended) Defines the network interface Kafka listens on. Replace the IP with the respective node's IP address.
  • advertised.listeners: (Optional but recommended) Used to advertise a different hostname or IP address to clients than the one Kafka is binding to.

The server.properties file also contains other important settings for network threads, socket buffers, log retention, and more. Review and adjust these based on your requirements.


Step 5: Start Kafka Brokers

After configuring server.properties on each Kafka broker node, start the Kafka broker service:

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

The -daemon flag runs the Kafka server in the background.

To verify Kafka is listening on the default broker port (9092):

netstat -nltp

Expected output:

tcp6       0      0 :::9092                 :::*                    LISTEN      <PID>/java

Step 6: Create a Partitioned Topic on Kafka

Create a topic with partitions and replication-factor:

/usr/local/kafka/bin/kafka-topics.sh --create --topic topic1 --bootstrap-server kafka://10.0.66.220:9092 --partitions 2 --replication-factor 2

Expected output:

Created topic topic1.
  • --partitions: Sets the number of partitions for the topic. Each partition acts as an ordered, immutable sequence of records. More partitions allow greater parallelism in producing and consuming messages.
  • --replication-factor: Specifies the number of replicas for each partition. A replication factor of 1 means one copy per partition. For production environments, a higher replication factor (e.g., 2 or 3) is recommended for fault tolerance.

The following command can be used to describe the Kafka topic and get detailed metadata about it:

/usr/local/kafka/bin/kafka-topics.sh --describe --topic topic1 --bootstrap-server kafka://10.0.66.220:9092

Expected output:

Topic: topic1  TopicId: tNJaPMf2TzGJoK2Xk5Ru6w PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824
 Topic: topic1  Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: topic1  Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0


Integration with Ceph Object Gateway


Step 1: Create a Topic from Object Gateway Side

aws --endpoint-url http://10.0.67.122:80 sns create-topic --name topic1 --attributes 'push-endpoint=kafka://10.0.66.220:9092,persistent=false,verify-ssl=false,kafka-brokers="[10.0.64.191:9092, 10.0.66.18:9092]"'

Note:
kafka-brokers is a comma-separated list of host:port values for Kafka brokers. These brokers (may contain a broker which is defined in Kafka URI) will be added to Kafka URI to support sending notifications to the Kafka cluster. Multiple brokers ensure high availability. RGW will attempt to connect to the first available broker in the list. If one broker is unreachable or down, it will failover to the next available broker.

Expected output:

{
    "TopicArn": "arn:aws:sns:default::topic1"
}

Step 2: Create a Bucket, Configure Notifications, and Trigger notification Configured Event

Create a bucket:

aws --endpoint-url http://10.0.67.122:80 s3 mb s3://bkt1

Create notification configuration file notif.json with below Contents:

{
    "TopicConfigurations": [
        {
            "Id": "notif1",
            "TopicArn": "arn:aws:sns:default::topic1",
            "Events": [
                "s3:ObjectCreated:*",
                "s3:ObjectRemoved:*"
            ]
        }
    ]
}
  • s3:ObjectCreated:*: notifications sent for all object creation events, including PutObject, PostObject, CopyObject, CompleteMultipartUpload.
  • s3:ObjectRemoved:*: notifications sent for all object deletion events, including DeleteObject, DeleteMarkerCreated.

Apply the notification configuration:

aws --endpoint-url http://10.0.67.122:80 s3api put-bucket-notification-configuration --bucket bkt1 --notification-configuration file://notif.json

Upload an object to trigger the event:

fallocate -l 10MB obj10MB
aws --endpoint-url http://10.0.67.122:80 s3 cp obj10MB s3://bkt1/obj1

Expected output:

upload: ./obj10MB to s3://bkt1/obj1

Step 3: Verify Event Record Received at Kafka Side

Consume messages from Kafka topic:

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka://10.0.65.246:9092 --from-beginning --topic topic1 --timeout-ms 30000 | jq .

Expected output:

{
  "Records": [
    {
      "eventVersion": "2.2",
      "eventSource": "ceph:s3",
      "awsRegion": "default",
      "eventTime": "2025-03-24T04:38:19.784111Z",
      "eventName": "ObjectCreated:CompleteMultipartUpload",
      "userIdentity": {
        "principalId": "hsm"
      },
      "requestParameters": {
        "sourceIPAddress": ""
      },
      "responseElements": {
        "x-amz-request-id": "2be0346c-4268-4e39-8d19-38c1b1ff6883.74180.18386630626840617633",
        "x-amz-id-2": "74180-default-default"
      },
      "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "notif1",
        "bucket": {
          "name": "bkt1",
          "ownerIdentity": {
            "principalId": "hsm"
          },
          "arn": "arn:aws:s3:default::bkt1",
          "id": "2be0346c-4268-4e39-8d19-38c1b1ff6883.74180.7"
        },
        "object": {
          "key": "obj1",
          "size": 10000000,
          "eTag": "0b223142030f0d12db211f3e18069fbd-2",
          "versionId": "",
          "sequencer": "BBE1E067EF23A32F",
          "metadata": [],
          "tags": []
        }
      },
      "eventId": "1742791099.799220.0b223142030f0d12db211f3e18069fbd-2",
      "opaqueData": ""
    }
  ]
}

Kafka log output may be shown additionally like below:

[2025-03-24 04:39:19,314] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 1 messages

Note:
The output is formatted using jq for better readability. Validate the details in the event records for object, bucket, eventName, etc.

This event record confirms that the Ceph Object Gateway is successfully sending event notifications to the configured Kafka topic for object creation of obj1 under bkt1, demonstrating real-time event streaming between Ceph and Kafka.


Conclusion

This blog post provides a comprehensive guide for configuring a Kafka cluster with three Zookeeper nodes and three Kafka brokers, along with integrating it with Ceph Object Gateway for bucket notifications.

By following the outlined steps, users can successfully set up a robust Kafka environment, ensuring fault tolerance and efficient message handling. This guide also demonstrates how to create partitioned Kafka topics and integrate them with Ceph Object Gateway bucket notifications, allowing for real-time event monitoring and verification of notifications received at the Kafka side.


References:

0 comments
26 views

Permalink