Written by Dan Debrunner.
In the first article in this series I explained tumbling windows, for example, a tumbling window allows you to calculate the average price of a stock symbol over the last five minutes, every five minutes
(eviction policy time(300.0)).
If there was some rapid movement in the price, a tumbling window would not report it until the next eviction, which could be up to five minutes, too late to make a trade decision. This is where sliding windows are useful, a sliding window of tuples is continuously maintained, such as the last five minutes of stock prices, and then the average price over the window can be continuously maintained. So a change in the average price can be seen as soon as the event occurs that drive the change.
A sliding window has a two policies, an eviction policy and a trigger policy. The eviction policy defines the contents of the window, by defining when tuples are evicted. Unlike a tumbling window, a subset of the tuples are evicted according to the policy, thus the window can be seen to slide, typically representing a collection of most recently arrived tuples. Operators that support sliding windows do not normally submit output tuples upon eviction, instead they submit output tuples according to its trigger policy. When the window is triggered the operator performs its function against the current set of tuples in the window. This allows the functionality of maintaining a moving average of a price across the last five minutes with updates (submission of output tuples) triggered every second, as in this SPL example:
// Average the price attribute over a five minute sliding window (time(300.0))
// emitting the new average every second (time(1.0))
stream<V> VMovingAverage = Aggregate(V)
window V : sliding, time(300.0), time(1.0);
output VMovingAverage: price= Average(price);
Sliding windows support an number of eviction and trigger policy types, and the eviction and trigger policies are independent, so a time based eviction can be combined with a count based trigger. A typical use is a time eviction with a trigger policy of count(1), which means the window will be triggered on each input tuple, thus the downstream operators always received the most up to date aggregations against the sliding window.
Sliding Eviction Policies
Sliding windows support the same eviction policy types as tumbling windows, with the exception of punctuation based eviction. Though the same syntax is used, the meaning of each policy is different for sliding windows.
Count Eviction Policy
An eviction policy count(N) states that the sliding window is full when it has contains N tuples, thus logically when full it contains the last N tuples to arrive at the input port. Once the window is full then any tuple to arrive at the input port will evict one tuple, the oldest tuple.
For example, with count(3), once three tuples have arrived, the window becomes full and will continue to contain three tuples forever. As each new tuple arrives, the oldest tuple in the window will be evicted and the new tuple inserted, leaving three tuples in the window.
Let’s walk though the sequence of events:
Starting with the window being empty (light yellow rectangle, on the right), tuples are moving from left to right, so A will be the first to arrive at the input port:
After two tuples (A & B) have arrived the window is not yet full:
Now C arrives at the window and the window becomes full, any subsequent tuple will cause the oldest tuple to evicted:
Now, D arrives and evicts A, leaving the window containing [B,C,D] (oldest to newest):
This process continues as tuples arrive, E will cause the eviction of B, leaving [C,D,E] in the window and then E would be evicted after three more tuples have arrived.
Time Eviction Policy
An eviction policy time(T) states any tuple that has been in the sliding window more than T seconds must be evicted. So with a eviction policy of time(5.0), at all times the sliding window contains all tuples that arrived in the last five seconds, including the possibility the window is empty. Eviction of the tuples that have been in the window more that five seconds take place independently of any tuple insertion.
For example, here’s an image of a sliding window with eviction policy time(5.0) at a point in time t+4. Tuple A was inserted at time t (four seconds earlier), tuple B one second after A(t+1), and tuple C two and a half seconds after B (t+3.5). The number for each tuple in the window represents the number of seconds it will remain in the window, tuple C has been in the window for 0.5 seconds, thus it will remain in the window for a further 4.5 (5 – 0.5) seconds.
Three seconds later (t+7) tuples A and B will have been evicted, because A would have been in the window seven seconds and B six seconds, both of which are greater than the policy value of five seconds. Tuple C now has 1.5 (4.5 – 3) seconds left in the window, and tuple D which arrived one second ago (t+6) has four seconds left in the window.
Delta Eviction Policy
The eviction policy delta(attribute, delta) defines the condition for eviction of a tuple in a sliding window in terms of the value of attribute in an input tuple, and the value of delta. With the eviction policy delta(temperature, 1.5), when a new tuple T arrives at the input port, the value of its temperature attribute is compared with the value of the temperature attribute of all tuples in the window. For any tuple, if the difference in the values is greater than 1.5 (the delta) then that tuple is evicted. More formally, a tuple in the window, Twindow, is evicted if this condition is true:
With this example, then if the window contained tuples with temperature values of [17.1, 16.4,16.0] then the arrival of a tuple with temperature value 17.6 would evict 16.0 because (17.6 – 16.0) is greater than 1.5, leaving the window’s contents to be [17.6, 17.1, 16.4] after the insertion.
If the tuples arrive at the window in order such that the value of attribute never decreases, then the window naturally slides, which means it’s always the oldest tuple(s) that are evicted. This behavior can be typical of SPL timestamp attributes (e.g a time a sensor was read or a trade was executed), and the delta policy supports timestamp attributes where the delta value represents the time difference in seconds between two values (ignoring machine identifier). Thus when input tuples are ordered, then all tuples in the window have an attribute value that that is within delta of the most recent tuple.
However, if the input tuples are not ordered with respect to attribute, then the window does not act as a true “slide”, tuples are evicted based solely on the delta value, and thus tuples may be evicted earlier than tuples that arrived before them. In addition, since the it’s the only most recently arrived tuple used to determine what is evicted, out of order data can result in the window containing tuples that differ in the attribute value by more than the delta. Here, with the policy delta(temperature, 1.5), we can see that tuple D evicts tuple B (17.6 – 16.0 > 1.5) which is not the oldest tuple, and then tuple E does not evict any tuples as (14.0-temperature) is not greater than 1.5 for any tuple in the window. Thus the window didn’t truly “slide” as A was left in the window, and there are pairs of tuples in the window for which the delta is exceeded.
If required, any ordering of tuples needs to enforced by upstream operators.
Sliding Trigger Policies
The trigger policy for a sliding window defines when the operator needs to perform its processing against the contents of the window. Definition of the trigger policy is independent of the eviction policy, thus a sliding window may have a time based eviction policy, but a count based trigger. While the trigger is being processed by the operator tuples are not evicted or inserted in the window. The policy types for triggering match the syntax of the eviction policies but their definition applies to when the trigger occurs.
In SPL the trigger policy is specified after the eviction policy, e.g. sliding, count(20), time(1) has a count eviction policy of size 10 and a time trigger policy of one second.
Count Trigger Policy
A trigger policy count(N) states that the sliding window is triggered for every N tuples that arrive at the input port. A typical use is count(1) which means the window is triggered for every input tuple, for example with the Aggregate operator this means every input tuple results in the submission of tuple(s) reflecting the most up to date aggregation of the window (which may contain any number of tuples, defined by its eviction policy). If a trigger policy is not specified then it defaults to count(1).
Time Trigger Policy
A trigger policy time(T) states that the sliding window is triggered every T seconds, regardless of tuple arrival.
Delta Eviction Policy
A trigger policy delta(attribute, delta) defines the condition for triggering of a sliding window in terms of the value of attribute in its input tuples. When a tuple T arrives at the window the difference for attribute is calculated between T and the last tuple that triggered the window, if the difference is larger than delta then the window is triggered. For example with delta(ts, 1), where ts is an SPL timestamp attribute, the window will be triggered every second according to timestamps of the data, not wall clock time.
Note: To configure windows based on the timestamp of data and not wall clock time, use the event-time window feature, which is more accurate than the delta policy and also supports data that arrives late.
SPL’s sliding windows provide the ability to perform some function over a well-defined set of tuples on an input port. The difference from tumbling windows is the contents of the window changes incrementally according to the eviction policy, with the most common case being the window contains the collection of most recently arrived tuples, e.g. the last 100 tuples to arrive at the input port
(eviction policy count(100)). You may experiment with sliding windows for aggregation using the Aggregate and Join operators from the SPL Standard toolkit.
In addition to sliding and tumbling windows, event-time windows allows configuring a time based window based on a timestamp in the data and not elapsed, or system time. This is helpful when you have incoming data with a timestamp.