Scenario
You have an IBM MQ queue manager. An application is putting messages to a command queue. Another application gets these messages from the queue and takes actions in response.

Objective
You want multiple separate audit applications to be able to review the commands that go through the command queue.
They should be able to replay a history of these command messages as many times as they want.
This must not impact the application that is currently getting the messages from the queue.

Solution
You can use Streaming Queues to make a duplicate of every message put to the command queue to a separate copy queue.
This copy queue can be used to feed a connector that can produce every message to a Kafka topic. This Kafka topic can be used by audit applications

Details
The final solution works like this:

- A JMS application called
Putter
puts messages onto an IBM MQ queue called COMMANDS
- For the purposes of this demo, a development-instance of LDAP is used to authenticate access to IBM MQ
- A JMS application called
Getter
gets messages from the COMMANDS
queue
- Copies of every message put to the COMMANDS queue will be made to the
COMMANDS.COPY
queue
- A Connector will get every message from the COMMANDS.COPY queue
- The Connector transforms each JMS message into a string, and produces it to the
MQ.COMMANDS
Kafka topic
- A Java application called
Audit
can replay the history of all messages on the Kafka topic
Instructions
Pre-requisites
If you want to create the entire end-to-end demo solution as described above from scratch, you can simply follow the instructions below.
You will need:
- a Red Hat OpenShift cluster
- on your local machine:
The instructions below are broken into sections, so if you instead want to use this tutorial as the basis for adding to an existing scenario (e.g. you already have an IBM MQ queue and want to add Kafka) you can just choose the relative sections to follow.
Setup IBM MQ
Setup LDAP
Install an development LDAP server that will manage user identities for the IBM MQ queue manager.
./01-install-ldap/setup.sh

Two user identities are defined in config.yaml:
mquser
that will be used by our JMS MQ applications
dn: uid=mquser,ou=people,dc=ibm,dc=com
objectClass: inetOrgPerson
objectClass: organizationalPerson
objectClass: person
objectClass: top
cn: mquserCN
sn: mquserSN
uid: mquser
userPassword: mquserpassword
kafkauser
that will be used later by the Kafka Connector
dn: uid=kafkauser,ou=people,dc=ibm,dc=com
objectClass: inetOrgPerson
objectClass: organizationalPerson
objectClass: person
objectClass: top
cn: kafkauserCN
sn: kafkauserSN
uid: kafkauser
userPassword: kafkapassword
Generate certificates
Generate a new self-signed CA, and use it to create certificates for the MQ queue manager, the JMS MQ client apps, and the Kafka Connector.
./02-generate-certs/generate.sh
The JMS client and Kafka Connector certificates are added to the queue manager's keystore so that it will accept connections from them.
Ths certificate generation is all done in a Docker container to reduce the number of dependencies you need locally.
Running this will create:
streamingdemo-ca
files - with the CA used to sign all certs in the tutorial demo
streamingdemo-jms-client
files - used by the JMS applications
streamingdemo-kafka-client
files - used by the Kafka Connect connector for connecting to IBM MQ
streamingdemo-mq-server
files - used by the IBM MQ queue manager
Install the MQ Operator
Install the MQ Operator that will manage your IBM MQ queue manager.
./03-install-mq-operator/setup.sh
The tutorial script uses a specific channel for the MQ Operator that is supported on OpenShift 4.8 and earlier. You may want to do this step manually if you want to choose a different version.
Create the queue manager
Create and configure the queue manager to prepare it for using with the JMS applications
export IBM_ENTITLEMENT_KEY=yourentitlementkey
./04-create-queue-manager/setup.sh

BEFORE running this, you will need to set an environment variable with an Entitled Registry key from myibm.ibm.com/products-services/containerlibrary
The queue manager config is contained in the qmgr-setup ConfigMap which:
Creates the COMMANDS
queue that the JMS applications will use
DEFINE QLOCAL(COMMANDS) REPLACE
Creates the APP.SVRCONN
channel that the JMS applications will use
DEFINE CHANNEL(APP.SVRCONN) CHLTYPE(SVRCONN) TRPTYPE(TCP) SSLCAUTH(OPTIONAL) SSLCIPH('ANY_TLS12_OR_HIGHER') REPLACE
Sets up security so that the JMS applications have to use the mquser
username / password defined in LDAP previously
SET CHLAUTH(APP.SVRCONN) TYPE(BLOCKUSER) USERLIST(*MQUSER) WARN(YES) ACTION(REPLACE)
REFRESH SECURITY
SET AUTHREC OBJTYPE(QMGR) GROUP('mqusers') AUTHADD(ALL)
SET AUTHREC OBJTYPE(QUEUE) PROFILE('**') GROUP('mqusers') AUTHADD(ALL)
SET AUTHREC OBJTYPE(CHANNEL) PROFILE('**') GROUP('mqusers') AUTHADD(ALL)
JMS applications
Setup the JMS apps
Compile the JMS Putter and Getter applications
./05-jms-apps/build-apps.sh

