webMethods

webMethods

Join this online group to communicate across IBM product users and experts by sharing advice and best practices with peers and staying up to date regarding product enhancements.

 View Only

Event Processing made easy with IBM Event Streams and webMethods Integration

By John Carter posted Fri August 30, 2024 10:32 AM

  

Event Processing made easy with IBM Event Streams and webMethods Integration

In this article, I will introduce the messaging tools that have been introduced as part of the preview version of Service Designer 11.0 released in December 2023 and how it can be combined with IBM Event Streams.

Why ?

The new tools complement our existing messaging services to recognize the changing landscape that IoT and the increasing multitude of different messaging providers bring to integration. The plethora of event sources and the volume of data makes event processing difficult to implement, to ensure scalability and responsiveness.

That's why our new messaging tools are designed to work natively with any number of different platforms and offers both the traditional transactional event processing, but also a new event streaming model to make it easier to process and leverage large volumes of events and make sense of it.

Logical components and steps required to manage and make sense of large volumes of data

Message processing versus Event Streaming

Traditional message processing inferred that the individual messages represent critical events such as an Order or Invoice, that must be processed as part of a safe, reliable transaction, where the clients are enterprise-level applications.

However, with the rise of IoT and event-driven architectures both the volume, the type of event, and the variety of producers is much more varied. The events in themselves also might hold little value and it is only in the overall trends or patterns that they produce where the value can be obtained. This new pattern requires a very different messaging platform and a very different processing model in order to be able to process the required volumes.

This will involve capturing and joining streams of events from different sources as well then filtering, aggregating or calculating data from the stream before then forwarding or processing the results. This would normally require advanced development skills to avoid incurring performance issues or onerous overhead in network or memory use.

Messaging reboot

Previously webMethods offered you both native messaging with webMethods Universal Messaging or JMS messaging to allow you to use any JMS messaging platform of your choice. Neither solutions are designed with event streaming in mind, and JMS often lacks access to the features that distinguish modern messaging solutions.

With this in mind, for our webMethods customers we decided to introduce a whole new messaging framework that would allow you an agnostic method for sending and receiving events from the messaging platform of your choice and introduce a processing mode to make processing event streams into your app integration easier. The new implementation is provided as a webMethods package called WmStreaming via our webMethods Package Registry

The new webMethods messaging framework has been designed to be agnostic about the messaging platform with the use of a plugin for each provider. The current version is bundled with a plugin for Kafka, but we will be providing both plug-ins for other solutions as well as documenting the plugin SDK to allow developers to integrate natively with any messaging platform of their choice.

Why Use IBM Event Streams ?

There are many messaging solutions to choose between and many hyper scalers will provide varying messaging services. Not all of which will scale or offer the full range of functionality that you will need. With IBM Event Streams you get a fully fledge messaging provider that can scale to your business needs and provide full functionality for both transactional and event stream processing.

The other advantage is that it is built on Kafka and so can easily integrated with webMethods Integration. Using webMethods integration and Event Streams means that you don't need super advanced developer skills to process different messaging streams and with IBM Event Streams you can leverage the power of Kafka without the headache of trying to manage and secure it.

IBM Event Streams is part of a broader portfolio known as IBM Event Automation, which contains an event processing capability based on Apache Flink.  This provides powerful UI based tools to take advantage of Flink's distributed stateful streaming that can be used instead before forwarding the results into webMethods.

Remember the goal with webMethods is app integration and the trigger can work either transactionally or in cases where you don't have IBM Event Processing as a stream processor.

We will continue to work on making both products fully fledge citizens within a common arch, ensuring that discoverability, even management and control can be assured end to end.

Pre-requisites for building your own integration ?

You can obviously read this blog without needing to do any of the tasks to get a good idea of how webMethods and Event Streams can help solve your event processing requirements. However, if you do want to follow along then you will need the following;

Sign up and connect to IBM Event Streams

Signing up is easy, you just need to go the IBM cloud, register and then sign up for a free tenant for IBM Event Streams

