[
https://issues.apache.org/jira/browse/MAPREDUCE-64?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12767022#action_12767022
]
Chris Douglas commented on MAPREDUCE-64:
----------------------------------------
bq. one simple way might be to simply add TRACE level log messages at every
collect() call with the current values of every index plus the spill number
[...]
That could be an interesting visualization. I'd already made up the diagrams,
but anything that helps the analysis and validation would be welcome. I'd
rather not add a trace to the committed code, but data from it sounds great.
bq. I ran a simple test where I was running a sort of 10 byte records, and it
turned out that the "optimal" io.sort.record.percent caused my job to be
significantly slower. It was the case then that a small number of large spills
actually ran slower than a large number of small spills. Did we ever determine
what that issue was? I think we should try to understand why the theory isn't
agreeing with observations here.
IIRC those tests used a non-RawComparator, right? Runping reported similar
results, where hits to concurrent collection were more expensive than small
spills. The current theory is that keeping the map thread unblocked is usually
better for performance. Based on this observation, I'm hoping that the
spill.percent can also be eliminated at some point in the future, though the
performance we're leaving on the table there is probably not as severe and is
more difficult to generalize. Microbenchmarks may also not capture the expense
of merging many small spills in a busy, shared cluster, where HDFS and other
tasks are completing for disk bandwidth. I'll be very interested in metrics
from MAPREDUCE-1115, as they would help to flesh out this hypothesis.
----
The documentation (such as it is) in HADOOP-2919 describes the existing code.
The metadata and serialization data are tracked using a set of indices marking
the start and end of a spill ({{kvstart}}, {{kvend}}) and the current position
({{kvindex}}) while the serialization data are described by similar markers
({{bufstart}}, {{bufend}}, {{bufindex}}). There are two other indices carried
over from the existing design. {{bufmark}} is the position in the serialized
record data of the end of the last fully serialized record. {{bufvoid}} is
necessary for the RawComparator interface, which requires contiguous ranges for
key compares; if a serialized key crosses the end of the buffer, it must be
copied to the front to satisfy the aforementioned API spec. All of these are
retained; the role of each is largely unchanged.
The proposed design adds another parameter, the {{equator}} (while {{kvstart}}
and {{bufstart}} could be replaced with a single variable similar to
{{equator}} the effort seemed misspent). The record serialization moves
"forward" in the buffer, while the metadata are allocated in 16 byte blocks in
the opposite direction. This is illustrated in the following diagram:
!M64-0i.png|thumbnail!
The role played by kvoffsets and kvindices is preserved; logically,
particularly in the spill, each is interpreted in roughly the same way. In the
new code, the allocation is not static, but will instead expand with the
serialized records. This avoids degenerate cases for combiners and multilevel
merges (though not necessarily optimal performance).
Spills are triggered in two conditions: either the soft limit is reached
(collection proceeds concurrently with the spill) or a record is large enough
to require a spill before it can be written to the buffer (collection is
blocked). In the former case is illustrated here:
!M64-1i.png|thumbnail!
The {{equator}} is moved to an offset proportional to the average record size
(caveats
[above|https://issues.apache.org/jira/browse/MAPREDUCE-64?focusedCommentId=12765984&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_12765984]),
{{kvindex}} is moved off the equator, aligned with the end of the array (int
alignment, also so no metadata block will span the end of the array).
Collection proceeds again from the equator, growing toward the ends of the
spill. Should either run out of space, collection will block until the spill
completes. Note that there is no partially written data when the soft limit is
reached; it can only be triggered in collect, not in the blocking buffer.
The other case to consider is when record data are partially written into the
collection buffer, but the available space is exhausted:
!M64-2i.png|thumbnail!
Here, the equator is moved to the beginning of the partial record and
collection blocks. When the spill completes, the metadata are written off the
equator and serialization of the record can continue.
During collection, indices are adjusted only when holding a lock. As in the
current code, the lock is only obtained in collect when one of the possible
conditions for spilling may be satisfied. Since collect does not block, every
serialized record performs a zero-length write into the buffer once both the
key and value are written. This ensures that all the boundaries are checked and
that collection will block if necessary.
That's about it.
> Map-side sort is hampered by io.sort.record.percent
> ---------------------------------------------------
>
> Key: MAPREDUCE-64
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-64
> Project: Hadoop Map/Reduce
> Issue Type: Bug
> Reporter: Arun C Murthy
> Assignee: Chris Douglas
> Attachments: M64-0.patch, M64-0i.png, M64-1.patch, M64-1i.png,
> M64-2.patch, M64-2i.png, M64-3.patch
>
>
> Currently io.sort.record.percent is a fairly obscure, per-job configurable,
> expert-level parameter which controls how much accounting space is available
> for records in the map-side sort buffer (io.sort.mb). Typically values for
> io.sort.mb (100) and io.sort.record.percent (0.05) imply that we can store
> ~350,000 records in the buffer before necessitating a sort/combine/spill.
> However for many applications which deal with small records e.g. the
> world-famous wordcount and it's family this implies we can only use 5-10% of
> io.sort.mb i.e. (5-10M) before we spill inspite of having _much_ more memory
> available in the sort-buffer. The word-count for e.g. results in ~12 spills
> (given hdfs block size of 64M). The presence of a combiner exacerbates the
> problem by piling serialization/deserialization of records too...
> Sure, jobs can configure io.sort.record.percent, but it's tedious and
> obscure; we really can do better by getting the framework to automagically
> pick it by using all available memory (upto io.sort.mb) for either the data
> or accounting.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.