From IBM MQ 9.4.3, you get access to an IBM provided, and supported Kafka Connect runtime, which can be used for running the IBM MQ source and sink Kafka connectors which are already provided and supported by IBM. This means you no longer need to obtain separate support for Kafka Connect, simplifying the deployment of the connectors.
You can install the IBM provided/supported Kafka Connect runtime in the same machine running MQ queue manager so that messages can moved between queues and Kafka topics. This scenario is especially useful when you do not want to provision and operate another a of servers for Kafka Connect runtime. You can run Kafka Connect runtime in standalone mode. In addition, this deployment allows Kafka Connect runtime to connect to the queue manager in bindings mode. However, because we are running two connectors (source and sink) in the same Kafka Connect process, we will use client mode.
This blog provide a guide on how to setup Kafka Connect runtime with MQ source and sink connectors connecting a local queue manager with IBM Event Streams. The diagram shows this deployment. The helper and scripts files are provide in this Github repository.

Setup IBM Event Streams
- Assume IBM Event Streams is installed with a cluster name "es-min-prod" in the namespace "es".
- Create KafkaUser called "kafka-connect".
oc apply -n es -f kafka-connect-user.yaml
- Create source and target Kafka topics called FROM.MQ.TOPIC and TO.MQ.TOPIC.
oc apply -n es -f kafka-topic-tomq.yaml
oc apply -n es -f kafka-topic-frommq.yaml
- Get connection information of IBM Event Streams using this helper script. This script generate configuration information, credentials and trust store file called es-cert.p12.
export NAMESPACE=es
export CLUSTER_NAME=es-min-prod
./get-eventstreams-info.sh
Setup IBM MQ
- Create source and target queues called TO.KAFKA.Q and FROM.KAFKA.Q, with appropriate authorization.
- Create state queues for source and sink connectors MQSOURCE.STATE.Q and MQSINK.STATE.Q, with appropriate authorization.
- A sample MQSC is provided here.
Install Kafka Connect runtime
- Download the IBM provided MQ Kafka Connector from IBM Passport Advantage (V.M.R.F-IBM-MQ-Kafka-Connectors.tar.gz).
- Extract the file, you still the following folders
tar xvf 9.4.3.1-IBM-MQ-Kafka-Connectors.tar.gz
cd kafka-connect
tree -L 1
.
├── kafka-connect
├── licenses
├── sink
├── source
└── xml-converter
- The "kafka-connect" contains IBM-provided Kafka Connect runtime. Copy the folder into /opt folder.
- Copy the trust store file "es-cert.p12" into folder "/opt/kafka-connect/connectors".
- The "sink" folder contains MQ Kafka Sink connector, and the "source" folder containers MQ Kafka Source connector.
- Create folders for the connectors, and copy the JAR and configuration files into the folders.
mkdir -p /opt/kafka-connect/connectors/mq-sink /opt/kafka-connect/connectors/mq-source
cp -p sink/kafka-connect-mq-sink-2.2.1.jar /opt/kafka-connect/connectors/mq-sink
cp -p sink/mq-sink.properties /opt/kafka-connect/connectors/mq-sink
cp -p source/kafka-connect-mq-source-2.3.0.jar /opt/kafka-connect/connectors/mq-source
cp -p sink/mq-source.properties /opt/kafka-connect/connectors/mq-source
Configure Kafka Connect runtime (standalone)
Configure the file /opt/kafka-connect/connect-standalone.properties (sample)
- Kafka Bootstrap endpoint
bootstrap.servers=<<BOOTSTRAP_ENDPOINT>>
- Kafka TLS
ssl.protocol=TLSv1.3
ssl.truststore.location=/opt/kafka-connect/connectors/es-cert.p12
ssl.truststore.type=PKCS12
ssl.truststore.password=<<TRUSTSTORE_PASSWORD>>
producer.ssl.protocol=TLSv1.3
producer.ssl.truststore.location=/opt/kafka-connect/connectors/es-cert.p12
producer.ssl.truststore.type=PKCS12
producer.ssl.truststore.password=<<TRUSTSTORE_PASSWORD>>
consumer.ssl.protocol=TLSv1.3
consumer.ssl.truststore.location=/opt/kafka-connect/connectors/es-cert.p12
consumer.ssl.truststore.type=PKCS12
consumer.ssl.truststore.password=<<TRUSTSTORE_PASSWORD>>
- Kafka SCRAM authentication
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-connect" password="<<KAFKAUSER_PASSWORD>>";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=SCRAM-SHA-512
producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-connect" password="<<KAFKAUSER_PASSWORD>>";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=SCRAM-SHA-512
consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-connect" password="<<KAFKAUSER_PASSWORD>>";
- Storage
group.id=myconnect
offset.storage.topic=myconnect-offsets-standalone
config.storage.topic=myconnect-configs-standalone
status.storage.topic=myconnect-status-standalone
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
topic.creation.enable=true
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
- Offset
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
- Exactly once source support
exactly.once.source.support=enabled
- Converters and schema disabled
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
- Plugins
plugin.path=/opt/kafka-connect/connectors/mq-source,/opt/kafka-connect/connectors/mq-sink
Configure MQ Kafka Source Connector
Configure the file /opt/kafka-connect/connectors/mq-source/mq-source.properties (sample)
- Name and classname
name=mq-source
connector.class=com.ibm.eventstreams.connect.mqsource.MQSourceConnector
tasks.max=1
- Target topic
topic=FROM.MQ.TOPIC
- MQ connection and source queue, state queue
mq.queue.manager=QUEUE.MANAGER.1
mq.connection.mode=client
mq.connection.name.list=<<hostname(port)>>
mq.client.reconnect.options: QMGR
mq.channel.name=CHANNEL1
mq.queue=TO.KAFKA.Q
mq.exactly.once.state.queue=MQSOURCE.STATE.Q
mq.user.name=<<username>>
mq.password=<<password>>
- (Optional) MQ TLS (not used in this setup)
mq.ssl.cipher.suite=
mq.ssl.peer.name=
mq.ssl.keystore.location=
mq.ssl.keystore.password=
mq.ssl.truststore.location=
mq.ssl.truststore.password=
- MQ record builder and JMS header
mq.record.builder=com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder
mq.message.body.jms=true
- Copy JMS properties to Kafka headers (set to true if needed)
mq.jms.properties.copy.to.kafka.headers=false
- Converters and schema disabled
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
Configure MQ Kafka Sink Connector
Configure the file /opt/kafka-connect/connectors/mq-sink/mq-sink.properties (sample)
- Name and classname
name=mq-sink
connector.class=com.ibm.eventstreams.connect.mqsink.MQSinkConnector
tasks.max=1
- Source topic
topic=TO.MQ.TOPIC
- MQ connection and target queue, state queue
mq.queue.manager=QUEUE.MANAGER.1
mq.connection.mode=client
mq.connection.name.list=<<hostname(port)>>
mq.client.reconnect.options: QMGR
mq.channel.name=CHANNEL1
mq.queue=FROM.KAFKA.Q
mq.exactly.once.state.queue=MQSINK.STATE.Q
mq.user.name=<<username>>
mq.password=<<password>>
- (Optional) MQ TLS (not used in this setup)
mq.ssl.cipher.suite=
mq.ssl.peer.name=
mq.ssl.keystore.location=
mq.ssl.keystore.password=
mq.ssl.truststore.location=
mq.ssl.truststore.password=
- Message behavior - persistence (default is true), and time to live (default is 0 - unlimited)
mq.persistent=true
mq.time.to.live=0
- MQ record builder and JMS header
mq.record.builder=com.ibm.eventstreams.connect.mqsink.builders.JsonRecordBuilder
mq.message.body.jms=false
- Converters and schema disabled
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
Run Kafka Connect
- Run Kafka Connect with both MQ Kafka source and sink connector configured.
cd /opt/kafka-connect
bin/connect-standalone.sh config/connect-standalone.properties connectors/mq-sink/mq-sink.properties connectors/mq-source/mq-source.properties
Test MQ Kafka source connector
- Start Kafka console consumer. Instructions on how to setup Apache Kafka console tools is found in here.
bin/kafka-console-consumer.sh --consumer.config consumer.properties --topic FROM.MQ.TOPIC --bootstrap-server es-min-prod-kafka-bootstrap-es.apps.{domain}:443 --from-beginning
- Use IBM MQ sample to put message to queue
. ./set-mq-inst1
./put-order.sh 5
Test MQ Kafka sink connector
- Use IBM MQ sample to get messages from queue.
. ./set-mq-inst1
./get-order.sh
- Start Kafka console producer. Instructions on how to setup Apache Kafka console tools is found in here.
bin/kafka-console-producer.sh --producer.config producer.properties --topic TO.MQ.TOPIC --bootstrap-server es-min-prod-kafka-bootstrap-es.apps.{domain}:443