Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20449#discussion_r165066153 --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala --- @@ -104,9 +104,18 @@ private[spark] class BlockStoreShuffleReader[K, C]( context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) + // Use completion callback to stop sorter if task was cancelled. + context.addTaskCompletionListener(tc => { + // Note: we only stop sorter if cancelled as sorter.stop wouldn't be called in + // CompletionIterator. Another way would be making sorter.stop idempotent. + if (tc.isInterrupted()) { sorter.stop() } --- End diff -- One advantage of `CompletionIterator` is that the `completionFunction` will be called as soon as the wrapped iterator is consumed. So for sorter, it will release memory earlier rather than at task completion. As for job cancelling, It's not just `CompletionIterator` that we should consider. The combiner and sorter pattern(or similar) is something we should look for: ``` scala combiner.insertAll(iterator) // or sorter.insertAll(iterator) // then returns new iterator combiner.iterator // or sorter.iterator ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org