MQ

 View Only
  • 1.  IBM Client Hang in long running windows service

    Posted 2 days ago

    We have a .NET long-running Windows service (.NET 8) that reads messages from an IBM MQ server (MQ farm server: IBM MQ 9.2.0.16). The MQ receives approximately 40 messages per second. Our .NET Windows service reads these messages, processes them, and stores the data into a SQL database using stored procedures.

    Initially, we used a single hosted service (single thread) to read from the MQ, but it was too slow. To improve performance, we added 5 hosted services to the application, all reading from the same MQ. We are using the .NET MQ client (IBM MQ Client 9.3.4.1) to connect to the MQ server.

    Currently, we are facing an issue where, after some time, the MQ client hangs and stops consuming messages from the manager. As a result, messages pile up in the MQ manager, and we receive an alert once a day. However, after restarting the Windows service, it resumes consuming messages as usual. This issue occurs daily and affects the reliability of the service in consuming data.

    We also have other Windows applications using the same .NET MQ client, but they operate with a single thread and receive only a few messages. These applications have not encountered any issues so far.

    Can anyone help us resolve this issue? Any assistance would be greatly appreciated.



    ------------------------------
    Dilshan Makavitage
    ------------------------------


  • 2.  RE: IBM Client Hang in long running windows service

    Posted 2 days ago

    Hi Dilshan,

    I've taken a quick look through your code, but I don't actually see the part where you call MQ to get the messages? That was the part I was assuming you wanted us to review? Any chance we can see that bit?

    I note that your code write some information out to an application log. Could you also show us what it wrote when you have the problem?

    Cheers,
    Morag



    ------------------------------
    Morag Hughson
    MQ Technical Education Specialist
    MQGem Software Limited
    Website: https://www.mqgem.com
    ------------------------------



  • 3.  RE: IBM Client Hang in long running windows service

    Posted 21 hours ago

    Hi Morag, 

    Thanks for the reply, All MQ Config data stored in appsettings.json file. For example, below I attached a sample for your reference cannot share real server information but i give u how it look like .  

    So above single MQ config data will share inside SatsDataSource1,SatsDataSource2,SatsDataSource3,SatsDataSource4,and SatsDataSource5 (5 background services implemented by  MqSourceDataProviderService).  That means a single MQ manager's queue is consuming by 5 different threads.  

    I am not getting errors in application log files, when the MQ client hangs and stops consuming messages from the manager. 

    Here below i mentioned the MQ client class 

      public class MqClient : IMqClient
      {
          private readonly int openQueueOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING | MQC.MQOO_INQUIRE;
          private readonly MQGetMessageOptions getMessageOptions;
    
          static MqClient()
          {
              // NOTE: Untested effects of default provider on messages not encoded in ibm 437
              System.Text.Encoding.RegisterProvider(System.Text.CodePagesEncodingProvider.Instance);
          }
    
          /// <inheritdoc />
          public bool IsConnected => this.queueManager?.IsConnected ?? false;
    
          /// <inheritdoc />
          public MqConfiguration Configuration { get; private set; }
    
          private MQQueueManager queueManager;
          private MQQueue queue;
    
          /// <summary>
          /// Initialize a new <see cref="MqClient"/>.
          /// </summary>
          /// <param name="config"></param>
          public MqClient(MqConfiguration config)
          {
              this.Configuration = config;
    
              this.getMessageOptions = new MQGetMessageOptions
              {
                  Options = MQC.MQGMO_WAIT
                      | MQC.MQGMO_FAIL_IF_QUIESCING
                      | MQC.MQGMO_COMPLETE_MSG,
                  // it is adviced not to use MQC.MQGMO_CONVERT
                  WaitInterval = Configuration.RetrieveWaitTime, // in mili seconds
                  MatchOptions = MQC.MQMO_NONE
              };
          }
    
          /// <inheritdoc />
          public void Connect()
          {
              this.queueManager = new MQQueueManager(Configuration.ManagerName, Configuration.ToMqHashTable());
    
              // If Browse is ever needed: MQC.MQOO_BROWSE
              this.queue = queueManager.AccessQueue(this.Configuration.QueueName, this.openQueueOptions);
          }
    
          /// <inheritdoc />
          public void Disconnect()
          {
              if (queueManager != null && queueManager.IsConnected)
              {
                  queueManager.Disconnect();
              }
          }
    
          public int CountMessagesInQueue()
          {
              return this.queue.CurrentDepth;
          }
    
          /// <inheritdoc />
          public void SendMessage(string messageText)
          {
              PutMessage(queue, messageText);
          }
    
          /// <inheritdoc />
          public string GetMessage()
          {
              MQMessage retrievedMessage = new MQMessage()
              {
                  Version = MQC.MQMD_VERSION_2
              };
    
              this.queue.Get(retrievedMessage, getMessageOptions);
              
              if (retrievedMessage.DataLength == 0)
              {
                  return string.Empty;
              }
    
              var message = retrievedMessage.ReadString(retrievedMessage.DataLength);
              return message;
          }
    
          //TODO throw error/raise event if put failed?
          private void PutMessage(MQQueue putQueue, string messageString)
          {
              if (putQueue == null) return; //TODO log or error
    
              MQMessage message = new MQMessage
              {
                  CharacterSet = MQC.CODESET_UTF, // = 1208 = UTF-8 
                  Format = MQC.MQFMT_STRING,
                  MessageFlags = MQC.MQMF_SEGMENTATION_ALLOWED,
                  Version = MQC.MQMD_VERSION_2
                  //, Persistence = MQC.MQPER_PERSISTENT //def. MQC.MQPER_PERSISTENCE_AS_Q_DEF 
              };
    
              message.WriteString(messageString);
    
              MQPutMessageOptions pmo = new MQPutMessageOptions
              {
                  Options = MQC.MQPMO_FAIL_IF_QUIESCING
              };
    
              // Put the message on the putQueue
              putQueue.Put(message, pmo);
          }
    
          public override string ToString()
          {
              return string.Format("{0}, {1}, {2}", nameof(MqClient), this.Configuration.Name, this.Configuration.QueueName);
          }
      }
    • Here below i have MqSourceDataProviderService inherited inside SatsDataSource1..5

      

     public class MqSourceDataProviderService : BaseDataProviderService<string>
       {
           private BufferBlock<string> dataBuffer;
           public override ISourceBlock<string> DataSource => dataBuffer;
    
           const int waitAfterNoMessageInQueue = 1000;
           const int waitDefaultMilliseconds = 0;
           private readonly int connectRetryInterval;
           Task executeTask = null;
    
           private IMqClient client;
           public MqConfiguration Configuration { get; private set; }
    
           public bool IsConnected => this.client != null && this.client.IsConnected;
    
    
           public MqSourceDataProviderService(MqConfiguration _mqConfig, ILogger<MqSourceDataProviderService> _logger)
               : base(_logger)
           {
               this.dataBuffer = new BufferBlock<string>();
               this.connectRetryInterval = _mqConfig.RetrieveWaitTime;
               this.Configuration = _mqConfig;
           }
    
           public override async Task StartAsync(CancellationToken cancellationToken)
           {
               await base.StartAsync(cancellationToken).ConfigureAwait(false);
           }
           protected override async Task ExecuteAsync(CancellationToken cancellationToken)
           {
               // don't block while execute is running because it may have a zillion messages on the queue and everything else will wait
               executeTask = Task.Run(() =>
               {
                   executeTask = base.ExecuteAsync(cancellationToken);
               });
    
               await Task.CompletedTask;
           }
           public override async Task StopAsync(CancellationToken cancellationToken)
           {
               // wait till getter is done and then stop
               await base.StopAsync(cancellationToken);
               await executeTask;
           }
    
           private void InitializeClient()
           {
    
               this.client = new MqClient(Configuration);
               client.Connect();
           }
    
           /// <summary>
           /// Calling GetData will return a result from data source (if available) and also trigger DataReceived and Error events if applicable
           /// </summary>
           /// <returns>Data or null if none available</returns>
           protected override async Task GetData(CancellationToken cancellationToken)
           {
               int delay = waitDefaultMilliseconds;
               bool isRecoverableError = false;
    
               if (cancellationToken.IsCancellationRequested)
                   await Task.CompletedTask;
    
               try
               {
                   if (!this.IsConnected)
                       InitializeClient();
    
                   var data = client.GetMessage();
                   if (data.Length > 0)
                   {
                       this.dataBuffer.Post(data);
                   }
                   return;
               }
               catch (MQException exc) when (exc.ReasonCode == (int)MQExceptionReasonCodes.MQRC_NO_MSG_AVAILABLE)
               {
                   logger.LogTrace("{0}, {1}", exc.Message, Configuration.QueueName);
    
                   delay = waitAfterNoMessageInQueue;
                   isRecoverableError = true;
               }
               catch (MQException exc) when (exc.ReasonCode == (int)MQExceptionReasonCodes.MQRC_HOST_NOT_AVAILABLE)
               {
                   logger.LogError("{0}, {1}", exc.Message, Configuration.QueueName);
    
                   delay = connectRetryInterval;
                   isRecoverableError = true;
               }
               catch (MQException exc)
               {
                   logger.LogError(exc, "{0}, (configuration {1}). Waiting {2} seconds to retry connecting.",
                       nameof(IMqClient), Configuration, this.connectRetryInterval / 1000);
    
                   delay = connectRetryInterval;
                   isRecoverableError = false;
               }
               catch (Exception exc)
               {
                   logger.LogError(exc, "{0}, (configuration {1}). Waiting {2} seconds to retry connecting.",
                       nameof(IMqClient), Configuration, this.connectRetryInterval / 1000);
    
                   delay = connectRetryInterval;
                   isRecoverableError = false;
               }
    
    
               if (!isRecoverableError)
               {
                   // some of the exceptions will dispose of the client...
                   // so we have to check if client is still in servicable state ...
                   try
                   {
                       // ... by trying to reconnect it ...
                       if (this.IsConnected)
                           client.Disconnect();
                   }
                   catch (Exception exc)
                   {
                       logger.LogError(exc, "Trying to disconnect {0}, (configuration {1}). Waiting {2} seconds to retry connecting.",
                           nameof(IMqClient), Configuration, this.connectRetryInterval / 1000);
                       // ... otherwise drop it and reinitialize
                       this.client = null;
                   }
               }
    
               await Task.Delay(delay, cancellationToken);
           }
    
           public override void Dispose()
           {
               if (client != null)
               {
                   // if it was already disposed, just remove ref and move on
                   try
                   {
                       client.Disconnect();
                   }
                   catch (Exception exc)
                   {
                       logger.LogWarning(exc, "Disposing client {0}, (configuration {1}).",
                       nameof(IMqClient), Configuration);
                   }
                   finally
                   {
                       client = null;
                   }
               }
    
               base.Dispose();
           }
    
           public override string ToString()
           {
               return this.Configuration.ToString();
           }
       }

    • Program file (below)
       class Program
       {
           private static SatsFeederConfiguration _configuration;
           static void Main(string[] args)
           {
               using var app = new DataFeedApp();
    
    
               app.Build((hostContext, services) =>
               {
                   // get application configurations
                   _configuration = hostContext.Configuration.Get<SatsFeederConfiguration>();
                   services.AddSingleton(sp => _configuration.MqSources[0]);
                   services.AddSingleton<ISystemClock, SystemClock>();
    
                   // DB context section
                   services.AddDbContextFactory<CutoffKpiDbContext>(options =>
                   {
                       options.UseSqlServer(_configuration.DataOutputs.DbOutputs[0].ConnectionString);
                   }, ServiceLifetime.Transient);
    
                   services.AddScoped<IKpiDataRepository, KpiDataRepository>();
                   services.AddSingleton<SatsDataSource1>();
                   services.AddSingleton<SatsDataSource2>();
                   services.AddSingleton<SatsDataSource3>();
                   services.AddSingleton<SatsDataSource4>();
                   services.AddSingleton<SatsDataSource5>();
                   services.AddSingleton<DataProcessor>();
    
                   services.AddHostedService(CreateSatsDataSource1);
                   services.AddHostedService(CreateSatsDataSource2);
                   services.AddHostedService(CreateSatsDataSource3);
                   services.AddHostedService(CreateSatsDataSource4);
                   services.AddHostedService(CreateSatsDataSource5);
               });
               app.ExceptionLogger.LogInformation(_configuration.ToString());
               Console.WriteLine(_configuration.ToString());
               app.Run();
           }
    
    
           private static SatsDataSource1 CreateSatsDataSource1(IServiceProvider provider)
           {
               var dataProcessor = provider.GetRequiredService<DataProcessor>();
               var satDataSource = provider.GetRequiredService<SatsDataSource1>();
               satDataSource.LinkTo(dataProcessor);
               return satDataSource;
           }
    
    
           private static SatsDataSource2 CreateSatsDataSource2(IServiceProvider provider)
           {
               var dataProcessor = provider.GetRequiredService<DataProcessor>();
               var satsDataSource2 = provider.GetRequiredService<SatsDataSource2>();
               satsDataSource2.LinkTo(dataProcessor);
               return satsDataSource2;
           }
    
           private static SatsDataSource3 CreateSatsDataSource3(IServiceProvider provider)
           {
               var dataProcessor = provider.GetRequiredService<DataProcessor>();
               var satsDataSource3 = provider.GetRequiredService<SatsDataSource3>();
               satsDataSource3.LinkTo(dataProcessor);
               return satsDataSource3;
           }
    
           private static SatsDataSource4 CreateSatsDataSource4(IServiceProvider provider)
           {
               var dataProcessor = provider.GetRequiredService<DataProcessor>();
               var satsDataSource4 = provider.GetRequiredService<SatsDataSource4>();
               satsDataSource4.LinkTo(dataProcessor);
               return satsDataSource4;
           }
    
           private static SatsDataSource5 CreateSatsDataSource5(IServiceProvider provider)
           {
               var dataProcessor = provider.GetRequiredService<DataProcessor>();
               var satsDataSource5 = provider.GetRequiredService<SatsDataSource5>();
               satsDataSource5.LinkTo(dataProcessor);
               return satsDataSource5;
           }
    
        
       }


    ------------------------------
    Dilshan Makavitage
    ------------------------------



  • 4.  RE: IBM Client Hang in long running windows service

    Posted 16 hours ago

    Also I see you using both MQ and a DB. Are you by any chance using MSDTC ?

    Is there a case for a multi-phase commit?

    On the other hand your set up and framework looks a little bit overly complicated. You have 2 different timeouts for your MQ get.
    There is no need for that. If the MQ GET call has a message available, it will immediately return with the message. If no message is available it will return after the wait interval.

    You did not specify how you would handle a poison message or an MQ Reason Code 2033  or 2009 exception call

    Hope it helps



    ------------------------------
    Francois Brandelik
    ------------------------------



  • 5.  RE: IBM Client Hang in long running windows service

    Posted 16 hours ago

    HI Francois ,

    Thank you for response, regarding your questions 

     I see you using both MQ and a DB. Are you by any chance using MSDTC ?

    No we are not using MSDTC, we are doing reading messages from MQ using 5 background processes (SatsDataSource1..5 (singleton)) and process them in DataProcessor class(singleton) (.net Threading.Tasks.Dataflow) and store them in database. 

    You have 2 different timeouts for your MQ get.

    Do you mean RetrieveWaitTime ?

    You did not specify how you would handle a poison message or an MQ Reason Code 2033  or 2009 exception call

    this has been written in side the  MqSourceDataProviderService  class

    what things you recommend us making this multi-thread MQ reader to be more reliable  



    ------------------------------
    Dilshan Makavitage
    ------------------------------



  • 6.  RE: IBM Client Hang in long running windows service

    Posted 12 hours ago

    Here are a few debugging tools to consider:

    1) IBM MQ Application Activity Trace. There is a free supportpac (MH06) that can help with analyzing the trace data.

    2) Sysinternals Process Monitor. It might give you more clues as to the reason for the issue when it presents itself.



    ------------------------------
    Tim Zielke
    ------------------------------



  • 7.  RE: IBM Client Hang in long running windows service

    Posted 9 hours ago

    Hi Dilshan,

    I suspect we are not seeing all the code even yet, but at least there is something to comment about from an MQ perspective now. I have taken a look at your code from a perspective of what might be problematic when running 5 concurrent instances of the same code. These are the things I noticed:-

    • I see that you are using MQGMO_COMPLETE_MSG. I also note that you have a small method called CountMessagesInQueue which makes me nervous. I don't see where you use it, but please bear in mind that if you are using the current depth as an indicator that there are still messages to get, but you are also using MQGMO_COMPLETE_MSG, that there may not yet be a complete set.
    • You don't handle the situation when data.length is zero. Given that you rely on a non-zero length, perhaps you should log when you are given a message with zero length? This would not be considered an exception so there is no Reason code from MQ in this situation.
    • Why do you have waitAfterNoMessageInQueue ? What are you waiting for? If more messages arrive on the queue, shouldn't you be getting those? Why not just go back into a Get-with-wait for this time? However, when this is used, you write something to the application log and you say that there is nothing in the application log, so I guess you can't be going through there.

    Given that you say there is no exception reported, I assume that you are concerned that all your threads have gone into a get on the queue, and never returned? You can use Application Activity Trace as already mentioned, to check what is going on, and you can also use DISPLAY CONN with TYPE(ALL) and look for HSTATE. If these says ACTIVE that means the application is currently in a GET on that handle. If it says INACTIVE that means the application is not currently in a GET on that handle. That would be a useful way to quickly tell if you 5 threads are truly sitting in a GET or not.

    Cheers,
    Morag



    ------------------------------
    Morag Hughson
    MQ Technical Education Specialist
    MQGem Software Limited
    Website: https://www.mqgem.com
    ------------------------------