IBM Event Streams and IBM Event Automation

 View Only

Monitoring EventAutomation capabilities

By Pierre Richelle posted 8 days ago

  

An Event Automation Monitoring journey

Introduction

This blog provides a detailed guide on configuring and monitoring an Event Automation deployment using OpenShift Container Platform (OCP) features.
These features are built on Prometheus, an open-source monitoring framework, and Grafana, a widely used open-source visualization platform.

The Event Automation solution is comprised of three core components:

  • Event Streams: Provides the event distribution layer using Apache Kafka.
  • Event Gateway: Manages access to Kafka topics.
  • Event Processing: Analyzes events to deliver meaningful insights.

To demonstrate the monitoring capabilities, we will explore a common use case: collecting and processing events from an existing commerce application used to place item orders. A business application, implemented as an event processing flow, analyzes product buying trends.

In this scenario, monitoring focuses on ensuring the health of the entire application. This includes:

  • The Event Gateway, which control the production and consumption of events in Kafka topics.
  • The Event Streams which provides the Kafka Topics
  • The Event Processing flow, which transforms raw events into actionable insights.

The blog is centered around a Grafana dashboard that consolidates key metrics essential for effective application monitoring.

We begin by outlining the application use case, followed by a detailed examination of the metrics provided in the dashboard.
This includes understanding their significance and how they can be applied to monitor the system.
Next, we demonstrate how to leverage the Grafana dashboard to interpret these metrics across various operational scenarios.

Finally, the blog concludes with a step-by-step guide to setting up the monitoring tools and configuring the Event Automation components to generate and utilize the necessary metrics.

By the end of this blog, readers will have a practical understanding of how to monitor an Event Automation deployment, ensuring system reliability and operational insights.

Use case

In the fast-evolving world of modern applications, seamless integration between front-end systems and robust back-end infrastructures is paramount.

The project originated from a larger use case: a web front-end application that sends product orders to a back-end system via message queuing (MQ). To have a better decision-making the business users would like to tap into this information and would like to have a better insight into product orders categorised by type and aggregated by the hour.

The solution retained to address this business case is to leverage IBM’s event automation tools and Apache Flink that allows to transform those raw event streams into meaningful metrics, ready for consumption by external client applications.

To use this event processing capability, the MQ messages are duplicated to a IBM MQ using the streaming queues configuration and sent to Kafka using MQ Kafka connector.

The connector is available from the EventStreams connector catalog.

To focus the scope of this blog, I simplified the architecture by replacing the initial MQ and Kafka Connect setup with a Java application that directly posts events to Kafka through the IBM event gateway, as shown in the following figure:

The IBM event gateway controls access to Kafka topics, ensuring a secure and reliable flow of data. A flink SQL job is built using IBM Event Processing and it consumes these events, processes them in real time, and produces results that are written back to a Kafka Topic. These results are then accessible to external client applications via the same event gateway.

The Java producer application is sending events to the Topic "PRODUCT.ORDERS".

The event processing flow is as follows:

Where the nodes in the canvas are as follows:
  • PRODUCT.ORDERS source parses the input message sent by the Java producer

  • unpack_products: the event order produced by the java application contains an array of products. This node retrieves the content of the products array:

  • getProductType : compute the product type from the products extracted in the event using the description and a regexp:

  • productTypePerH : performs the aggregation of products type per hour and compute the sums:

  • PRODUCT.TOP : is publishing the computed event to the topic "PRODUCT.TOP"

You can find the complete instruction to deploy the application use case at the event automation monitoring git repo

Monitoring

Rather than listing every metric available for monitoring a Kafka-based solution, I opted to create a Grafana dashboard that consolidates all the key metrics relevant to application owners. This dashboard provides a comprehensive view, enabling them to verify whether the application is functioning as expected and to gain insights when troubleshooting issues.

In this blog, I will guide you through the dashboard, explaining the purpose and significance of each metric displayed in its respective panel.

The metrics used to build the dashboard are emitted by the Event Automation solution that have been configured to emit them. The metrics will not be available if the solution is not emitting any data.
Please refer to the chapter () for more details on how to configure the Event Automation solutions.

Metrics

It is possible to visualize Metrics directly in OpenShift console without grafana once the Event Automation solutions have been configured to emit them.

To visualize a metric, open the OpenShift web console as administrator and on the left navigation pane select metrics under observe.
In the expression type the metrics that you would like to visualize (for example kafka_topic_bytes_in_oneminuterate):

It's possible to filter based on column, for example to have the metrics only for the topic PRODUCT.TOP:

kafka_topic_bytes_in_oneminuterate{topic="PRODUCT.TOP"}

Grafana dashboard

The grafana dashboard described in this chapter can be imported in your grafana instance (explanation on the grafana import dahboard site).

Once the dashboard imported select the right variable at the top of the page:

  • Namespace: the namespace where the use case is deployed
  • Topic Input: the topic where the events are posted: PRODUCT.ORDERS
  • Topic Output: the topic where the event generated by the flink job are published: PRODUCT.TOP
  • FlinkTask: the pod task manager that has been deployed flink-job-top-results-taskmanager-.

Dashboard description

The dashboard contains 5 sections:

  • Heath
    this section provides the main view and tells to the application owner if the application is healthy.
  • Runtime Health
    provides some metrics to check if the runtime (event processing, kafka) is healthy
  • Data processing
    provides metrics about Kafka incoming and outgoing data. This helps the application owner to verify that data is arriving to be processed and if the application is generating data. It can also be used to check if there are consumers for the data that the application is generating.
  • Gateway Health
    provides metrics that can be used to check if the gateway used to access the application TOPICS is healthy and checking if there are clients connected for consuming or producing.

