Cloud Pak for Data Group

Automatic Fusion and Threading 

Mon September 28, 2020 03:50 PM

In IBM Streams 4.2 and newer, there are two features designed to make it easier to attain high performance: automatic submission time fusion and the automatic threading model. 

Automatic submission time fusion means that you can achieve a reasonable number of PEs for your system without having to manually fuse operators.
The automatic threading model means that you can take advantage of multiple cores per host, without having to manually place threaded ports in your application.

This post explains what these features are, how to use them, and how they relate to each other.

Note:  Read the post Manually Optimizing Streams Applications,   to see how to manually perform fusion and threading optimizations to improve your application’s performance.

Automatic Submission Time Fusion

In Streams, operators execute at runtime inside of Processing Elements, or PEs. Every PE in the application is a separate running process. The act of grouping operators into PEs is called fusion. PEs can execute on separate hosts, allowing us to take advantage of clusters. But, operators that communicate across PEs suffer from greater communication cost. We have a classic tradeoff: we want multiple PEs in the application so we can take advantage of all of the available hosts, but too many PEs causes unnecessary communication overhead. In general, we want to have roughly one PE per host.

Prior to Streams 4.2, PE fusion was performed manually through the partitionColocation, partitionExlocation and partitionIsolation options for the operator placement configs. Operators without a partition config were isolated into their own PE by default. All fusion occurred at compile time.

In contrast, Streams 4.2 automatically fuses operators into PEs for you at submission time. Because this fusion occurs at submission time, the system can tailor the fusion to the available hosts in the instance. For example, if you submit an application to an instance with 20 hosts, you are likely to get a different fusion than if you submit the same application to an instance with 1 host. Note that this difference does not require recompiling the application.

This fusion happens automatically for you, by default. However, you can change its behavior through the new fusionScheme application configuration option. The default value is automatic, which will target one PE per host, as described above. If you specify manual, then you can use the additional fusionTargetPeCount option to manually control how many PEs you get. For example, the following job submission requests 5 PEs:

streamtool submitjob -C fusionScheme=manual -C fusionTargetPeCount=5 myApp.sab

Note that the actual number of PEs may be more or less than 5. Streams has to obey a variety of application and instance constraints which are independent of the target PE count.

If you wish to use the old fusion behavior, then specify the legacy option:

streamtool submitjob -C fusionScheme=legacy myApp.sab

When under automatic and manual fusion, you can also influence how Streams fuses parallel regions as specified by the @parallel annotation. By default, parallel regions are treated no differently than the rest of the application. However, through the fusionType option, you can change the fusion behavior for parallel regions. You can specify channelIsolation to make sure that only operators in the same parallel channel are fused together. Or you can specify channelExlocation to allow operators from outside of the parallel region to be fused with operators from within the parallel region, but disallows operators from different parallel channels to be fused together.

For more on automatic submission time fusion, see the product documentation for specifying fusion for a single application, and for specifying the default fusion for all applications in an instance or domain.

Automatic Threading

In a PE, when a thread executes an operator, it will also execute all downstream operators until it hits another thread or a PE output port. There are several sources of threads inside of a PE. Source operators always introduce at least one thread. Some operators, such as ThreadedSplit, introduce their own threads (along with a queue per thread; tuples are copied into the queue). PE input ports also introduce a thread which pulls tuples from the network, and then executes downstream operators. Parallel regions, through the @parallel annotation, introduce threads if the beginning of the parallel region is in the same PE as its immediately upstream operators. Finally, threaded ports are a means for programmers to manually introduce threads into an application. A threaded port inserts a new thread and a queue in front of the input port it’s associated with. The upstream thread copies tuples into the threaded port queue, and the downstream thread pulls tuples from that queue and executes the downstream operator.

Prior to Streams 4.2, threaded ports were the primary way for you to introduce threads into a PE that were not already there. In general, you want to place them in front of very expensive operators. In practice, however, it can be tricky to find exactly where to place threaded ports in large applications. And manually adding threaded ports can’t adapt when you submit the same application to different hosts.