The config for the apps is contained in Config.java.
The HOST
constant is modified by the build-apps.sh
script to match the hostname for your queue manager.
public static final String HOST = "PLACEHOLDERHOSTNAME";
public static final String QMGRNAME = "MYQMGR";
public static final String CHANNEL = "APP.SVRCONN";
public static final String QUEUE = "COMMANDS";
public static final String CIPHER = "ECDHE_RSA_AES_128_CBC_SHA256";
The truststore and keystore values set in Java system properties are the files created previously.
System.setProperty("javax.net.ssl.trustStore", "../02-generate-certs/certs/streamingdemo-ca.jks" );
System.setProperty("javax.net.ssl.keyStore", "../02-generate-certs/certs/streamingdemo-jms-client.jks" );
System.setProperty("javax.net.ssl.keyStorePassword", "passw0rd" );
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", "false");
Verify the initial scenario
Send messages to the COMMANDS queue
./05-jms-apps/run-putter.sh

The Putter application puts five messages onto the COMMANDS queue and then terminates.
sendMessage("Perform the first action A", session, producer);
sendMessage("Perform a second action B", session, producer);
sendMessage("Perform the third action C", session, producer);
sendMessage("Perform some penultimate fourth action D", session, producer);
sendMessage("Perform a fifth and final action E", session, producer);
Verify the messages from the MQ web console
./05-jms-apps/access-mq-console.sh

Click through to the COMMANDS
queue, and you should see the five messages that the Putter
app put.
Get messages from the COMMANDS queue
./05-jms-apps/run-getter.sh
This will get messages from the COMMANDS
queue, and write them to the console.
----------------------------------------
Getting messages from the COMMANDS queue
----------------------------------------
JMSMessage class: jms_text
JMSType: null
JMSDeliveryMode: 2
JMSDeliveryDelay: 0
JMSDeliveryTime: 0
JMSExpiration: 0
JMSPriority: 4
JMSMessageID: ID:414d51204d59514d47522020202020207abdc96201310040
JMSTimestamp: 1657388750326
JMSCorrelationID: null
JMSDestination: queue:///COMMANDS
JMSReplyTo: null
JMSRedelivered: false
JMSXAppID: mq.samples.Putter
JMSXDeliveryCount: 1
JMSXUserID: mquser
JMS_IBM_Character_Set: UTF-8
JMS_IBM_Encoding: 273
JMS_IBM_Format: MQSTR
JMS_IBM_MsgType: 8
JMS_IBM_PutApplType: 28
JMS_IBM_PutDate: 20220709
JMS_IBM_PutTime: 17455036
Perform the first action A
JMSMessage class: jms_text
JMSType: null
JMSDeliveryMode: 2
JMSDeliveryDelay: 0
JMSDeliveryTime: 0
JMSExpiration: 0
JMSPriority: 4
JMSMessageID: ID:414d51204d59514d47522020202020207abdc96202310040
JMSTimestamp: 1657388750351
JMSCorrelationID: null
JMSDestination: queue:///COMMANDS
JMSReplyTo: null
JMSRedelivered: false
JMSXAppID: mq.samples.Putter
JMSXDeliveryCount: 1
JMSXUserID: mquser
JMS_IBM_Character_Set: UTF-8
JMS_IBM_Encoding: 273
JMS_IBM_Format: MQSTR
JMS_IBM_MsgType: 8
JMS_IBM_PutApplType: 28
JMS_IBM_PutDate: 20220709
JMS_IBM_PutTime: 17455038
Perform a second action B
JMSMessage class: jms_text
JMSType: null
JMSDeliveryMode: 2
JMSDeliveryDelay: 0
JMSDeliveryTime: 0
JMSExpiration: 0
JMSPriority: 4
JMSMessageID: ID:414d51204d59514d47522020202020207abdc96203310040
JMSTimestamp: 1657388750364
JMSCorrelationID: null
JMSDestination: queue:///COMMANDS
JMSReplyTo: null
JMSRedelivered: false
JMSXAppID: mq.samples.Putter
JMSXDeliveryCount: 1
JMSXUserID: mquser
JMS_IBM_Character_Set: UTF-8
JMS_IBM_Encoding: 273
JMS_IBM_Format: MQSTR
JMS_IBM_MsgType: 8
JMS_IBM_PutApplType: 28
JMS_IBM_PutDate: 20220709
JMS_IBM_PutTime: 17455039
Perform the third action C
JMSMessage class: jms_text
JMSType: null
JMSDeliveryMode: 2
JMSDeliveryDelay: 0
JMSDeliveryTime: 0
JMSExpiration: 0
JMSPriority: 4
JMSMessageID: ID:414d51204d59514d47522020202020207abdc96204310040
JMSTimestamp: 1657388750380
JMSCorrelationID: null
JMSDestination: queue:///COMMANDS
JMSReplyTo: null
JMSRedelivered: false
JMSXAppID: mq.samples.Putter
JMSXDeliveryCount: 1
JMSXUserID: mquser
JMS_IBM_Character_Set: UTF-8
JMS_IBM_Encoding: 273
JMS_IBM_Format: MQSTR
JMS_IBM_MsgType: 8
JMS_IBM_PutApplType: 28
JMS_IBM_PutDate: 20220709
JMS_IBM_PutTime: 17455040
Perform some penultimate fourth action D
JMSMessage class: jms_text
JMSType: null
JMSDeliveryMode: 2
JMSDeliveryDelay: 0
JMSDeliveryTime: 0
JMSExpiration: 0
JMSPriority: 4
JMSMessageID: ID:414d51204d59514d47522020202020207abdc96205310040
JMSTimestamp: 1657388750392
JMSCorrelationID: null
JMSDestination: queue:///COMMANDS
JMSReplyTo: null
JMSRedelivered: false
JMSXAppID: mq.samples.Putter
JMSXDeliveryCount: 1
JMSXUserID: mquser
JMS_IBM_Character_Set: UTF-8
JMS_IBM_Encoding: 273
JMS_IBM_Format: MQSTR
JMS_IBM_MsgType: 8
JMS_IBM_PutApplType: 28
JMS_IBM_PutDate: 20220709
JMS_IBM_PutTime: 17455042
Perform a fifth and final action E
Verify again from the MQ web console
./05-jms-apps/access-mq-console.sh