Download connection properties
After registering, download the Kafka properties file via the Event Streams home page -> Connect to this service
Kafka properties for configuring connection
Generate an API key
You will also need to generate an API key, which is done via "Service credentials" menu. Be aware that you will need to access the secrets manager itself in order to copy the secret
Click on the Secret Managers link to get access to the secret itself
Then click on the secret and select "view secret" from the actions menu, but make sure you keep the secret somewhere safe.
Click on the secret that you generated and select
Keep the Kafka properties and API key details somewhere, we will come back to them in the next section.
Create the Temperature topic
The last task here  is to create the 'Temperature' topic that we will be using for this demo. Select "topics" from the right hand menu and create a new topic named "Temperature", selecting all of the standard default i.e. partitions 1 etc. etc.

WxIBMEventStreamsDemo package

I have published an example webMethods package to a git repository (see below) that you can install into your locally installed webMethods Service Designer. Alternative you san skip ahead to the "Try it out" section if you just want create your own integration.

https://github.com/johnpcarter/WxIBMEventStreamsDemo

Download & restart Local development runtime

Clone the package directly into your packages directory and then restart your embedded runtime from Service Designer -> Server menu -> Restart or Start.

$ cd /Applications/WmServiceDesigner/IntegrationServer/packages

$ git clone https://github.com/johnpcarter/WxIBMEventStreamsDemo.git

The path above reflects the default location if you have installed Service Designer on a Macintosh, on windows the path C:\PrOGRAM FILES\...

Configuring the connection

Once your embedded Integration Server has restarted then you can reconfigure the connection with the properties that you downloaded from Event Streams earlier. Open up the admin page for your local webMethods runtime at http://localhost:5555 and enter the default credentials 'Administrator/manage'. Then navigate to the event streams connection view via Streaming -> Provider Settings. Then click on the connection 'WxIBMEventStreamsDemo_IBMEventStream_SaaS' which is provided by our new package and then "Edit Connection Alias Details" to see the following page.

Edit the connection and replace the configuration Parameters with the Kafka properties that you downloaded earlier

Replace the configuration parameters with the contents of the file that you downloaded earlier, replacing the password value <API_KEY> with the api_key from the API key file. The reference in the Kafka properties refers to the line sasl.jaas.config. 

 For the provider URI, simply copy one of the bootstrap servers into the field and prefix it with 'https://' and add the port 9093 to the end.

Lastly you will need to select SASL_SSL from the security profile list before saving your changes. Afterwards, you will notice that the pasted API key has been replaced with [***]. Don't worry this is completely normal as any detected passwords get moved to our password manager and encrypted on disk. You can still edit the connection and the reference will be maintained so you won't have to worry about setting. Only if you rename the connection or package will require you to re-enter any passwords.

You can now go back to the streaming -> Event providers page and test the connection. If the works then you can enable the connection and congratulate yourself that you have now successfully connected with webMethods with Event Streams.

FYI, You  can replace the url or any id's and password with a global variable with the syntax ${...}. The advantage is that you will be able to reconfigure them more easily via environment variables, Kubernetes secrets or an external vault without having to rebuild or redeploy

Kafka Libraries

I have bundled the Kafka libraries in the package itself, you can find them at

WmServiceDesigner/IntegrationServer/packages/WxIBMEventStreamsDemo/code/jars/static

current version is 3.1.0

Try it out

The package comes with its own home page from where you can try out the connectivity, similar to the java sample app that you download from the IBM Event Streams home page. Simply open the page http://localhost:5555/WxIBMEventStreamsDemo

The left hand side represents the Producer which simulates a number of devices emitting temperature measurements that need to collected and acted on. You can set the min, max temperatures and the number of devices and then click on the button to start producing events that will be sent to your Event Streams platform.

The right hand side shows our consumer which comprises an event trigger that collects the events every 20 seconds, then averages them out for each device before sending the results onto a service. The service leverages the webMethods web-socket server capability to allow the results to be displayed in your browser, without having to resort to polling.

Digging into the package

IF you want to look behind the curtain to see how it works, simply open Service Designer and then expand the package WxIBMEventStreamsDemo from the right hand package navigator.

The event trigger in the WxIBMEventStreamsDemo

The key assets are in the folder

  • wx.eventstreams.consumer
  • wx.eventstreams.producer

The service 'wx.eventstreams.producer:sendTemperatures' is transform the test data into Temperature events and then sends them via the built in service 'pub.streaming:send'. Consumption is implemented via a trigger 'wx.eventstreams.consumer:consumeTemperature', which is show in the screenshot above.

