MQ

 View Only

Combine Stream Queues with Cap Expiry to keep a history of messages

By Matthew Whitehead posted Wed May 11, 2022 11:42 AM

  

Introduction


In MQ 9.2.3 the new Stream Queue feature was released for the first time on distributed platforms. It makes it easy to create a duplicate stream of messages as they arrive at a queue. For example, by configuring Q1 to stream messages to Q2 a copy of each message put to Q1 is also put on Q2.

There are a number of use-cases for this feature:

  • archiving messages for backup
  • applying new analysis to existing messages
  • testing new environments with real life workload
  • replaying past messages to solve an application problem

Most use-cases would typically have an application actively consuming the streamed messages, removing them from the second queue and putting the data somewhere else. However, the last use-case in particular is one where you probably wouldn’t be doing that. It's more likely that the streamed messages are never touched, except in the case of an emergency.

Yet we know that there is no such thing as infinite storage or an infinite queue, so you can’t just keep a copy of every message that hits the original queue on the off chance that you may need some of them. So instead, the practical solution is to keep a limited history of messages from the original queue, maybe for an hour or a day, all depending on the needs and the message rates.

In this article we'll look at how Streaming Queues can be used to retain that time limited history of messages and how that can be better performing than your own applications removing them.

Setting up the stream queue


To discuss the scenario in detail let's remind ourselves how to configure a streaming queue setup.

First let's assume we have an existing local queue - MY.APP.QUEUE.

Secondly we'll define a queue to store a copy of those messages on, for a limited period of time:

DEFINE QLOCAL(MY.APP.HISTORY)

Note that in this example both are regular local queues, and that we can set this scenario up on any of MQ's distributed platforms (Linux, Windows, AIX etc.)

We haven't yet enabled streaming from MY.APP.QUEUE so let's do that now:

ALTER QLOCAL(MY.APP.QUEUE) STREAM(MY.APP.HISTORY)

This means MQ will make a best-effort attempt to deliver a duplicate of every message put to MY.APP.QUEUE to MY.APP.HISTORY. Later in Q&A at the end article we'll look at how you can choose between this best-effort approach and the alternative which is called must-duplicate. The two choices have pros and cons which are discussed in that section.

Limiting the number of messages we keep


For MY.APP.QUEUE there will already be applications getting messages from it. But for the new queue MY.APP.HISTORY we don't have any applications running to prevent it from filling up. As described in the introduction most of the time we won't actually need the messages from that queue. They are simply there in case a situation arises that requires us to re-process any of them. If we do nothing else to our configuration then MY.APP.HISTORY will quickly fill up.

To prevent this without having to run another application to sit there draining it we will use another of MQ's features - cap expiry. Effectively every message put to the stream queue will have a timer started that will automatically remove the message if it hasn't been consumed by someone else.

Configuring CAPEXPRY


We will set the length of time MQ retains messages on the stream queue by setting CAPEXPRY on MY.APP.HISTORY.

Note: When the CAPEXPRY feature was originally added in MQ 8 fixpack 4 it wasn't possible to add a completely new queue attribute. As a result the way of setting CAPEXPRY involves using the CUSTOM attribute (a free-form text field available on queues).

Later in the article we'll talk about how long to keep messages and how much storage that is likely to require but for now let's set CAPEXPRY on MY.APP.HISTORY to 5 minutes:

ALTER QLOCAL(MY.APP.HISTORY) CUSTOM('CAPEXPRY(3000)')

Remember that MQ message expiries are measured in 1/10ths of a second so a value of 5 minutes is calculated as 5 minutes * 60 seconds * 10 = 3000

Note: MQ expires messages from unused queues using an expiry task that runs periodically. By default it is run every 5 minutes so it is possible that messages will remain on the history queue for 10 minutes. Later in the article we'll look at ways of running the expiry task more frequently.

Testing the scenario


Use one of the MQ samples or your own application to begin putting messages to MY.APP.QUEUE:

/opt/mqm/samp/bin/amqsblst QM1 MY.APP.QUEUE -W -s 2048 -c 100 -u 10

