IBM Event Streams and IBM Event Automation

IBM Event Streams and IBM Event Automation

Join this online group to communicate across IBM product users and experts by sharing advice and best practices with peers and staying up to date regarding product enhancements.

 View Only

Getting started with IBM provided Kafka Connect for MQ Advanced

By Kok Sing Khong posted Sun September 21, 2025 08:46 PM

  

From IBM MQ 9.4.3, you get access to an IBM provided, and supported Kafka Connect runtime, which can be used for running the IBM MQ source and sink Kafka connectors which are already provided and supported by IBM. This means you no longer need to obtain separate support for Kafka Connect, simplifying the deployment of the connectors. 

You can install the IBM provided/supported Kafka Connect runtime in the same machine running MQ queue manager so that messages can moved between queues and Kafka topics. This scenario is especially useful when you do not want to provision and operate another a of servers for Kafka Connect runtime. You can run Kafka Connect runtime in standalone mode. In addition, this deployment allows Kafka Connect runtime to connect to the queue manager in bindings mode. However, because we are running two connectors (source and sink) in the same Kafka Connect process, we will use client mode.

This blog provide a guide on how to setup Kafka Connect runtime with MQ source and sink connectors connecting a local queue manager with IBM Event Streams. The diagram shows this deployment. The helper and scripts files are provide in this Github repository

Setup IBM Event Streams

  1. Assume IBM Event Streams is installed with a cluster name "es-min-prod" in the namespace "es".
  2. Create KafkaUser called "kafka-connect".
    oc apply -n es -f kafka-connect-user.yaml
  3. Create source and target Kafka topics called FROM.MQ.TOPIC and TO.MQ.TOPIC.
    oc apply -n es -f kafka-topic-tomq.yaml
    oc apply -n es -f kafka-topic-frommq.yaml
  4. Get connection information of IBM Event Streams using this helper script. This script generate configuration information, credentials and trust store file called es-cert.p12.
    export NAMESPACE=es
    export CLUSTER_NAME=es-min-prod
    ./get-eventstreams-info.sh

Setup IBM MQ

  1. Create source and target queues called TO.KAFKA.Q and FROM.KAFKA.Q, with appropriate authorization.
  2. Create state queues for source and sink connectors MQSOURCE.STATE.Q and MQSINK.STATE.Q, with appropriate authorization.
  3. A sample MQSC is provided here.

Install Kafka Connect runtime

  1. Download the IBM provided MQ Kafka Connector from IBM Passport Advantage (V.M.R.F-IBM-MQ-Kafka-Connectors.tar.gz).
  2. Extract the file, you still the following folders
    tar xvf 9.4.3.1-IBM-MQ-Kafka-Connectors.tar.gz
    cd kafka-connect
    tree -L 1
    .
    ├── kafka-connect
    ├── licenses
    ├── sink
    ├── source
    └── xml-converter
  3. The "kafka-connect" contains IBM-provided Kafka Connect runtime. Copy the folder into /opt folder.
  4. Copy the trust store file "es-cert.p12" into folder "/opt/kafka-connect/connectors".
  5. The "sink" folder contains MQ Kafka Sink connector, and the "source" folder containers MQ Kafka Source connector.
  6. Create folders for the connectors, and copy the JAR and configuration files into the folders.
    mkdir -p /opt/kafka-connect/connectors/mq-sink /opt/kafka-connect/connectors/mq-source
    cp -p sink/kafka-connect-mq-sink-2.2.1.jar /opt/kafka-connect/connectors/mq-sink
    cp -p sink/mq-sink.properties /opt/kafka-connect/connectors/mq-sink
    cp -p source/kafka-connect-mq-source-2.3.0.jar /opt/kafka-connect/connectors/mq-source
    cp -p sink/mq-source.properties /opt/kafka-connect/connectors/mq-source

Configure Kafka Connect runtime (standalone)

Configure the file /opt/kafka-connect/connect-standalone.properties (sample)

  1. Kafka Bootstrap endpoint
    bootstrap.servers=<<BOOTSTRAP_ENDPOINT>>
  2. Kafka TLS
    ssl.protocol=TLSv1.3
    ssl.truststore.location=/opt/kafka-connect/connectors/es-cert.p12
    ssl.truststore.type=PKCS12
    ssl.truststore.password=<<TRUSTSTORE_PASSWORD>> producer.ssl.protocol=TLSv1.3
    producer.ssl.truststore.location=/opt/kafka-connect/connectors/es-cert.p12
    producer.ssl.truststore.type=PKCS12
    producer.ssl.truststore.password=<<TRUSTSTORE_PASSWORD>>
    consumer.ssl.protocol=TLSv1.3
    consumer.ssl.truststore.location=/opt/kafka-connect/connectors/es-cert.p12
    consumer.ssl.truststore.type=PKCS12
    consumer.ssl.truststore.password=<<TRUSTSTORE_PASSWORD>>
  3. Kafka SCRAM authentication
    security.protocol=SASL_SSL
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-connect" password="<<KAFKAUSER_PASSWORD>>";
    producer.security.protocol=SASL_SSL
    producer.sasl.mechanism=SCRAM-SHA-512
    producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-connect" password="<<KAFKAUSER_PASSWORD>>";
    consumer.security.protocol=SASL_SSL
    consumer.sasl.mechanism=SCRAM-SHA-512
    consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-connect" password="<<KAFKAUSER_PASSWORD>>";
  4. Storage
    group.id=myconnect
    offset.storage.topic=myconnect-offsets-standalone
    config.storage.topic=myconnect-configs-standalone
    status.storage.topic=myconnect-status-standalone
    offset.storage.replication.factor=1
    status.storage.replication.factor=1
    config.storage.replication.factor=1
    topic.creation.enable=true
    topic.creation.default.partitions=2
    topic.creation.default.replication.factor=1
  5. Offset
    offset.storage.file.filename=/tmp/connect.offsets
    offset.flush.interval.ms=10000
  6. Exactly once source support
    exactly.once.source.support=enabled
  7. Converters and schema disabled
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
  8. Plugins
    plugin.path=/opt/kafka-connect/connectors/mq-source,/opt/kafka-connect/connectors/mq-sink

