Github user szhem commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23083#discussion_r235769204
  
    --- Diff: 
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
    @@ -103,11 +116,26 @@ private[spark] class BlockStoreShuffleReader[K, C](
             
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
             context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
             
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
    +        val taskListener = new TaskCompletionListener {
    +          override def onTaskCompletion(context: TaskContext): Unit = 
sorter.stop()
    +        }
             // Use completion callback to stop sorter if task was 
finished/cancelled.
    -        context.addTaskCompletionListener[Unit](_ => {
    -          sorter.stop()
    -        })
    -        CompletionIterator[Product2[K, C], Iterator[Product2[K, 
C]]](sorter.iterator, sorter.stop())
    +        context.addTaskCompletionListener(taskListener)
    +        CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](
    +          sorter.iterator,
    +          {
    +            sorter.stop()
    +            // remove task completion listener as soon as the sorter stops 
to prevent holding
    +            // its references till the end of the task which may lead to 
memory leaks, for
    +            // example, in case of processing multiple 
ShuffledRDDPartitions by a single task
    +            // like in case of CoalescedRDD occurred after the ShuffledRDD 
in the same stage
    +            // (e.g. rdd.repartition(1000).coalesce(10));
    +            // note that holding sorter references till the end of the 
task also holds
    +            // references to PartitionedAppendOnlyMap and 
PartitionedPairBuffer too and these
    +            // ones may consume a significant part of the available memory
    +            context.remoteTaskCompletionListener(taskListener)
    --- End diff --
    
    Great question! Honestly speaking I don't have pretty good solution right 
now.
    TaskCompletionListener stops sorter in case of task failures, cancels, 
etc., i.e. in case of abnormal termination. In "happy path" case task 
completion listener is not needed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to