Github user JoshRosen commented on the pull request:
https://github.com/apache/spark/pull/5868#issuecomment-98888713
Based on some additional discussions, I think that we should use
specialized sorter implementation that is specific to sort-based shuffle and
design a separate sorter for more general-purpose record sorting. By using a
specialized sorter, we can benefit from several performance optimizations that
would be difficult to implement in a more general-purpose sorter:
- We may not need a full 32-bits to record the partition id; since shuffles
are unlikely to have more than, say, 10 million partitions, we can use fewer
bits to encode the partition id, allowing us to pack it into a single word that
uses the remaining bits to store the record pointer (exploiting the fact that
we can choose a reasonable upper bound on our addressable memory (say 1
terabyte) and can save address bits due to word-alignment). This should give
us excellent cache-locality benefits, since the sort array will only require 8
bytes of space per record.
- Because sort-shuffle's sort keys are partition ids, we can expect to
encounter long runs of records with the same sort key in the sorted file; this
may not be the case in a more general-purpose sort, where there might be cases
in which all of the sort keys are distinct. As a result, this allows external
sort to use a specialized merge procedure that operates on runs of records with
the same key rather than individual records.
- Ignoring the spilling/merging case for a moment, the sort shuffle writer
only needs to know individual records' lengths during the in-memory sort: we
need to know a record's length in order to copy it to the sorted output file,
but once a partition's records are adjacent in the output file we no longer
need to know their individual lengths. This is a consequence of the fact that
the sorted data will be consumed by a serializer that knows how to identify
record boundaries, building on our assumption that the serializer supports
reordering of serialized records in its serialization stream.
- If we go further and assume that our IO compression codec supports
concatenation of compressed data (which [Snappy seems to
support](https://snappy.googlecode.com/svn/trunk/framing_format.txt)), then we
can implement an IO-efficient merging procedure for external sort. If we store
an index file for each spill file, identifying the offsets of each partition
within the sorted file, then the merge sort procedure can simply traverse these
indicies and concatenate the partitions' serialized data without interpreting
it. This would let us use methods like `transferTo` to implement copying
without requiring data to be buffered in the JVM, sidestepping the complexity
of managing IO buffers during the merge.
In light of this, I'm going to work on refactoring my external sorting
branch to perform these optimizations and will update this pull request with
those changes. I'm going to remove the SPARK-7081 and SPARK-7078 JIRA links,
since those JIRAs seem to be concerned with a more general-purpose record
sorter for use in SQL joins.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]