Cloud Pak for Integration

Cloud Pak for Integration

Join this online group to communicate across IBM product users and experts by sharing advice and best practices with peers and staying up to date regarding product enhancements.

 View Only

IBM Cloud Pak for Integration Event Streams Connectors Proof of Concept 

Sat October 30, 2021 12:48 PM

Overview
This article will help jumpstart teams who wants to setup and run the Event Streams Connectors quickly on IBM Cloud Pak for Integration. This article covers MQSink, MQSource, HTTP Sink, and Debezium SQL Connectors. This article doesn't cover installation of Event Streams on IBM Cloud Pak for Integration.


IBM Event Streams Overview
IBM Event Strems is built on open source Apache Kafka, IBM® Event Streams is an event-streaming platform that helps you build smart apps that can react to events as they happen. Event Streams is based on years of IBM operational expertise gained from running Apache Kafka event streams for enterprises. This makes Event Streams ideal for mission-critical workloads. Please visit the Product page https://www.ibm.com/cloud/event-streams.

 
IBM Event Streams Connectors Overview
You can integrate external systems with IBM Event Streams by using the Kafka Connect framework and connectors. Please refer to the documentation for more in-depth details https://ibm.github.io/event-streams/connecting/connectors.

Setting Kafka Connect Framework
Open the IBM Event Streams console from IBM Cloud Pak for Integration Platform Navigator.

Select "Set up a Kafka Connect environment" from Toolbox icon on left panel. 


Click "Set up", and download KafkaConnect.zip, and unzip.

$ cd <your-kafkaconnect-download-path>/kafkaconnect

Update kafka-connect.yaml, configure bootstrapServers, image properties.
Extract your Event Streams bootstrap server.

$ oc get eventstreams<your-event-streams-instance> -o yaml -n<event-streams-namespace> | grep svc:9092
    bootstrapServers: es-dev-kafka-bootstrap.cp4i-es2.svc:9092

$ vi kafka-connect.yaml ( modify bootstrapServers, image properties)

  bootstrapServers: es-dev-kafka-bootstrap.cp4i-es2.svc:9092 (the output of "oc get" above)
  image: image-registry.openshift-image-registry.svc:5000/<your-event-streams-namespace>/my-connect-cluster-image:latest
Note: This article will be using the OpenShift Image Registry to store Kafka Connect docker image.

Connectors Download & Configuration
cd <your-kafkaconnect-download-path>/kafkaconnect/my-plugins
mkdir mq    (To place mqsink, mqsource connector jar files)
mkdir http  (To place httpsink connector jar file)

Download MQSink, MQSource, HTTPSink Connectors from the Event Streams connectors catalog page https://ibm.github.io/event-streams/connectors.

Similarly download MQSource Connector jar file. 
Move or copy both MQSink, and MQSource jar files to kafkaconnect/my-plugins/mq folder.

Download HTTP Sink Connector source and build the connector jar.
HTTPSink Connector jar is not available for download hence you need to build it.
$ cd <your-kafkaconnect-path>/kafkaconnect/my-plugins/http

$ git clone https://github.com/thomaskwscott/kafka-connect-http.git
$ mvn package 
Note: Install Maven is not installed already (command on mac - $ brew install maven). The Maven build may complain about java version compatibility, so make sure to install openjdk or a compatible java version. This should create "kafka-connect-http-5.4.1.jar" in the http folder.

Build Kafka Connect Docker image
cd <your-kafkaconnect-path>/kafkaconnect>
Edit Dockerfile, add COPY statement as below.

  FROM cp.icr.io/cp/ibm-eventstreams-kafka:10.4.0-kafka-2.8.0
  COPY ./my-plugins/ /opt/kafka/plugins/
  USER 1001

$ docker build -t my-connect-cluster-image:latest .

Push the Kafka Connect image to OpenShift Container Registry
Login to the OpenShift Cluster
$ oc login <your openshift api url>

Get your default OpenShift image registry Route

$ oc get route default-route -n openshift-image-registry --template='{{ .spec.host }}'
Note: if you do not have a default-route created already then you can create one with ($ oc patch configs.imageregistry.operator.openshift.io/cluster --patch '{"spec":{"defaultRoute":true}}' --type=merge).

$ docker tag my-connect-cluster-image:latest <openshift-registry-default-route>/<cp4i-eventstreams-namespace>/my-connect-cluster-image:latest
$ docker login https://<openshift-registry-default-route> -u $(oc whoami) -p $(oc whoami -t)
$ docker push <openshift-registry-default-route>/<cp4i-eventstreams-namespace>/my-connect-cluster-image:latest