You should see that the COMMANDS
queue is empty, as the Getter
app has retrieved the messages.
Setup IBM Event Streams
Install the Event Streams Operator
Install the Event Streams Operator that will manage your Kafka cluster and Connector.
./06-install-eventstreams-operator/setup.sh
The tutorial script uses a specific channel for the Event Streams Operator. You may want to do this step manually if you want to choose a different version.
Create the Kafka cluster
Create and configure the Kafka cluster that will host the audit topic
export IBM_ENTITLEMENT_KEY=yourentitlementkey
./07-create-kafka-cluster/setup.sh

BEFORE running this, you will need to set an environment variable with an Entitled Registry key from myibm.ibm.com/products-services/containerlibrary
The Kafka cluster config is contained in the EventStreams spec:
Creates an internal bootstrap address that will be used by the Connector
- authentication:
type: scram-sha-512
name: internal
port: 9093
tls: true
type: internal
Creates an external bootstrap address that will be used by the Audit
app you run from your local machine.
- authentication:
type: scram-sha-512
name: external
port: 9094
tls: true
type: external
Setup the MQ/Kafka Connector
Setup the Kafka Connector
Start a Kafka Connector that will get a copy of messages from the COMMANDS MQ queue, and produce them to the MQ.COMMANDS Kafka topic
./08-setup-kafka-connect/setup.sh

