Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r165066153
  
    --- Diff: 
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
    @@ -104,9 +104,18 @@ private[spark] class BlockStoreShuffleReader[K, C](
             
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
             context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
             
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
    +        // Use completion callback to stop sorter if task was cancelled.
    +        context.addTaskCompletionListener(tc => {
    +          // Note: we only stop sorter if cancelled as sorter.stop 
wouldn't be called in
    +          // CompletionIterator. Another way would be making sorter.stop 
idempotent.
    +          if (tc.isInterrupted()) { sorter.stop() }
    --- End diff --
    
    One advantage of `CompletionIterator` is that the `completionFunction` will 
be called as soon as the wrapped iterator is consumed. So for sorter, it will 
release memory earlier rather than at task completion.
    
    As for job cancelling, It's not just `CompletionIterator` that we should 
consider. The combiner and sorter pattern(or similar) is something we should 
look for:
    ``` scala
    combiner.insertAll(iterator) // or sorter.insertAll(iterator)
    // then returns new iterator
    combiner.iterator // or sorter.iterator
    ```



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to