In contrast, PEs in Streams 4.2 will automatically determine how many threads to use based on the host they are currently running on, and which numbers of threads yield the best performance. You can change this behavior through threading models, which were also introduced in Streams 4.2. You can control threading models both through SPL itself, and through changing submission time options.

Specifying Threading Models in SPL

In SPL, you can apply the @threading annotation to composite operators to control the threading model that applies to the set of operators in that composite. (If you apply @threading to the main composite, it will determine the threading model for the entire application.) The new default is automatic, which actually chooses between dynamic and manual when the PE launches. The following application requests the dynamic threading model:

    composite Main {
            stream Src = Beacon() {}
            stream Res = ManyOperators(Src) {}
            () as Sink = Custom(Res) {}


The dynamic threading model maintains a global thread pool, and any thread can execute any operator. The manual threading model is the old behavior, where developers have to manually introduce new threads. When under the automatic threading model, the PE looks at properties of the system (such as the number of available logical cores) and properties of the PE (such as the number of operators and the number of threads the PE must already have) to determine if dynamic or manual is best. In general, the PE will favor dynamic when the number of operators exceeds the number of logical cores, and it will favor manual when the number of existing threads is already a significant fraction of the available logical cores on the system. Finally, the dedicated threading model puts threaded ports in-between all operators in the PE. The name comes from the fact that each operator input port is executed by a dedicated thread.

All of the threading models have valid uses. The manual threading model has the lowest latency, as there are no queues and no extra tuple copying into those queues. But, it is single threaded and is unable to take advantage of multicore hosts without your intervention. The dedicated threading model can take advantage of multicore hosts, but it may incur more latency because it must copy tuples into the threaded port queues, and the number of threads increases with the number of operators in the PE. Using the dedicated threading model is only appropriate when you know that each PE will have fewer operators than the number of available logical cores on the system it’s running on. The dynamic threading model can take advantage of multiple cores by default, automatically find the appropriate number of threads, but it may incur more latency than manual because it also copies tuples into queues, and more synchronization overhead than dedicated. The default, automatic, uses information available at runtime to try to choose the best option between manual and dynamic. However, because you may know your application’s needs better than Streams, the @threading annotation allows you to change the threading model.

You can apply the @threading annotation to both composite operator definitions and invocations. If a composite operator invocation has the annotation, and its definition has the annotation, then the PE uses the threading model requested at the invocation site. The @threading annotation can also nest, so that you can specialized different regions of your application with different threading models. The following application specifies a different threading model for the ingress operators than for the rest of the operators:

    composite Main {
            stream Src = Ingress() {}
            stream Res = Compute(Src) {}
            () as Sink = Egress(Res) {}


By default, the dynamic threading model is elastic. At runtime, the PE will explore using different numbers of threads, all while recording the total throughput achieved. The PE will settle on the number of threads that yields the highest overall throughput. You can override this behavior by turning elasticity off and providing a static number of threads. For example, the following turns off elasticity for the entire application, and sets the number of threads in each PE to 4:

    @threading(model=dynamic, elastic=false, threads=4)
    composite Main {
            stream Src = Beacon() {}
            stream Res = ManyOperators(Src) {}
            () as Sink = Custom(Res) {}


All threading models specified through the @threading annotation will take effect in both distributed and standalone modes. For more on threading models, see the product documentation on the @threading annotation in SPL.

Specifying Threading Models at Submission Time

At submission time, you can override the threading models specified in the SPL application. The threadingModel application configuration option takes the same values as the SPL annotation: automatic, manual, dynamic and dedicated. For example, the following specifies the threading model for the whole application to be manual:

streamtool submitjob -C threadingModel=manual myApp.sab

Note that the threading model specified at submission time overrides any threading model specified by the @threading annotation in SPL. This allows you to experiment with different threading models without having to recompile your application. Also note that unlike the @threading annotation, you can only change the threading model for the entire application. For the example above, all PEs in the application will use the manual threading model; there is no way to specify a different threading model for different sets of operators at submission time.

As with the @threading annotation, there are also ways to control elasticity and the number of threads at submission time. The following example chooses the dynamic threading model, turns off elasticity and sets the number of threads to 4:

streamtool submitjob -C threadingModel=dynamic -C dynamicThreadingElastic=false -C dynamicThreadingThreadCount=4 myApp.sab

You can also change the threading model for all applications in an instance by setting instance.threadingModel. The following example sets the manual threading model for all applications in the StreamsInst instance:

streamtool setproperty instance.threadingModel=manual -i StreamsInst

As with changing the threading model for a single application, changing the threading model at the instance level will override any @threading annotations in the SPL application. The principle here is that the last person to have influence over the operators determines what threading model applies. Considering the @threading annotation and the submission time properties, the precedence rules which determine the actual threading model at runtime are (lower values supersede higher values):

  1. Application-level submission time threadingModel.
  2. Instance-level submission time instance.threadingModel.
  3. @threading on a composite operator’s invocation.
  4. @threading on a composite operator’s definition.
  5. Streams default of automatic.

Automatic Submission Time Fusion and Automatic Threading

These two features are independent but complementary. So far, we have discussed them in isolation of each other as using one does not require knowledge of the other. This was a deliberate design decision on our part: it greatly simplifies their interfaces. We also designed them so that if you changed one but not the other, the other would still do a reasonable thing. For example, if you change your fusion so that you get many more PEs, automatic threading is likely to use less threads.

For fusion, the available options are:

  • manual: You specify how many PEs you want through the fusionTargetPeCount option.
  • automatic: The default, which automatically determines the appropriate number of PEs for your application at submission time.
  • legacy: Reverts back to the pre-4.2 fusion behavior where operators are in their own PE by default, and are only fused together when explicitly requested.

For threading models, the available options are:

  • manual: You have to manually add threads; best for low-latency and if you know exactly where threads should go.
  • dynamic: The PE automatically adds threads, and any thread can execute any operator; best for high throughput and when you’re unsure where threads should go and how many are best.
  • dedicated: A threaded port in front of every operator input port in the PE; best for high throughput when you know the number of operators in the PE will be less than the number of available logical cores on the system.
  • automatic: The default, which chooses between manual and dynamic at PE startup.

In the post Optimizing Streams Applications, we presented a set of slides that walks you through how to perform fusion and threading optimizations to improve your application’s performance. This post explains how Streams 4.2 now performs that fusion and threading automatically, and how to modify the default behavior. If you choose legacy fusion mode and the manual threading model, then the old lessons still apply for manually optimizing your application. But the point of these new features is that most of the lessons are now applied automatically by Streams (including compiling with optimizations on).

We chose the defaults for both features knowing the default behavior of the other. Submission time fusion defaults to automatically making about one PE per host. We chose this default because if two operators exist on the same host, it’s more efficient for them to communicate from inside of the same PE than across PEs. But, before Streams 4.2, using very few PEs and not adding threaded ports in the right places could greatly limit parallelism and overall throughput. As a consequence, we made automatic threading the default, so that applications which consist of a small number of PEs with a large number of operators will still take advantage of the available parallelism.

We had two goals with these features. First, we wanted to achieve good performance by default by automatically fusing and threading applications based on the systems they were deployed to. Second, we wanted to provide you with controls that allow you to use the default as a baseline, and then make small changes to that baseline to further improve performance. While you can pay attention to just fusion or just threading for incremental performance gains, fully optimizing your application will require taking into account both fusion and threading. Streams 4.2 makes this process easier by providing direct controls that allow you to specify the number of PEs in your application, and how threads will execute operators.

Related Links

See the Streams Knowledge Center for more information.


0 Favorited
0 Files