This starts by using the modify-qmgr ConfigMap, adding a third 3-modifications.mqsc
entry to modify the MQ queue manager.
It creates a new COMMANDS.COPY
queue for the Connector to use, and alters the COMMANDS queue used by the JMS apps, so that the queue manager streams a copy of all messages to it.
DEFINE QLOCAL(COMMANDS.COPY) REPLACE
ALTER QLOCAL(COMMANDS) STREAMQ('COMMANDS.COPY') STRMQOS(MUSTDUP)
It sets up a KAFKA.SVRCONN
channel for the Connector to use to connect to the queue manager.
DEFINE CHANNEL(KAFKA.SVRCONN) CHLTYPE(SVRCONN) TRPTYPE(TCP) SSLCAUTH(OPTIONAL) SSLCIPH('ANY_TLS12_OR_HIGHER') REPLACE
It updates the queue manager security so that the Connector can access the queue using the kafkauser
username / password defined in LDAP previously
SET CHLAUTH(KAFKA.SVRCONN) TYPE(BLOCKUSER) USERLIST(*KAFKAUSER) WARN(YES) ACTION(REPLACE)
REFRESH SECURITY
SET AUTHREC OBJTYPE(QMGR) GROUP('kafkausers') AUTHADD(CONNECT, INQ)
SET AUTHREC OBJTYPE(QUEUE) PROFILE(COMMANDS.COPY) GROUP('kafkausers') AUTHADD(ALLMQI)

It creates a Kafka topic that the Connector can produce to. The topic is configured to retain a copy of all commands that have been sent for the last 365 days.
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaTopic
metadata:
labels:
eventstreams.ibm.com/cluster: eventstreams
name: commands.topic
namespace: kafka
spec:
config:
min.insync.replicas: '1'
retention.ms: '31536000000'
partitions: 1
replicas: 3
topicName: MQ.COMMANDS
It generates credentials that the Connector can use to connect to Kafka. These will be used to produce to the MQ.COMMANDS
topic, as well as to create and use topics that Kafka Connect uses to store state.
apiVersion: eventstreams.ibm.com/v1beta1
kind: KafkaUser
metadata:
name: kafka-connect-credentials
namespace: kafka
labels:
eventstreams.ibm.com/cluster: eventstreams
spec:
authentication:
# generate username/password for this user
type: scram-sha-512
authorization:
acls:
# ---------------------------------------
# cluster permissions
# ---------------------------------------
# check existing cluster config
- operation: DescribeConfigs
resource:
type: cluster
# ---------------------------------------
# topic permissions
# ---------------------------------------
# check existing topics
- operation: DescribeConfigs
resource:
name: '*'
patternType: literal
type: topic
# create topics (both to produce to, and to use for internal state)
- operation: Create
resource:
name: '*'
patternType: literal
type: topic
# consume from topics (needed to retrieve state from internal topics)
- operation: Read
resource:
name: '*'
patternType: literal
type: topic
# produce to topics (both writing to internal state topics and messages being produced by connectors)
- operation: Write
resource:
name: '*'
patternType: literal
type: topic
# ---------------------------------------
# consumer group permissions
# ---------------------------------------
- operation: Read
resource:
name: '*'
patternType: literal
type: group
# ---------------------------------------
# transaction permissions
# ---------------------------------------
# create transactions
- operation: Write
resource:
name: '*'
patternType: literal
type: transactionalId
type: simple
The setup script also creates secrets that store credentials and certificates that the Connector uses to connect to MQ.
oc apply -f ./resources/mq-creds.yaml
oc create secret generic jms-client-truststore \
--from-file=ca.jks=../02-generate-certs/certs/streamingdemo-ca.jks \
--from-literal=password='passw0rd' \
-n kafka --dry-run=client -oyaml | oc apply -f -
oc create secret generic jms-client-keystore \
--from-file=client.jks=../02-generate-certs/certs/streamingdemo-kafka-client.jks \
--from-literal=password='passw0rd' \
-n kafka --dry-run=client -oyaml | oc apply -f -
oc create secret generic mq-ca-tls \
--from-file=ca.crt=../02-generate-certs/certs/streamingdemo-ca.crt \
-n kafka --dry-run=client -oyaml | oc apply -f -

The setup script builds a simple custom Docker image based on the Event Streams Kafka container, extended with the jar for the MQ source connector.
FROM cp.icr.io/cp/ibm-eventstreams-kafka:11.0.2
USER root
RUN mkdir -p /opt/kafka/plugins/
RUN curl -Lo /opt/kafka/plugins/kafka-connect-mq-source.jar https://github.com/ibm-messaging/kafka-connect-mq-source/releases/download/v1.3.1/kafka-connect-mq-source-1.3.1-jar-with-dependencies.jar
USER 1001
The Connect cluster definition specifies the address for the Kafka cluster to connect to, the custom Docker image to use, and provides access to the Kafka and MQ credentials and certificates that the Connector will need.
image: image-registry.openshift-image-registry.svc:5000/kafka/kafkaconnectwithmq:latest
bootstrapServers: eventstreams-kafka-bootstrap.kafka.svc:9093
tls:
trustedCertificates:
- secretName: eventstreams-cluster-ca-cert
certificate: ca.crt
- secretName: mq-ca-tls
certificate: ca.crt
authentication:
type: scram-sha-512
username: kafka-connect-credentials
passwordSecret:
secretName: kafka-connect-credentials
password: password
externalConfiguration:
volumes:
- name: mq-credentials
secret:
secretName: mq-credentials
- name: jms-client-truststore
secret:
secretName: jms-client-truststore
- name: jms-client-keystore
secret:
secretName: jms-client-keystore
Finally, the setup script starts the MQ/Kafka connector using the mq-connector spec.
# the Kafka topic to produce to
topic: MQ.COMMANDS
# the MQ queue to get messages from
mq.queue: COMMANDS.COPY
# messages sent to the MQ queue are JMS TextMessages
mq.message.body.jms: true
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
mq.record.builder: com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder
# connection details for the queue manager
mq.queue.manager: MYQMGR
mq.connection.name.list: queuemanager-ibm-mq.ibmmq(1414)
mq.channel.name: KAFKA.SVRCONN
mq.user.name: ${file:/opt/kafka/external-configuration/mq-credentials:username}
mq.password: ${file:/opt/kafka/external-configuration/mq-credentials:password}
# SSL config for connecting to MQ
mq.ssl.use.ibm.cipher.mappings: false
mq.ssl.cipher.suite: '*TLS12'
mq.ssl.truststore.location: /opt/kafka/external-configuration/jms-client-truststore/ca.jks
mq.ssl.truststore.password: ${file:/opt/kafka/external-configuration/jms-client-truststore:password}
mq.ssl.keystore.location: /opt/kafka/external-configuration/jms-client-keystore/client.jks
mq.ssl.keystore.password: ${file:/opt/kafka/external-configuration/jms-client-keystore:password}
Audit applications
Setup the Kafka Java app
Compile the Java Kafka application
./09-audit-app/build-apps.sh

The config for the apps is contained in Config.java.
The BOOTSTRAP
constant is modified by the build-apps.sh
script to match the hostname for your Kafka cluster.
public static final String TOPIC = "MQ.COMMANDS";
public static final String BOOTSTRAP = "PLACEHOLDERBOOTSTRAP";
public static final String CLIENT = "audit-app";
public static final String GROUP = "audit";
public static final String TRUSTSTORE = "./ca.p12";
public static final String TRUSTSTORE_PASSWORD = "PLACEHOLDERTRUSTSTOREPASSWORD";
public static final String USERNAME = "audit-app";
public static final String PASSWORD = "PLACEHOLDERAPPPASSWORD";
The credentials and Kafka cluster truststore generated by the Event Streams Operator are downloaded by the setup script.
BOOTSTRAP=$(oc get eventstreams eventstreams -nkafka -ojsonpath='{.status.kafkaListeners[?(@.type=="external")].bootstrapServers}')
gsed -i -e 's/PLACEHOLDERBOOTSTRAP/'$BOOTSTRAP'/' kafka-simple/src/main/java/com/ibm/clientengineering/kafka/samples/Config.java
CA_PASSWORD=$(oc get secret eventstreams-cluster-ca-cert -nkafka -o 'go-template={{index .data "ca.password"}}' | base64 -d)
gsed -i -e 's/PLACEHOLDERTRUSTSTOREPASSWORD/'$CA_PASSWORD'/' kafka-simple/src/main/java/com/ibm/clientengineering/kafka/samples/Config.java
PASSWORD=$(oc get secret audit-app -nkafka -o jsonpath='{..password}' | base64 -d)
gsed -i -e 's/PLACEHOLDERAPPPASSWORD/'$PASSWORD'/' kafka-simple/src/main/java/com/ibm/clientengineering/kafka/samples/Config.java
The Audit
app avoids committing any offsets to the Kafka cluster, so that it can always replay all events from the Kafka topic, every time the application is run.
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Demo the whole solution
Use MQ Streaming Queues and Kafka Connect to make an auditable copy of IBM MQ messages

./05-jms-apps/run-putter.sh
Put messages to the COMMANDS
queue.
./05-jms-apps/run-getter.sh
Get the messages from the COMMANDS
queue.
You can do the above two steps multiple times.
./09-audit-app/run-audit.sh
Retrieve the audit copy of all messages that have been put to the COMMANDS
queue in the last one year from the MQ.COMMANDS
Kafka topic. You should see the history of all messages put to the queue since you set up Streaming Queues.