[ 
https://issues.apache.org/jira/browse/MAPREDUCE-6923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112313#comment-16112313
 ] 

Robert Schmidtke edited comment on MAPREDUCE-6923 at 8/3/17 7:14 AM:
---------------------------------------------------------------------

Hi Ravi,

my guess is that since {{trans}} is a {{long}}, and 
{{ByteBuffer.allocate(...)}} only takes {{ints}}, a "blind" cast in the 
{{Math.min(...)}} operation might yield a negative value for {{trans > 
Integer.MAX_VALUE}}:

{code:java}
package test;
import java.io.IOException;
import java.nio.ByteBuffer;
public class Test {
    public static void main(String[] args) throws IOException {
        long trans = Integer.MAX_VALUE + 1L;
        int shuffleBufferSize = 131072;
        ByteBuffer byteBuffer = ByteBuffer
                .allocate(Math.min(shuffleBufferSize, (int) trans));
        System.out.println(byteBuffer.capacity());
    }
}
{code}

gives

{code:java}
Exception in thread "main" java.lang.IllegalArgumentException
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
        at test.Test.main(Test.java:9)
{code}

whereas

{code:java}
package test;
import java.io.IOException;
import java.nio.ByteBuffer;
public class Test {
    public static void main(String[] args) throws IOException {
        long trans = Integer.MAX_VALUE + 1L;
        int shuffleBufferSize = 131072;
        ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(shuffleBufferSize,
                trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
        System.out.println(byteBuffer.capacity());
    }
}
{code}

correctly outputs {{131072}}.

As for the other 18% issue, I am not yet quite sure. I'm currently 
investigating the I/O of each of Hadoop's components, using TeraSort as my 
working horse. For a TeraSort of 1024 GiB, YARN reads a total of 1169 GiB in my 
setup, with {{transferTo.allowed=true}}. Taking into account that the MapReduce 
framework counters report 1065 GiB of serialized map output (and thus, 1065 GiB 
of shuffled bytes), the overhead is "only" 104 GiB for 1024 GiB input, or 
roughly 10%. So there are additional reads, even when using {{transferTo}}. 
Maybe it has something to do with resource distribution? Note that I have 
disabled speculative execution, so there are no extra executions of additional 
reducers, which might read the same map output multiple times. However, there 
are 140 "Failed Shuffles" -- does that mean that they have been executed again? 
If so, and assuming that for 1024 GiB of input, each reducer needs to fetch 
{{1065 / 2048 = 0.52 GiB}}, there is an additional overhead of {{2048 * 0.52 = 
73 GiB}}. What remains is an unexplained 31 GiB.

When running TeraSort with {{transferTo.allowed=false}} and my patch as 
described above, sorting 256 GiB, the MapReduce framework counters report 266 
GiB of serialized map output (and thus, 266 GiB of shuffled bytes). In this 
run, there were no "Failed Shuffles". Since my analysis reports that YARN reads 
300 GiB, the overhead is actually probably more correctly measured as 34 GiB (= 
13% of 256 GiB) instead of 45 GiB (= 18% of 256 GiB). These 34 GiB are close 
enough to the 31 GiB for 1024 GiB input (see above), so maybe this is constant 
overhead for 2048 mappers and 2048 reducers?

Anyway, since I'll be investigating this behavior in the future, digging into 
per-file statistics, I'll be able to report exactly which file is read how 
often / how much of it is read. I can then tell exactly what is happening on 
disk. Since this is part of unpublished research, however, I'm afraid I can 
only report the results later.


was (Author: rosch):
Hi Ravi,

my guess is that since {{trans}} is a {{long}}, and 
{{ByteBuffer.allocate(...)}} only takes {{ints}}, a "blind" cast in the 
{{Math.min(...)}} operation might yield a negative value for {{trans > 
Integer.MAX_VALUE}}:

{code:java}
package test;
import java.io.IOException;
import java.nio.ByteBuffer;
public class Test {
    public static void main(String[] args) throws IOException {
        long trans = Integer.MAX_VALUE + 1L;
        int shuffleBufferSize = 131072;
        ByteBuffer byteBuffer = ByteBuffer
                .allocate(Math.min(shuffleBufferSize, (int) trans));
        System.out.println(byteBuffer.capacity());
    }
}
{code}

gives

{code:java}
Exception in thread "main" java.lang.IllegalArgumentException
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
        at test.Test.main(Test.java:12)
{code}

whereas

{code:java}
package test;
import java.io.IOException;
import java.nio.ByteBuffer;
public class Test {
    public static void main(String[] args) throws IOException {
        long trans = Integer.MAX_VALUE + 1L;
        int shuffleBufferSize = 131072;
        ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(shuffleBufferSize,
                trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
        System.out.println(byteBuffer.capacity());
    }
}
{code}

correctly outputs {{131072}}.

As for the other 18% issue, I am not yet quite sure. I'm currently 
investigating the I/O of each of Hadoop's components, using TeraSort as my 
working horse. For a TeraSort of 1024 GiB, YARN reads a total of 1169 GiB in my 
setup, with {{transferTo.allowed=true}}. Taking into account that the MapReduce 
framework counters report 1065 GiB of serialized map output (and thus, 1065 GiB 
of shuffled bytes), the overhead is "only" 104 GiB for 1024 GiB input, or 
roughly 10%. So there are additional reads, even when using {{transferTo}}. 
Maybe it has something to do with resource distribution? Note that I have 
disabled speculative execution, so there are no extra executions of additional 
reducers, which might read the same map output multiple times. However, there 
are 140 "Failed Shuffles" -- does that mean that they have been executed again? 
If so, and assuming that for 1024 GiB of input, each reducer needs to fetch 
{{1065 / 2048 = 0.52 GiB}}, there is an additional overhead of {{2048 * 0.52 = 
73 GiB}}. What remains is an unexplained 31 GiB.

When running TeraSort with {{transferTo.allowed=false}} and my patch as 
described above, sorting 256 GiB, the MapReduce framework counters report 266 
GiB of serialized map output (and thus, 266 GiB of shuffled bytes). In this 
run, there were no "Failed Shuffles". Since my analysis reports that YARN reads 
300 GiB, the overhead is actually probably more correctly measured as 34 GiB (= 
13% of 256 GiB) instead of 45 GiB (= 18% of 256 GiB). These 34 GiB are close 
enough to the 31 GiB for 1024 GiB input (see above), so maybe this is constant 
overhead for 2048 mappers and 2048 reducers?

Anyway, since I'll be investigating this behavior in the future, digging into 
per-file statistics, I'll be able to report exactly which file is read how 
often / how much of it is read. I can then tell exactly what is happening on 
disk. Since this is part of unpublished research, however, I'm afraid I can 
only report the results later.

> YARN Shuffle I/O for small partitions
> -------------------------------------
>
>                 Key: MAPREDUCE-6923
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-6923
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>         Environment: Observed in Hadoop 2.7.3 and above (judging from the 
> source code of future versions), and Ubuntu 16.04.
>            Reporter: Robert Schmidtke
>            Assignee: Robert Schmidtke
>         Attachments: MAPREDUCE-6923.00.patch
>
>
> When a job configuration results in small partitions read by each reducer 
> from each mapper (e.g. 65 kilobytes as in my setup: a 
> [TeraSort|https://github.com/apache/hadoop/blob/branch-2.7.3/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java]
>  of 256 gigabytes using 2048 mappers and reducers each), and setting
> {code:xml}
> <property>
>   <name>mapreduce.shuffle.transferTo.allowed</name>
>   <value>false</value>
> </property>
> {code}
> then the default setting of
> {code:xml}
> <property>
>   <name>mapreduce.shuffle.transfer.buffer.size</name>
>   <value>131072</value>
> </property>
> {code}
> results in almost 100% overhead in reads during shuffle in YARN, because for 
> each 65K needed, 128K are read.
> I propose a fix in 
> [FadvisedFileRegion.java|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java#L114]
>  as follows:
> {code:java}
> ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(this.shuffleBufferSize, 
> trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
> {code}
> e.g. 
> [here|https://github.com/apache/hadoop/compare/branch-2.7.3...robert-schmidtke:adaptive-shuffle-buffer].
>  This sets the shuffle buffer size to the minimum value of the shuffle buffer 
> size specified in the configuration (128K by default), and the actual 
> partition size (65K on average in my setup). In my benchmarks this reduced 
> the read overhead in YARN from about 100% (255 additional gigabytes as 
> described above) down to about 18% (an additional 45 gigabytes). The runtime 
> of the job remained the same in my setup.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to