The following parts provides description and usage of the different dashboard sections.

Health

The health section gathers the most relevant metrics in regards to the flink job application that has been used in this use case and which makes the core of the business application.

 The dashboard is as follows:

metric used: es_kafka_producermetrics_timelastmessagewritten_s
origin: Kafka proxy

Other metrics than those provided here might also be useful like the backpressure.
At a high level, backpressure happens if some operator(s) in the Job Graph cannot process records at the same rate as they are received.

This fills up the input buffers of the subtask that is running this slow operator. Once the input buffers are full, backpressure propagates to the output buffers of the upstream subtasks.

More information cane be found at "what is backpressure".

The following parts explains in more details the different panels provided in the dashboard.

Last message IN

Last message published in the topic input by the Java application.

metric used: es_kafka_producermetrics_timelastmessagewritten_s
origin: Kafka proxy

Depending of the interval time selected by two messages you might have different values.

If the application is stopped or not sending events, this metric will increase, telling to the application owner that something happened with the producer. 

You might create an alert on this value if you expect to receive events within a specific time.

EP Consumer/Producer

  • EP Consumer msg/sec: this is the number of events that the deployed flink job is consuming.
metric used: flink_taskmanager_job_task_operator_KafkaSourceReader_KafkaConsumer_records_consumed_rate
origin: Flink
  • EP Producer msg/sec: this is the number of events that the deployed flink job is producing.
metric used: flink_taskmanager_job_task_operator_KafkaProducer_record_send_rate
origin: flink

Those values are a good indicator if the flink job is running as expected.
If there are new messages published in the topic PRODUCT.ORDERS (indicated with the LastMessageIN), the EP consumer msg/sec value should be more than zero. If it is not the case, there might be an issue with your flink job.

The producer would provides information on the number of records sent to the topic PRODUCT.TOP. This value is more complicated to evaluate since it depends on the aggregation window that has been set on the flink job.
In the application deployed, made of the FLink SQL job, the aggregation window is set to 1 minute:

SELECT
    COUNT(`products_element`.`quantity`) AS `totalNumberOfProducts`,
    `productsType` AS `productsType`,
    `window_start` AS `aggregateStartTime`,
    `window_end` AS `aggregateEndTime`
FROM TABLE (
    TUMBLE( TABLE `getProductType`, DESCRIPTOR(`ordertime`), INTERVAL '1' MINUTE )
)

It means that multiple records (if we have multiple different product types) should be generated every minute if the Flink job is consuming data.
If the value was 1 hour, no record will be generated for that period.

This information can be also observe using the Flink web ui and by navigating to the section "running jobs"

And by selecting the running job (here flink-job-top-results), we can see that the flink Job is running and it is receiving as well as sending bytes:



EP Lag

This metric is about the kafka offset lag measured from the event processing's perspective.

metric used: flink_taskmanager_job_task_operator_KafkaSourceReader_KafkaConsumer_records_lag_max
origin: flink

A Flink job consumes records from Kafka, where each record has an associated offset that uniquely identifies its position within a partition.

The consumer reads events at specific offsets and is always behind the producer, as new records are continuously appended to the topic.

From the Kafka broker’s perspective, the latest committed offset refers to the last acknowledged offset by the consuming application.

However, in Flink, the offset tracked by the job represents the last successfully processed and persisted record in its state.

The Lag is the difference between the latest produced offset in Kafka and the latest consumed offset by the Flink job.

Consumer lag in particular is very important because it allows you to monitor the delay between the offset consumed by your consumers and the offset of the most recently added messages. When the lag is growing, it indicates that the consumers are slower than the producers and they are falling behind. 

If your applications are supposed to work in near real-time, you will need to do something to rectify the situation.

If this value is increasing, it means that the Flink job is not able to process the new events quickly enough. This can happens due to

  • CPU throttle
  • Input event rate too high for the number of jobs deployed (under replicated)
  • Flink tasks in the flink job taking too much time (slow backend, complex processing, ...). This can be confirmed by looking at the backPressured. (see the Eventprocessing dashboard).

TOPIC Input Lag

This metric is the value is from the Kafka broker’s perspective.

metric used: kafka_consumergroup_lag
origin: Kafka Exporter

Usually this value should be the same as the EP Lag but Kafka might see that the EP job is lagging behind even though the job is working on the last messages because flink has not yet commit his offset.

Automatic commits occur at regular intervals, reducing the need for manual intervention. Flink tracks Kafka consumer offsets internally, providing a robust mechanism for maintaining data integrity. By storing offsets within its state, Flink ensures that the system can recover accurately from failures.

If it increases it might be possible that Flink experiences backpressure (due to slow downstream processing or insufficient resources) and records will accumulate in Kafka.

Kafka sees a growing lag, but Flink might still be working through buffered records at an unpredictable rate.

Another possible reason could be that the Flink job experiences issues and is not able to process the latest records.

You might create an alert on this value. It will let know that the event process is not able to consume the events quickly enough.

TOPIC Output Lag

How much the consumer application of the events generated by our flink job is falling behind the latest events.

metric used: kafka_consumergroup_lag
origin: Kafka Exporter

This metric is similar from TOPIC Input Lag the difference is that this value is for the consumer.

