Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6397#discussion_r31005551
  
    --- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
    @@ -84,35 +86,40 @@ import org.apache.spark.storage.{BlockObjectWriter, 
BlockId}
      *   each other for equality to merge values.
      *
      * - Users are expected to call stop() at the end to delete all the 
intermediate files.
    - *
    - * As a special case, if no Ordering and no Aggregator is given, and the 
number of partitions is
    - * less than spark.shuffle.sort.bypassMergeThreshold, we bypass the 
merge-sort and just write to
    - * separate files for each partition each time we spill, similar to the 
HashShuffleWriter. We can
    - * then concatenate these files to produce a single sorted file, without 
having to serialize and
    - * de-serialize each item twice (as is needed during the merge). This 
speeds up the map side of
    - * groupBy, sort, etc operations since they do no partial aggregation.
      */
     private[spark] class ExternalSorter[K, V, C](
         aggregator: Option[Aggregator[K, V, C]] = None,
         partitioner: Option[Partitioner] = None,
         ordering: Option[Ordering[K]] = None,
         serializer: Option[Serializer] = None)
    -  extends Logging with Spillable[WritablePartitionedPairCollection[K, C]] {
    +  extends Logging
    +  with Spillable[WritablePartitionedPairCollection[K, C]]
    +  with SortShuffleFileWriter[K, V] {
    +
    +  private val conf = SparkEnv.get.conf
     
       private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
       private val shouldPartition = numPartitions > 1
    +  private def getPartition(key: K): Int = {
    +    if (shouldPartition) partitioner.get.getPartition(key) else 0
    +  }
    +
    +  // Since SPARK-7855, bypassMergeSort optimization is no longer performed 
as part of this class.
    +  // As a sanity check, make sure that we're not handling a shuffle which 
should use that path.
    +  if (SortShuffleWriter.shouldBypassMergeSort(conf, numPartitions, 
aggregator, ordering)) {
    --- End diff --
    
    maybe turn this into an assert with an error msg?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to