Github user szhem commented on a diff in the pull request:
https://github.com/apache/spark/pull/23083#discussion_r235769204
--- Diff:
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
@@ -103,11 +116,26 @@ private[spark] class BlockStoreShuffleReader[K, C](
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
+ val taskListener = new TaskCompletionListener {
+ override def onTaskCompletion(context: TaskContext): Unit =
sorter.stop()
+ }
// Use completion callback to stop sorter if task was
finished/cancelled.
- context.addTaskCompletionListener[Unit](_ => {
- sorter.stop()
- })
- CompletionIterator[Product2[K, C], Iterator[Product2[K,
C]]](sorter.iterator, sorter.stop())
+ context.addTaskCompletionListener(taskListener)
+ CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](
+ sorter.iterator,
+ {
+ sorter.stop()
+ // remove task completion listener as soon as the sorter stops
to prevent holding
+ // its references till the end of the task which may lead to
memory leaks, for
+ // example, in case of processing multiple
ShuffledRDDPartitions by a single task
+ // like in case of CoalescedRDD occurred after the ShuffledRDD
in the same stage
+ // (e.g. rdd.repartition(1000).coalesce(10));
+ // note that holding sorter references till the end of the
task also holds
+ // references to PartitionedAppendOnlyMap and
PartitionedPairBuffer too and these
+ // ones may consume a significant part of the available memory
+ context.remoteTaskCompletionListener(taskListener)
--- End diff --
Great question! Honestly speaking I don't have pretty good solution right
now.
TaskCompletionListener stops sorter in case of task failures, cancels,
etc., i.e. in case of abnormal termination. In "happy path" case task
completion listener is not needed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]