You can also enable the consumer trigger directly via the admin UI or in Service Designer. However, in that case you won't have the link to your web page and the web-socket will not be available to the processing service and so instead it will trace the results to the server log 

If you want to know more, we will drill down into how you can build your own producer and consumer in the sections below.

Doing it yourself

Now we will examine how you can use the new streaming package bundled with Service Designer to natively integrate with Kafka and process large volumes of events using our new event trigger yourself step by step.

Create a webMethods package to manage your producer and streamer.

You cannot develop anything in webMethods without first creating a package as it our solution to shipping, sharing and reusing code. We will create a single package for both our producer and consumer, in reality this probably would be a bad idea as the two may have to be deployed into completely different environment, but we will do it to keep things simple.

Call your package what you want, the screenshots below reference a package named JcKafkaExample. 'Jc' is my initials ;-), generally in webMethods we tend to prefix our packages with a common code so that they can be easily distinguished from organisation to another. Hence why all webMethods packages are prefixed with Wm. You also need to create a folder structure inside your package that will also implicate a unique namespace for the assets. This is important for both legibility and to ensure uniqueness when installing packages from different sources. Layout the following structure in your package, you can replace the 'jc' root with the prefix that you used for package.

name-space is critical to avoid conflicts and ensure your code is legible

The folders are

  • jc.kafka.example - root folder
  • jc.kafka.example.common - Assets common to both the producer and consumer such as the event structure
  • jc.kafka.example.producer - This is where we will emit events and send them to kafka
  • jc.kafka.example.consumer - We will create our trigger here to the consume the producer emitted events

You can of course replace jc with your initials and note in a real use case it would not be recommended to use the word kafka as the whole point of the new framework is to be provider agnostic i.e. our soluould be easily us

ed with any event streaming/messaging platform by simply swapping in the correct plugin, libraries and updating the connection.

Creating the Document type

Most events will have some kind of structure so it makes sense to model that structure with a Document Type in webMethods (Document types are cool because they allow you to manipulate structured data independently of external format requirements) . The transformation between documents and the events is managed implicitly for events formatted as either JSON or XML. Create a document type in the documents folder called Temperature with three attributes as below;

  • deviceId - String, represents a unique identifier for source device
  • timestamp - Object with java wrapper attribute set to java.util.Date
  • temperature - Object with java wrapper attribute set to java.lang.Double
Bundling the Kafka libraries

You will need to download the Kafka libraries and add them to your package before you can create your connection. Kafka libraries can be downloaded from the Kafka download site and should then be placed into the package e.g.

WmServiceDesigner/IntegrationServer/packages/JcKafkaExample/code/jars/static

The libraries can be found in the libs directory of the download from Kafka and you will need to copy the following to the above directory

  • kafka-clients-*.jar
  • lz4-java-*.jar (required for compression)
  • snappy-java-*.jar (required for compression)

The Kafka client also utilizes the "slf4j-API-.jar", but there is no need to copy it because it is already included in the common JAR distribution of the webMethods Integration runtime.

You will need to then restart your local runtime, again you can do this from Service Designer and the menu Server -> Restart

Create the connection to IBM EventStreams

We now need to establish a connection between our integration runtime and Event Streams. Open a web page to admin UI of the local Integration Server; localhost:5555 → Streaming → Provider settings → Create Connection Alias as below;

Configure as we mentioned above for the WXIBMEventStreamsDemo package, copying the Kafka properties into the configuration parameters, updating the API key and setting the provider URL. Remember to test the connection before enabling it.

Create an event specification for Temperature

The event specification is a means to formalize the complexity of knowing the event structure. As already mentioned event streaming can implicate complex JSON or XML structures or in the case of very simple events a single typed value. Creating event specifications allows developers to share the details of the event structure and allow easier collaboration. Open a web page to admin UI of the local Integration runtime; localhost:5555 → Streaming → Event specifications → Create event specification, name it temperature as below;

The key will be a simple string and the value a JSON string as specified by our earlier created Document Type. The translation of the document from JSON or XML will be done automatically when sending or receiving events. Make sure that the namespace of the document you created earlier is correct, best way is to simply copy the document type from Service Designer and paste into the field.
Emitting events via a producer

