A Simple User-defined Parallelism and Mixed-mode Example

 View Only

A Simple User-defined Parallelism and Mixed-mode Example 

Mon September 28, 2020 06:52 PM

Written by Mike Oberholtzer.

What is User-defined Parallelism

An important feature in Streams is user-defined parallelism (UDP), which allows users to specify parallel regions in their SPL applications. Parallel regions are sub-graphs of an SPL application that can be replicated to take advantage of data parallelism. InfoSphere Streams will do the work of replicating the operators and creating all of the necessary connections. Consult the Streams Knowledge Center for more information on UDP.

This document provides a quick overview with examples for replicating operators and distributing tuple data in parallel using two techniques: mixed-mode and UDP.

What is mixed-mode

You can also use mixed-mode SPL source to replicate operators. Mixed mode is a combination of Perl and SPL where Perl code generates SPL code.
More information on mixed-mode.

When would you want to replicate operators with UDP or mixed-mode?

When your application is capable of leveraging the advantage of data parallelism (e.g., applying copies of an operator to different subsets of the data in parallel) replicating operators can be used distribute tuples across multiple operators. However, the ordering of tuples must not be important for operators that consume the result.

Simple Examples

In this section, a simple example of a composite operator that sends tuples to a fan-out of 10 operators is provided using both mixed-model SPL source and the new UDP functionality. The same processing is performed on subsets of the tuples and then fans-in back to an ending operator. The fan-out of tuples does not send every tuple to each of the 10 operators. Instead tuples are sent to only one of the replicated 10 operators.

Example 1: Mixed Mode

The first example shows a mixed mode source file (with extension .splmm). This application consists of:

  • Beacon operator that produces tuples
  • Compile time parameter to specify the width of the fan out
  • Threaded split operator to route the tuples to the replicated operators
  • A FileSink operator to consume the tuples from the replicated operators and write them to a file

Note that this example uses Perl to generate a number of operators at compile time based on the compile time width parameter.

<% my $AppWidth = $ARGV[0]; %>


composite Example1 {
  type AppTuple = tuple<int32 counter, rstring data>;

  graph
    stream <AppTuple> beaconOutput as O = Beacon() {
      logic state: mutable int32 i = 0;
      param iterations: 500000;
      output O:  counter = i++;
    }

    (stream<AppTuple> outputStream1 
    <% for (my $k=2;$k<=$AppWidth;$k++) { %> 
      ;stream<AppTuple>outputStream<%=$k%>
    <% } %>)
    = ThreadedSplit(beaconOutput) {
      param bufferSize: 1000u;
    }
    <% for (my $k=1; $k<= $AppWidth; $k++) { %>
      stream<AppTuple> outStream<%=$k%> = Functor(outputStream<%=$k%>) {
        output outStream<%=$k%> : data = (rstring)<%=$k%>;
      }
    <% } %>

    () as Sink = FileSink(outStream1 
      <% for (my $k=2;$k<=$AppWidth;$k++){%> 
        , outStream<%=$k%> 
      <% } %>)   {
      param file : "{localtime:%m.%d.%H.%M.%S}_TestFile.txt";
            format : txt;
    }
  }

A graphical representation from the Streams Studio instance graph of example 1, with a compile time parameter of 10, is shown below.

Example 2: UDP

In example 2, the functionality from example 1 is demonstrated using the new UDP functionality. In this case, mixed-mode SPL code is not utilized and the source file consists only of SPL code (with extension .spl).
This example consists of:
  • Beacon operator that produces tuples
  • @parallel annotation, with a submission time width, applied to an invocation of the Functor operator (this will create copies of the Functor operator, with the number of copies determined by the submission time value, “WIDTH”)
  • A FileSink operator to consume the tuples from the replicated operators and write them to a file
composite Example2 {
  type AppTuple = tuple<int32 counter, rstring data>;

  graph
    stream <AppTuple> beaconOutput as O = Beacon() {
      logic  state: mutable int32 i = 0;
      param  iterations: 500000;
      output O:  counter = i++;
    }

  @parallel(width = (int32) getSubmissionTimeValue("WIDTH"))
  stream<AppTuple> outStream1 = Functor(beaconOutput) {
    output outStream1 : data = (rstring)getChannel();
  }

  () as Sink = FileSink(outStream1)   {
    param  file : "{localtime:%m.%d.%H.%M.%S}_TestFileUDP.txt";
           format : txt;
  }
}

A graphical representation from the Streams Studio instance graph of example 2, with a submission time parameter of 10, is shown below.

Streams Studio Instance Graph Example 2

Benefits and Drawbacks of each solution

The mixed-mode code (example 1) allows you to specify the width of the fan-out only at compile time. It also distributes tuples across the replicated operators by sending the next tuple to an idle replicated operator. On the other hand, the UDP example allows you to specify the width at job submission time, allowing you to change the parallel width without recompiling the application. UDP, as demonstrated in example 2, uses a round-robin approach to distribute the tuples to the replicated operator.

The expected performance of the two examples should be similar when running on the same hardware.



Knowledge center information on mixed-mode and UDP.


#CloudPakforDataGroup

Statistics

0 Favorited
12 Views
0 Files
0 Shares
0 Downloads