IBM Event Streams and IBM Event Automation

IBM Event Streams and IBM Event Automation

Join this online group to communicate across IBM product users and experts by sharing advice and best practices with peers and staying up to date regarding product enhancements.

 View Only

Event Processing meets Flink Datagen: Generate test data simulations in a flash!

By Ammu Parvathy posted Fri December 20, 2024 07:21 AM

  

IBM Event Processing is a scalable, low-code, event stream processing platform that helps you transform and act on data in real time. You can define flows that connect to event sources which bring event data (messages) from Apache Kafka® into your flow, combined with processing actions you want to take on your events. The flows are run as Apache Flink® jobs. One of the key steps in prototyping an Apache Flink application is generating data efficiently. This process lays the foundation for testing and validating the application’s logic and performance. Whether you are simulating streaming events, creating batch datasets, or mimicking real-world scenarios, generating representative data is essential to ensure that the application behaves as expected under different conditions. It also helps in identifying potential issues and refine the application logic before deploying it in production.

Some approaches for data generation with Apache Flink include using the source connectors for example the Kafka connector or using the Flink Datagen connector. Currently DataGen connector  is not a supported connector in the Event Processing source node. But using the custom SQL node it is also possible to use the DataGen connector. This blog will show you how to create EP flows with Table API DataGen connector, and explaining the benefits and limitations of this approach.

Connectors used by the jobs

With this approach you prepare and generate test data in a test job, using the same SQL as the connectors used in the real jobs. Populating some Apache Kafka topics and consuming them in test jobs is one such example which is a way of mimicking the production scenario. For instance, most of the Flink jobs defined in IBM Event Processing tutorials are consuming and processing test data generated in Kafka topics. As depicted in the image below a typical Kafka connector is backed by a kafka topic, which gets the message through a producer and the same is consumed by Flink source.

Example job with a Kafka connector

Using the Flink DataGen Connector

With this approach you use a built in Datagen connector for creating tables based on in-memory data generation. This is used to generate synthetic data streams for testing and development purpose within Apache Flink. This is useful when developing queries locally without access to external systems such as Kafka. 

Example job with a DataGen connector

Why Flink DataGen?

This connector allows users to simulate data generation with customisable schemas datatypes and patterns. It is known for its seamless integration with stream processing capabilities, high throughput and support for complex schemas enabling the generation of realistic datasets. Forget about the hassles of topic creation and connector configurations, just include 'datagen' in your sql with the relevant schema customisations and there you go!

DataGen for mocking physical tables

Mocking out physical tables is a common use case for simulating data for real connector based tables. This could be especially useful for simulating results in Kafka connected tables which may not have any messages in the topics. For example consider a table backed by Kafka as below:

CREATE TABLE `orders`
(
    `orderId`                      INT NOT NULL,
    `description`                  STRING,
    `quantity`                     INT,
    `tsSec`                        BIGINT NOT NULL,
    `tsMillis`                     BIGINT NOT NULL,
    `tsMicros`                     BIGINT NOT NULL,
    `event_time`                   TIMESTAMP(3) METADATA FROM 'timestamp',
    WATERMARK FOR `event_time` AS `event_time` - INTERVAL '1' MINUTE
)
WITH (
    'properties.bootstrap.servers' = 'kafka:000',
    'connector' = 'kafka',
    'format' = 'avro',
    'topic' = 'orders-avro-test',
    'properties.security.protocol' = 'PLAINTEXT',
    'properties.isolation.level' = 'read_committed',
    'scan.startup.mode' = 'earliest-offset'
);
CREATE TEMPORARY TABLE GenOrders
WITH (
    'connector' = 'datagen',
    'number-of-rows' = '10'
)
LIKE orders (EXCLUDING ALL);

We can create a source table with the necessary connectors and table schema and use the LIKE clause to create a temporary table that mimics the original table and simulate some test data. This feature can be useful for testing and debugging Flink application logic with simulated data before deploying into production. The IBM Event Processing tutorials setup provides an excellent play ground for working with Kafka connectors.

DataGen in action

To illustrate the connector functionality we have chosen an interesting use-case to generate some synthetic data using this connector. In this example we are trying to integrate the real time event processing capabilities of Flink with the Decision review system in sports. The Decision Review System (DRS), is a technology-based system used in cricket to assist the match officials in their decision-making process. It’s a system by which players can challenge the decision made by the umpire and enter a thorough review process before making a decision. The main inputs that have been used are television replays, technology that tracks the path of the ball, microphones to detect small sounds made as the ball hits bat or pad, and infra-red imaging to detect temperature changes as the ball hits the bat or pad and predicts what it would have done.

