Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/23083#discussion_r235769204 --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala --- @@ -103,11 +116,26 @@ private[spark] class BlockStoreShuffleReader[K, C]( context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) + val taskListener = new TaskCompletionListener { + override def onTaskCompletion(context: TaskContext): Unit = sorter.stop() + } // Use completion callback to stop sorter if task was finished/cancelled. - context.addTaskCompletionListener[Unit](_ => { - sorter.stop() - }) - CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) + context.addTaskCompletionListener(taskListener) + CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]]( + sorter.iterator, + { + sorter.stop() + // remove task completion listener as soon as the sorter stops to prevent holding + // its references till the end of the task which may lead to memory leaks, for + // example, in case of processing multiple ShuffledRDDPartitions by a single task + // like in case of CoalescedRDD occurred after the ShuffledRDD in the same stage + // (e.g. rdd.repartition(1000).coalesce(10)); + // note that holding sorter references till the end of the task also holds + // references to PartitionedAppendOnlyMap and PartitionedPairBuffer too and these + // ones may consume a significant part of the available memory + context.remoteTaskCompletionListener(taskListener) --- End diff -- Great question! Honestly speaking I don't have pretty good solution right now. TaskCompletionListener stops sorter in case of task failures, cancels, etc., i.e. in case of abnormal termination. In "happy path" case task completion listener is not needed.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org