We can easily publish events to Kafka via the service “pub.streaming:send” provided in the new WmStreaming package. The service is also designed to take an array so that you can emit a collection of events together.

From Service Designer → right-click on the producer folder → New → Flow Service and give the name “emitTemperatures” and format the signature as follows;

Make sure that the attribute “temperatures” is based on a Document Reference List". Then in the implementation add a loop to map /temperatures to /events. Add a map step inside the loop to map the temperatures to a value object inside events and also set the key to %keyPrefix%-%$iteration% using variable substitution.

Once done add an invoke step for the service “pub.streaming:send”. The events should map automatically to the input, you need only specify the connectionAliasName and eventSpecificationName. They should match the name of the connection alias and specification that you created earlier.

Run the service and it should execute without error. Of course, nothing much will happen as we haven’t yet built any consumer to take advantage of these events.

Create a trigger to consume events

To consume the above events we need to use our new event trigger. Right-click on the consumer folder → New → Event Trigger and name it collectTemperatures with the following settings

The new trigger is very powerful and has two variations;

  • Transactional - Nearly identical in functionality to our existing UM based trigger, only that this one can be used with any messaging provider for which we have a plug-in.
  • Event streaming - Consumes multiple events and allows you to easily filter and aggregate them together so that they can be consumed more easily as a stream.

The steps to completing your trigger are;

  1. Select the connection alias.
  2. Indicate what events you want to process and how via the Receive and decode section.
  3. Set the trigger type in the process section.
  4. Implement multiple processing options on the stream such as filtering, transforming or calculating.
  5. Pass the processed stream onto a flow service or forward to a different queue/topic.
  6. Tune trigger performance via the properties section.

Select the connection & event specification

The connection is the alias that you created earlier in the admin UI. You should not reference a connection in a different package and ensure that your package is self-contained unless you like the dependency nightmare that ensues. Remember packages should be easily deployable and ideally be 100% operational after deployment.

The trigger needs to know what topic or queue from which it will consume events and also be able to know how the events are structured so that they can be filtered and aggregated easily. That is why we created the event specification earlier and needs to be selected in the “receive and decode” section.

Processing the stream

The next step is to choose how you want to process events, you can choose between;

  1. Process a single event - A single event will retrieved from the queue or topic.
  2. Process a count-based window of events - You need to specify a maximum number of events that will be fetched and forwarded to the processing section, switch to time-based if there is an insufficient number of events in the queue and use the value from “max waiting time”.
  3. Process a time-based window of events - This will fetch all events from the queue or topic since the last interval.

Option (1) is almost identical in behavior to a traditional UM or JMS trigger and allows you to process individual transactions as required. Note that the calculate processing option is disabled in this case as it makes no sense.

Option (2) and (3) are new processing behaviors that are aimed at event streaming, where you need to efficiently process large volumes of transient events, aggregate them and easily perform calculations. Be aware that they will attempt to fetch as many events dictated by your configuration; for count based the upper limit is known. However, for time-based there is potentially no known upper limit, so you must ensure that you have a reasonable interval in relation to the volume of documents.

In the example below I have picked the option for “count-based” window, where a maximum of 100 events will be fetched at a time, or after 10 seconds if less than 100 events are available in the queue or topic.

Note the WxIBMEventStreamsDemo package instead uses a time based window of 20 seconds, averaging temperature over a fixed time makes more sense. However, here I wanted to show the alternative processing window.

These 100 events will be then streamed into the processing section, where you can choose what to do with each event; namely

  • filter - Exclude certain events from the stream based on criteria specified here, using the same format as a flow branch statement e.g. %/stream/value/temperature% > 10.0
  • transform - Allow the event structure and attributes to be mapped via a generated flow service, importantly all following steps will then have to operate based on the new structure
  • calculate - Aggregate numerical data from the events, such as calculating averages, etc.
  • collect - group data into silos.
  • forward - forward results via another topic/queue .

For instance, the following example showcases a use case where we average out the temperature, whilst excluding all temperatures under 10.0 degrees, before finally invoking a flow service with the results.

First we filter out all events that have a temperature below 10.0 degrees, we will consider these bad readings.

