[jira] [Commented] (MAPREDUCE-6923) Optimize MapReduce Shuffle I/O for small partitions

2017-08-10 Thread Ravi Prakash (JIRA)

[ 
https://issues.apache.org/jira/browse/MAPREDUCE-6923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16121932#comment-16121932
 ] 

Ravi Prakash commented on MAPREDUCE-6923:
-

bq. I'd say that for readSize == trans, we're in the else block, 
Thanks for pointing that out Robert! :-) Yupp. I agree 

bq. I'll be linking to the results once they're properly published.
Looking forward to it :-)

> Optimize MapReduce Shuffle I/O for small partitions
> ---
>
> Key: MAPREDUCE-6923
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-6923
> Project: Hadoop Map/Reduce
>  Issue Type: Improvement
> Environment: Observed in Hadoop 2.7.3 and above (judging from the 
> source code of future versions), and Ubuntu 16.04.
>Reporter: Robert Schmidtke
>Assignee: Robert Schmidtke
> Fix For: 2.9.0, 3.0.0-beta1
>
> Attachments: MAPREDUCE-6923.00.patch, MAPREDUCE-6923.01.patch
>
>
> When a job configuration results in small partitions read by each reducer 
> from each mapper (e.g. 65 kilobytes as in my setup: a 
> [TeraSort|https://github.com/apache/hadoop/blob/branch-2.7.3/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java]
>  of 256 gigabytes using 2048 mappers and reducers each), and setting
> {code:xml}
> 
>   mapreduce.shuffle.transferTo.allowed
>   false
> 
> {code}
> then the default setting of
> {code:xml}
> 
>   mapreduce.shuffle.transfer.buffer.size
>   131072
> 
> {code}
> results in almost 100% overhead in reads during shuffle in YARN, because for 
> each 65K needed, 128K are read.
> I propose a fix in 
> [FadvisedFileRegion.java|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java#L114]
>  as follows:
> {code:java}
> ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(this.shuffleBufferSize, 
> trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
> {code}
> e.g. 
> [here|https://github.com/apache/hadoop/compare/branch-2.7.3...robert-schmidtke:adaptive-shuffle-buffer].
>  This sets the shuffle buffer size to the minimum value of the shuffle buffer 
> size specified in the configuration (128K by default), and the actual 
> partition size (65K on average in my setup). In my benchmarks this reduced 
> the read overhead in YARN from about 100% (255 additional gigabytes as 
> described above) down to about 18% (an additional 45 gigabytes). The runtime 
> of the job remained the same in my setup.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org



[jira] [Commented] (MAPREDUCE-6923) Optimize MapReduce Shuffle I/O for small partitions

2017-08-10 Thread Robert Schmidtke (JIRA)

[ 
https://issues.apache.org/jira/browse/MAPREDUCE-6923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16121117#comment-16121117
 ] 

Robert Schmidtke commented on MAPREDUCE-6923:
-

Thanks for coming back to my comments.

When I said Yarn I indeed meant the NodeManager, sorry for the confusion. 
You're right about the shuffle service, it was however something that I only 
discovered recently, having built my configuration a long time ago, not exactly 
knowing what I was doing. I set these keys as you described.
I'm seeing jar files being loaded in the MapTask and ReduceTask JVMs alright, 
but there does not seem to be disk I/O overhead.

In any case, I greatly appreciate all of your effort, and now that things are 
working as expected for me, I can focus on analyzing the numbers and making 
some sense of them. I'll be linking to the results once they're properly 
published.

Cheers
Robert

> Optimize MapReduce Shuffle I/O for small partitions
> ---
>
> Key: MAPREDUCE-6923
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-6923
> Project: Hadoop Map/Reduce
>  Issue Type: Improvement
> Environment: Observed in Hadoop 2.7.3 and above (judging from the 
> source code of future versions), and Ubuntu 16.04.
>Reporter: Robert Schmidtke
>Assignee: Robert Schmidtke
> Fix For: 2.9.0, 3.0.0-beta1
>
> Attachments: MAPREDUCE-6923.00.patch, MAPREDUCE-6923.01.patch
>
>
> When a job configuration results in small partitions read by each reducer 
> from each mapper (e.g. 65 kilobytes as in my setup: a 
> [TeraSort|https://github.com/apache/hadoop/blob/branch-2.7.3/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java]
>  of 256 gigabytes using 2048 mappers and reducers each), and setting
> {code:xml}
> 
>   mapreduce.shuffle.transferTo.allowed
>   false
> 
> {code}
> then the default setting of
> {code:xml}
> 
>   mapreduce.shuffle.transfer.buffer.size
>   131072
> 
> {code}
> results in almost 100% overhead in reads during shuffle in YARN, because for 
> each 65K needed, 128K are read.
> I propose a fix in 
> [FadvisedFileRegion.java|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java#L114]
>  as follows:
> {code:java}
> ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(this.shuffleBufferSize, 
> trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
> {code}
> e.g. 
> [here|https://github.com/apache/hadoop/compare/branch-2.7.3...robert-schmidtke:adaptive-shuffle-buffer].
>  This sets the shuffle buffer size to the minimum value of the shuffle buffer 
> size specified in the configuration (128K by default), and the actual 
> partition size (65K on average in my setup). In my benchmarks this reduced 
> the read overhead in YARN from about 100% (255 additional gigabytes as 
> described above) down to about 18% (an additional 45 gigabytes). The runtime 
> of the job remained the same in my setup.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org



[jira] [Commented] (MAPREDUCE-6923) Optimize MapReduce Shuffle I/O for small partitions

2017-08-10 Thread Robert Schmidtke (JIRA)

[ 
https://issues.apache.org/jira/browse/MAPREDUCE-6923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16121109#comment-16121109
 ] 

Robert Schmidtke commented on MAPREDUCE-6923:
-

Hi Ravi,

{quote}
When {{shuffleBufferSize <= trans}}, then behavior is exactly the same as old 
code.
{quote}
Yes.

{quote}
if {{readSize == trans}} (i.e. the {{fileChannel.read()}} returned as many 
bytes as I wanted to transfer, {{trans}} is decremented correctly, {{position}} 
is increased correctly and the {{byteBuffer}} is flipped as usual. 
{{byteBuffer}} 's contents are written to {{target}} as usual, {{byteBuffer}} 
is cleared and then hopefully GCed never to be seen again.
{quote}
I'd say that for {{readSize == trans}}, we're in the [else 
block|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java#L127],
 and thus {{byteBuffer}} is {{limit()}} ed to {{trans}} (which is the size it 
already has because we're in the case where {{trans < shuffleBufferSize}}. It's 
correctly positioned to {{0}} as we're done reading, and {{trans}} is correctly 
set to {{0}}. Afterwards, the loop breaks (it can only be one iteration here 
because otherwise {{trans}} would have been larger than {{shuffleBufferSize}}), 
{{byteBuffer}} is written to {{target}} and then cleared.

{quote}
if {{readSize < trans}} (almost the same thing as above happens, but in a while 
loop). The only change this patch makes is that the {{byteBuffer}} may be 
smaller than before this patch, but it doesn't matter because its big enough 
for the number of bytes we need to transfer.
{quote}
Now we have the situation you described for the previous case, and I agree with 
your reasoning here.

> Optimize MapReduce Shuffle I/O for small partitions
> ---
>
> Key: MAPREDUCE-6923
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-6923
> Project: Hadoop Map/Reduce
>  Issue Type: Improvement
> Environment: Observed in Hadoop 2.7.3 and above (judging from the 
> source code of future versions), and Ubuntu 16.04.
>Reporter: Robert Schmidtke
>Assignee: Robert Schmidtke
> Fix For: 2.9.0, 3.0.0-beta1
>
> Attachments: MAPREDUCE-6923.00.patch, MAPREDUCE-6923.01.patch
>
>
> When a job configuration results in small partitions read by each reducer 
> from each mapper (e.g. 65 kilobytes as in my setup: a 
> [TeraSort|https://github.com/apache/hadoop/blob/branch-2.7.3/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java]
>  of 256 gigabytes using 2048 mappers and reducers each), and setting
> {code:xml}
> 
>   mapreduce.shuffle.transferTo.allowed
>   false
> 
> {code}
> then the default setting of
> {code:xml}
> 
>   mapreduce.shuffle.transfer.buffer.size
>   131072
> 
> {code}
> results in almost 100% overhead in reads during shuffle in YARN, because for 
> each 65K needed, 128K are read.
> I propose a fix in 
> [FadvisedFileRegion.java|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java#L114]
>  as follows:
> {code:java}
> ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(this.shuffleBufferSize, 
> trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
> {code}
> e.g. 
> [here|https://github.com/apache/hadoop/compare/branch-2.7.3...robert-schmidtke:adaptive-shuffle-buffer].
>  This sets the shuffle buffer size to the minimum value of the shuffle buffer 
> size specified in the configuration (128K by default), and the actual 
> partition size (65K on average in my setup). In my benchmarks this reduced 
> the read overhead in YARN from about 100% (255 additional gigabytes as 
> described above) down to about 18% (an additional 45 gigabytes). The runtime 
> of the job remained the same in my setup.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org



[jira] [Commented] (MAPREDUCE-6923) Optimize MapReduce Shuffle I/O for small partitions

2017-08-09 Thread Ravi Prakash (JIRA)

[ 
https://issues.apache.org/jira/browse/MAPREDUCE-6923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120829#comment-16120829
 ] 

Ravi Prakash commented on MAPREDUCE-6923:
-

Oh and sorry about neglecting your questions earlier. Apologies also if this is 
too deep in the details. Maybe a better understanding could help.

The Hadoop project has tried to make clear distinctions between YARN (the 
resource management) and frameworks that can run on top of  YARN (eg. 
MapReduce, Tez, Slider etc.). Even so some dependencies have stuck around.

bq. I see that some 1.5 GiB is spent on reading the mapreduce jar files (in 
Yarn), and another 1.2 GiB is spent reading jar files in /usr/lib/jvm.
I'm not entirely sure what you mean when you say Yarn here. I'm guessing you 
mean the NodeManager. _Technically_ the NodeManager shouldn't really even be 
loading the MapReduce jars (because separate projects blah blah). However, 
there's a MapReduce Auxiliary Shuffle Service (if you see your yarn-site.xml 
{{yarn.nodemanager.aux-services}} probably has 
{{org.apache.hadoop.mapred.ShuffleHandler}} which I'm sure pulls in all sorts 
of MapReduce code into the NodeManager JVM. This happens only when you start 
the cluster (the auxiliary ShuffleService is a long-running service in the 
NodeManager) . 

{quote}
I have instrumented reading zip and jar files separately, and over the course 
of all map tasks (TeraGen + TeraSort), my instrumentation gives a total of 638 
GiB / (2048 + 2048) = 159.5 MiB per mapper, and 337 GiB / 2048 = 168.5 MiB per 
reducer. However I wouldn't rely too much on these numbers, because if I added 
them to the regular I/O induced by reading/writing the input/output, shuffle 
and spill, then my numbers wouldn't agree any longer with the XFS counters.
{quote}
Hmm.. without knowing exactly what your instrumentation does, I will choose to 
share your skepticism of these numbers :-)

bq. Do you mean that Yarn should exhibit this I/O, or would I see this in the 
map and reduce JVMs (as explained above)?
Again, I'm guessing by "Yarn" over here you mean the NodeManager. To launch any 
YARN container (MapTask or ReduceTask or TezChild etc) the NodeManager does a 
[lot of 
things|https://github.com/apache/hadoop/blob/ac7d0604bc73c0925eff240ad9837e14719d57b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java#L161]
 . One of the things is to localize the resources. For this, usually a separate 
process called a Localizer is run. This process may download things from HDFS 
to the local machine under certain circumstances. (usually though if the job 
jars are already in the DistributedCache, then it may be skipped)
However I was referring to the MapTask and ReduceTask JVMs loading the jar 
files.


> Optimize MapReduce Shuffle I/O for small partitions
> ---
>
> Key: MAPREDUCE-6923
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-6923
> Project: Hadoop Map/Reduce
>  Issue Type: Improvement
> Environment: Observed in Hadoop 2.7.3 and above (judging from the 
> source code of future versions), and Ubuntu 16.04.
>Reporter: Robert Schmidtke
>Assignee: Robert Schmidtke
> Fix For: 2.9.0, 3.0.0-beta1
>
> Attachments: MAPREDUCE-6923.00.patch, MAPREDUCE-6923.01.patch
>
>
> When a job configuration results in small partitions read by each reducer 
> from each mapper (e.g. 65 kilobytes as in my setup: a 
> [TeraSort|https://github.com/apache/hadoop/blob/branch-2.7.3/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java]
>  of 256 gigabytes using 2048 mappers and reducers each), and setting
> {code:xml}
> 
>   mapreduce.shuffle.transferTo.allowed
>   false
> 
> {code}
> then the default setting of
> {code:xml}
> 
>   mapreduce.shuffle.transfer.buffer.size
>   131072
> 
> {code}
> results in almost 100% overhead in reads during shuffle in YARN, because for 
> each 65K needed, 128K are read.
> I propose a fix in 
> [FadvisedFileRegion.java|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java#L114]
>  as follows:
> {code:java}
> ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(this.shuffleBufferSize, 
> trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
> {code}
> e.g. 
> [here|https://github.com/apache/hadoop/compare/branch-2.7.3...robert-schmidtke:adaptive-shuffle-buffer].
>  This sets the shuffle buffer size to the minimum value of the shuffle buffer 
> size specified in the configuration (128K by default), and the actual 
> partition size (65K on average in my 

[jira] [Commented] (MAPREDUCE-6923) Optimize MapReduce Shuffle I/O for small partitions

2017-08-09 Thread Hudson (JIRA)

[ 
https://issues.apache.org/jira/browse/MAPREDUCE-6923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120802#comment-16120802
 ] 

Hudson commented on MAPREDUCE-6923:
---

SUCCESS: Integrated in Jenkins build Hadoop-trunk-Commit #12157 (See 
[https://builds.apache.org/job/Hadoop-trunk-Commit/12157/])
MAPREDUCE-6923. Optimize MapReduce Shuffle I/O for small partitions. (raviprak: 
rev ac7d0604bc73c0925eff240ad9837e14719d57b7)
* (edit) 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java


> Optimize MapReduce Shuffle I/O for small partitions
> ---
>
> Key: MAPREDUCE-6923
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-6923
> Project: Hadoop Map/Reduce
>  Issue Type: Improvement
> Environment: Observed in Hadoop 2.7.3 and above (judging from the 
> source code of future versions), and Ubuntu 16.04.
>Reporter: Robert Schmidtke
>Assignee: Robert Schmidtke
> Fix For: 2.9.0, 3.0.0-beta1
>
> Attachments: MAPREDUCE-6923.00.patch, MAPREDUCE-6923.01.patch
>
>
> When a job configuration results in small partitions read by each reducer 
> from each mapper (e.g. 65 kilobytes as in my setup: a 
> [TeraSort|https://github.com/apache/hadoop/blob/branch-2.7.3/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraSort.java]
>  of 256 gigabytes using 2048 mappers and reducers each), and setting
> {code:xml}
> 
>   mapreduce.shuffle.transferTo.allowed
>   false
> 
> {code}
> then the default setting of
> {code:xml}
> 
>   mapreduce.shuffle.transfer.buffer.size
>   131072
> 
> {code}
> results in almost 100% overhead in reads during shuffle in YARN, because for 
> each 65K needed, 128K are read.
> I propose a fix in 
> [FadvisedFileRegion.java|https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java#L114]
>  as follows:
> {code:java}
> ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(this.shuffleBufferSize, 
> trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
> {code}
> e.g. 
> [here|https://github.com/apache/hadoop/compare/branch-2.7.3...robert-schmidtke:adaptive-shuffle-buffer].
>  This sets the shuffle buffer size to the minimum value of the shuffle buffer 
> size specified in the configuration (128K by default), and the actual 
> partition size (65K on average in my setup). In my benchmarks this reduced 
> the read overhead in YARN from about 100% (255 additional gigabytes as 
> described above) down to about 18% (an additional 45 gigabytes). The runtime 
> of the job remained the same in my setup.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org