A lag doesn't always means that there is an issue with the consumer application: in our use case our consumer application might not need to process these information as quickly as they are generated.

EP-Busy

This information measures how much time the task spends actively processing records during each second, as opposed to being idle or waiting for data.

metric used: flink_taskmanager_job_task_busyTimeMsPerSecond
origin: Flink

A high value (close to 1000 ms per second) indicates the task is continuously busy, while a low value indicates idle periods. If your job starts lagging in processing offsets, it might be due to tasks being CPU-bound. In such cases, this metric can provide insights:

  • High busy time (close to 1000 ms/sec): Indicates that tasks are consuming almost all available CPU for processing and might be throttled or unable to keep up with the data rate.
  • Low busy time: Suggests other potential causes of lag, such as backpressure, data skew, or slow input sources. Proactive Monitoring:

Monitoring this metric over time can help predict bottlenecks before they lead to throttling or lag:

  • If the busy time shows a consistent upward trend or remains near 1000 ms/sec, it indicates the job is nearing its resource limits.
  • This allows you to take proactive action, such as scaling up the number of TaskManagers, increasing task parallelism, or optimizing your job logic.

CPU throttle

This value tells if the flink job is consuming too much CPU for the CPU limit that has been assigned for the pod.

metric used: container_cpu_cfs_throttled_seconds_total
origin: OCP

