Many organizations are taking advantage of the continuous streams of data being generated by their devices, employees, customers, and more. You can use streaming analytics to extract insights from your data as it is generated, instead of storing it in a database or data warehouse first. For example, you could analyze the data generated by an online store to answer questions like:
Which are the top selling products in each department right now? What are the total sales for the last hour?
These are examples of streaming analytics applications that you can create with Streams flows. Streams flows is a web based graphical IDE for creating streaming analytics applications without having to write a lot of code or learn a new language.
In this article, I’ll demonstrate how to use the Aggregation operator in Streams flows to create applications that compute and store various statistics for streaming data.
Table of Contents
Prerequisites
To follow along, you need IBM Cloud Pak for Data version 2.5 or 3.0 and a running Streams instance.
See this information for how to install and configure the Streams service.
Note: If you are using Cloud Pak for Data v3.5, the Aggregation operator in Streams flows differs slightly from what is presented in this article. Thererfore, please read the documentation for the latest version of the Aggregation operator.
If you are not familiar with Streams flows, watch this short video for an overview of the canvas.
To follow along, open the Streams flow IDE by adding a new flow to any project. From within the project, click “Add to Project” > “Streams Flow”
Overview
You use the Aggregation operator in Streams flows to calculate averages, maximums, and other basic statistics for streaming data. The best way to learn about the Aggregation operator is by example. This article will show a few common examples, and in each case, you’ll see how to configure the Aggregation operator to get the desired result.
About the use case
The scenario is of an online department store. As customers browse the store, they generate events that are called a clickstream. A clickstream is a continuous stream of data that describes users’ interactions with the website as they occur. In addition to browsing, these activities could also be adding an item or items to a cart, log-in/log-out, and so on.
The store management is interested in using the clickstream data to get ongoing answers to the following questions:
- What is the running total sales amount today?
- For each hour, how many customers were actively browsing the store?
- What is the running total sales amount per department in the last hour, day and week?
Our input data will be the sample stream of clickstream events that is available in Streams flows. Below is an example of the contents of the sample data stream:
Each row in the table is a single event, or tuple. Each event always has a customer id and a timestamp. The remaining contents of each tuple include depend on the type of the click event, highlighted above. For example, the add_to_cart
event is generated when a customer adds a product to their cart, and contains the name and category/department of the product that was added to the cart, while the login
event contains the customer id and the event time.
To use this sample stream as a data source, drag the Sample data operator to the canvas. In the properties pane, choose the Clickstream topic.
You can preview the clickstream data as shown above: click Edit Schema and then Show preview in the dialog that appears.
Now that we have a data stream, we can use it to learn more about the Aggregation operator.
Aggregation concepts
The Aggregation operator takes a data stream as input and produces the result of user specified aggregations as output. To use the Aggregation operator, you need to configure its key parameters based on what you are trying to calculate.
These are:
- Aggregation window size and window type,
- Aggregation function (max, min, average, etc.)
Window size
Although streaming data is potentially infinite, we are often only interested in subsets of the data that are based on time, e.g. total sales for the last hour. Or, we use subsets based on the number of events that have occurred, e.g. the maximum of the last 5 readings.
This subset of the streaming data is called a window. The size of the window can be specified in different ways, such as elapsed time, or based on the number of tuples. You always have a clue to the size of the window in the question that you are trying to answer. For example, in the question “How much are the total sales for the last hour?”, the window size is 1 hour. The Aggregation operator in Streams flows currently supports time based windows.
Window type: Sliding vs Tumbling
There are two types of windows, sliding and tumbling. The window type determines on how often you want the result to be calculated.
Sliding: Calculate the result of the aggregation whenever a new tuple arrives. Best for moving averages, running totals and other up-to-the-second calculations.
Tumbling: Calculate the result of the aggregation once at the end of each period, regardless of how often tuples arrive. Best for situations where updates at specific intervals are required. For example, you would use a tumbling window to report the total sales once an hour.
If your store had a sale every minute and you were calculating the total sales in the last hour, the difference between the two window types can be illustrated as follows:
Window type |
Number of result tuples per hour |
Tuples used in calculation |
Sliding |
60, since we receive a sale every minute |
All sales that occurred less than an hour from the current time. |
Tumbling |
1 |
All sales that occurred in the hour since the application started, and every hour after that. |
Any tuples used in a tumbling window are only used once and are discarded once the operator produces output. This is where the “tumbling” term comes from, all the tuples tumble out of the window and are not reused.
On the other hand, a tuple in a sliding window can be used many times for the calculation, as long as it has not been in the window longer than X
, where X
is the size of the window. For example, with a 1 hour window, a tuple that arrived 30 minutes ago will be kept in the window, while a tuple that arrived 1.5 hours ago will be discarded.
Functions
The last parameter you need to configure is which aggregate function(s) will be used on our input data to get our results. Available functions at the time of writing are are Average
, Max
, Min
,Count
, CountDistinct, Sum
, and StdDev.
In our example, we want to compute the total sales so far. By applying the Sum
function to the value of every tuple in the window, we will get the running total sales.
Whenever the operator is ready to produce output, whether periodically (tumbling window) or every time a new tuple arrives (sliding window), the function(s) you select will be applied to the all the tuples in the window. If you just want to copy the value of an attribute on the input stream to the output stream, use PassThrough
.
Putting it all together
For the question “how much are the total sales for the last hour?”, we need a 1 hour time window. Since we want the running total to be updated every time there is a sale, we use a sliding window. Every time there is a new sale, the Sum
function is applied to all the tuples in the window, that is, all the sales in the last hour, and the result is produced as output.
Timestamps and dates
Before moving to the first example, it is helpful to mention how the Aggregation operator uses timestamps. The operator has a “Use timestamp in tuple” flag to indicate that the recorded time for events is present in the incoming data and should be used instead of system time. If this flag is used, each tuple must have an attribute that contains the timestamp to be used. The operator would start counting the window size from the time recorded in the first tuple, and not when the tuple arrived.
Date formats
If you are writing applications that will send data to a flow, the data must be in JSON and the time stamp should be in ISO-8601 format, with any delimiter. Milliseconds are optional and the timezone should not be present.
Valid examples are: "2018-01-08T07:11:36"
, "2018-01-08 07:11:36.877"
.
Examples
Now let’s see some examples.
To follow along, create a new empty flow. Drag the Sample Data operator to the canvas, and select “Clickstream” as the Topic for the sample data.
Example 1: What are the total sales for the last 5 minutes?
We will compute the running total by adding the value of each sale in the last 5 minutes. While a small value is helpful for testing purposes you can increase the size of the window to 1 hour or 1 week or more, depending on the organization’s needs.
Configure the operator:
- Drag the Aggregation operator to the canvas and connect it to the sample data operator. The properties pane will open so we can configure the operator.
- Under Aggregation Window:
-
- Type: Use a sliding window because we want a running total.
- Time Unit: minute (For testing purposes you can use a smaller value, say 1 minute).
- Number of Time units: 5
- Use timestamp in tuple: If your data has timestamps that indicate when the event occurred, check this box to ensure that the uses these timestamps when computing elapsed time. If you leave this unchecked, the operator will use the system time instead. Since the sample data stream includes a
time_stamp
attribute, we can use it. Check this box and select time_stamp
under Timestamp field. See the section about timestamps above for more information on the correct timestamp format.
- Aggregation Definition:
- Under Functions, we build a list of the desired output attributes for the operator. For each output attribute, use “Add function” to add it to the list. In our simple example, we just want 2 output attributes: The total sales and the time of the last sale.
- Output attribute: Total sales in the last 5 min.
- Output Field Name: Name of the value we want to compute. In this case, we’ll call it
total_sales_last_5min
.
- Function Type: Select Sum.
- Apply function to: This is the input attribute that will be used in our calculation. Select the
total_price_of_basket
- Output attribute: Time stamp.
-
- Click “Add function”.
- Output Field Name:
time_stamp
- Function Type: Select “PassThrough” to copy the value from the input stream to the output stream.
- Apply function to: Select the
time_stamp
attribute.
- To copy any other attributes from the input stream attribute to the output stream, you can click “Add function” and select “PassThrough” to indicate that the value should just be transferred from the input stream to the output stream.
The configured operator should look like this:
Our output will be sent to a CSV file using the Object Storage operator, but this is not the only available option. Results could also be sent to Message Hub for integration with a real time dashboard, or stored in Redis, or DB2 Warehouse.
Click Run to run the flow and you should see data streaming between the operators. You can browse to your output file in Cloud Object Storage and see the results:
time_stamp,total_sales_last_5min
"2018-01-02T11:17:51",705188.169999982
"2018-01-02T11:17:51",705188.169999982
"2018-01-02T11:17:51",705188.169999982
"2018-01-02T11:17:51",705188.169999982
"2018-01-02T11:17:51",705250.149999982
"2018-01-02T11:17:51",705250.149999982
"2018-01-02T11:17:51",705269.679999982
Notice that there are some entries where the total sales is still the same. Why is this happening? Since we used a sliding window, we get an update every time a new tuple arrives. But not all the tuples in the clickstream represent a sale. They could be generated for customer logging in or out, and so on. When a tuple arrives, the running total is calculated even though it hasn’t changed. So, we want to change the flow so that only tuples that represent a sale are used in our calculation. This is done by adding a Filter operator between the Sample Data and the Total sales in the last hour operators.
After adding the Filter operator, set the filter condition to click_event_type == "checkout"
.
This will only send checkout events to the Aggregation operator:
After making this change and re-running the flow, the running total is only updated when a sale has occurred, as shown in the results file:
time_stamp,total_sales_last_hr
"2018-01-04T11:32:14",35235.53
"2018-01-04T11:32:15",35057.7
"2018-01-04T11:32:15",35120.24
"2018-01-04T11:32:16",35301.08
Example 2: For each hour, how many customers were active on the site?
To help determine the peak shopping hours, we want to count the number of unique customers that generated clickstream events for each hour. We don’t want to just count the number of clickstream events, since each customer will generate multiple events. Instead, we’ll count the number of unique customer ids that appear in the clickstream, starting from the arrival of the first customer. We will use the CountDistinct
function on the customer_id
attribute.
- Drag another Aggregation operator to the canvas and connect it to the sample data operator.
- Under Aggregation Window:
- Type: Use a tumbling window because we want results for each hour, not a running total as customers arrive.
- Time Unit: hr
- Number of Time units: 1
- Aggregation Definition:
- Output function:
- Output Field Name: Name of the value we want to compute. In this case, we’ll call it
total_customers_per_hour
.
- Function Type: Select
CountDistinct
to count the unique number of customers.
- Apply function to: This is the input attribute that will be used in our calculation. Select
customer_id
.
- Repeat the same steps to add the
time_stamp
attribute as in Example 1.
Connect the output of this operator to another Cloud Object Storage target. After running the flow, you should have output like this in the second output file:
time_stamp,total_customers_last_hr
"2018-01-08T07:10:35",455
"2018-01-08T07:11:36",435
"2018-01-08T07:12:37",368
"2018-01-08T07:13:38",4363
In my test I used a 1 minute window, and in the results you will see that the time stamps are apart by a minute. This is because we are using a tumbling window, so the operator only generates output periodically, in this case, every minute. If you compare that to the output of the previous example, which used a sliding window, the timestamps were much more frequent because the sliding window generates output whenever there is new data.
Example 3: For each product category, what are the total sales in the last 5, 10 and 30 minutes?
In this case we want to compute the same value (running total sales) over different time periods. This is a common scenario that requires using multiple Aggregate operators in parallel. By computing the totals in parallel, you can enrich the data stream before saving it in a database or using it in a dashboard. Using different window sizes for the same data also helps account for irregular peaks in your data. Each operator will compute the running total, but use a different window size.
We’ll start with the total sales in the last 5 minutes and apply the same concept to compute the sales for the last 10 and 30 minutes.
Total sales in the last 5 minutes
Since this is another running total, we will use a sliding window. For every category, we’ll add up the value of the product_price
attribute using the Sum
function.
Using Partitions
To get the total sales for each category, we need to maintain the running total for each category. We do this by putting all the events for a given category in a separate window. This is called partitioning. Use the Partition By parameter to create windows for each category. The category is identified in the product_category
attribute. Whenever a product is sold, only the running total sales for the category will be updated.
- Connect another Aggregation operator to the data source.
- Under Aggregation Window:
- Type: Sliding
- Time Unit: minute
- Number of Time units: 5
- Partition By:
product_category.
- Aggregation Definition:
- Output attributes:
total_sales_per_category
is the Output Field Name. Select Sum
as the Function Type and Apply function to: product_price
product_category
: Click “Add function”. Set Output Field Name to product_category
and click PassThrough
as the function. This is because we are not applying any computation to the value but we want to copy it from the input to the output.
- Repeat the above step to add the
time_stamp
as an output attribute.
The 5_min_dept_sales
operator would give a running total sales for the last 5 minutes for each category. Here is some sample output after running the flow:
time_stamp,product_category,total_sales_5min
"2018-01-08T05:36:30","Food",6188.7899999998
"2018-01-08T05:36:30","Food",6189.77999999986
"2018-01-08T05:36:30","Food",6196.76999999986
"2018-01-08T05:36:30","Food",6202.75999999986
"2018-01-08T05:36:31","Home Products",1392.28
"2018-01-08T05:36:31","Drinks",5048.84999999995
"2018-01-08T05:36:31","Food",6204.74999999986
"2018-01-08T05:36:31","Drinks",5051.83999999995
"2018-01-08T05:36:31","Food",6205.7399999998
Total sales in the last 10 and 30 minutes
To compute the total sales for the last 10 and 30 minutes (or last hour and day, week, e.t.c), copy and paste the 5_min_dept_sales
operator twice. Connect the copies to the Sample Data operator and modify their parameters to use sliding windows of 10 and 30 minutes each.
Conclusion
This post has been an introduction to the Aggregation operator in Watson Studio Streams flows. We discussed the concept of using windows to process streaming data, and a few examples of how to do so.
An example flow containing these examples is available on GitHub, so you can try these examples by downloading the example flow and importing it into Streams flows:
- From a Watson Studio project, click Add to Project > Streams flow
- If you don’t already have a project, create one first.
- From the “New Streams flow” page, Click From file and then select the
Aggregation_examples.stp
file from the zip file you just downloaded. Click Create.
- After the flow is created, you need to configure it to send the result files to your Cloud Object Storage service:
- Click Edit, and for each
Cloud Object Storage
operator, edit it to specify the connection to the Cloud Object Storage service (you must have created one before importing the flow), and the file path.
- Run the flow by clicking Run.
Useful links
#CloudPakforDataGroup