Verify the Kafka Connector image is uploaded successfully.
$ oc get is -n <cp4i-eventstreams-namespace>


KafkaConnect Instance
Create KafkaConnect Framework instance.
$ cd <your-kafkaconnect-download-path>/kafkaconnect/my-plugins
$ oc apply -f kafka-connect.yaml

Check if KafkaConnect is created and in Ready state.
$ oc get
KafkaConnect -n <your-event-streams-namespace>
NAME                            DESIRED REPLICAS   READY
my-connect-cluster   1                                     True



1. MQSink Connector 
Use case : To synchronize IBM Event Streams Topic data to an MQ Queue.


Create a yaml file like below, and configure your MQ Queue Manager internal configuration as the below example.

$ cat mq-sink-<qmgr>-<queue>.yaml
apiVersion: eventstreams.ibm.com/v1alpha1
kind: KafkaConnector
metadata:
  name: mq-sink-mqdv03-mqsink
  labels:
    eventstreams.ibm.com/cluster: my-connect-cluster
spec:
  class: com.ibm.eventstreams.connect.mqsink.MQSinkConnector
  tasksMax: 1
  config:
    topics: MQSINK
    mq.queue.manager: mqdv03
    mq.connection.name.list: mqdv03-ibm-mq.cp4i-mq.svc(1414)
    mq.channel.name: SB.SVRCONN
    mq.queue: MQSINK
    mq.user.name: dummy
    mq.password: dummy
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    mq.message.builder: com.ibm.eventstreams.connect.mqsink.builders.DefaultMessageBuilder


$ oc apply -f mq-sink-<qmgr>-<queue>.yaml -n <your-event-streams-namespace>

Check if the Connector is created and in Ready state.
$ oc get KafkaConnectors -n <your-event-streams-namespace>

Testing MQSink Connector with IBM Event Streams Starter Application
Login to the IBM Event Streams Console, and navigate to Toolbox > Start Application, and click "Get Started".
Follow the instructions,
Step 1 : Download the jar into a folder (the jar file name is demo-all.jar).
Step 2: Generate Properties


Unzip the downloaded starter application.

Step 3: Run the starter application to put messages into the MQSINK Topic.
$ /usr/bin/java -Dproperties_path=<path-to-properties> -jar demo-all.jar

Note: If you run into PKCS12 certificate error like below,
ERROR: Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore truststore.p12 of type PKCS12
then please make sure to use compatible java version (or) convert p12 to JKS as documented here https://ibm.github.io/event-streams/troubleshooting/pkcs12-keystore-java-client/

Open Browser, and enter http://localhost:8080.
Hit "Start Producing" button to write sample messages to MQSINK Topic.


Verify Event Streams MQSINK Topic data.


Verify MQ Queue is synchronized with the Event Streams MQSINK Topic data.




2. MQSource Connector
Use case : To synchronize messages from an MQ Queue to IBM Event Streams Topic.

Create a yaml file like below, and configure your MQ Queue Manager internal configuration as the below example.

$ cat mq-source-<qmgr>-<queue>.yaml
apiVersion: eventstreams.ibm.com/v1alpha1
kind: KafkaConnector
metadata:
  name: mq-source-mqdv03-mqsource
  labels:
    eventstreams.ibm.com/cluster: my-connect-cluster
spec:
  class: com.ibm.eventstreams.connect.mqsource.MQSourceConnector
  tasksMax: 1
  config:
    topic: MQSOURCE
    mq.queue.manager: mqdv03
    mq.connection.name.list: mqdv03-ibm-mq.cp4i-mq.svc(1414)
    mq.channel.name: SB.SVRCONN
    mq.queue: MQSOURCE
    mq.user.name: dummy
    mq.password: dummy
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    mq.record.builder: com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
    mq.message.body.jms: true

Note: Create Topic name MQSOURCE in Event Streams, and Queue name MQSOURCE in MQ Queue Manager.

$ oc apply -f mq-source-<qmgr>-mqsource.yaml -n <your-event-streams-namespace>

Check if the Connector is created and in Ready state.
$ oc get KafkaConnectors -n <your-event-streams-namespace>

Testing MQSource Connector
You have few ways to put messages into MQSOURCE queue. The quickest way is to login into the OpenShift Console, and navigate to MQ Queue Manager pod > Terminal. 
$ amqsput MQSOURCE  (write few test messages).