Use the runmqsc command to check how many messages are on each queue:

DISPLAY QSTATUS(MY.APP.QUEUE) CURDEPTH
DISPLAY QSTATUS(MY.APP.HISTORY) CURDEPTH

Use one of the consuming samples to remove the messages from the original queue:

/opt/mqm/samp/bin/amqsget MY.APP.QUEUE QM1

Since this requires a delay to show that MQ will automatically expire the unread messages on MY.APP.HISTORY now would be a good time to make a cup of tea.

Checking that the messages have been removed


Now that 35 minutes have passed we can check the depth of the history queue again:

DISPLAY QSTATUS(MY.APP.HISTORY) CURDEPTH

You should see that the queue is empty and we haven't needed to do any tidy up work to clear them up.

Sizing & storage considerations


It's important to consider how much storage space you will need to store messages for the length of time you need. The messages going to the original queue may only be on the queue for a few seconds but on the history queue we are intentionally storing them for much longer. Let's look at some rough calculations for storing different numbers of messages for a period of 10 minutes (the 5 minute cap expiry value plus up to 5 minutes waiting for the expiry task to run):

  • 100 messages a second, 2kb each = 60,000 messages and approximately 120 megabytes
  • 10000 messages a second, 2kb each = 6,000,000 messages and approximately 12 gigabytes

You need to make sure that the history queue's MAXDEPTH attribute is set to a value large enough for the number of messages you will have in your system.

The amount of storage needed on the file system will be more than the amount of message data calculated above, because there is a header and other meta data for each message. It is therefore important to ensure that both the file system and the queue file size (see the MAXFSIZE queue attribute) are roughly 20-30% larger than the calculated message data.

Keeping up with workload


You may be wondering if the queue manager will have difficulty expiring 1000s of messages a second from the history queue. Remember that from minute 5 the first messages to arrive on the queue will need removing, and they must be removed at least as fast as the putting application can add new ones.

There is a detailed performance report available to read here but a summary of that report is that the expiry task is usually able to consume messages just as fast as a regular consuming application, typically much faster. There will be some overhead to the queue manager's workload but it will be considerably less than writing a application to consume and delete the messages itself. A rough ballpark figure to consider is that expiring messages from a queue will add approximately 15% CPU overhead compared to putting messages and leaving them there.

So you’re still benefiting from MQ’s exactly-once delivery semantics from the original queue but with the extra reassurance of having a recent history of the messages available to you just in case you need them, for not too high an overhead.

What if I actually need the messages from the history queue?


Obviously we're storing these extra messages in case we need to view historic message data, so we need to look at the options for reading/replaying them. If you hit a situation where some or all of the duplicate messages are required the first thing you need to do is make sure you maintain access to the recent history. Those messages are still set to expire so it's best to grab an immutable copy before doing anything else. A useful tool for doing this is dmpmqmsg. This can be used to copy or drain the queue to a file for offline inspection or replay, or to move the messages directly to another queue for re-processing.

Let's look at some example dmpmqmsg commands:

Dump MY.APP.HISTORY to a file with message data shown in hex only:

dmpmqmsg -iMY.APP.HISTORY -fhistory.txt -m QM1

Dump MY.APP.HISTORY to a file with message data shown in both hex and ascii:

dmpmqmsg -iMY.APP.HISTORY -fhistory.txt -da -m QM1

Dump MY.APP.HISTORY to a file without retaining the MQMD:

dmpmqmsg -iMY.APP.HISTORY -fhistory.txt -dN -m QM1

If we want the messages reloaded from the file to another queue we can also use the dmpmqmsg command:

dmpmqmsg -oMY.RECOVERED.QUEUE -fhistory.txt -m QM1

Remember that if the MQMD has been included in the output file from the previous step the messages will be reloaded with a message expiry (whatever expiry was left before the message was unloaded from the queue). This may not be what we want so there are a couple of options to load messages back onto the queue without an expiry:

  • Save them to file without the MQMD (the -dN example shown above). This will cause all original message header data to be lost.
  • Edit the file to change the EXP value for each message to -1 (the value for MQEI_UNLIMITED). A simple sed command or other find/replace tool can be used to do this.