Configure MQ Kafka Source Connector

Configure the file /opt/kafka-connect/connectors/mq-source/mq-source.properties (sample)

  1. Name and classname
    name=mq-source
    connector.class=com.ibm.eventstreams.connect.mqsource.MQSourceConnector
    tasks.max=1
  2. Target topic
    topic=FROM.MQ.TOPIC
  3. MQ connection and source queue, state queue
    mq.queue.manager=QUEUE.MANAGER.1
    mq.connection.mode=client
    mq.connection.name.list=<<hostname(port)>>
    mq.client.reconnect.options: QMGR
    mq.channel.name=CHANNEL1
    mq.queue=TO.KAFKA.Q
    mq.exactly.once.state.queue=MQSOURCE.STATE.Q
    mq.user.name=<<username>>
    mq.password=<<password>>
  4. (Optional) MQ TLS (not used in this setup)
    mq.ssl.cipher.suite=
    mq.ssl.peer.name=
    mq.ssl.keystore.location=
    mq.ssl.keystore.password=
    mq.ssl.truststore.location=
    mq.ssl.truststore.password=
  5. MQ record builder and JMS header
    mq.record.builder=com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder
    mq.message.body.jms=true
  6. Copy JMS properties to Kafka headers (set to true if needed)
    mq.jms.properties.copy.to.kafka.headers=false
  7. Converters and schema disabled
    key.converter.schemas.enable=false
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter.schemas.enable=false
    value.converter=org.apache.kafka.connect.json.JsonConverter

Configure MQ Kafka Sink Connector

Configure the file /opt/kafka-connect/connectors/mq-sink/mq-sink.properties (sample)

  1. Name and classname
    name=mq-sink
    connector.class=com.ibm.eventstreams.connect.mqsink.MQSinkConnector
    tasks.max=1
  2. Source topic
    topic=TO.MQ.TOPIC
  3. MQ connection and target queue, state queue
    mq.queue.manager=QUEUE.MANAGER.1
    mq.connection.mode=client
    mq.connection.name.list=<<hostname(port)>>
    mq.client.reconnect.options: QMGR
    mq.channel.name=CHANNEL1
    mq.queue=FROM.KAFKA.Q
    mq.exactly.once.state.queue=MQSINK.STATE.Q
    mq.user.name=<<username>>
    mq.password=<<password>>
  4. (Optional) MQ TLS (not used in this setup)
    mq.ssl.cipher.suite=
    mq.ssl.peer.name=
    mq.ssl.keystore.location=
    mq.ssl.keystore.password=
    mq.ssl.truststore.location=
    mq.ssl.truststore.password=
  5. Message behavior - persistence (default is true), and time to live (default is 0 - unlimited)
    mq.persistent=true
    mq.time.to.live=0
  6. MQ record builder and JMS header
    mq.record.builder=com.ibm.eventstreams.connect.mqsink.builders.JsonRecordBuilder
    mq.message.body.jms=false
  7. Converters and schema disabled
    key.converter.schemas.enable=false
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter.schemas.enable=false
    value.converter=org.apache.kafka.connect.json.JsonConverter

Run Kafka Connect

  1. Run Kafka Connect with both MQ Kafka source and sink connector configured.
    cd /opt/kafka-connect
    bin/connect-standalone.sh config/connect-standalone.properties connectors/mq-sink/mq-sink.properties connectors/mq-source/mq-source.properties

Test MQ Kafka source connector

  1. Start Kafka console consumer. Instructions on how to setup Apache Kafka console tools is found in here.
    bin/kafka-console-consumer.sh --consumer.config consumer.properties --topic FROM.MQ.TOPIC --bootstrap-server es-min-prod-kafka-bootstrap-es.apps.{domain}:443 --from-beginning
    
  2. Use IBM MQ sample to put message to queue
     . ./set-mq-inst1
    ./put-order.sh 5 

Test MQ Kafka sink connector

  1. Use IBM MQ sample to get messages from queue.
    . ./set-mq-inst1
    ./get-order.sh
  2. Start Kafka console producer. Instructions on how to setup Apache Kafka console tools is found in here.
    bin/kafka-console-producer.sh --producer.config producer.properties --topic TO.MQ.TOPIC --bootstrap-server es-min-prod-kafka-bootstrap-es.apps.{domain}:443
0 comments
25 views

Permalink