Verify the messages got picked up the MQSource Connector from the Event Streams Console.



3. HTTPSink Connector
Use case : To publish messages from an Event Streams Topic using an API Endpoint.

Create a yaml file like below, and configure an API endpoint to be invoked (Example below).

Note: Attached sample App Connect Enterprise (ACE) Project Interchange file contains a REST API to receive Payload and write the Payload to an MQ Queue. If you want to use a different API to test you can do that as well. 

$ cat http-sink.yaml
apiVersion: eventstreams.ibm.com/v1alpha1
kind: KafkaConnector
metadata:
  name: http-sink-invoice-posting
  labels:
    eventstreams.ibm.com/cluster: my-connect-cluster
spec:
  class: uk.co.threefi.connect.http.HttpSinkConnector
  tasksMax: 1
  config:
    topics: HTTPSINK.INVOICES
    tasks.max: 1
    request.method: "POST"
    http.api.url: "http://sb-rest2mq-invoice-api-http-cp4i-ace.ocp-dev1-xxxxx-0000.us-east.containers.appdomain.cloud:80/rest2mq/invoice/v1/addInvoice"
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter


Note: Create Topic HTTPSINK.INVOICES in Event Streams. Make sure the API that you want to invoke is available.

Create HTTPSink Connector
$ oc apply -f http-sink.yaml -n <your-event-streams-namespace>

Check if the Connector is created and in Ready state.
$ oc get KafkaConnectors -n <your-event-streams-namespace>

Testing HTTPSink Connector
Please see MQSink Testing steps above. Download Event Streams Starter application for Topic HTTPSINK.INVOICES. Run the Starter application, push few messages to the Topic and verify that the API is invoked successfully.

Configuring Truststore & Security

To invoke secured endpoints using the HTTPSinkConnector, you need to create a truststore containing the endpoint certificates. Create a generic secret in OpenShift for the truststore. Configure the KafkaConnect.yaml to use the truststore.

> Extract the endpoint self signed certificate.
    openssl s_client -showcerts -connect <your-url>:7083
    Copy the PEM certificate part into clipboard (including BEGIN, and END lines).
    -----BEGIN ----
    xxxxxxxxxxxxx
    xxxxxxxxxxxxx
    -----END-----
    Copy the certificate into a file, example my-api.crt).

> Create <truststore>.p12 (PKCS12 format) + the endpoint certificate.
$ /usr/local/Cellar/openjdk@11/11.0.9/bin/keytool -keystore ./my-httpsink-truststore.p12 -storepass my-p12-password -noprompt -alias my-api -import -file ./my-api.crt -storetype PKCS12
 
> Create a generic secret in OpenShift.
$ oc create secret generic my-httpsink-truststore --from-file=my-httpsink-truststore.p12=./my-httpsink-truststore.p12 --type=Opaque -n cp4i-eventstreams
 
> Update the KafkaConnect CR to mount the secret via externalConfiguration and use the p12 in the jvmOptions.
spec:
  externalConfiguration:
    volumes:
      - name: connector-cert
        secret:
          secretName: my-api-cert
  jvmOptions:
    javaSystemProperties:
      - name: javax.net.ssl.trustStore
        value: /opt/kafka/external-configuration/connector-cert/my-httpsink-truststore.p12
      - name: javax.net.ssl.trustStorePassword
        value: my-p12-passsword
      - name: javax.net.ssl.trustStoreType
        value: PKCS12

​4. Debezium MsSQL Connector

Download Debezium MsSQL connector from here https://debezium.io/documentation/reference/stable/connectors/mysql.html
Once downloaded, unzip and copy the jar files into my-plugins (see the connector config section in the top).

This proof-of-concept will use a MSSQL docker container version. See below for installing, starting sql agent, enabling change-data-capture on customers table.

MSSQL Server Docker Installation
$ docker pull mcr.microsoft.com/mssql/server
$ docker run -e "ACCEPT_EULA=Y" -e "SA_PASSWORD=xxxx@xxx" -e "MSSQL_PID=Enterprise" -p 1433:1433 -d mcr.microsoft.com/mssql/server
 

-- Docker Shell
$ docker exec -it --user root cf2beb2c4038 bash

— Starting the SQL Agent
$ /opt/mssql/bin/mssql-conf set sqlagent.enabled true

-- Exit Docker shell

-- Stop docker process
$ docker ps 
$ docker stop <id>

-- Start docker process
$ docker start <id>
$ docker exec -it --user root cf2beb2c4038 bash

 -- MSSQL command line 
