Robert Schmidtke commented on MAPREDUCE-6923:

Hi Ravi,

When {{shuffleBufferSize <= trans}}, then behavior is exactly the same as old 

if {{readSize == trans}} (i.e. the {{fileChannel.read()}} returned as many 
bytes as I wanted to transfer, {{trans}} is decremented correctly, {{position}} 
is increased correctly and the {{byteBuffer}} is flipped as usual. 
{{byteBuffer}} 's contents are written to {{target}} as usual, {{byteBuffer}} 
is cleared and then hopefully GCed never to be seen again.
I'd say that for {{readSize == trans}}, we're in the [else 
 and thus {{byteBuffer}} is {{limit()}} ed to {{trans}} (which is the size it 
already has because we're in the case where {{trans < shuffleBufferSize}}. It's 
correctly positioned to {{0}} as we're done reading, and {{trans}} is correctly 
set to {{0}}. Afterwards, the loop breaks (it can only be one iteration here 
because otherwise {{trans}} would have been larger than {{shuffleBufferSize}}), 
{{byteBuffer}} is written to {{target}} and then cleared.

if {{readSize < trans}} (almost the same thing as above happens, but in a while 
loop). The only change this patch makes is that the {{byteBuffer}} may be 
smaller than before this patch, but it doesn't matter because its big enough 
for the number of bytes we need to transfer.
Now we have the situation you described for the previous case, and I agree with 
your reasoning here.

> Optimize MapReduce Shuffle I/O for small partitions
> ---------------------------------------------------
>                 Key: MAPREDUCE-6923
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-6923
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>         Environment: Observed in Hadoop 2.7.3 and above (judging from the 
> source code of future versions), and Ubuntu 16.04.
>            Reporter: Robert Schmidtke
>            Assignee: Robert Schmidtke
>             Fix For: 2.9.0, 3.0.0-beta1
>         Attachments: MAPREDUCE-6923.00.patch, MAPREDUCE-6923.01.patch
> When a job configuration results in small partitions read by each reducer 
> from each mapper (e.g. 65 kilobytes as in my setup: a 
> [TeraSort|https://github.com/apache/hadoop/blob/branch-2.7.3/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java]
>  of 256 gigabytes using 2048 mappers and reducers each), and setting
> {code:xml}
> <property>
>   <name>mapreduce.shuffle.transferTo.allowed</name>
>   <value>false</value>
> </property>
> {code}
> then the default setting of
> {code:xml}
> <property>
>   <name>mapreduce.shuffle.transfer.buffer.size</name>
>   <value>131072</value>
> </property>
> {code}
> results in almost 100% overhead in reads during shuffle in YARN, because for 
> each 65K needed, 128K are read.
> I propose a fix in 
> [FadvisedFileRegion.java|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java#L114]
>  as follows:
> {code:java}
> ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(this.shuffleBufferSize, 
> trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
> {code}
> e.g. 
> [here|https://github.com/apache/hadoop/compare/branch-2.7.3...robert-schmidtke:adaptive-shuffle-buffer].
>  This sets the shuffle buffer size to the minimum value of the shuffle buffer 
> size specified in the configuration (128K by default), and the actual 
> partition size (65K on average in my setup). In my benchmarks this reduced 
> the read overhead in YARN from about 100% (255 additional gigabytes as 
> described above) down to about 18% (an additional 45 gigabytes). The runtime 
> of the job remained the same in my setup.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to