File and Object Storage

Workflow of a Hadoop Mapreduce job with HDFS Transparency & IBM Spectrum Scale

By CHINMAYA MISHRA posted Mon November 23, 2020 11:50 AM

  

The MapReduce framework provides the de-facto compute engine in a Hadoop based analytics infrastructure. Popular Hadoop workloads like Hive, HBase and others leverage the Mapreduce framework for analysis of petabytes of data.

The MapReduce framework consists of Mappers and Reducers which are essentially JVM based containers that run the actual application code. The Mappers and Reducers interact with storage via HDFS to retrieve and analyse data. When a Hadoop cluster is integrated with Spectrum Scale, these Mappers and Reducers interact with Spectrum Scale HDFS Transparency components instead.

In this blog, we take a closer look at the subtle differences inherent in these the interactions as compared with a native HDFS based system. The Map, Shuffle and Reduce phases are discussed, and in each phase the native HDFS vs Spectrum Scale HDFS Transparency differences in the workflow are highlighted.

This writing is intended for Hadoop solution architects/designers who may be considering moving to a IBM Spectrum Scale based Big Data solution from a native HDFS based environment.
HDFS Transparency with IBM Spectrum Scale supports a variety of shared storage including the IBM Elastic Storage Server (ESS) and other IBM storage platforms. This is explained in detail in IBM documentation: https://www.ibm.com/support/knowledgecenter/STXKQY_BDA_SHR/bl1bda_hadoopscale_kclanding.htm?cp=STXKQY_5.0.4 This blog focuses on shared storage, and IBM ESS is cited as example.

Abbreviations used in this blog:

  • NN    means NameNode
  • DN    means DataNode
  • RM    means ResourceManager
  • NM    means NodeManager
  • Scale  means “IBM Spectrum Scale”
  • ESS means IBM Elastic Storage Server

With native HDFS, RM tends to schedule mappers on worker nodes which has a replica of the data block. This is 1 of the 3 DNs which have the replicas (assuming hdfs replication factor of 3). However, with Spectrum Scale & shared storage, there are no physical local replicas on the worker nodes. In the following teragen/terasort example, we will see how such scheduling is taken care of. The teragen step creates 10million records equalling 10GB of data. A 3 node Hadoop cluster using a remote mounted filesystem from an ESS was used in this example.

[root@dn03-dat ~]# hadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples.jar teragen -Dmapreduce.job.maps=20 100000000    /teragen-10gb
-Dmapreduce.job.maps=20 => 20 mappers specified. If not specified, just 1 or 2 mappers are created.
100000000                          => Number of records.
/teragen-10gb                      => The directory under Hadoop root where the record files would be created.
 
With teragen, each mapper creates one file. Each of these files contain unsorted records and serve as input data for the terasort job. Once the teragen job completes, we can see those files created inside the Scale directory /teragen-10gb through Posix. Each file is 500MB in size, 500MB x 20 files totals 10GB.

[root@dn03-dat teragen-10gb]# pwd
/ess-gpfs0/chmdata1/teragen-10gb                                => /ess-gpfs0/chmdata1 is our Scale Hadoop root directory.
[root@dn03-dat teragen-10gb]# ls -l
total 9768960
-rw-r--r-- 1 root root 500000000 Apr 14 05:35 part-m-00000
-rw-r--r-- 1 root root 500000000 Apr 14 05:36 part-m-00001
…. (lines omitted)
-rw-r--r-- 1 root root 500000000 Apr 14 05:35 part-m-00019
-rw-r--r-- 1 root root         0 Apr 14 05:36 _SUCCESS
[root@dn03-dat teragen-10gb]#
 

Running Terasort:

Input considerations for terasort:

  • -Dmapreduce.job.reduces=40   => Number of reducers. If not specified, just 1 or 2 reducers are launched by default.
  • -Dio.file.buffer.size=1048576   => IO file buffer, 1MB recommended
  • /teragen-10gb     => The directory under Scale Hadoop root containing input records
  • /terasort-10gb     => The output directory under Scale Hadoop root containing sorted records
[root@dn03-dat ~]# hadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples.jar terasort \
-Dmapreduce.job.reduces=40 \
-Dio.file.buffer.size=1048576   \
/teragen-10gb     /terasort-10gb

