Github user advancedxy commented on a diff in the pull request:
https://github.com/apache/spark/pull/23083#discussion_r235299656
--- Diff:
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
@@ -103,11 +116,26 @@ private[spark] class BlockStoreShuffleReader[K, C](
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
+ val taskListener = new TaskCompletionListener {
+ override def onTaskCompletion(context: TaskContext): Unit =
sorter.stop()
+ }
// Use completion callback to stop sorter if task was
finished/cancelled.
- context.addTaskCompletionListener[Unit](_ => {
- sorter.stop()
- })
- CompletionIterator[Product2[K, C], Iterator[Product2[K,
C]]](sorter.iterator, sorter.stop())
+ context.addTaskCompletionListener(taskListener)
+ CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](
+ sorter.iterator,
+ {
+ sorter.stop()
+ // remove task completion listener as soon as the sorter stops
to prevent holding
+ // its references till the end of the task which may lead to
memory leaks, for
+ // example, in case of processing multiple
ShuffledRDDPartitions by a single task
+ // like in case of CoalescedRDD occurred after the ShuffledRDD
in the same stage
+ // (e.g. rdd.repartition(1000).coalesce(10));
+ // note that holding sorter references till the end of the
task also holds
+ // references to PartitionedAppendOnlyMap and
PartitionedPairBuffer too and these
+ // ones may consume a significant part of the available memory
+ context.remoteTaskCompletionListener(taskListener)
--- End diff --
Nice catch.
Liked I said in the above, do we have another way to remove reference to
sorter?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]