Click "Set up", and download KafkaConnect.zip, and unzip.
$ cd <your-kafkaconnect-download-path>/kafkaconnect
Update kafka-connect.yaml, configure bootstrapServers, image properties.Extract your Event Streams bootstrap server.
$ oc get eventstreams<your-event-streams-instance> -o yaml -n<event-streams-namespace> | grep svc:9092
bootstrapServers: es-dev-kafka-bootstrap.cp4i-es2.svc:9092
$ vi kafka-connect.yaml ( modify bootstrapServers, image properties)
bootstrapServers: es-dev-kafka-bootstrap.cp4i-es2.svc:9092 (the output of "oc get" above)
image: image-registry.openshift-image-registry.svc:5000/<your-event-streams-namespace>/my-connect-cluster-image:latest
Note: This article will be using the OpenShift Image Registry to store Kafka Connect docker image.
Connectors Download & Configurationcd <your-kafkaconnect-download-path>/kafkaconnect/my-plugins
mkdir mq (To place mqsink, mqsource connector jar files)
mkdir http (To place httpsink connector jar file)
Download MQSink, MQSource, HTTPSink Connectors from the Event Streams connectors catalog page https://ibm.github.io/event-streams/connectors.
Similarly download MQSource Connector jar file. Move or copy both MQSink, and MQSource jar files to kafkaconnect/my-plugins/mq folder.
Download HTTP Sink Connector source and build the connector jar.
HTTPSink Connector jar is not available for download hence you need to build it.
$ cd <your-kafkaconnect-path>/kafkaconnect/my-plugins/http
$ git clone https://github.com/thomaskwscott/kafka-connect-http.git
$ mvn package
Note: Install Maven is not installed already (command on mac - $ brew install maven). The Maven build may complain about java version compatibility, so make sure to install openjdk or a compatible java version. This should create "kafka-connect-http-5.4.1.jar" in the http folder.
Build Kafka Connect Docker image
cd <your-kafkaconnect-path>/kafkaconnect>
Edit Dockerfile, add COPY statement as below.
FROM cp.icr.io/cp/ibm-eventstreams-kafka:10.4.0-kafka-2.8.0
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001
$ docker build -t my-connect-cluster-image:latest .
Push the Kafka Connect image to OpenShift Container Registry
Login to the OpenShift Cluster
$ oc login <your openshift api url>
Get your default OpenShift image registry Route
$ oc get route default-route -n openshift-image-registry --template='{{ .spec.host }}'
Note: if you do not have a default-route created already then you can create one with ($ oc patch configs.imageregistry.operator.openshift.io/cluster --patch '{"spec":{"defaultRoute":true}}' --type=merge).
$ docker tag my-connect-cluster-image:latest <openshift-registry-default-route>/<cp4i-eventstreams-namespace>/my-connect-cluster-image:latest
$ docker login https://<openshift-registry-default-route> -u $(oc whoami) -p $(oc whoami -t)
$ docker push <openshift-registry-default-route>/<cp4i-eventstreams-namespace>/my-connect-cluster-image:latest
Verify the Kafka Connector image is uploaded successfully.
$ oc get is -n <cp4i-eventstreams-namespace>
KafkaConnect Instance
Create KafkaConnect Framework instance.
$ cd <your-kafkaconnect-download-path>/kafkaconnect/my-plugins
$ oc apply -f kafka-connect.yaml
Check if KafkaConnect is created and in Ready state.
$ oc get KafkaConnect -n <your-event-streams-namespace>
NAME DESIRED REPLICAS READY
my-connect-cluster 1 True
1. MQSink Connector
Use case : To synchronize IBM Event Streams Topic data to an MQ Queue.
Create a yaml file like below, and configure your MQ Queue Manager internal configuration as the below example.
$ cat mq-sink-<qmgr>-<queue>.yaml
apiVersion: eventstreams.ibm.com/v1alpha1
kind: KafkaConnector
metadata:
name: mq-sink-mqdv03-mqsink
labels:
eventstreams.ibm.com/cluster: my-connect-cluster
spec:
class: com.ibm.eventstreams.connect.mqsink.MQSinkConnector
tasksMax: 1
config:
topics: MQSINK
mq.queue.manager: mqdv03
mq.connection.name.list: mqdv03-ibm-mq.cp4i-mq.svc(1414)
mq.channel.name: SB.SVRCONN
mq.queue: MQSINK
mq.user.name: dummy
mq.password: dummy
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
mq.message.builder: com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder
$ oc apply -f mq-sink-<qmgr>-<queue>.yaml -n <your-event-streams-namespace>
Check if the Connector is created and in Ready state.
$ oc get KafkaConnectors -n <your-event-streams-namespace>
Testing MQSink Connector with IBM Event Streams Starter Application
Login to the IBM Event Streams Console, and navigate to Toolbox > Start Application, and click "Get Started".
Follow the instructions,
Step 1 : Download the jar into a folder (the jar file name is demo-all.jar).
Step 2: Generate Properties
Unzip the downloaded starter application.
Step 3: Run the starter application to put messages into the MQSINK Topic.
$ /usr/bin/java -Dproperties_path=<path-to-properties> -jar demo-all.jar
Note: If you run into PKCS12 certificate error like below,
ERROR: Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore truststore.p12 of type PKCS12
then please make sure to use compatible java version (or) convert p12 to JKS as documented here https://ibm.github.io/event-streams/troubleshooting/pkcs12-keystore-java-client/
Open Browser, and enter http://localhost:8080.
Hit "Start Producing" button to write sample messages to MQSINK Topic.
Verify Event Streams MQSINK Topic data.
Verify MQ Queue is synchronized with the Event Streams MQSINK Topic data.
2. MQSource Connector
Use case : To synchronize messages from an MQ Queue to IBM Event Streams Topic.

