This post helps you learn how to connect App Connect Enterprise and IBM Integration Bus to IBM Event Streams. Apache Kafka is an open source project that provides a messaging service capability, based upon a distributed commit log. It lets you publish and subscribe data to streams of data records (messages). When LinkedIn initially open sourced the Kafka concept in 2011, it was based around the idea of a message queueing platform but since then it has evolved more towards a streaming platform for low-latency, high-throughput, fault-tolerant streams of events. IBM Cloud provides a public Kafka-made-simple service which until recently was referred to as IBM Message Hub. In the middle of September IBM announced that the IBM Message Hub service on IBM Cloud was being renamed to IBM Event Streams in an effort to closely align with a new on-premise private cloud deployment option. The new private cloud IBM Event Streams product, which has just been released, makes it easy to deploy Kafka clusters, get started building event-driven applications and simplifies cross site replication for disaster recovery.
Regardless of whether you are using the open source Apache Kafka solution, the IBM Event Streams service on IBM Cloud, or the new private cloud IBM Event Streams, you can integrate with your existing application estate by utilizing the built-in message flow nodes in IBM Integration Bus and App Connect Enterprise. The KafkaProducer and KafkaConsumer message flow nodes were first provided in IIBv10.0.0.7 at the end of 2016, but they are also available as part of the latest evolution of this software, IBM App Connect Enterprise v11. This article provides a simple example which demonstrates how to use both IBM Integration Bus v10 and App Connect Enterprise v11 to produce Kafka messages.
IBM Event Streams comes in two editions:
- IBM Event Streams Community Edition is a free version intended for trials and demonstration purposes. It can be installed and used without charge.
- IBM Event Streams is a paid-for version intended for enterprise use, and includes additional features such as geo-replication.
For more information, check out the documentation. The rest of this post assumes you have installed both IBM Event Streams and either IIBv10 or ACEv11.
Sending Kafka data to IBM Event Streams using IIB:
The Kafka message flow nodes have been available since IIBv10.0.0.7 These instructions have been tested against IIBv10.0.0.13 which was the latest available IIB release at the time of testing. Subsequently IIBv10.0.0.14 has also been released! When selecting which version of the product to use, you may also note that IIBv10.0.0.12 and later releases include a functional APAR IT23442 which although is not mandatory to make this scenario work, significantly improves the IIB product’s capabilities for configuring a more extensive range of properties when interacting with Kafka. So once you have this scenario working you might like to explore this further.
- Open an IIB Command Console, create an integration node, start it up and create an associated integration server:
mqsicreatebroker IIB_EVENTSTREAMS
mqsistart IIB_EVENTSTREAMS
mqsicreateexecutiongroup IIB_EVENTSTREAMS -e default
- IBM Event Streams provides the opportunity to Generate a starter application and its associated configuration files, so it may be helpful to do this in order to help extract configuration which can be used with IIB in the steps which follow. When securing the connection from IIB to Event Streams you will need:
- A copy of the server-side public certificate added to your client-side trusted certificates.
- An API key generated from the IBM Cloud Private UI.
- Configure the IIB truststore properties using the following commands:
mqsichangeproperties IIB_EVENTSTREAMS -e default -o ComIbmJVMManager -n truststoreType -v JKS
mqsichangeproperties IIB_EVENTSTREAMS -e default -o ComIbmJVMManager -n truststoreFile -v C:\IIB_EVENTSTREAMS\truststore\escert.jks
mqsichangeproperties IIB_EVENTSTREAMS -e default -o ComIbmJVMManager -n truststorePass -v default::truststorePass
The JKS certificate file which is placed into the IIB trust store, is used by IIB to verify the certificate which is presented by IBM Event Streams during the handshake when an SSL connection is made. The file escert.jks
is a copy of the server-side public certificate which you can download from IBM Event Streams. When you download the jks file, by default it is protected by a simple password which is the value password
but you will want to change this, or export the certificate and import it into your own trust store using a tool such as ikeyman.
- Download the the JKS file from IBM Event Streams and copy it into the directory you will use for the IIB trust store. In our example this is
C:\IIB_EVENTSTREAMS\truststore\escert.jks
- You will also need an API key which can be generated from the IBM Cloud Private user interface that comes with IBM Event Streams. We will provide these passwords to the IIB infrastructure in the commands which follow:
mqsisetdbparms IIB_EVENTSTREAMS -n default::truststorePass -u thiscanbeanyvalue -p password
mqsisetdbparms IIB_EVENTSTREAMS -n kafka::KAFKA::default -u token -p "3QA9xbOXUwimNbH0r0t0g5H94PZWZTh_M-q-eVxxxxxx"
mqsistop IIB_EVENTSTREAMS
mqsistart IIB_EVENTSTREAMS
- Import the provided project interchange file (attached below) into your IIB Toolkit’s workspace. You will find an application called EventStreams which contains a message flow named EventStreamsFlow.
The purpose of this flow is to receive a simple JSON message over HTTP, which contains the name of a Kafka topic and some payload data. The KafkaProducer message flow node sends the payload to the provided topic. Look at the KafkaProducer node’s properties. You will need to change the Bootstrap servers property to refer to your IBM Event Streams system.
The Topic name on the message flow node has been set to ThisIsADefaultTopicName
. This will be overridden based on the HTTP data which is sent into the message flow. The security properties have already been pre-configured for you to specify SASL_SSL and TLSv1.2. Deploy the message flow to server default, owned by node IIB_EVENTSTREAMS.
- You can check the HTTP Port / URL which the flow is exposed on using this command:
mqsireportproperties IIB_EVENTSTREAMS -e default -o HTTPConnector -r
- Test the message flow by sending it data using curl or an equivalent HTTP testing client. The example below assumes that your IBM Event Streams topic is called
BenThompsonTopic
:
curl -X POST http://localhost:7801/EventStreamsFlow -d "{\"TopicName\":\"BenThompsonTopic\",\"Payload\":\"Hello World\"}"
- Look in the IBM Event Streams web user interface, navigate to your topic, and click the Find messages link on the right side of the screen:
Select the partition for the topic and the message offset number (if this is the first time you have sent data, it will be offset 0) and you should see the message has been successfully delivered as shown below:
Sending Kafka data to IBM Event Streams using ACE:
ACEv11.0.0.1 provides users the chance to deploy message flows to an integration server which is owned by an integration node, or a server which is entirely stand-alone. For variety compared to the IIBv10 description above, the instructions below assume a server which is independent of an integration node. These instructions also assume a Windows platform:
- Open an App Connect Enterprise command console and create a working directory by running the mqsicreateworkdir command:
mqsicreateworkdir C:\MyServer
This will set up the directory structure for your integration server.
- IBM Event Streams provides the opportunity to download a sample Java application and its associated configuration files, so it may be helpful to do this in order to help extract configuration which can be used with ACE in the steps which follow. When securing the connection from ACE to Event Streams you will need:
- A copy of the server-side public certificate added to your client-side trusted certificates.
- An API key generated from the IBM Cloud Private UI.
- Open the file
C:\MyServer\server.conf.yaml
in your favoured text editor. Locate the ResourceManagers … JVM section, and uncomment the truststore properties and assign the following values:
ResourceManagers:
JVM:
truststoreType: 'JKS'
truststoreFile: 'C:\IIB_EVENTSTREAMS\truststore\escert.jks'
truststorePass: 'MyServer::truststorePass'
The JKS certificate file which is placed into the ACE trust store, is used by ACE to verify the certificate which is presented by IBM Event Streams during the handshake when an SSL connection is made. The file escert.jks
is a copy of the server-side public certificate which you can download from IBM Event Streams. When you download the jks file, by default it is protected by a simple password which is the value password
but you will want to change this, or export the certificate and import it into your own trust store using a tool such as ikeyman.
- Download the the JKS file from IBM Event Streams and copy it into the directory you will use for the ACE trust store. In our example this is
C:\IIB_EVENTSTREAMS\truststore\escert.jks
- You will also need an API key which can be generated from the IBM Cloud Private user interface that comes with IBM Event Streams. We will provide these passwords to the ACE infrastructure in the commands which follow:
mqsisetdbparms -w C:\MyServer -n MyServer::truststorePass -u thiscanbeanyvalue -p password
mqsisetdbparms -w C:\MyServer -n kafka::KAFKA -u token -p "3QA9xbOXUwimNbH0r0t0g5H94PZWZTh_M-q-eVxxxxxx"
UPDATE: Note that when using a standalone integration server with Kafka, the syntax of the mqsisetdbparms command to define the credentials requires the n parameter to specify kafka::KAFKA
and not kafka::KAFKA::MyServer
where MyServer is the name of your integration server. In an earlier version of this article this configuration was shown incorrectly, but now the correct syntax is shown in the second bullet above. Also note that if you are using an integration-node-owned server with Kafka, the syntax of the mqsisetdbparms command requires the n parameter to specify kafka::KAFKA::MyServer
where MyServer is the name of your integration server
- Start the integration server:
IntegrationServer --name MyServer --work-dir C:\MyServer --admin-rest-api 7600 --http-port-number 7900 --console-log
- Import the provided project interchange file (attached below) into your ACE Toolkit’s workspace. You will find an application called EventStreams which contains a message flow named EventStreamsFlow.
The purpose of this flow is to receive a simple JSON message over HTTP, which contains the name of a Kafka topic and some payload data. The KafkaProducer message flow node sends the payload to the provided topic. Look at the KafkaProducer node’s properties. You will need to change the Bootstrap servers property to refer to your IBM Event Streams system. The Topic name on the message flow node has been set to ThisIsADefaultTopicName
.
This will be overridden based on the HTTP data which is sent into the message flow. The security properties have already been pre-configured for you to specify SASL_SSL and TLSv1.2. Deploy the message flow to the stand-alone integration server MyServer.
- Test the message flow by sending it data using curl or an equivalent HTTP testing client. The example below assumes that your IBM Event Streams topic is called
BenThompsonTopic
:
curl -X POST http://localhost:7900/EventStreamsFlow -d "{\"TopicName\":\"BenThompsonTopic\",\"Payload\":\"From ACE with love\"}"
- Look in the IBM Event Streams web user interface, navigate to your topic, and click the Find messages link on the right side of the screen:
Select the partition for the topic and the message offset number (if this is the first time you have sent data, it will be offset 0) and you should see the message has been successfully delivered as shown below: