MQ

 View Only

Monitor queue depth in MQ queue manager and send alerts to Splunk

By Kok Sing Khong posted Mon July 10, 2023 11:26 AM

  

This article shows an approach to monitoring the current queue depth in relation to the max queue depth. If the current queue depth reaches a set of pre-defined thresholds, send a log message to Splunk Cloud. The diagram below shows a queue manager deployed on Red Hat OpenShift. I have an MQ monitoring pod that periodically inquires about the current queue depth and a maximum queue depth of an MQ queue manager. If it exceeds a threshold (red or amber), the MQ monitoring pod will send a log message to the Splunk Cloud via the HTTP Event Collector (HEC).

The implementation of the MQ Monitoring is in this repository. The details on how to use this is documented in the README.md. 

Before we start, we need to register for Splunk Cloud. Once register, you need to add an HTTP Event Collect (HEC) - by navigating to Settings > Data inputs. You are going to use token value when we send a message to the HEC.

I will explain a few key aspects of the code.

  1. There is a Run Loop at periodically inquires about the attributes of the queue (current queue depth and maximum queue depth). Before the Loop starts, it tries to connect to the queue manager. Then it uses MQ Programmable Command Formats (PCF) to inquire about the attributes of the queue. You can configure the monitor period using a config map.
    	public void run(String... args) throws Exception {
    		log.info(mqProperties.toString());
    		log.info(splunkProperties.toString());
    		log.info(monitorProperties.toString());
    		// MQQueueMonitor monitor = new MQQueueMonitor();
    		monitor.connect();
    		for(;;) {
    			log.info("Start monitoring");
    			monitor.doPCF();
    			try { Thread.sleep(monitorProperties.getPeriod().intValue()*1000); } catch(Exception e) {}
    		}
    	}
  2. The main logic is located in the MQQueueMonitor class, which has the connect() method and doPCF() method. Let's look at the connect() method. It takes the parameters (channel name, hostname, port, queue manager name) needed to connect to an MQ queue manager (will explain later how it is passed in) and put them into a hashtable. It first creates an MQQueueManager object and from then the PCFMessageAgent object. We will use the PCFMessageAgent to send PCF commands to the queue manager. 
        public void connect() {
    		log.info("connect: " + mqProperties.toString());
            log.info("connect: " + splunkProperties.toString());
            log.info("connect: " + monitorProperties.toString());
            try {
                Hashtable<String,Object> mqht = new Hashtable<String, Object>();
                mqht.put(CMQC.CHANNEL_PROPERTY, mqProperties.getChannelName().toString());
                mqht.put(CMQC.HOST_NAME_PROPERTY, mqProperties.getHostname());
                mqht.put(CMQC.PORT_PROPERTY, mqProperties.getPort().intValue());
                mqht.put(CMQC.USER_ID_PROPERTY, mqProperties.getUser());
                mqht.put(CMQC.PASSWORD_PROPERTY, mqProperties.getPassword());
                queueManager = new MQQueueManager(mqProperties.getQueueManagerName()+"", mqht);
                //queueManager = new MQQueueManager("QMLAB1", mqht);
                log.info("MQQueueManager created: " + queueManager.getName());
    
                agent = new PCFMessageAgent(queueManager);
                log.info("PCFMessageAgent created");
                reconnect = false;
            } 
            catch (MQException mqe) {
                mqe.printStackTrace();
                log.error("MQException: CC=" + mqe.completionCode + " : RC=" + mqe.reasonCode + " Msg=" + mqe.getLocalizedMessage());
            } 
            catch (MQDataException mqde) {
                log.error("MQDataException: CC=" + mqde.completionCode + " : RC=" + mqde.reasonCode);
            }
        }
  3. In the doPCF() method, we create a PCFMessage to "Inquire the Queue". We need to add parameters to specify the Generic Queue Name (you can use an asterisk to filter a list of queues you want to inquire about, e.g. "*TEST" ) and the queue type (e.g. Local queue only). We also specify the attributes of the queue we want to inquire about (e.g. queue name, current queue depth and maximum queue depth). Finally, we use a filter for the queues that we are interested in (which are queues that have at least one message). The PCF request command will result in a list of PCF messages as a response. Each of these messages is a queue (with the attributes) that has at least one message. For each of these queues, we will check its current queue depth against a threshold (Red and Amber) that we specified - and send the (Info, Warning, Error) message to Splunk. There is a simple recovery logic here, when we have a connection error, the method will try to do a reconnection.
        public void doPCF() {
            if(reconnect) {
                connect();
            }
            try {
                // https://www.ibm.com/docs/en/ibm-mq/9.3?topic=formats-mqcmd-inquire-q-inquire-queue
                PCFMessage request = new PCFMessage(CMQCFC.MQCMD_INQUIRE_Q);
                // Add generic queue name. E.g. *TEST
                request.addParameter(CMQC.MQCA_Q_NAME, mqProperties.getGenericQueueName());
                // Add parameter to request only local queues
                request.addParameter(CMQC.MQIA_Q_TYPE, CMQC.MQQT_LOCAL);
                // Add parameter to request only queue name, current depth and max depth
                request.addParameter(CMQCFC.MQIACF_Q_ATTRS, new int [] {
                    CMQC.MQCA_Q_NAME,
                    CMQC.MQIA_CURRENT_Q_DEPTH,
                    CMQC.MQIA_MAX_Q_DEPTH
                });
                // Add filter to only return responses with a queue depth greater than 0 (zero) i.e. non-zero queue depth
                request.addFilterParameter(CMQC.MQIA_CURRENT_Q_DEPTH, CMQCFC.MQCFOP_GREATER, 0);
                PCFMessage[] responses = this.agent.send(request);
             
                for (int i = 0; i < responses.length; i++) {
                    if ( ((responses[i]).getCompCode() == CMQC.MQCC_OK) &&
                         ((responses[i]).getParameterValue(CMQC.MQCA_Q_NAME) != null) ) {
                        String name = responses[i].getStringParameterValue(CMQC.MQCA_Q_NAME);
                        if (name != null) {
                            name = name.trim();
                        }
                        int curDepth = responses[i].getIntParameterValue(CMQC.MQIA_CURRENT_Q_DEPTH);
                        int maxDepth = responses[i].getIntParameterValue(CMQC.MQIA_MAX_Q_DEPTH);
                        log.info("Name="+name + " : curDepth=" + curDepth + ", maxDepth=" + maxDepth + ", thresholdRed: " + monitorProperties.getThresholdRed().floatValue() + ", thresholdAmber: " + (float)monitorProperties.getThresholdAmber().floatValue());
                        if (curDepth == maxDepth) {
                            String message = "Error: Name="+name + " : current depth equals max depth ["+maxDepth+"]";
                            log.error(message);
                            sendToSplunk(message);
                        } else if (curDepth >= (maxDepth * monitorProperties.getThresholdRed().floatValue())) {
                            String message = "Warning: Name="+name + " : current depth ["+curDepth+"] is within " + monitorProperties.getThresholdRed().floatValue()*100+ "% of max depth ["+maxDepth+"]";
                            log.warn(message);
                            sendToSplunk(message);
                        } else if (curDepth >= (maxDepth * monitorProperties.getThresholdAmber().floatValue())) {
                            String message = "Info: Name="+name + " : current depth ["+curDepth+"] is within " + monitorProperties.getThresholdAmber().floatValue()*100 + "% of max depth ["+maxDepth+"]";
                            log.info(message);
                            sendToSplunk(message);
                        }
                    }
                }
    
            } catch(IOException ioe) {
                log.error("IOException:" +ioe.getLocalizedMessage());
                reconnect = true;
            } catch (PCFException pcfe) {
                log.error("PCFException: CC=" + pcfe.completionCode + " : RC=" + pcfe.reasonCode);
                reconnect = true;
            } catch (MQDataException mqde) {
                log.error("MQDataException: CC=" + mqde.completionCode + " : RC=" + mqde.reasonCode);
                reconnect = true;
            }
        }

  4. There is a private method aptly named sendToSplunk() to send log messages to Splunk via HTTP Event Collector (HEC). To simplify implementation for the sample code, we disabled hostname verification and disabled TLS verification (passing in an empty SSLContext). You just need to obtain the Splunk token and HEC endpoint from SplunkCloud. We send the Splunk token in the Authorization header with the value ("Splunk " + Token).
        private void sendToSplunk(String message) {
            try {
                final Properties props = System.getProperties(); 
                props.setProperty("jdk.internal.httpclient.disableHostnameVerification", Boolean.TRUE.toString());
    
                String jsonBody = "{\"event\": \"" + message + "\"}";
                log.info(jsonBody);
                HttpRequest request = HttpRequest.newBuilder()
                            .uri(new URI(splunkEndpoint))
                            .headers("Authorization", splunkAuthorization)
                            .headers("Content-Type", "application/json")
                            .POST(HttpRequest.BodyPublishers.ofString(jsonBody))
                            .build();
                HttpClient client = HttpClient.newBuilder()
                    .sslContext(this.sslContext)
                    .build();
                HttpResponse<String> response = client.send(request, BodyHandlers.ofString());
                int statusCode = response.statusCode();
                if (statusCode == 200 || statusCode == 201) {
                    log.info("Send to Splunk successful");
                } else {
                    log.error("Send to Splunk error: " + statusCode);
                }
    
            } catch (URISyntaxException urie) {
                log.error("URISyntaxException: Error: "  + urie.getLocalizedMessage());
                urie.printStackTrace();
            } catch (IOException ioe) {
                log.error("IOException: Error: "  + ioe.getLocalizedMessage());
                ioe.printStackTrace();
            } catch (InterruptedException ie) {
                log.error("InterruptedException: Error: " + ie.getLocalizedMessage());
                ie.printStackTrace();
            }
        }
  5. There is a pom.xml for us to build the jar file, and Dockerfile for us to build the image.
  6. There is also a deploy.yaml file which we will explain more. There is a ConfigMap where you specify the parameters in the fields. These fields are mapped to ENV variables. In Spring Boot, these ENV variables are mapped to the properties defined in the application.properties file. We just dependencies injection capabilities of Spring Boot (@Autowired) to populate the class variables of the object. The deploy.yaml also contains a Deployment that specifies the location of the image that will be pulled. We need to push our image to our preferred image registry (I used docker.io).
    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: mqmonitor-configmap
      namepsace: mq
    type: Opaque
    data:
      mqQueueManagerName: SMALLQM
      mqHostname: smallqm-ibm-mq.mq.svc.cluster.local
      mqPort: "1414"
      mqChannelName: SMALLQMCHL_NOTLS
      mqGenericQueueName: "*"
      monitorPeriod: "60"
      monitorThresholdRed: "0.9"
      monitorThresholdAmber: "0.5"
    ---
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: mqmonitor
      namepsace: mq
      labels:
        app: mqmonitor
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: mqmonitor
      template:
        metadata:
          labels:
            app: mqmonitor
        spec:
          containers:
            - name: mqmonitor
              image: docker.io/koksing/mqmonitor:1.0.0
              imagePullPolicy: Always
              resources:
                requests:
                  memory: "512Mi"
                  cpu: "250m"
                limits:
                  memory: "1024Mi"
                  cpu: "500m"
              env:
                - name: SPLUNK_TOKEN
                  valueFrom:
                    secretKeyRef:
                      name: mqmonitor-secret
                      key: splunkToken
                - name: SPLUNK_ENDPOINT
                  valueFrom: 
                    secretKeyRef:
                      name: mqmonitor-secret
                      key: splunkEndpoint
                - name: MQ_QUEUEMANAGERNAME
                  valueFrom: 
                    configMapKeyRef:
                      name: mqmonitor-configmap
                      key: mqQueueManagerName
                - name: MQ_HOSTNAME
                  valueFrom: 
                    configMapKeyRef:
                      name: mqmonitor-configmap
                      key: mqHostname
                - name: MQ_PORT
                  valueFrom: 
                    configMapKeyRef:
                      name: mqmonitor-configmap
                      key: mqPort
                - name: MQ_CHANNELNAME
                  valueFrom: 
                    configMapKeyRef:
                      name: mqmonitor-configmap
                      key: mqChannelName
                - name: MQ_GENERICQUEUENAME
                  valueFrom: 
                    configMapKeyRef:
                      name: mqmonitor-configmap
                      key: mqGenericQueueName
                - name: MONITOR_PERIOD
                  valueFrom: 
                    configMapKeyRef:
                      name: mqmonitor-configmap
                      key: monitorPeriod
                - name: MONITOR_THRESHOLDRED
                  valueFrom: 
                    configMapKeyRef:
                      name: mqmonitor-configmap
                      key: monitorThresholdRed
                - name: MONITOR_THRESHOLDAMBER
                  valueFrom: 
                    configMapKeyRef:
                      name: mqmonitor-configmap
                      key: monitorThresholdAmber
  7. Finally, there is also a secret.sample.yaml that you need to specify the Splunk token and Splunk endpoint. This will be mapped to ENV variables in the same way as ConfigMaps. The value needs to be in base64 encoded. There is a "funny" problem, there is a trailing newline character in the value that needs to be stripped.
    ---
    apiVersion: v1
    kind: Secret
    metadata:
      name: mqmonitor-secret
      namepsace: mq
    type: Opaque
    data:
      splunkToken: xxxxxxxx-xxxx-xxxx-xxxxxxxxxxxxxxxxx (base64)
      splunkEndpoint: https://xxx-x-xxxxx.splunkcloud.com:8088/services/collector/event (base64)
      #mqUser: "mqadmin"
      #mqPassword: "xxxxxxxxx"

 

In conclusion, this is show the message looks like in Splunk Cloud.

References:

  1. Simple Monitoring of the Current Queue Depth (click here) - I adapted the code here into a SprintBoot microservice.
  2. Monitoring IBM MQ queue depth in Cloud Pak for Integration (click here)
  3. Different ways to get the Current Depth (CURDEPTH) for a queue in an MQ queue manager_3.pdf (click here)

#Spotlight
#automation-featured-area-2
2 comments
209 views

Permalink

Comments

Thu July 20, 2023 03:19 AM

@Morag,

No particular reason. This approach does not require any changes in the in the queue manager nor queue configuration (no need to configure QDEPTHHI, QDEPTHLO, QDPHIEV, QDPLOEV, QDPMAXEV). I guess I can provide another implementation to use MQ events instead. I do still need to periodically poll the queue "SYSTEM.ADMIN.PERFM.EVENT".

Sat July 15, 2023 11:54 AM

If I am understanding the content of this post correctly, you are monitoring your queue depths by polling the command server repeatedly, rather than using queue depth event messages which would allow the queue manager to push the notification to you instead of being constantly asked the queue depth. Is there a reason why you are not using event messages for this task?