Guaranteed tuple processing in Streams with consistent regions

 View Only

Guaranteed tuple processing in Streams with consistent regions 

Mon August 24, 2020 03:47 PM

Written by Gabriala Jacques da Silva.

Streams provides at-least once tuple processing support with the consistent regions feature. Consistent regions provide resiliency to an SPL application by providing the ability to recover from failures. We name these regions consistent because the SPL runtime periodically establishes a consistent state for a given sub-graph (i.e., a region). The consistent state comprises of a collection of persisted operator states that are consistent with having processed all tuples up to a certain logical point. On a failure, a region is reset to its last successfully persisted state, and source/start operators of a region can replay any tuples submitted after the restored state. In most use cases, this replay enables applications to achieve an exactly-once tuple processing semantic. For all use cases, the replay enables applications to achieve at-least-once processing semantic.

At least-once vs exactly-once processing

The difference between the at-least-once and exactly-once semantics is a matter of when tuples are observed. If the application is publishing results into a dashboard via a TCP socket (e.g., the TCPSink operator), any tuples written to the TCP socket and published on the dashboard cannot be retracted by the SPL application. Although the tuple results are exact (its attribute values are as if they had been no failures), they are being submitted to the TCP socket twice. As a result, the dashboard, will observe duplicate tuples (i.e., at-least-once).

If the application is publishing tuples into an external system in which results can be retracted, do duplicate detection, or be idempotent, then the result with consistent regions will have an exactly-once semantics. For example, if we use an operator that writes tuples to a file (e.g, the FileSink operator), it can reopen the file on a failure, reposition the file stream to where it was at the last consistent state, and truncate it to that position, effectively discarding any possible duplicate. This results in the application having exactly-once semantics. Another example is a database which does a key constraint checking on insertion. Any duplicate insertion will be rejected by the database, and the observed end result will be as if tuples had been processed exactly-once.

An example application that can leverage consistent regions is a telco switch application where network switches are dumping call data record files into a directory and the SPL application must ensure that each file is fully processed. Another typical use is an application that reads from persistent message queues and ensures that every message is processed at-least-once.

What exactly does consistency mean?

In more detail, SPL considers that the state of a subgraph is consistent if it reflects that a tuple submitted by an operator to an output stream has been processed by all operators in the region that consume that same stream.

The figures below illustrate our definition of consistency for a region with 3 operators. The topology on the left has op1 with a single output stream that is consumed by both op2 and op3. The right figure illustrates a tuple submission timeline. The orange arrows represent the timeline for each operator (time flowing from left to right). Each blue arrow represents a tuple submission from an operator to its downstream operators. The dashed line represents a global state, where the point in which it crosses the orange line (blue cans) is the time at which the local state of that operator is captured.

One possible global state which is consistent is represented by the blue cans and the dashed line: the state of op1 is captured after it submits tuple t2 for both op2 and op3, and the states of op2 and op3 are captured after they process tuple t2. By our definition, an example of a global state that is inconsistent is the one represented by the second dashed line: state of op1 is captured after it submits t3, state of op2 is captured after it processes t3, and state of op3 is captured before it processes t31. Another example of a global inconsistent state is the one represented by the third dashed line: state of op1 is captured before the submission of t4, and the states of op2 and op3 are captured after processing t4.

To achieve a consistent state, the SPL runtime runs a variation of the Chandy-Lamport algorithm [1] for establishing a distributed snapshot. The Chandy-Lamport algorithm considers the persistence of both channel state (stream connection) and process state (operators). Because of our stricter definition of consistency that every submitted tuple must also be processed, the SPL runtime does not need to persist channel state. This is because it effectively force any channel state to be reflected in the process state.

Some key characteristics of consistent regions are:

  1. At-least-once semantics by using replayable data sources
  2. Exactly-once semantics if using an external system that can detect duplicate tuples or can restore its state after a failure
  3. Support for multi-threaded operators
  4. Support for non-deterministic applications
  5. Fine-grained region selection by using annotations
  6. Support for regions that have cycles that close on control input ports (operator ports that do not generate tuple submission)

Selecting a consistent region

SPL offers a flexible way to choose which parts of the application establish a consistent state. Different parts of the application can establish consistent states independently (i.e., different regions) or as a single consistent region. Other parts of the application don’t have to participate in a consistent region at all. This reduces the cost of establishing consistent states, as only the required parts of the application participate on state persistence and tuple replay.

SPL offers two annotations to select consistent regions: @consistent and @autonomous. In summary,@consistent includes an operator and its reachability graph into the consistent region. @autonomous removes an operator and its reachability graph from the consistent region. If an operator is in the reachability graph of both an autonomous and a consistent annotation, the consistent one takes precedence.

The figure below shows an example of a simple topology that has an operator annotated with @consistent. In this example, all operators are placed in a single consistent region (illustrated with a red bar). To compute that, the SPL compiler identifies the annotated operator as a start operator of the consistent region and computes the reachability graph of the start operator, which, in this case, includes all other operators in the topology. Operators that have no output ports are identified as the end of the region. Start operators of a consistent region must be able to replay tuples to guarantee that every tuple is processed.