Caution: dmpmqcfg supports reading from one queue and writing the messages directly to another queue. However, attempting to read messages from the history queue and put them straight back onto the application queue can cause an infinite loop where each restored message gets duplicated back to the history queue, and so on. A safer approach would be to move the the messages to a file first, or move them to another queue that isn't used for message streaming and then move them from that queue back onto the application queue.

What else should I consider?


Below I have attempted to answer some common questions about this scenario

  • My messages already have an expiry set by the putting application. Will they be removed earlier or later than 30 minutes?
    • No. The way that streaming queues work is that the duplicate message put to the stream queue has its expiry interval reset to UNLIMITED, regardless of the expiry set on the original message. This allows you to use the CAPEXPRY feature to set a new expiry interval on the duplicate messages.
    • However, this also means that if you replay these messages to the source queue they won't have the original application expiry set.
  • Can I make the expiry task check more often than every 5 minutes?
    • Yes. The MQ tuning parameter ExpiryInterval can be set to the frequency (in seconds) that you wish MQ to run the expiry task. This is done by adding the TuningParameters stanza to qm.ini and an entry for ExpiryInterval=X under that stanza. Reducing the expiry interval helps to smooth the amount of messages in the system, especially when the message rates are high and the period of history is short.
  • What will happen if the history queue fills up?
    • This depends on how you configure the streaming queue. By default MQ will ignore the history queue being full and won't deliver a duplicate message (not even to the dead letter queue). This is referred to as "Best Effort" streaming. However this does mean you may have a gap in your history.
    • If you can't tolerate a gap in the history you can set the streaming queue option on the application queue to "Must Duplicate" (MUSTDUP). This time if the history queue is full the putting application will receive MQRC_Q_FULL and neither the original message nor the duplicate message will be delivered to their queues. Here is an alternative to the command where we enabled streaming, this time setting the must duplicate option as well:
      • ALTER QLOCAL(MY.APP.QUEUE) STREAMQ(MY.APP.HISTORY) STRMQOS(MUSTDUP)
    • Full detail about the two different streaming qualities of service is available in the IBM Documentation.
4 comments
47 views

Permalink

Comments

Mon July 17, 2023 07:45 AM

Hi Joel, thanks, but I can't take the credit. This article was written by Matt Whitehead. I've passed your comment on to his team though. 

Wed July 05, 2023 07:07 PM

Hi Paul, 

First of all good article, very useful. However, I found a minor typo when I was trying to follow the definitions. It can be easily solved, but it would be good to have a clean example :-) 

You have the following command

ALTER QLOCAL(MY.APP.QUEUE) STREAM(MY.APP.HISTORY)

You are missing a Q and it should read as follows:

ALTER QLOCAL(MY.APP.QUEUE) STREAMQ(MY.APP.HISTORY)

Best regards... Joel

Tue October 04, 2022 11:51 AM

Hi Andy,
Thanks for your comment.
We ran tests against local SSDs and SAN. None of the tests showed the expiry task falling behind (i.e. messages building up on the streaming queues beyond what is expected). I've just tested against an NFS filesystem with a 1 millisecond network link latency introduced, and again, the expiry task keeps up, but of course the overall rate drops due to the throttling of the PUTs by the file writes.
I've passed on your comments about the expiry task to the developers and we'll certainly be looking out for any scenarios where this is likely to be an issue. 

Mon May 23, 2022 08:53 AM

Matt,
I'm very surprised that the (single) expiry task can keep up with a set of producers and consumers, have changes been made since I left to increase the scalability of this mechanism?. The IO latency in the file systems used by the MQ perf team are typically very low which might mask any issues here (the expirer task would typically be limited by IO latency on the recovery log when dealing with persistent messages), however I believe they have an exit which they can use to simulate slower IO (for example where some form of remote replication is involved) where any limitations of using the expirer task in this manner are more likely to show up.