IBM Cloud Global

 View Only

IBM Event Streams Tutorial: Building a Simple Order Processing System with Python

By Gunasekaran Venkatesan posted Sun October 20, 2024 01:39 PM

  

Overview

In this tutorial, we will build a simple event-driven system using IBM Event Streams (a managed Kafka service). We will create a producer that sends order messages to a Kafka topic and a consumer that receives and processes those messages in real time.


Prerequisites

  • IBM Cloud Account: Sign up for a free account if you don't have one.
  • IBM Event Streams Instance: Create an instance of Event Streams in your IBM Cloud dashboard.
  • Python 3.x: Ensure Python is installed on your system.
  • Kafka Python Client: We will use the confluent-kafka library, so install it with the following command:
pip install confluent-kafka

Step 1: Create an IBM Event Streams Instance

  1. Log in to your IBM Cloud Dashboard.
  2. Navigate to Catalog and search for Event Streams.
  3. Click on Event Streams, then Create.
  4. Choose your region, service plan (Free tier is available), and create the service.

Step 2: Create a Kafka Topic

  1. After creating the Event Streams instance, open it from your resource list.
  2. Click on Topics in the left-hand menu.
  3. Click Create Topic and name it orders.
  4. Choose the number of partitions (1 is sufficient for our demo) and set the replication factor (1 is fine for testing).
  5. Click Create to finalize your topic.

Step 3: Retrieve Your Kafka Credentials

  1. Go to your Event Streams instance.
  2. Click on Service credentials in the left-hand menu.
  3. Click New credential to generate a new set of credentials.
  4. Copy the API Key, Bootstrap endpoints, and Password (you will need these in your scripts).

Step 4: Create the Producer Script

Create a Python file named order_producer.py and add the following code:

from confluent_kafka import Producer
import json

# Kafka producer configuration
conf = {
    'bootstrap.servers': 'your-bootstrap-endpoints',  # Replace with your bootstrap endpoints
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': 'your-api-key',  # Replace with your API key
    'sasl.password': 'your-password',  # Replace with your password
}

# Create Kafka Producer instance
producer = Producer(**conf)

# Delivery report callback
def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")

# Function to send order messages
def send_order(order):
    producer.produce('orders', key=str(order['order_id']), value=json.dumps(order), callback=delivery_report)
    producer.flush()

if __name__ == '__main__':
    orders = [
        {'order_id': 1, 'item': 'Laptop', 'quantity': 2},
        {'order_id': 2, 'item': 'Headphones', 'quantity': 1},
        {'order_id': 3, 'item': 'Phone', 'quantity': 3}
    ]

    for order in orders:
        send_order(order)


Explanation:

  • This script connects to your Kafka service using the credentials provided.
  • It sends predefined order messages to the orders topic.
  • The delivery_report function notifies you if the message was successfully sent or if there was an error.

Update Configuration:

Make sure to replace:

  • your-bootstrap-endpoints with the bootstrap endpoints from your service credentials.
  • your-api-key with your API key.
  • your-password with your password.

Step 5: Create the Consumer Script

Create another Python file named order_consumer.py and add the following code:

from confluent_kafka import Consumer, KafkaException
import json

# Configuration for Kafka Consumer
conf = {
    'bootstrap.servers': 'your-bootstrap-endpoints',  # Replace with your bootstrap endpoints
    'group.id': 'order-processor-group',
    'auto.offset.reset': 'earliest',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': 'your-api-key',  # Replace with your API key
    'sasl.password': 'your-password',  # Replace with your password
}

# Create Kafka Consumer instance
consumer = Consumer(**conf)

# Subscribe to 'orders' topic
consumer.subscribe(['orders'])

# Function to consume and print messages in real-time
def consume_messages():
    try:
        while True:
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())

            # Decode and print the order message
            order = json.loads(msg.value().decode('utf-8'))
            print(f"Consumed order: {order}")

    except Exception as e:
        print(f"Error while consuming messages: {str(e)}")
    finally:
        consumer.close()

if __name__ == '__main__':
    consume_messages()

Explanation:

  • This script connects to the same Kafka service and subscribes to the orders topic.
  • It continuously listens for new messages and prints the order details as they are consumed.

Update Configuration:

Make sure to replace:

  • your-bootstrap-endpoints with the bootstrap endpoints from your service credentials.
  • your-api-key with your API key.
  • your-password with your password.

Step 6: Run the Producer and Consumer

1. Open Two Terminal Windows:

  • Terminal 1: For the Consumer
  • Terminal 2: For the Producer

2. Start the Consumer:

In Terminal 1, run the following command:

python order_consumer.py

3. Start the Producer:

In Terminal 2, run the following command:

python order_producer.py

4. Observe the Output

  • In Terminal 1, you will see real-time outputs like:

  • In Terminal 2, the producer will display messages confirming that they were delivered:       

Conclusion
In this tutorial, you learned how to:

  • Set up an IBM Event Streams instance.
  • Create producer and consumer scripts using Python and the confluent-kafka library.
  • Observe live message flow from producer to consumer.

This is a foundational step toward building more complex event-driven applications

#IBMEventStreams #kafka #python

0 comments
18 views

Permalink