This is a simplified example to demonstrate the connector functionality with this use-case. Here we are simulating a data stream representing the image frames generated by a ball trajectory during a cricket match. Each event represents a frame (frame_id) with the position of the ball (ball_position) at a specific point of time (timestamp).

Below is an example of the expected data to be processed from DRS:

{
    "ball_position": [
        100.001,
        50.05,
        10.90
    ],
    "frame_id": "445ddf53c2",
    "average_pixel_intensity": 15,
    "previous_average_intensity": 10,
    "timestamp": "2024-10-25 11:09:59.323"
}
  • ball_position is an array of float representing the ball position with respect to the X, Y, Z coordinates (Number of elements in a array is defaulted to 3 with DataGen connector)
  • frame_id: Let’s assume that the DRS generates frame_id as an alphanumeric of 10 characters each. We will be setting the length explicitly in the SQL.
  • average_pixel_intensity is the simplified metric that represents average brightness level of all pixels within a region of the frame or entire frame of an image.
  • previous_average_intensity being the average pixel intensity of the frame just before the current frame and an associated timestamp.

The processing consists of filtering out the unwanted frames using the 2 metrics average_pixel_intensity and previous_average_intensity. We will be using Flink SQL to process the image and filter out only the relevant frames required for further computations.

1. Defining the data stream

We are leveraging the Custom SQL node feature introduced in Event Processing 1.2.2 version where in you have options to build your own custom SQL from scratch. Below is the screenshot of set of features available under the Custom palette in the canvas. SQL source node can be used to declare a new source, SQL processor for any processing logic and SQL destination for declaring a sink.

Custom palette in Event Processing

The SQL source node functions similar to the existing event source node, and allows customisation of the properties of the source connector used. Here is the SQL statement that is used to configure the SQL source node. It defines the schema of the source events and configurations of the DataGen connector. Customisations are available for each fields based on the use case and these are discussed at a later point in the article.

CREATE TABLE `ball_path_frames`
(
    `frame_id`             STRING,
    `ball_position`        ARRAY<FLOAT>,
    `avg_pixel_intensity`  INT,
    `prev_avg_intensity`   INT,
    `timestamp`            TIMESTAMP(3)
)
WITH (
    'connector' = 'datagen',
    'fields.frame_id.kind' = 'random',
    'fields.frame_id.length' = '10',
    'fields.ball_position.kind' = 'random',
    'fields.ball_position.element.min' = '1',
    'fields.ball_position.element.max' = '30',
    'fields.avg_pixel_intensity.kind' = 'random',
    'fields.avg_pixel_intensity.min' = '1',
    'fields.avg_pixel_intensity.max' = '160',
    'fields.prev_avg_intensity.kind' = 'random',
    'fields.prev_avg_intensity.min' = '1',
    'fields.prev_avg_intensity.max' = '160',
    'number-of-rows' = '10'
    );

2. Processing the events

Processing logic consists of calculating the absolute difference between the average pixel intensities of current and previous frames and comparison of this to a threshold value. We can either make use of the SQL processor node in the Custom palette to directly consume the SQL or use the processing nodes like transform/filter.

2.1 Configuration using SQL Processor node

The SQL processor node can be used to write Flink SQL code, allowing for the implementation of complex queries and processing logic. 

CREATE TEMPORARY VIEW `intensity difference between frames` AS
SELECT
    `frame_id`                     AS `frame_id`,
    `ball_position`                AS `ball_position`,
    `avg_pixel_intensity`          AS `avg_pixel_intensity`,
    `prev_avg_intensity`           AS `prev_avg_intensity`,
    `timestamp`                    AS `timestamp`,
    ABS(`avg_pixel_intensity` - `prev_avg_intensity`) AS `intensity_diff`
FROM `ball_path_frames`;

CREATE TEMPORARY VIEW `filtered frames` AS
SELECT * FROM `intensity difference between frames`
WHERE `intensity_diff` > 5;

Make sure to use the same name of source table in the FROM clause of the temporary view or else the editor will complain about validation failures. The validation feature in the SQL editor runs for every single change and in case of any errors it will point to the line from which it occurred.

Data processing using the custom processor node

2.2 Configuration using transform node and filter node

Alternately, we can also make use of the processor nodes like transform/filter to apply this logic. Here I am making use of the transform node to get the difference between the pixel intensities and a filter node to extract the relevant frames. At this point I can either connect it to a sink or run it on the fly to fetch the results.

Data processing using the transform and filter node

3. Running the Flow

It is possible to make this flow bounded or unbounded by using the number-of-records property of the connector. This is an optional property and when explicitly set to a number will generate the desired number of rows after which the job is stopped. In cases where this configuration is not set, the flow behaves like an unbounded stream with continuous data generation and job needs to be explicitly stopped before re-running the test. This can be achieved by making use of the Stop flow button in the canvas.

