[ 
https://issues.apache.org/jira/browse/MAPREDUCE-64?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12767022#action_12767022
 ] 

Chris Douglas commented on MAPREDUCE-64:
----------------------------------------

bq. one simple way might be to simply add TRACE level log messages at every 
collect() call with the current values of every index plus the spill number 
[...]

That could be an interesting visualization. I'd already made up the diagrams, 
but anything that helps the analysis and validation would be welcome. I'd 
rather not add a trace to the committed code, but data from it sounds great.

bq. I ran a simple test where I was running a sort of 10 byte records, and it 
turned out that the "optimal" io.sort.record.percent caused my job to be 
significantly slower. It was the case then that a small number of large spills 
actually ran slower than a large number of small spills. Did we ever determine 
what that issue was? I think we should try to understand why the theory isn't 
agreeing with observations here.

IIRC those tests used a non-RawComparator, right? Runping reported similar 
results, where hits to concurrent collection were more expensive than small 
spills. The current theory is that keeping the map thread unblocked is usually 
better for performance. Based on this observation, I'm hoping that the 
spill.percent can also be eliminated at some point in the future, though the 
performance we're leaving on the table there is probably not as severe and is 
more difficult to generalize. Microbenchmarks may also not capture the expense 
of merging many small spills in a busy, shared cluster, where HDFS and other 
tasks are completing for disk bandwidth. I'll be very interested in metrics 
from MAPREDUCE-1115, as they would help to flesh out this hypothesis.

----

The documentation (such as it is) in HADOOP-2919 describes the existing code. 
The metadata and serialization data are tracked using a set of indices marking 
the start and end of a spill ({{kvstart}}, {{kvend}}) and the current position 
({{kvindex}}) while the serialization data are described by similar markers 
({{bufstart}}, {{bufend}}, {{bufindex}}). There are two other indices carried 
over from the existing design. {{bufmark}} is the position in the serialized 
record data of the end of the last fully serialized record. {{bufvoid}} is 
necessary for the RawComparator interface, which requires contiguous ranges for 
key compares; if a serialized key crosses the end of the buffer, it must be 
copied to the front to satisfy the aforementioned API spec. All of these are 
retained; the role of each is largely unchanged.

The proposed design adds another parameter, the {{equator}} (while {{kvstart}} 
and {{bufstart}} could be replaced with a single variable similar to 
{{equator}} the effort seemed misspent). The record serialization moves 
"forward" in the buffer, while the metadata are allocated in 16 byte blocks in 
the opposite direction. This is illustrated in the following diagram:

!M64-0i.png|thumbnail!

The role played by kvoffsets and kvindices is preserved; logically, 
particularly in the spill, each is interpreted in roughly the same way. In the 
new code, the allocation is not static, but will instead expand with the 
serialized records. This avoids degenerate cases for combiners and multilevel 
merges (though not necessarily optimal performance).

Spills are triggered in two conditions: either the soft limit is reached 
(collection proceeds concurrently with the spill) or a record is large enough 
to require a spill before it can be written to the buffer (collection is 
blocked). In the former case is illustrated here:

!M64-1i.png|thumbnail!

The {{equator}} is moved to an offset proportional to the average record size 
(caveats 
[above|https://issues.apache.org/jira/browse/MAPREDUCE-64?focusedCommentId=12765984&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_12765984]),
 {{kvindex}} is moved off the equator, aligned with the end of the array (int 
alignment, also so no metadata block will span the end of the array). 
Collection proceeds again from the equator, growing toward the ends of the 
spill. Should either run out of space, collection will block until the spill 
completes. Note that there is no partially written data when the soft limit is 
reached; it can only be triggered in collect, not in the blocking buffer.

The other case to consider is when record data are partially written into the 
collection buffer, but the available space is exhausted:

!M64-2i.png|thumbnail!

Here, the equator is moved to the beginning of the partial record and 
collection blocks. When the spill completes, the metadata are written off the 
equator and serialization of the record can continue.

During collection, indices are adjusted only when holding a lock. As in the 
current code, the lock is only obtained in collect when one of the possible 
conditions for spilling may be satisfied. Since collect does not block, every 
serialized record performs a zero-length write into the buffer once both the 
key and value are written. This ensures that all the boundaries are checked and 
that collection will block if necessary.

That's about it.

> Map-side sort is hampered by io.sort.record.percent
> ---------------------------------------------------
>
>                 Key: MAPREDUCE-64
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-64
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>            Reporter: Arun C Murthy
>            Assignee: Chris Douglas
>         Attachments: M64-0.patch, M64-0i.png, M64-1.patch, M64-1i.png, 
> M64-2.patch, M64-2i.png, M64-3.patch
>
>
> Currently io.sort.record.percent is a fairly obscure, per-job configurable, 
> expert-level parameter which controls how much accounting space is available 
> for records in the map-side sort buffer (io.sort.mb). Typically values for 
> io.sort.mb (100) and io.sort.record.percent (0.05) imply that we can store 
> ~350,000 records in the buffer before necessitating a sort/combine/spill.
> However for many applications which deal with small records e.g. the 
> world-famous wordcount and it's family this implies we can only use 5-10% of 
> io.sort.mb i.e. (5-10M) before we spill inspite of having _much_ more memory 
> available in the sort-buffer. The word-count for e.g. results in ~12 spills 
> (given hdfs block size of 64M). The presence of a combiner exacerbates the 
> problem by piling serialization/deserialization of records too...
> Sure, jobs can configure io.sort.record.percent, but it's tedious and 
> obscure; we really can do better by getting the framework to automagically 
> pick it by using all available memory (upto io.sort.mb) for either the data 
> or accounting.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to