Passing parameters to an operator in a parallel region

 View Only

Passing parameters to an operator in a parallel region 

Mon August 24, 2020 04:05 PM

The documentation for User-Defined Parallelism presents an example of using a FileSource inside of a UDP region:

@parallel(width=3)
    stream<uint64 i> Output = FileSource() {
        param file: "input" + (rstring)getChannel() + ".csv";
    }

If the above FileSource invocation is inside of a UDP region, then each operator replica will open a different file because of the use of getChannel(). The replica on the 0th channel will open the file with the name "input0.csv", the replica on the 1th channel will open the file with the name "input1.csv", and so on.

In the above example, the resource we want to use happens to have a naming convention that has the parallel channel number encoded in it. This convenient naming scheme will not always hold when we want to access an external resource from inside of a UDP region.

Consider the case of using a TCPSource from inside of a UDP region. It’s unlikely that the IP addresses and ports that we want to connect to will follow the kind of naming scheme shown above. So how can we solve this problem in general?

Our solution uses getChannel() to index into operator local lists:

 @parallel(width=3)
    stream<uint64 i> Src = TCPSource() {
        logic state: {
            list<rstring> _addresses = ["123.45.6.89", "45.6.445.89", "48.3.495.3"];
            list<uint32> _ports = [240u, 560u, 9001u];
        }

        param role: client;
              address: _addresses[getChannel()];
              port: _ports[getChannel()];
    }
When this TCPSource invocation appears inside of a UDP region, each replica of the TCPSource operator will access the part of the _addresses and _ports lists indicated by the value of getChannel(). This solution can apply in situations where we can hard-code the values for the parameters in advance, and know what the maximum parallel width of the UDP region can be. (Note that if the UDP region had more than 3 channels, we would encounter a runtime error when we accessed past the third location in the _addresses and _ports lists.) If those constraints do not hold, then we could instead use submission time values:


 @parallel(width=(int32)getSubmissionTimeValue("parWidth"))
    stream<uint64 i> Src = TCPSource() {
        logic state: {
            list<rstring> _addresses = (list<rstring>)getSubmissionTimeValue("addrs");
            list<uint32> _ports = (list<uint32>)getSubmissionTimeValue("ports");
        }

        param role: client;
              address: _addresses[getChannel()];
              port: _ports[getChannel()];
    }​

The only constraint in the example above is that the length of the addrs and ports lists passed at submission time must be no larger than the value of parWidth, which is also provided at submission time.

At submission time, the following command should work, assuming that the compiled application file was named Main.sab:

streamtool submitjob -P addrs='[“123.45.6.89”, “45.6.445.89”, “48.3.495.3”]’ -P ports='[240, 560, 9001]’ -P parWidth=3 output/Main.sab





#CloudPakforDataGroup
#Streams

Statistics

0 Favorited
14 Views
0 Files
0 Shares
0 Downloads