20/04/22 05:25:02 INFO terasort.TeraSort: starting
20/04/22 05:25:10 INFO input.FileInputFormat: Total input files to process : 20
Spent 1482ms computing base-splits.
Spent 61ms computing TeraScheduler splits.
Computing input splits took 1548ms
Sampling 10 splits of 80
Making 40 from 100000 sampled records
Computing parititions took 2976ms
Spent 4542ms computing partitions.
20/04/22 05:25:16 INFO client.RMProxy: Connecting to ResourceManager at dn03-dat.gpfs.net/30.1.1.17:8050
20/04/22 05:25:18 INFO client.AHSProxy: Connecting to Application History server at dn04-dat.gpfs.net/30.1.1.18:10200
20/04/22 05:25:19 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/root/.staging/job_1587011342744_0003
20/04/22 05:25:19 INFO mapreduce.JobSubmitter: number of splits:80
20/04/22 05:25:21 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1587011342744_0003
20/04/22 05:25:21 INFO mapreduce.JobSubmitter: Executing with tokens: []
20/04/22 05:25:22 INFO conf.Configuration: found resource resource-types.xml at file:/etc/hadoop/3.1.4.0-311/0/resource-types.xml
20/04/22 05:25:23 INFO impl.YarnClientImpl: Submitted application application_1587011342744_0003
20/04/22 05:25:23 INFO mapreduce.Job: The url to track the job: http://dn03-dat.gpfs.net:8088/proxy/application_1587011342744_0003/
20/04/22 05:25:23 INFO mapreduce.Job: Running job: job_1587011342744_0003
20/04/22 05:25:30 INFO mapreduce.Job: Job job_1587011342744_0003 running in uber mode : false
20/04/22 05:25:30 INFO mapreduce.Job: map 0% reduce 0%
20/04/22 05:25:46 INFO mapreduce.Job: map 1% reduce 0%
20/04/22 05:25:47 INFO mapreduce.Job: map 3% reduce 0%
20/04/22 05:25:48 INFO mapreduce.Job: map 4% reduce 0%
………………..<lines omitted>……………………………………
20/04/22 05:26:13 INFO mapreduce.Job: map 84% reduce 0%
………………..<lines omitted>……………………………………
20/04/22 05:26:41 INFO mapreduce.Job: map 98% reduce 30%
………………..<lines omitted>……………………………………
20/04/22 05:27:23 INFO mapreduce.Job: map 100% reduce 85%
………………..<lines omitted>……………………………………
20/04/22 05:28:20 INFO mapreduce.Job: map 100% reduce 100%
20/04/22 05:28:33 INFO mapreduce.Job: Job job_1587011342744_0003 completed successfully
20/04/22 05:28:33 INFO mapreduce.Job: Counters: 54
File System Counters
FILE: Number of bytes read=10400034560
FILE: Number of bytes written=20828210580
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=10000009600
HDFS: Number of bytes written=10000000000
HDFS: Number of read operations=440
HDFS: Number of large read operations=0
HDFS: Number of write operations=80
Job Counters
Launched map tasks=80
Launched reduce tasks=40
Data-local map tasks=27
Rack-local map tasks=53
………………..<lines omitted>……………………………………
Map-Reduce Framework
Map input records=100000000
Map output records=100000000
Map output bytes=10200000000
Map output materialized bytes=10400019200
Input split bytes=9600
Combine input records=0
Combine output records=0
Reduce input groups=100000000
Reduce shuffle bytes=10400019200
Reduce input records=100000000
Reduce output records=100000000
Spilled Records=200000000
Shuffled Maps =3200
Failed Shuffles=0
Merged Map outputs=3200
GC time elapsed (ms)=624721
CPU time spent (ms)=14432410
Physical memory (bytes) snapshot=121880182784
Virtual memory (bytes) snapshot=597271707648
Total committed heap usage (bytes)=124417212416
Peak Map Physical memory (bytes)=1602551808
Peak Map Virtual memory (bytes)=4419944448
Peak Reduce Physical memory (bytes)=826998784
Peak Reduce Virtual memory (bytes)=6260457472
………………..<lines omitted>……………………………………
File Input Format Counters
Bytes Read=10000000000
File Output Format Counters
Bytes Written=10000000000
20/04/22 05:28:33 INFO terasort.TeraSort: done

