IBM Event Processing is a scalable, low-code, event stream processing platform that helps you transform and act on data in real time. You can define flows that connect to event sources which bring event data (messages) from Apache Kafka® into your flow, combined with processing actions you want to take on your events. The flows are run as Apache Flink® jobs. One of the key steps in prototyping an Apache Flink application is generating data efficiently. This process lays the foundation for testing and validating the application’s logic and performance. Whether you are simulating streaming events, creating batch datasets, or mimicking real-world scenarios, generating representative data is essential to ensure that the application behaves as expected under different conditions. It also helps in identifying potential issues and refine the application logic before deploying it in production.
Some approaches for data generation with Apache Flink include using the source connectors for example the Kafka connector or using the Flink Datagen connector. Currently DataGen connector is not a supported connector in the Event Processing source node. But using the custom SQL node it is also possible to use the DataGen connector. This blog will show you how to create EP flows with Table API DataGen connector, and explaining the benefits and limitations of this approach.
Connectors used by the jobs
With this approach you prepare and generate test data in a test job, using the same SQL as the connectors used in the real jobs. Populating some Apache Kafka topics and consuming them in test jobs is one such example which is a way of mimicking the production scenario. For instance, most of the Flink jobs defined in IBM Event Processing tutorials are consuming and processing test data generated in Kafka topics. As depicted in the image below a typical Kafka connector is backed by a kafka topic, which gets the message through a producer and the same is consumed by Flink source.
Using the Flink DataGen Connector
With this approach you use a built in Datagen connector for creating tables based on in-memory data generation. This is used to generate synthetic data streams for testing and development purpose within Apache Flink. This is useful when developing queries locally without access to external systems such as Kafka.
Why Flink DataGen?
This connector allows users to simulate data generation with customisable schemas datatypes and patterns. It is known for its seamless integration with stream processing capabilities, high throughput and support for complex schemas enabling the generation of realistic datasets. Forget about the hassles of topic creation and connector configurations, just include 'datagen' in your sql with the relevant schema customisations and there you go!
DataGen for mocking physical tables
Mocking out physical tables is a common use case for simulating data for real connector based tables. This could be especially useful for simulating results in Kafka connected tables which may not have any messages in the topics. For example consider a table backed by Kafka as below:
CREATE TABLE `orders`
(
`orderId` INT NOT NULL,
`description` STRING,
`quantity` INT,
`tsSec` BIGINT NOT NULL,
`tsMillis` BIGINT NOT NULL,
`tsMicros` BIGINT NOT NULL,
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR `event_time` AS `event_time` - INTERVAL '1' MINUTE
)
WITH (
'properties.bootstrap.servers' = 'kafka:000',
'connector' = 'kafka',
'format' = 'avro',
'topic' = 'orders-avro-test',
'properties.security.protocol' = 'PLAINTEXT',
'properties.isolation.level' = 'read_committed',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TEMPORARY TABLE GenOrders
WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
LIKE orders (EXCLUDING ALL);
We can create a source table with the necessary connectors and table schema and use the LIKE clause to create a temporary table that mimics the original table and simulate some test data. This feature can be useful for testing and debugging Flink application logic with simulated data before deploying into production. The IBM Event Processing tutorials setup provides an excellent play ground for working with Kafka connectors.
DataGen in action
To illustrate the connector functionality we have chosen an interesting use-case to generate some synthetic data using this connector. In this example we are trying to integrate the real time event processing capabilities of Flink with the Decision review system in sports. The Decision Review System (DRS), is a technology-based system used in cricket to assist the match officials in their decision-making process. It’s a system by which players can challenge the decision made by the umpire and enter a thorough review process before making a decision. The main inputs that have been used are television replays, technology that tracks the path of the ball, microphones to detect small sounds made as the ball hits bat or pad, and infra-red imaging to detect temperature changes as the ball hits the bat or pad and predicts what it would have done.
This is a simplified example to demonstrate the connector functionality with this use-case. Here we are simulating a data stream representing the image frames generated by a ball trajectory during a cricket match. Each event represents a frame (frame_id
) with the position of the ball (ball_position
) at a specific point of time (timestamp
).
Below is an example of the expected data to be processed from DRS:
{
"ball_position": [
100.001,
50.05,
10.90
],
"frame_id": "445ddf53c2",
"average_pixel_intensity": 15,
"previous_average_intensity": 10,
"timestamp": "2024-10-25 11:09:59.323"
}
The processing consists of filtering out the unwanted frames using the 2 metrics average_pixel_intensity
and previous_average_intensity
. We will be using Flink SQL to process the image and filter out only the relevant frames required for further computations.
1. Defining the data stream
We are leveraging the Custom SQL node feature introduced in Event Processing 1.2.2 version where in you have options to build your own custom SQL from scratch. Below is the screenshot of set of features available under the Custom palette in the canvas. SQL source node can be used to declare a new source, SQL processor for any processing logic and SQL destination for declaring a sink.
The SQL source node functions similar to the existing event source node, and allows customisation of the properties of the source connector used. Here is the SQL statement that is used to configure the SQL source node. It defines the schema of the source events and configurations of the DataGen connector. Customisations are available for each fields based on the use case and these are discussed at a later point in the article.
CREATE TABLE `ball_path_frames`
(
`frame_id` STRING,
`ball_position` ARRAY<FLOAT>,
`avg_pixel_intensity` INT,
`prev_avg_intensity` INT,
`timestamp` TIMESTAMP(3)
)
WITH (
'connector' = 'datagen',
'fields.frame_id.kind' = 'random',
'fields.frame_id.length' = '10',
'fields.ball_position.kind' = 'random',
'fields.ball_position.element.min' = '1',
'fields.ball_position.element.max' = '30',
'fields.avg_pixel_intensity.kind' = 'random',
'fields.avg_pixel_intensity.min' = '1',
'fields.avg_pixel_intensity.max' = '160',
'fields.prev_avg_intensity.kind' = 'random',
'fields.prev_avg_intensity.min' = '1',
'fields.prev_avg_intensity.max' = '160',
'number-of-rows' = '10'
);
2.2 Configuration using transform node and filter node
Alternately, we can also make use of the processor nodes like transform/filter to apply this logic. Here I am making use of the transform node to get the difference between the pixel intensities and a filter node to extract the relevant frames. At this point I can either connect it to a sink or run it on the fly to fetch the results.
3. Running the Flow
It is possible to make this flow bounded or unbounded by using the number-of-records
property of the connector. This is an optional property and when explicitly set to a number will generate the desired number of rows after which the job is stopped. In cases where this configuration is not set, the flow behaves like an unbounded stream with continuous data generation and job needs to be explicitly stopped before re-running the test. This can be achieved by making use of the Stop flow
button in the canvas.
The canvas offers two modes of job submission. Events from now for processing only the latest events in the Kafka topic or Include historical to pull in the events that are already available in the Kafka topic. You can choose either of these options as both has the same effect when working with the DataGen connector.
Run this flow to simulate some data and see the results:
Expand this flow with more processing nodes to get the ball trajectory prediction and decide on a HIT or MISS. Howzat! Try it out yourself using the demo environment provided by Event Processing tutorials.
Understanding the available datatype ranges
While the DataGen connector allows for quick and seamless data generation, there still are some aspects to keep in mind while working around the randomness of the data. Simulating realistic data might need additional configuration settings on each fields. This section describes about some of the available datatype ranges.
Numbers
Numbers are the easiest to generate. The connector provides random generator and sequence generator options and these can be set using the field.#.kind
property. Using random
with min / max configuration or sequence
with start / end allows us to produce “not too random” data. Consider the SQL:
CREATE TABLE `student`
(
`id` INT,
`marks` INT,
`subject` STRING
)
WITH (
'connector' = 'datagen',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '10',
'fields.marks.kind' = 'random',
'fields.marks.min' = '50',
'fields.marks.max' = '100',
'fields.subject.kind' = 'random',
'fields.subject.length' = '5',
'number-of-rows' = '100'
);
The below results are produced with this configuration. Here we can see that even though we provided the number-of-rows
as 100, only 10 elements are produced. This is because the number-of-rows
configuration is overridden by the sequence start and end setting done for the id
field.
String and Varchar
Text produced by the DataGen in varchar and string are random strings containing 0-9 and a-f characters. It’s possible to generate a lot of rows per second and filtering out only the rows that contain a certain value (with a where clause). But we’re still only getting “words” that contain A to F characters.
Customisations are possible by combining this with other SQL operations but with limitations.
CREATE TABLE `address`
(
`id` INT,
`name` STRING,
`country_id` INT,
`city` STRING
)
WITH (
'connector' = 'datagen',
'fields.id.kind' = 'random',
'fields.id.min' = '1',
'fields.id.max' = '10',
'fields.name.kind' = 'random',
'fields.name.length' = '5',
'fields.country_id.kind' = 'sequence',
'fields.country_id.start' = '10',
'fields.country_id.end' = '15',
'fields.city.kind' = 'random',
'fields.city.length' = '6'
);
CREATE TEMPORARY VIEW address_view AS SELECT id, name,
case `country_id` when 10 then 'IN'
when 11 then 'FR'
when 12 then 'UK'
when 13 then 'GB'
when 14 then 'US'
end AS country_code,
city
FROM address;
Timestamps
The max-past
option that can be used on timestamp fields allows us to go back in time up to a maximum value. In case where this value is not set, the generated timestamps starts from the current timestamp associated with the local machine time zone. This allows us to generate random past timestamp that can be used on flows. Consider the example below:
CREATE TABLE `orders`
(
`order_id` STRING,
`order_time` TIMESTAMP(3)
)
WITH (
'connector' = 'datagen',
'fields.order_id.kind' = 'random',
'fields.order_id.length' = '10',
'fields.order_time.max-past' = '300000',
'number-of-rows' = '10'
);
Here we are setting the max-past
configuration for order_time
field as 300000ms or 5minutes. The minimum timestamp value generated for this column will always be within 5 minutes past the current timestamp.
Collections
In the ball_trajectory example above we discussed about the ball_position
being an array of float. For random collection types like array type, multiset, maptype etc, the collection size is defaulted to 3, which means these always generate results with 3 elements. As can be seen in the results panel of the example it only has 3 elements. This configuration cannot be overriden.
The bottom line
Testing and validation of application logic and performance can greatly benefit from simple ways to generate test data. Some key considerations for data generation includes utilising the Flink DataGen Connector or making use of the connectors used by the jobs.The tutorials set up in Event Processing documentation is an excellent starting point to play-around with the Kafka connector for data generation. To leverage the Flink DataGen connector you can use the latest custom node feature which allows for the customisation of the source connectors. The results simulated through this connector can be random and these can be made more realistic by customising the values for each column using the connector configurations set.
Contributors
Ammu Parvathy
Sebastien Pereira
David Radley