Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/6397#discussion_r31006798
--- Diff:
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -84,35 +86,40 @@ import org.apache.spark.storage.{BlockObjectWriter,
BlockId}
* each other for equality to merge values.
*
* - Users are expected to call stop() at the end to delete all the
intermediate files.
- *
- * As a special case, if no Ordering and no Aggregator is given, and the
number of partitions is
- * less than spark.shuffle.sort.bypassMergeThreshold, we bypass the
merge-sort and just write to
- * separate files for each partition each time we spill, similar to the
HashShuffleWriter. We can
- * then concatenate these files to produce a single sorted file, without
having to serialize and
- * de-serialize each item twice (as is needed during the merge). This
speeds up the map side of
- * groupBy, sort, etc operations since they do no partial aggregation.
*/
private[spark] class ExternalSorter[K, V, C](
aggregator: Option[Aggregator[K, V, C]] = None,
partitioner: Option[Partitioner] = None,
ordering: Option[Ordering[K]] = None,
serializer: Option[Serializer] = None)
- extends Logging with Spillable[WritablePartitionedPairCollection[K, C]] {
+ extends Logging
+ with Spillable[WritablePartitionedPairCollection[K, C]]
+ with SortShuffleFileWriter[K, V] {
+
+ private val conf = SparkEnv.get.conf
private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
private val shouldPartition = numPartitions > 1
+ private def getPartition(key: K): Int = {
+ if (shouldPartition) partitioner.get.getPartition(key) else 0
+ }
+
+ // Since SPARK-7855, bypassMergeSort optimization is no longer performed
as part of this class.
+ // As a sanity check, make sure that we're not handling a shuffle which
should use that path.
+ if (SortShuffleWriter.shouldBypassMergeSort(conf, numPartitions,
aggregator, ordering)) {
--- End diff --
This is called once per map task, so there's not an obvious performance
benefit to allowing users to disable assertions to skip this check.
My intent was for this to guard against misuse of this ExternalSorter for
cases that should have been handled by the bypassMergeSort path. The idea is
that the non-sort-shuffle uses of this sort will pass an ordering and thus will
not be eligible for the bypass, so any sort case which has no aggregation, no
key ordering, and few partitions shouldn't have gone through this path if it
had few partitions. One corner-case is situations where we haven't specified
an ordering, partitioner, or aggregator; in this case, I think that
`ExternalSorter` will effectively do nothing, so therefore we should treat that
configuration as an error. I'll look into adding some error-checking to detect
this case.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]