Then we use a the calculate step combine multiple events by perform calculations on any numeric value, similar to a spreadsheet. These calculations then become the basis for the following steps, the original individual events are destroyed.

You can also choose to split the calculations based on attribute in our the event. For instance in our example it makes no sense to get the average temperature across all devices, we instead want the average temperature for each device and so we can choose to group the calculation. Entering "/stream/value/deviceId' into the groun by "document field" selector will instead produce not average temperature event, but instead one for each device within the collection period.

Invoking the stream target

The last step is to either forward the final result to a queue or topic of your choice or invoke a generated flow service called the stream_target as shown below

The service is auto-generated and includes the signature to ensure that the transformations and calculations that you have built in the process section can be easily referenced. Any changes you make to this service should be retained if you make changes. However, it is good practice to minimize the code that you include in this generated service and instead delegate it to your own services.

For simplicity’s sake, you can simply add a trace pipeline to the generated flow service and then run the ‘emitTemperatures’ service that you created earlier; You should see something similar to the following in your server log.

2024-01-17 09:50:36 CET [ISP.0090.0001C] (tid=275) --- START tracePipeline [17/01/2024, 09:50] --- 2024-01-17 09:50:36 CET [ISP.0090.0008C] (tid=275) 0 stream_calculations[0] {[Lcom.wm.data.IData;} => 2024-01-17 09:50:36 CET [ISP.0090.0008C] (tid=275) 1 element {java.lang.String} = '/stream/value/temperature' 2024-01-17 09:50:36 CET [ISP.0090.0008C] (tid=275) 1 count {java.lang.String} = '6' 2024-01-17 09:50:36 CET [ISP.0090.0008C] (tid=275) 1 average {java.lang.String} = '12.9' 2024-01-17 09:50:36 CET [ISP.0090.0008C] (tid=275) 1 min {java.lang.String} = '11.1' 2024-01-17 09:50:36 CET [ISP.0090.0002C] (tid=275) --- END tracePipeline ---

The first thing you will notice is you no longer receive the actual events if using a calculated step in your trigger, only the final calculations, the count, and the average, and will exclude any events where the temperature was less than 10 degrees.

Next steps

For detailed documentation on IBM Event Streams please refer here

https://ibm.github.io/event-automation - IBM Event automation

We will be soon pasting more and more articles on our upcoming IBM webMethods 11.1 release so keep tuned to techXChange.

10 comments
176 views

Permalink

Comments

Wed March 12, 2025 11:47 AM

Please be aware that the web page does not seem to like Google chrome. Clicking on the "start consuming" button triggers an error, which causes the connection to be immediately closed.

It works fine with Firefox on all platforms and with Safari on macOS

Thu September 12, 2024 02:33 AM

Hi @Wayne Leishman,

So it appears that the browser has established a web-socket with your runtime, but it is then being invalidated i.e. "Could not find session with sessionId: ****"

The good news is that clearly your subscriber is working, because the upstream communication is done only when receiving the results from your trigger.

In the browser debug log do you see the following message when clicking on "Start consuming" ?

connected successfully to ws://localhost:3333/events/temperature

In the latest update I have added more logging in the IS side to see if the connection is established properly. So you should see messages like

ISSERVER|| 2024-09-12 08:32:52 CEST [ISP.0090.0004C] (tid=3861) **socket** -- closing web-socket for temperature 

ISSERVER|| 2024-09-12 08:32:54 CEST [ISS.0179.0998I] (tid=289) Event trigger wx.eventstreams.consumer:consumeTemperature consumer-6 is stopping thread "Event trigger wx.eventstreams.consumer:consumeTemperature consumer-6" 

ISSERVER|| 2024-09-12 08:32:54 CEST [ISS.0179.0998I] (tid=289) Event trigger wx.eventstreams.consumer:consumeTemperature consumer-6 is closing the provider's Consumer object 

ISSERVER|| 2024-09-12 08:32:54 CEST [ISS.0179.0998I] (tid=289) Event trigger wx.eventstreams.consumer:consumeTemperature received PartitionAssignmentHandler.onRevoke for consumer-6, partition(s) [Temperature-0] 

ISSERVER|| 2024-09-12 08:32:54 CEST [ISS.0179.0211I] (tid=3861) Event trigger wx.eventstreams.consumer:consumeTemperature has stopped all consumers 

ISSERVER|| 2024-09-12 08:32:54 CEST [ISS.0179.0998I] (tid=3861) Event trigger wx.eventstreams.consumer:consumeTemperature has been disabled 

ISSERVER|| 2024-09-12 08:32:55 CEST [ISP.0090.0004C] (tid=781) **socket** -- established connection for web-socket /temperature 

Wed September 11, 2024 04:02 PM

Has anyone got this to work? I know John C is running it on his Mac but has anyone else????

I followed all the instructions and after starting the producer and consumer, I never see anything appear on the consumer section of the DSP (http://localhost:5555/WxIBMEventStreamsDemo/).

I am running this on a Windows 11 64bit laptop.

After clicking Start producer, I see the information on the DSP page, and in the integration server log I see  the debug messages "** Producer ** -- Running" each time an event is sent.
However, if I start the consumer, nothing appears on the right side of the DSP page. In the IS log I see exceptions, including "Could not find session with sessionId" for example:

ISSERVER|| 2024-09-11 15:56:47 EDT [ISS.0179.0218E] (tid=880) Event trigger wx.eventstreams.consumer:consumeTemperature/consumer-3 received a fatal exception while processing step TIME_BASED target processing at eventId 0-12725: com.wm.app.b2b.server.ServiceException: [ISS.0086.9402] Could not find session with sessionId: "MjA1MzM5NTQ2OQ==" 
ISSERVER|| 2024-09-11 15:56:47 EDT [ISS.0180.0006E] (tid=880) Error occurred in Processing Engine for resource wx.eventstreams.consumer:consumeTemperature: The following error occurred invoking target step: Fatal - class com.wm.app.b2b.server.ServiceException : [ISS.0086.9402] Could not find session with sessionId: "MjA1MzM5NTQ2OQ=="

I'm not sure if this is related to my browsers and/or browser security. I have tried in both Google and MS Edge. Is it related to me using Windows. I'm not sure why it would be....

So, if ANYONE has got this to work please ping me. I've been through the instructions multiple times, and reinstalled the latest demo package to pick up the latest changes.

Wayne

Thu September 05, 2024 09:45 AM

Thanks @John Carter

I went ahead and provided a card and made sure to create a free (lite) Event Streams resource for my cloud ID. I will also keep a sharp eye out on any resources attached to my cloud ID and make sure nothing is being charged to me.

Now I can continue trying your example.

Thanks for taking the time to put it together.

Wayne

Thu September 05, 2024 02:15 AM

Sorry @Wayne Leishman, I'm new to this process too. My recommendation would be to use a disposable virtual credit card. That's what I do, ever since I got bitten by a big bill for an AKS cluster running in Azure :-(  Shut it down, but forgot to delete it !!!

Wed September 04, 2024 05:30 PM

Hi again John

Thanks for the response to my previous question.

I am trying to get my free IBM Event Streams tenant. While signing up for my IBM cloud userid, it wants my credit card. I know it say it only puts a $1 charge on it while verifying my info using credit card info but the button I see on the bottom of the signup form says "Upgrade".

I have not completed my account setup yet because I don't want to sign up for a paid subscription :)

Any tips on the IBM Cloud account? If not, I will contact IBM support.

I downloaded Service Designer 11 and have it and the MSR running. I still need to grab the wM package from Git.

Wayne

Wed September 04, 2024 08:00 AM

@Wayne Leishman, this will be done via the package registry at https://packages.webmethods.io.

However, we haven't refined the process for accepting packages or ensuring that there is always at least one maintainer as yet.

Wed September 04, 2024 07:58 AM

Hi @Mike Ng,

Remember we will still be adding both the Kafka and MQ connectors soon. We are also considering developing the necessary plugin for MQ to allow the built in messaging to use MQ as a provider too.

Tue September 03, 2024 11:16 PM

Can we expect a better integration with our own IBM messaging products in the near future since we are now internal to IBM MQ and IBM Event Streaming (Kafka)?

Tue September 03, 2024 04:59 PM

Nice overview John!

You mention that "...the plugin SDK to allow developers to integrate natively with any messaging platform of their choice."

Will there be a place where developers can share the plug-ins? I'm just thinking this will save developers from "reinventing the wheel" if they want to connect to the same platform.