The figure below shows an example of the same topology as above with @autonomous in one of the operators in the reachability graph of the start operator. The SPL compiler computes the reachability graph of the @autonomous annotated operator and removes them from the consistent region (operators without red bar).

Here, here and here you can find many other examples on how to select and limit the scope of consistent regions.

Configuring a consistent region

The @consistent annotation can have the following 5 parameters:

  1. trigger={periodic|operator} – indicates how to start the establishment of a consistent state. It can be periodically (e.g., every 30 seconds) or logically determined by an operator (e.g., after fully processing a file).
  2. period – specified for periodic regions, indicates how frequently consistent states are established
  3. drainTimeout – specifies when to timeout on an establishment of a consistent region. This must be larger than the time it takes for a region to establish a consistent state. It has a default value of 180 seconds.
  4. resetTimeout – specifies when to timeout on a restoration of a consistent region. This must be larger than the time it takes to reset the state of a consistent region. It has a default value of 180 seconds.
  5. maxConsecutiveResetAttempts – indicates how many automatic consecutive restoration attempts are made before an explicit system intervention is required. It has a default value of 5.

The @autonomous annotation has no parameters.

Establishing a consistent state

Establishing a consistent state is an operation that applies for the whole consistent region. From the perspective of an operator in the consistent region, the establishment of a consistent state has 2 stages: (i) drain and (ii) checkpoint. Each stage maps to a new callback available for SPL C++ and Java Primitive Operator developers. Globally, the consistent state is successfully established when all operators in the consistent region complete these two stages.

Drain – In this stage, an operator drains its internal state and output streams. This means that operators are free to finish the processing of any previously received tuple, flush out its internal state and submit any pending tuple in its internal buffers to its output streams. For example, the FileSink operator can flush the current output stream it is writing to. The ThreadedSplit operator can submit all tuples still residing in its buffers. The DynamicFilter fully processes a tuple during the process() method invocation, so it does not need to do any operation on the drain stage.

Any tuple submitted prior to the completion of the drain stage is going to be fully processed by its downstream operators by the end of the establishment of the consistent state. Because operators have control of when the drain stage ends via a callback, operators are free to determine the logical boundaries in which consistent states are established (e.g., after fully processing a file). Once the drain stage completes, no new tuples can be submitted2.

If an operator has input streams that are in the consistent region (e.g., end operator in the topology above), then the drain stage only occurs after the completion of the drain stage of its upstream operators and after processing all their submitted tuples prior to the completion of their drain stage.

Checkpointing – In this stage, the operator serializes its state (if any) to the Streams checkpointing backend. This stage always happens after the drain stage. The SPL runtime automatically manages operator checkpoints. This means that the runtime maintains multiple checkpoint versions and automatically delete checkpoints that will not be used in case of a failure. Logic state variables declared in any operator (including Custom) in the consistent region are automatically checkpointed.

Resetting to a consistent state

Upon a failure, the SPL runtime automatically resets the region to the last consistent state. Once this restoration is complete, the start operators of a region can start tuple replay. The SPL runtime restores a region to a consistent state in the following scenarios: (i) a PE crashes, (ii) a host crashes, (iii) a connection between PEs break, (iv) an operator detects and reports an error, and (v) via manual administration intervention.

The restoration to a consistent state of a primitive operator occurs in a single stage called reset. In this stage, the operator de-serializes its state from the Streams checkpointing backend. This stage also maps to a new callback available for primitive operator developers. Behind the scenes, the reset of a region occurs by stopping tuple submission in the start operators of the region and by emptying out any stream connection by forcing any pending tuples to be processed3.

The reset stage of an operator occurs only after the reset of all its upstream operators. Logic state variables are automatically reset.

The consistent state of a region is restored once all operators in the region complete the reset stage. Once the restoration is complete, operators can again submit tuples and start operators of the region can start replaying tuples.

Example SPL application

In the Streams samples repository on GitHub, you can find an example of a word count application with a periodic consistent region. This example uses two instances of a fault injection operator, which forces the application to crash after processing a certain number of tuples. This leads the region to restore to a consistent state and replay tuples. This example has exactly-once semantics. You can run the application in Streams 4.0 and see that the generated result file is the same as the file generated when no failures occur.

1The SPL definition of a consistent state is slightly more restrictive than the classic distributed systems definition [2], which considers the second shaded area (red) to be consistent as long as tuple t3 can later be re-processed in case of a recovery.

2The SPL runtime automatically submits a special punctuation (i.e., a token in the Chandy-Lamport algorithm) to the output streams of an operator after the end of the drain stage. If you are curious, you can follow the flow of these punctuations by searching for ‘DrainMarker’ in the PE logs (trace level).

3Similar to the establishment of a consistent state, this is achieved by the SPL runtime by flowing another special punctuation. You can follow this in the logs by searching for ‘ResetMarker’ in the PE logs (trace level).


[1] K. Mani Chandy, L. Lamport. “Distributed Snapshots: determining global states of distributed systems”. ACM Transactions in on Computer Systems, Vol. 3, No. 1, February 1985. [2] O. Babaoglu, K. Marzullo. “Consistent Global States of Distributed Systems: Fundamental Concepts and Mechanisms”. In Distributed Systems (S. Mullender). ACM Press, 1993.


0 Favorited
0 Files