Create a yaml file like below, and configure your MQ Queue Manager internal configuration as the below example.
$ cat mq-source-<qmgr>-<queue>.yaml
apiVersion: eventstreams.ibm.com/v1alpha1
kind: KafkaConnector
metadata:
name: mq-source-mqdv03-mqsource
labels:
eventstreams.ibm.com/cluster: my-connect-cluster
spec:
class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector
tasksMax: 1
config:
topic: MQSOURCE
mq.queue.manager: mqdv03
mq.connection.name.list: mqdv03-ibm-mq.cp4i-mq.svc(1414)
mq.channel.name: SB.SVRCONN
mq.queue: MQSOURCE
mq.user.name: dummy
mq.password: dummy
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
mq.record.builder: com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
mq.message.body.jms: true
Note: Create Topic name MQSOURCE in Event Streams, and Queue name MQSOURCE in MQ Queue Manager.
$ oc apply -f mq-source-<qmgr>-mqsource.yaml -n <your-event-streams-namespace>
Check if the Connector is created and in Ready state.
$ oc get KafkaConnectors -n <your-event-streams-namespace>
Testing MQSource Connector
You have few ways to put messages into MQSOURCE queue. The quickest way is to login into the OpenShift Console, and navigate to MQ Queue Manager pod > Terminal.
$ amqsput MQSOURCE (write few test messages).
Verify the messages got picked up the MQSource Connector from the Event Streams Console.
3. HTTPSink Connector
Use case : To publish messages from an Event Streams Topic using an API Endpoint.
Create a yaml file like below, and configure an API endpoint to be invoked (Example below).
Note: Attached sample App Connect Enterprise (ACE) Project Interchange file contains a REST API to receive Payload and write the Payload to an MQ Queue. If you want to use a different API to test you can do that as well.
$ cat http-sink.yaml
apiVersion: eventstreams.ibm.com/v1alpha1
kind: KafkaConnector
metadata:
name: http-sink-invoice-posting
labels:
eventstreams.ibm.com/cluster: my-connect-cluster
spec:
class: uk.co.threefi.connect.http.HttpSinkConnector
tasksMax: 1
config:
topics: HTTPSINK.INVOICES
tasks.max: 1
request.method: "POST"
http.api.url: "http://sb-rest2mq-invoice-api-http-cp4i-ace.ocp-dev1-xxxxx-0000.us-east.containers.appdomain.cloud:80/rest2mq/invoice/v1/addInvoice"
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
Note: Create Topic HTTPSINK.INVOICES in Event Streams. Make sure the API that you want to invoke is available.Create HTTPSink Connector
$ oc apply -f http-sink.yaml -n <your-event-streams-namespace>
Check if the Connector is created and in Ready state.
$ oc get KafkaConnectors -n <your-event-streams-namespace>Testing HTTPSink ConnectorPlease see MQSink Testing steps above. Download Event Streams Starter application for Topic HTTPSINK.INVOICES. Run the Starter application, push few messages to the Topic and verify that the API is invoked successfully.