$ /opt/mssql-tools/bin/sqlcmd -U sa -P xxx@xxxx
1> SELECT s.name AS Schema_Name, tb.name AS Table_Name, tb.object_id, tb.type, tb.type_desc, tb.is_tracked_by_cdc FROM sys.tables tb INNER JOIN sys.schemas s on s.schema_id = tb.schema_id WHERE tb.is_tracked_by_cdc = 1
2> go

1> SELECT * FROM sys.change_tracking_databases  WHERE database_id=DB_ID('HRDB')
2> go

 — Check SQL Agent Status
SELECT dss.[status], dss.[status_desc]
FROM   sys.dm_server_services dss
WHERE  dss.[servicename] LIKE N'SQL Server Agent (%';
4> go

-- CREATE HRDB database
1> create database HRDB
2> Go
1> use HRDB
2> go

 -- Make sure the database is changed to HRDB
1> select DB_NAME()
2> GO

 -- Enable Stored Procedures
1> EXEC sys.sp_cdc_enable_db
2> go

 — Create event capture job
1> EXEC sys.sp_cdc_add_job @job_type = N'capture'; 
2> go
Job 'cdc.HRDB_capture' started successfully.

 — Create cleanup job
1> EXEC sys.sp_cdc_add_job  @job_type = N'cleanup' ,@start_job = 0 ,@retention = 5760;
2> Go

 -- Create table customers
1>CREATE TABLE customers (id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,  first_name VARCHAR(255) NOT NULL,  last_name VARCHAR(255) NOT NULL,  email VARCHAR(255, phone_number VARCHAR(32)) NOT NULL UNIQUE);
2> go

 — Create change data capture (CDC) on the customers table
1> EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name= N'customers', @supports_net_changes = 1, @role_name=NULL
2> GO

 -- Insert sample records into the customers table
1> INSERT INTO customers(first_name,last_name,email,phone_number) VALUES ('John','Doe','jdoe@example.com', '+1-444-444-4444')
2> INSERT INTO customers(first_name,last_name,email,phone_number) VALUES ('Frank','Smith','fsmith@example.com', '+1-555-555-5555')
2> Go

-- Exit from the Docker shell

-- Create debezium SQL Connector in Event Streams
$ cat mssql-dbo-customers.yaml
apiVersion: eventstreams.ibm.com/v1alpha1
kind: KafkaConnector
metadata:
 labels:
   eventstreams.ibm.com/cluster: my-connect-cluster
 name: mssql-dbo-customers
 namespace: cp4i-es2
spec:
 class: io.debezium.connector.sqlserver.SqlServerConnector
 config:
   database.dbname: "HRDB"
   database.server.name: "HRDB"
   table.whitelist: "dbo.customers"
   database.hostname: "<MSSQL-HOSTNAME/IP>"
   database.user: "sa"
   database.password: "xxxxxxxxxx"
   database.port: '1433'
   connector.class: "io.debezium.connector.sqlserver.SqlServerConnector"
   database.history.kafka.topic: "MSSQL.DBO.CUSTOMERS"
   database.history.kafka.bootstrap.servers: "es-dev-kafka-bootstrap.cp4i-es2.svc:9092"
 pause: false
 tasksMax: 1

$ oc apply -f mssql-dbo-customers.yaml

Make sure the Connector is created in Event Streams, and started successfully. Check Event Streams console, and see dbo.customers topic is created, and receiving events as you insert records into the customers table.


References
IBM Cloud Pak for Integration documentation: https://www.ibm.com/docs/en/cloud-paks/cp-integration/2021.3
IBM Event Streams documentation: https://ibm.github.io/event-streams/about/overview

Kafka Connect and Connectors: https://ibm.github.io/event-streams/connecting/connectors/
Connecting to IBM MQ: https://ibm.github.io/event-streams/connecting/mq/
Running the MQ sink connector: https://ibm.github.io/event-streams/connecting/mq/sink
Running the MQ Source Connector: https://ibm.github.io/event-streams/connecting/mq/source
MQ Message Manager Tool: https://community.ibm.com/community/user/integration/viewdocument/mq-message-manager-utility?CommunityKey=77544459-9fda-40da-ae0b-fc8c76f0ce18&tab=librarydocuments

Statistics
0 Favorited
84 Views
1 Files
0 Shares
11 Downloads
Attachment(s)
zip file
ACE-INVOICE-API-Project-Interchange.zip   6 KB   1 version
Uploaded - Sat October 30, 2021