Now let’s examine the operations in each phase of terasort:

Mapreduce workflow
Workflow of a MapReduce Job

Workflow during MAP phase:

  • dfs client submits a job to YARN.
  • A list of input splits is computed. RM creates a map task for each split. Usually a split unit is a dfs block.
  • So, for each block of a file being read, one mapper is scheduled to be created. In our test, we used the default HDFS blocksize which is 128MB. Since each input file is 500MB, this corresponds to 4 dfs blocks (500/128=3.90) per file. There are 20 input files, so 20x 4 = 80 mappers were created.
  • RM adds the mapper tasks to a designated scheduler Queue.
  • The scheduler decides which tasks from which Queue can be processed on which worker nodes. Each host running a NM is a worker node.
  • Mappers are scheduled based on following considerations:
    1. a. Resource availability (typically CPU and Memory usage) on the worker nodes. NMs provide this info to the RM periodically.
    2. In native HDFS, mapper tasks have affinity for data locality. RM communicates with the NN to find out which hosts contain a replica of the block for a given split. Based on this, RM tries to make an informed decision about which tasks should be scheduled based on currently available resources on the worker nodes.
    3. However, with Spectrum Scale with shared storage, this data locality consideration is not relevant. So, the RM considers point (1) mentioned above, plus the load on DNs as obtained from the NN to take this scheduling decision. This is controlled by a config (yarn.scheduler.capacity.node-locality-delay=-1).
  • Each mapper has an embedded dfs client, and they start looking up the NN for metadata.
  • NameNode calculates block locations for each mapper. With Spectrum scale, block location is the actual Posix file offsets for the Scale file in consideration. NN sends to the mappers the Scale file path and the offset of the file (0, 128MB, 256 MB etc) that they should respectively read from.
  • Mappers open connection to local DN RPC port for reading:
    1. 1st Mapper asks local DN to read a block, e.g. a Scale file from offset 0
    2. 2nd Mapper asks local DN to read another block, e.g. the same Scale file from offset 128M and so on.
    3. 3rd Mapper asks local DN to read another block, e.g. another Scale file from offset 256M and so on.
  • For every mapper, DN engages a data transfer thread to read a data block from underlying storage. There are dfs.datanode.max.transfer.threads=N such threads. This is true for native HDFS too. However, with Spectrum Scale, the transfer thread leverages the co-located Spectrum Scale client to open the Scale file with Posix style access, seeks to the offset and reads the data block into buffer.
  • Then the transfer thread writes back the buffer to the dfs client, which in this case is the mapper.
  • Mappers process the data, sort it and write this interim result to shuffle space (controlled by parameter yarn.local-dirs) on localhost. Intermediate key-value generated by mapper is sorted by key.

NOTE: It is possible that the worker nodes don’t have a local DN. This is a possibility if a user prefers to have separate Hadoop and HDFS Transparency clusters. In such cases, the scheduling logic is purely based on the currently available resources on the worker nodes only.

 

One may view the cluster wide distribution of active containers using following command:

[root@dn03-dat ~]# yarn node -list
20/04/22 05:26:12 INFO client.RMProxy: Connecting to ResourceManager at dn03-dat.gpfs.net/30.1.1.17:8050
Total Nodes:3
         Node-Id             Node-State Node-Http-Address       Number-of-Running-Containers
mn02-dat.gpfs.net:45454         RUNNING mn02-dat.gpfs.net:8042                            14
dn03-dat.gpfs.net:45454         RUNNING dn03-dat.gpfs.net:8042                            42
dn04-dat.gpfs.net:45454         RUNNING dn04-dat.gpfs.net:8042                            16
[root@dn03-dat ~]#


One may view the resource utilization by containers on worker nodes using following command:

[root@dn03-dat ~]# yarn node -list -showDetails
                    ………………..<lines omitted>……………………………………
Total Nodes:3
         Node-Id             Node-State Node-Http-Address       Number-of-Running-Containers