The canvas offers two modes of job submission. Events from now for processing only the latest events in the Kafka topic or Include historical to pull in the events that are already available in the Kafka topic. You can choose either of these options as both has the same effect when working with the DataGen connector.

Run this flow to simulate some data and see the results:

Result panel

Expand this flow with more processing nodes to get the ball trajectory prediction and decide on a HIT or MISS. Howzat! Try it out yourself using the demo environment provided by Event Processing tutorials.

Understanding the available datatype ranges

While the DataGen connector allows for quick and seamless data generation, there still are some aspects to keep in mind while working around the randomness of the data. Simulating realistic data might need additional configuration settings on each fields. This section describes about some of the available datatype ranges. 

Numbers

Numbers are the easiest to generate. The connector provides random generator and sequence generator options and these can be set using the field.#.kind property. Using random with min / max configuration or sequence with start / end allows us to produce “not too random” data. Consider the SQL:

CREATE TABLE `student`
(
    `id`     INT,
    `marks`    INT,
    `subject` STRING
)
WITH (
    'connector' = 'datagen',
    'fields.id.kind' = 'sequence',
    'fields.id.start' = '1',
    'fields.id.end' = '10',
    'fields.marks.kind' = 'random',
    'fields.marks.min' = '50',
    'fields.marks.max' = '100',
    'fields.subject.kind' = 'random',
    'fields.subject.length' = '5',
    'number-of-rows' = '100'
    );

The below results are produced with this configuration. Here we can see that even though we provided the number-of-rows as 100, only 10 elements are produced. This is because the number-of-rows configuration is overridden by the sequence start and end setting done for the id field.

Limitation when using sequence

String and Varchar

Text produced by the DataGen in varchar and string are random strings containing 0-9 and a-f characters. It’s possible to generate a lot of rows per second and filtering out only the rows that contain a certain value (with a where clause). But we’re still only getting “words” that contain A to F characters. 
Customisations are possible by combining this with other SQL operations but with limitations.

CREATE TABLE `address`
(
    `id`     INT,
    `name`    STRING,
    `country_id` INT,
    `city` STRING
)
WITH (
    'connector' = 'datagen',
    'fields.id.kind' = 'random',
    'fields.id.min' = '1',
    'fields.id.max' = '10',
    'fields.name.kind' = 'random',
    'fields.name.length' = '5',
    'fields.country_id.kind' = 'sequence',
    'fields.country_id.start' = '10',
    'fields.country_id.end' = '15',
    'fields.city.kind' = 'random',
    'fields.city.length' = '6'
    );
CREATE TEMPORARY VIEW address_view AS SELECT id, name, 
case `country_id` when 10 then 'IN' 
when 11 then 'FR' 
when 12 then 'UK' 
when 13 then 'GB' 
when 14 then 'US' 
end AS country_code,
city
FROM address;

Timestamps

The max-past option that can be used on timestamp fields allows us to go back in time up to a maximum value. In case where this value is not set, the generated timestamps starts from the current timestamp associated with the local machine time zone. This allows us to generate random past timestamp that can be used on flows. Consider the example below:

CREATE TABLE `orders`
(
    `order_id`             STRING,
    `order_time`        TIMESTAMP(3)
)
WITH (
    'connector' = 'datagen',
    'fields.order_id.kind' = 'random',
    'fields.order_id.length' = '10',
    'fields.order_time.max-past' = '300000',  
    'number-of-rows' = '10'
    );

Here we are setting the max-past configuration for order_time field as 300000ms or 5minutes. The minimum timestamp value generated for this column will always be within 5 minutes past the current timestamp.

Collections

In the ball_trajectory example above we discussed about the ball_position being an array of float. For random collection types like array type, multiset, maptype etc, the collection size is defaulted to 3, which means these always generate results with 3 elements. As can be seen in the results panel of the example it only has 3 elements. This configuration cannot be overriden

The bottom line

Testing and validation of application logic and performance can greatly benefit from simple ways to generate test data. Some key considerations for data generation includes utilising the Flink DataGen Connector or making use of the connectors used by the jobs.The tutorials set up in Event Processing documentation is an excellent starting point to play-around with the Kafka connector for data generation. To leverage the Flink DataGen connector you can use the latest custom node feature which allows for the customisation of the source connectors. The results simulated through this connector can be random and these can be made more realistic by customising the values for each column using the connector configurations set.

Contributors

Ammu Parvathy

Sebastien Pereira

David Radley

0 comments
97 views

Permalink