This might arises if the number of records to process is very high. This can be due

  • High input throughput
  • Number of product to process is high (there can be multiple products per event).
  • Job restart. In our use case, the restart strategy is to consume the events from the beginning (controlled by scan.startup.mode' = 'earliest-offset' in the sql script).

When a container exceeds its CPU limits, the OS runtime will “throttle” (stopped from running the whole CPU period) the container and record the amount of time it was throttled.

Current Watermark

Watermarks are used to trigger time-based operations like window aggregations. They indicate that no events with a timestamp older than the watermark will be processed, helping Flink manage out-of-order data efficiently.

metric used: flink_taskmanager_job_task_operator_currentOutputWatermark
origin: Flink

For applications using event-time processing, like the flink job deployed here where aggregation is used, it is important that the watermarks continue to advance.

It is good to monitor the watermark at time-sensitive operators, such as process functions and windows, by watching the difference between the current timestamp and the watermark. If this event-time skew or event-time lag is unusually high, then this indicates that either (1) you are processing older events, perhaps during recovery from an outage, or (2) something upstream has not sent a watermark for a long time, e.g., a source has become idle. In the latter case, if an upstream operator or source does not send watermarks for a long time, downstream operators may experience event-time lag because they rely on watermarks to process and trigger time-sensitive computations such as windows or timers. This can cause downstream tasks to appear "stuck" or to accumulate excessive state.

We can see here that no new events have been published in the topic (Last Message) and the watermark increase since the last message was part of a window that is not closed due to another message having a timestamp outside of the current winwdow:

Runtime Health

The following dashboard section provides metrics to check the health of the application runtime which is made of the Flink job and Kafka.

Here is an example of dashboard showing an issue with the flink job:

The dashboard shows that the Flink job is restarting and that there are checkpoint failures.

Flink Task CPU/Memory

The CPU metrics show how much CPU the Flink job is using and how close he is from the limit set on the container.

metric used: container_cpu_usage_seconds_total cluster:namespace:pod_cpu:active:kube_pod_container_resource_limits
origin: OCP

If it reached the limit, the container will be throttle by the OS see (CPU Throttle).

The memory information is critically important in Flink SQL as memory issues can directly affect the performance, stability, and correctness of your Flink jobs.

metrics used: container_memory_working_set_bytes container_memory_rss cluster:namespace:pod_memory:active:kube_pod_container_resource_limits
origin: OCP

If memory is insufficient, the job can fail with an OOM error, requiring restarts and causing downtime. Early detection of high memory usage allows you to tune configurations, such as increasing TaskManager memory or optimizing query logic.

Another reason is that Flink SQL jobs often rely on the state backend to manage intermediate or persistent states (e.g., for joins or aggregations). Large states can consume significant memory.

The three metrics provided here are

  • Memory usage WSS (Working Set Size) represents the subset of memory within the RSS that a process has actively used or accessed over a recent time period. Helps in determining how much memory a process genuinely needs to operate efficiently. Optimize resource allocation or detect memory leaks.
  • Memory usage RSS (Resident Set Size): it represents the amount of physical memory that a process has in the main memory that doesn’t correspond to anything on disk. Typically includes stacks, heaps, and anonymous memory maps. RSS memory isn’t easily reclaimed, making the container with high chance to getting OOMKilled with high RSS.
  • Memory limit: the limit set on the container.

The different memory used by the flink job and their usage of the Flink job can be found in the Flink web UI:

Job Restart / Uptime

This value tells the number of times the deployed job has restarted.

metric used: flink_jobmanager_job_numRestarts
origin: Flink

This value should be zero.
There are multiple reason why a job might restart:

  • The Kafka certificate provided are incorrect or have been updated. Remember in this example the certificate are part of the SQL script.
  • Wrong credentials provided for the connection to Kafka
  • Memory exhausted. This might arises if the aggregation period is too long and too much records had to be kept in memory.
  • ...

In a well-behaved system, though, most of these originate from transient failures and can be ignored.

The uptime gives the amount of time that the job has been started and running.

metric used: flink_jobmanager_job_uptime
origin: Flink

This value should be used in combination with the restart time. It might be possible that the job had restarted multiple times due to transient issues and it is now running without issue.

CompletedCheckPoint / FailedCheckPoint

It is important to check that the number of checkpoint is increasing and that there are no failure.

metric used: flink_jobmanager_job_numberOfCompletedCheckpoints flink_jobmanager_job_numberOfFailedCheckpoints
origin: Flink

The checkpoint might failed for example to disk storage exhaustion.

Checkpointing forms the backbone of Flink's offset management strategy. When a checkpoint occurs, Flink captures the current state of the Kafka consumer, including the offsets for each partition.

This process guarantees that Flink can resume processing from the exact point of failure, maintaining consistency between the Flink state and Kafka offsets. By leveraging its state management capabilities, Flink provides exactly-once processing guarantees, ensuring that each record is processed precisely once, even in the event of system failures. The process where the data from the leader is fetched to the replicas is called replication.

Topic Replicas

Provides the number of replicas for the topic used in this use case.

metric used: kafka_topic_partition_replicas
origin: Kafka

Replicas in a Kafka topic ensure high availability and data durability. By replicating data across multiple brokers, Kafka can tolerate broker failures without losing messages or disrupting service, providing fault tolerance and reliability in message delivery.

There are two types of replica: Leader replica and Follower replica.

Let’s say that there are three replicas of a partition. One of them, should be a leader. All the requests from producers and consumers would pass to the leader in order to guarantee consistency. All the replicas other than the leader are called follower. Follower do not serve any request and their only task is to keep themselves updated with the leader.

TOPIC underReplicated

This is the one must have metric that should be monitored. This metric provides us a count of the follower replicas that are not caught up with the leader replica which are present in a particular broker.

metric used: kafka_topic_partition_under_replicated_partition
origin: Kafka

If the number of under replicated partitions is not constant, or if all the brokers are up and running and still you see a constant count of under replicated partitions, this typically indicates a performance issue in the cluster. If the under-replicated partitions are on a single broker, then that broker is typically the problem.

A Kafka broker becomes under replicated when it fails to keep up with the replication process due to issues like network latency, insufficient disk I/O throughput, high CPU usage, or broker crashes, preventing it from syncing data with leader partitions.

Data Processing

This section provides detailed information about the data flowing in and out of the Kafka topics.

Topic Input PRODUCE/CONSUME bytes/sec

The Topic Input PRODUCE provides information about the data published to the topic input (PRODUCT.ORDERS) by the Java application and the Topic Input CONSUME the data consumes by our fLink job application.

metric used: kafka_topic_bytes_in_oneminuterate & kafka_topic_bytes_out_oneminuterate
origin: Kafka

If messages arrive in the topic those should be consumed by the Flink job otherwise there is an issue with the Flink job.

We can also detects that the publisher stopped sending messages.

These panels are build upon the metrics that we configured (section monitoring eventstreams) for event streams by adapting the configmap:

Topic Output PRODUCE/CONSUME bytes/sec

The Topic Output PRODUCE tells about the generation of events by the flink job to the topic output (PRODUCT.TOP). This throughput will depends on the number of products type, the input throughput and the window selected for the aggregation.

The Topic Output CONSUME provides how much data are consumed from the TOPIC (PRODUCT.TOP). This would be a client application that is interested about the number of product type ordered in the time frame defined for the aggregation.

metric used: kafka_topic_bytes_in_oneminuterate & kafka_topic_bytes_out_oneminuterate
origin: Kafka

This information can be useful if you want to know if there is an application that is consuming the events of your topic.

Producer/Consumers msg/sec (egw)

Those information are sent back by the event gateway that has been used to exposed the topic to the external applications which are the Java application producer and consumer.

  • The Producer msg/sec (egw) provides information about the number of message per second produced through the event gateway sorted by subscription.
metric used: eemgateway_producers_msgs_total
  • The Consumers message/sec (egw) provides information about the number of message per second consumed through the event gateway sorted by subscription.
metric used: eemgateway_consumers_msgs_total
origin: Event Gateway

This allows to identify the producers that are sending data or the consumers consuming data. The subscription can be retrieved using the EventEndpoint management REST API using https://eem-mgr-ibm-eem-admin-event.xxx/eem/subscriptions (see REST admin API).

Note

One challenge that is encountered in Kafka is the complexity to identify who is consuming and producing events to your Kafka broker. Having an event gateway in front of the Kafka cluster to control who is consuming what is really an added value as it allows to easily identifies the producers or consumers.
The event gateway allows the client application to access only the topic that has been granted for this subscription. Only one topic can be reached with a specific subscription. The subscription can therefore be used to identify the client application as well as the topic that is reached.

Gateway Health

This last section can be used to get additional information on the EventEnpoint gateway that is used to exposed the topics.

Gateway CPU/Memory usage

The CPU metrics show how much CPU the gateway is using and how close he is from the limit set on the container.
If it reached the limit, the container will be throttle by the OS see (CPU Throttle).

metric used: node_namespace_pod_container:container_cpu_usage_seconds_total cluster:namespace:pod_cpu:active:kube_pod_container_resource_limits container_cpu_cfs_throttled_seconds_total
origin: OCP

The two memory metrics provided in percentage of the memory limit set to the container are:

  • Memory usage WSS (Working Set Size) represents the subset of memory within the RSS that a process has actively used or accessed over a recent time period. Helps in determining how much memory a process genuinely needs to operate efficiently. Optimize resource allocation or detect memory leaks.
metric used: container_memory_working_set_bytes
origin: OCP

  • The memory limit or Memory usage RSS (Resident Set Size): it represents the amount of physical memory that a process has in the main memory that doesn’t correspond to anything on disk. Typically includes stacks, heaps, and anonymous memory maps. RSS memory isn’t easily reclaimed, making the container with high chance to getting OOMKilled with high RSS.
metric used: container_memory_rss
origin: OCP

Number Of Clients

This panel provides the number of clients that are connected to the gateway.

metric used: eemgateway_connected_clients
origin: event gateway

Scenarios

This section provide four scenarios that highlight the monitoring dashboard usage:

  • Low Event Volume: Where the Flink job easily handles the data load, showcasing normal operations.
  • High Event Volume: Where the influx of events challenges the Flink job’s processing capacity, highlighting bottlenecks and the need for system optimization.
  • No Event: in this scenario, the event producer is stopped and some metrics will reflect this status.
  • Bad credentials: in this scenario, either the Kafka Certificates have been changed or the user credentials used by the Flink job to access Kafka has been removed. This status can be highlighted by the continuous restart of the Flink job.

low event volume

In this scenario, the number of event produced by the Java source application is low. The Flink job will be able to process the events without issue.

In this configuration, we can see that everything is working as expected by looking at the Health panel section:

  • Incoming message: Last message IN is less than 10 sec meaning that we have new events coming in. This is confirmed by the EP consumer msg/sec.
  • Generated message: EP producer msg/sec is not null.
  • Lagging: no lag displayed which means that our application is able to process incoming messages.
  • Watermark: current watermark is 1 min which is normal since we set an aggregation of 1 min for the product types.

If we look at the data processing section panel:

We can see that the amount of bytes received on the Topic Input PRODUCE has increased as well as the amount of data processed by the flink job as shown on the panel Topic Input CONSUME.

The flink job is also producing events at regular time (aggregation window of 1 min) Topic Output PRODUCE and the consumer app is processing those events once they are available Topic Output CONSUME.

The Producer msg/sec (egw) is the metric get from the Event Gateway for the subscription starting with bb4 which is the Java application producer.

The number of record consumed by the java consumer app Consumer msg/sec (egw) is lower than the data produced since the Flink job is performing an aggregation on the type of Order which is limited in number.

The runtime health panel section: 

It shows that everything is running as expected:

  • The CPU usage is low and no CPU throttling. Note that the default value set in the flink deployment for the CPU limit was set to 200mCPU. The purpose of this low value is to show throttling when the workload will be very high.
  • For the flink job, there is no Job Restart, no failed Checkpoint.
  • For Kafka, we can see that there are no under replicated Topic and the replicas is set to 3.

high event volume

In this scenario the number of events produced has been drastically increased by adding more Java application producer replicas.
To increase the processing work of the Flink Job, the following changes have been done:

  • the maximum number of products per order event has also been increased to 50
  • the unit of time used to compute the number of order types has been reduced to 1 minute instead of 1 hour.

If we look at the health panel section:


We can see that the flink job is not able to process the workload quickly enough.
This is highlighted by the EP Lag that is increasing. The Topic Input Lag, which is the lag from the Kafka point of view, is even higher.
The most possible reason of this lag is due to the lack of processing resource as the CPU throttling shows and we can see that the flink job is struggling to process the load with the EP-Busy.
The current watermark is increasing as well.
But the job is still processing input data at a rate of 20756 msg/sec (shown by the EP Consumer msg/sec).

Let's see if the job is not having any issue with it's internal processing by looking at the runtime health section: 


We can see that the job is indeed throttled but the memory is still ok.
And the job doesn't have any checkpoint issues: completed checkpoint still increase and no failedCheckpoint are shown.
The job has not crashed as well as the job restart witnesses it.

If we look at the data panel section:


We can see that the producer is producing messages at a rate of 27703 msg/sec (Producer msg/sec (egw)) which is 7703 more message/sec than what the job can process.
If you look at the messages generated by the flink job, the number of products per event as increased a lot as the following snapshot of the EventStre.

No Event

In this scenario, the Java producer application has been stopped.

On the Health panel section we can see that the last message IN is high:

This information can be used to know that we have an issue with the Java producer application.
As we would expect, the current watermark is also increasing.

Bad credentials

In this scenario, the user grants used by the flink job has been deleted by changing the Kafka user grants.

The job will be in exception at start up and be restarted by the job manager.
This can be see by looking at the runtime health panel section:

By monitoring the number of restart and failed checkpoint we can spot that the job has issues.

Monitoring configuration

This chapter begins by listing the monitoring tools used for collecting, storing, and visualizing the emitted metrics, along with links to detailed configuration guides.

The second section explains how monitoring metrics are emitted by the different Event Automation solution and provides instructions on configuring them to emit the metrics featured in this blog.

Monitoring Tools

Prometheus

Prometheus is an open-source systems monitoring and alerting toolkit. It collects metrics from the system to be monitored and and stores them as time series data. The OpenShift Container Platform monitoring stack is based on the Prometheus open source project and its wider ecosystem.

The openshift documentation Enabling monitoring for user-defined projects provides the steps to configure Prometheus to monitor and collect data from your deployed service.

Collector

Event Endpoint Management and Event Processing is generating openTelemetry metrics.
Openshift has in its recent release provides built in support for collecting OpenTelemetry through an operator. The installation fo the operator is described in the OpenShift build of OpenTelemetry installation.

The collector is used here to receive opentelemetry data from EventAutomation Endpoint management.

Grafana

Grafana will be used to create dashboard using data provided by Prometheus. Grafana is not provided by default with the recent release of OpenShift. It can be easily installed using the Grafana operator. The installation procedure of the operator as well as the custom resource (CR) is provided at the following Integrating Flink with Prometheus EventAutomation tutorial.

The tutorial also provide the procedure to integrate Grafana with the Prometheus Thanos querier. It is a good idea to integrate the Thanos querier as the data source for custom Grafana dashboards because Thanos querier can query metrics from the default Prometheus instance from openshift-monitoring and from the Prometheus instance of openshift-user-workload-monitoring.

Side notes

The Grafana service account can be found using the command

oc get sa -n openshift-user-workload-monitoring

The Thanos querier host name used to define the GrafanaDataSource CR can be retrieved using the command

oc -n openshift-monitoring get route thanos-querier -ojsonpath={.status.ingress[].host}

Monitoring configuration for the solutions

The following chapters provides the configuration to apply on the different event automation solution to enable the metrics generation.

The following architecture overview provides the different Kubernetes object involves in the metrics generation and collection:

The different elements in this architecture is described in more details in the following chapters.

Event Streams

There are different configurations that needs to be applied depending of the monitoring information that needs to be collected:

  • Kafka metrics that provides information metrics related to the broker state, usage, and performance such as the bytes in and out of Topics
  • Kafka exporter to expose additional metrics to Prometheus on top of the default ones such as information about the consumers. Kafka Exporter is an open source project that has been added in Strimzi and connects to Kafka as a client to collect different information about topics, partitions and consumer groups. It then exposes this information as a Prometheus metric endpoint.
  • Broker proxy to collect information about the producer

The following figure shows the deployment architecture when all the monitoring configuration is setup. The pods used for monitoring are highlighted in blue and the other k8s objects in light blue. As the architecture shows, additional processor resources are required and might need additional OCP licenses.

The monitoring configuration that has been applied during the EventStreams installation is:

spec:
  collector: {}
  kafkaProxy: {}
  strimziOverrides:
    kafka:
      jvmOptions:
        javaSystemProperties:
        - name: com.sun.management.jmxremote.port
          value: "9010"
        - name: com.sun.management.jmxremote.rmi.port
          value: "9011"
        - name: java.rmi.server.hostname
          value: localhost
        - name: com.sun.management.jmxremote.authenticate
          value: "false"
        - name: com.sun.management.jmxremote.ssl
          value: "false"
      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            key: kafka-metrics-config.yaml
            name: es-kafka-demo-metrics-config
    zookeeper:
      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            key: zookeeper-metrics-config.yaml
            name: es-kafka-demo-metrics-config
    kafkaExporter:
      groupRegex: .*
      topicRegex: PRODUCT.*

The following sections explain in details these configurations applied.

Kafka base metrics

Kafka is providing a service for prometheus to collect metrics. A prometheus serviceMonitor will be configured to collect those metrics. The generation of metrics for Prometheus are enabled by setting the following Strimzi override configuration in the EventStreams Custom Resource (CR):

Kafka broker metrics enablement:

spec:
  strimziOverrides:
    kafka:
     metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            key: kafka-metrics-config.yaml
            name: es-kafka-demo-metrics-config

Zookeeper metrics enablement:

spec:
  strimziOverrides:
    zookeeper:
      metricsConfig:
        type: jmxPrometheusExporter
        valueFrom:
          configMapKeyRef:
            key: zookeeper-metrics-config.yaml
            name: es-kafka-demo-metrics-config

The EventStreams operator will automatically generate the configMap that contains default metrics configuration.

Note

A config map with the metrics configuration can be created before and provided in the custom resource. It is possible to change the configuration directly in the config map and any changes will be automatically reflected by the operator that will update dynamically the Kafka broker (without restarting the pods).

The metrics generated by the Kafka Broker can be listed by running a curl command on the "Kafka broker pod":

oc exec <kafka-broker-pod> curl localhost:9404/metrics

This metric can also be visualized using the OpenShift builtin UI by navigating to the OpenShift console, select the Observe tab and then metrics:

Additional metrics can be defined by updating the configMap that contains the JMX rules defining the metrics exposed.
Some metrics used by Kafka can be found at Apache Kafka monitoring.

In the following example the default rule "kafka_server_BrokerTopicMetrics" has been updated and a new "kafka_topic_bytes_out" has been created:

rules:
- attrNameSnakeCase: false
  name: kafka_server_BrokerTopicMetrics_$1_$2
  pattern: kafka.server<type=BrokerTopicMetrics, name=(BytesInPerSec|BytesOutPerSec|MessagesInPerSec)><>(Count|OneMinuteRate)
- attrNameSnakeCase: false
  name: kafka_topic_bytes_out_$2
  pattern: kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>(Count|OneMinuteRate) 
  labels:
    topic: "$1"
- attrNameSnakeCase: false
  name: kafka_topic_bytes_in_$2
  pattern: kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>(Count|OneMinuteRate) 
  labels:
    topic: "$1"

The $ symbol is used as a reference to capture groups from the regular expression defined in the pattern field.

In the kafka_server_BrokerTopicMetrics the pattern contains two capture groups:

  • $1 : (BytesInPerSec|BytesOutPerSec|MessagesInPerSec)
  • $2 : (Count|OneMinuteRate)

The patterns has been modified to add "MessagesInPerSec". The pattern will for example match the emitted JMX metrics "BrokerTopicMetrics" that has a name MessagesInPerSec and an Attribute Count and a metric kafka_server_brokertopicmetrics_messagesinpersec_count will be exposed and made available.
Once configured and if this JMX metric is enabled, it is possible to see it with the curl command on the pod:

$> oc exec es-kafka-demo-kafka-0 curl localhost:9404/metrics | grep messagesinpersec
kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future version. Use kubectl exec [POD] -- [COMMAND] instead.
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 65132  100 65132    0     0   150k      0 --:--:-- --:--:-- --:--:--  149k
# HELP kafka_server_brokertopicmetrics_messagesinpersec_count Attribute exposed for management kafka.server:name=MessagesInPerSec,type=BrokerTopicMetrics,attribute=Count
# TYPE kafka_server_brokertopicmetrics_messagesinpersec_count untyped
kafka_server_brokertopicmetrics_messagesinpersec_count 45901.0

The rule kafka_topic_bytes_out (same applies to and in) has been built based on the information available at the Apache Kafka monitoring.
The documentation provides information about the MBEAN related to the Message in rate :

kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=([-.\w]+)   

In the rule, the pattern contains two captures group

  • $1: (.+) of the topic. This will be any TOPIC name. Note that we could have filtered the topic name here.
  • $2: (Count|OneMinuteRate) The metric emitted will be kafka_topic_bytes_out_count with a label topic. If there is a TOPIC ORDERS.NEW, the following metric will be available: kafka_topic_bytes_out_count{topic="ORDERS.NEW"} 1.3075331E7

In the "Metrics" section in the OCP console, it is then possible to look after the metric kafka_topic_bytes_out_count and filter base on the topic.

To find even further what metrics can be configured, one can connect a Java Console to a Kafka Broker with JMX Exporter interface enabled.

This can be enabled using the following properties in the EventStreams CR:

spec:
  strimziOverrides:
    kafka:
      jvmOptions:
        javaSystemProperties:
          - name: com.sun.management.jmxremote.port
            value: '9010'
          - name: com.sun.management.jmxremote.rmi.port
            value: '9011'
          - name: java.rmi.server.hostname
            value: localhost
          - name: com.sun.management.jmxremote.authenticate
            value: 'false'
          - name: com.sun.management.jmxremote.ssl
            value: 'false'

Once the Kafka Broker pods have restarted, the interface can be reached using a port forward:

 oc port-forward es-kafka-demo-kafka-0 -n event 9010:9010 9011:9011

A java console can then be started on the host that has executed the port forward. This is a built in java console, just issue jconsole command and provides for the remote connection the value localhost:9010. Accept the unsecure connection.
The MBeans tab provides all the possible metrics and the one used in the example:

The ObjectName kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec can be used to build

  • the pattern: "kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)>"
  • the Attributes for the value after <> (<> in the pattern used in the JMX exporter configuration acts as a delimiter to separate the object name from the attribute name in the JMX metric).
Warning

The approach provided here might be suitable for development and exploration since the connection to the brokers are not secured. If you would like to secure the connection to the Java JVM you can configure the Strimzi overides as explained in the documentation at securing the jmx connections.

Kafka Proxy - producer metrics

When activated, the proxy allows to provide aggregated information about producer activity for the selected topic through metrics such as active producer count, message size, and message produce rates.

The EventStreams operator will deploy proxy pods one for each Kafka Broker that would be used by the Kafka producer to reach the Kafka Brokers. This adds latency to any traffic to your Kafka brokers. An alternative is to get those metrics from the client application it self but this is not always possible.
The openshift bootstrap routes are updated to reflect this change. The name of the pods is in the form ".*-ibm-es-kp".

Existing clients that were using the k8s service should be reconfigured to use the KafkaProxy instead of the Kafka Broker bootstrap.

EventStreams operator deploys as well a metrics collector pod that is receiving the new metrics from the KafkaProxy.
A ServiceMonitor custom resource is also created to tell Prometheus to collect metrics from the metrics pod.

Kafka Proxy is easily configured by adding the following properties in the EventStreams CR:

apiVersion: eventstreams.ibm.com/v1beta2
kind: EventStreams
# ...
spec:
  kafkaProxy: {}
# ...
Note

To process metrics that are emitted by the KafkaProxy, the metrics pod needs to be deployed. The metrics pod deployment is controlled by the collector:{} entry in the EventStreams CR that is created normally added by default at creation. The entry might be added if the EventStreams CR is created from pipeline.

apiVersion: eventstreams.ibm.com/v1beta2
kind: EventStreams
# ...
spec:
  collector: {}
# ...

This configuration will deploy a pod with the name following the template <es-instance>-ibm-es-metrics-<uid>.

The metrics generated by the KafkaProxy can be listed by opening a session terminal on the "metrics pod" and issue the following curl command:
curl localhost:7888/metrics The metrics name are starting with "es_kafka_producermetrics_".

For example:

es_kafka_producermetrics_messageswrittenpertopicperproducer_total_count{clientid="connector-producer-kafka-datagen-0",serviceid="anonymous",topic="ORDERS.NEW"} 10595

This metric can also be visualised using the OpenShift builtin UI by navigating to the OpenShift console, select the Observe tab and then metrics:

Warning

The metrics are only listed if they have been emitted.

Note

The KafkaProxy provides useful information such as TOPIC accessed as well as the clientID

The EventStreams UI is also reflecting the Proxy configuration by providing specific dashboard related to the producer as the following snapshot from the EventStreams UI shows:

Warning

proper rights need to be given to the service account used by the admin pod In Event Streams version 11.0.0 and later, a Kafka Proxy handles gathering metrics from producing applications.

The information is displayed in the Producers dashboard.

To provide access to those metrics, the service account used by the admin pod needs to have the right grants.
The service account name follows the layout: (the name of the eventstreams instance provided in the asset is es-kafka-demo).

oc get sa | grep admapi
#es-kafka-demo-ibm-es-admapi     1         2d16h

Grant this service account to access prometheus metrics using the following command:

oc adm policy add-cluster-role-to-user cluster-monitoring-view -z <serviceaccount-name> -n <namespace-name>

Kafka Exporter - consumer metrics

Detailed information on EventAutomation documentation about KafkaExporter and on Strimzi documentation in the Overview page.

The setup is done by adding the following configuration in the EventStreams CR:

  apiVersion: eventstreams.ibm.com/v1beta2
  kind: EventStreams
  # ...
  spec:
    # ...
    strimziOverrides:
      # ...
      kafkaExporter:
        groupRegex: .*
        topicRegex: "PRODUCT.*"

RegExp are optional. Here we are collecting information for all consumer group and only for topics starting with "PRODUCT" which are used for the demo.

RegExp are optional. In the provided example the exporter is configured to provide metrics for all consumer group and only for topics starting with "PRODUCT" which are used for the demo.

The operator will deploy an additional pod called  and a ServiceMonitor CR to collect the additional metrics.

Once the metrics configured, some metrics are collected by the EventStreams web ui.
If you open the event streams web ui, navigate to the topics section, then select the topic PRODUCT.ORDERS and then consumer tab, you will be able to get insight from the consumer once the flink job of the java consumer application will be started:

By clicking at the specific consumer, you can get more details:

Warning

The topics and their metrics will only be shown once the use case deployment has been done (use case deployment).

Event Endpoint Management

Event Endpoint Manager and gateway can be configured to emit OpenTelemetry metrics.
A collector needs to be deployed to receive those metrics.
The following figure shows how the different components interact with each others:

Metrics that are provided by EEM can be found here: EventEndpoint Management metrics

  1. Define the OpenTelemetry Collector

The Collector section of the monitoring tools provide instructions to install the OCP OpenTelemetry operator.

The collector is deployed by creating the following OpenTelemetryCollector CR.
The collector is configured with two grpc receivers one for the manager and one for the gateway. The metrics are then exposed to prometheus using two different exporters.
Detailed information can be found at the RedHat Openshift OpenTelemetry installation.

The Event Endpoint Manager metrics will be prefixed with "eemmanager" and those from the gateway will be with "eemgateway".

  1. Configure the EEM solutions to emit Otel metrics

For the use case describe in this blog, simple configuration without security has been configured using

spec:
  manager:    
    openTelemetry:
      endpoint: 'http://otel-collector.event.svc.cluster.local:4318'
      interval: 30000
      protocol: grpc

An example of configuration for the manager can be found in the EventEndpoint manager CR and for the gateway in the Event Gateway CR. The endpoints defined corresponds are those configured in the OTel Collector receivers.

  1. Configure the Prometheus ServiceMonitor The ServiceMonitor is configured to match the OtelCollector service. The configuration used for this use case can be found in the OpenTelemetryCollector yaml.

Metrics are then discoverable in the OCP console under Observe. Example with "eemgateway_connected_clients":

EventProcessing

The event processing flow that has been build is exported, deployed and executed as a flink job. The monitoring configuration provided here is therefore focusing on Flink.

  1. Configure Flink Reporter

Metrics generated by Flink can be exposed to external systems by configuring the flink reporters. This configuration is done on the FlinkDeployment CR. Here is the configuration made by configuring the prom reporter:

The FlinkDeployment CR deployment will be done during the deployment of the UseCase.

spec:
  flinkConfiguration:
    metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
    metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    metrics.reporter.prom.port: 9250-9260
    metrics.reporters: prom

Additional information can be found on the Flink metric reporters site.

  1. Configure Service Monitor

A k8s service and a service monitor has been configured to let Prometheus know that there are new endpoint to monitor.
The flink k8s service defines how to reach the flink pods. The ServiceMonitor tells Prometheus what endpoints should be monitored through the k8s service.

A tutorial is also provided that explains the setup to activate the monitoring at Flink monitoring.

  1. (optional) Expose Flink Web UI

When flink manager is deployed through the FlinkDeployment CR, a REST and WebUI endpoint is also exposed through a k8s service on port 8081.
You can expose this service as a route to access the FlinkUI which has been revealed to be very useful to troubleshoot issue with flink jobs.

Warning

The service exposed is not secured by default and should be exposed through secure gateway in a production environment.

The service can be exposed on OCP using the command:

oc create route passthrough --service=<flink-rest> --port=8081 --hostname=<ocp-flink-host>

Once created the Web UI can be reach by opening a web browser at "http://ocp-flink-host".

Warning

It is not recommended to have the Flink Web UI exposed without security.

The following blog provides information on how to expose it with authentication: 

Consuming Apache Flink Web UI in the enterprise on OCP/K8S

0 comments
4 views

Permalink