mn02-dat.gpfs.net:45454         RUNNING mn02-dat.gpfs.net:8042                            14
Detailed Node Information :
        Configured Resources : <memory:190464, vCores:153>
        Allocated Resources : <memory:52224, vCores:14>
        Resource Utilization by Node : PMem:85856 MB, VMem:85856 MB, VCores:7.521652
        Resource Utilization by Containers : PMem:5840 MB, VMem:76914 MB, VCores:6.177
        Node-Labels :
dn03-dat.gpfs.net:45454         RUNNING dn03-dat.gpfs.net:8042                            35
          ………………..<lines omitted>……………………………………

 

Workflow during Shuffle phase:

  • As map tasks tend to finish, RM tries to schedule reducers across the worker nodes.
  • For reducers, data locality is not considered because reducers receive their input from mappers from multiple worker nodes over the network. This is true even for native HDFS.
  • So, RM tries to make an informed scheduling decision about which reducers should be scheduled based on currently available resources on the worker nodes.
  • In shuffle phase, interim map output from mappers are transferred to reducers. Every reducer fetches interim results for all values associated with the same key from multiple nodes. This is a network intensive operation within the Hadoop cluster nodes.
  • For optimum performance, the shuffle data should be on local disks. However, some users mayn’t have enough local disks on their Hadoop nodes. In such cases a Scale directory may be used, however ensuring that multiple nodes don’t write into the same directory and corrupt the interim results. A workable strategy may be to configure yarn.nodemanager.local-dir and nodemanager.log-dir as following:
yarn.nodemanager.local-dir=/shuffle-dir
where /shuffle-dir is a symbolic link to a unique Scale directory on a given host, e.g. /ess-gpfs0/chmdata1/hadoop/yarn/<local hostname>

The shuffle data is intermediate/temporary data and hence doesn’t need to be replicated.

Workflow during Reduce phase:

  • During shuffle phase, data from the mappers which are grouped by key, get split among reducers. As shuffle phase progresses, reducers enter into Reduce phase, where merging and sorting of map outputs happen.
  • As merging and sorting of map records complete, it’s now time to write the sorted records to Transparency. Each reducer has embedded dfs clients, which start looking up the NN for metadata.
  • NameNode calculates block locations for each reducer. With Spectrum scale, block location is the actual Posix file offsets for the Scale file in consideration. NN sends to the reducers the Scale file path and the offset of the file (0, 128MB, 256 MB etc) that they should respectively write to.
  • Recucers open connection to local DN rpc port for write
    • 1st Mapper asks local DN to write a block, e.g. a Scale file from offset 0
    • 2nd Mapper asks local DN to write another block, e.g. the another Scale file from offset 128M and so on.
  • For every reducer, DN engages a data transfer thread to read a data block from underlying storage. There are dfs.datanode.max.transfer.threads=N such threads. This is true for native HDFS too. However, with Spectrum Scale, the transfer thread leverages the co-located Spectrum Scale client to opens the Scale file with Posix style access, seeks to the offset and writes the data block.
  • Every reducer follows the same logic as mentioned in the file write (hdfs -put) section. Each of the output file is written to by one reducer. In our case we had 40 reducers, so 40 output files were created, each ~250MB. 250MB x 40 = 10GB of sorted output.
[root@dn03-dat terasort-10gb]# pwd
/ess-gpfs0/chmdata1/terasort-10gb
[root@dn03-dat terasort-10gb]# ls -l
total 9770753
-rw-r--r-- 1 root root       429 Apr 22 05:25 _partition.lst
-rw-r--r-- 1 root root 252866900 Apr 22 05:28 part-r-00000
-rw-r--r-- 1 root root 241313900 Apr 22 05:27 part-r-00001
          ………………..<lines omitted>……………………………………
-rw-r--r-- 1 root root 257091600 Apr 22 05:27 part-r-00039
-rw-r--r-- 1 root root         0 Apr 22 05:28 _SUCCESS
[root@dn03-dat terasort-10gb]#

 

Thanks you for reading this blog post. Comments are welcome.

Acknowledgements
Thanks to Xin TW Wang from IBM China Systems & Technology Lab for reviewing of this content.

0 comments
8 views

Permalink