Overview
In this tutorial, we will build a simple event-driven system using IBM Event Streams (a managed Kafka service). We will create a producer that sends order messages to a Kafka topic and a consumer that receives and processes those messages in real time.
Prerequisites
- IBM Cloud Account: Sign up for a free account if you don't have one.
- IBM Event Streams Instance: Create an instance of Event Streams in your IBM Cloud dashboard.
- Python 3.x: Ensure Python is installed on your system.
- Kafka Python Client: We will use the
confluent-kafka
library, so install it with the following command:
pip install confluent-kafka
Step 1: Create an IBM Event Streams Instance
- Log in to your IBM Cloud Dashboard.
- Navigate to Catalog and search for Event Streams.
- Click on Event Streams, then Create.
- Choose your region, service plan (Free tier is available), and create the service.
Step 2: Create a Kafka Topic
- After creating the Event Streams instance, open it from your resource list.
- Click on Topics in the left-hand menu.
- Click Create Topic and name it
orders
.
- Choose the number of partitions (1 is sufficient for our demo) and set the replication factor (1 is fine for testing).
- Click Create to finalize your topic.
Step 3: Retrieve Your Kafka Credentials
- Go to your Event Streams instance.
- Click on Service credentials in the left-hand menu.
- Click New credential to generate a new set of credentials.
- Copy the API Key, Bootstrap endpoints, and Password (you will need these in your scripts).
Step 4: Create the Producer Script
Create a Python file named order_producer.py
and add the following code:
from confluent_kafka import Producer
import json
# Kafka producer configuration
conf = {
'bootstrap.servers': 'your-bootstrap-endpoints', # Replace with your bootstrap endpoints
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': 'your-api-key', # Replace with your API key
'sasl.password': 'your-password', # Replace with your password
}
# Create Kafka Producer instance
producer = Producer(**conf)
# Delivery report callback
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
# Function to send order messages
def send_order(order):
producer.produce('orders', key=str(order['order_id']), value=json.dumps(order), callback=delivery_report)
producer.flush()
if __name__ == '__main__':
orders = [
{'order_id': 1, 'item': 'Laptop', 'quantity': 2},
{'order_id': 2, 'item': 'Headphones', 'quantity': 1},
{'order_id': 3, 'item': 'Phone', 'quantity': 3}
]
for order in orders:
send_order(order)
Explanation:
- This script connects to your Kafka service using the credentials provided.
- It sends predefined order messages to the
orders
topic.
- The
delivery_report
function notifies you if the message was successfully sent or if there was an error.
Update Configuration:
Make sure to replace:
your-bootstrap-endpoints
with the bootstrap endpoints from your service credentials.
your-api-key
with your API key.
your-password
with your password.
Step 5: Create the Consumer Script
Create another Python file named order_consumer.py
and add the following code:
from confluent_kafka import Consumer, KafkaException
import json
# Configuration for Kafka Consumer
conf = {
'bootstrap.servers': 'your-bootstrap-endpoints', # Replace with your bootstrap endpoints
'group.id': 'order-processor-group',
'auto.offset.reset': 'earliest',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': 'your-api-key', # Replace with your API key
'sasl.password': 'your-password', # Replace with your password
}
# Create Kafka Consumer instance
consumer = Consumer(**conf)
# Subscribe to 'orders' topic
consumer.subscribe(['orders'])
# Function to consume and print messages in real-time
def consume_messages():
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
# Decode and print the order message
order = json.loads(msg.value().decode('utf-8'))
print(f"Consumed order: {order}")
except Exception as e:
print(f"Error while consuming messages: {str(e)}")
finally:
consumer.close()
if __name__ == '__main__':
consume_messages()
Explanation:
- This script connects to the same Kafka service and subscribes to the
orders
topic.
- It continuously listens for new messages and prints the order details as they are consumed.
Update Configuration:
Make sure to replace:
your-bootstrap-endpoints
with the bootstrap endpoints from your service credentials.
your-api-key
with your API key.
your-password
with your password.
Step 6: Run the Producer and Consumer
1. Open Two Terminal Windows:
- Terminal 1: For the Consumer
- Terminal 2: For the Producer
2. Start the Consumer:
In Terminal 1, run the following command:
python order_consumer.py
3. Start the Producer:
In Terminal 2, run the following command:
python order_producer.py
4. Observe the Output
Conclusion
In this tutorial, you learned how to:
- Set up an IBM Event Streams instance.
- Create producer and consumer scripts using Python and the
confluent-kafka
library.
- Observe live message flow from producer to consumer.
This is a foundational step toward building more complex event-driven applications
#IBMEventStreams #kafka #python