Repository: spark
Updated Branches:
  refs/heads/master 1fd59c129 -> 3864480e1


[SPARK-25266][CORE] Fix memory leak in Barrier Execution Mode

## What changes were proposed in this pull request?

BarrierCoordinator uses Timer and TimerTask. `TimerTask#cancel()` is invoked in 
ContextBarrierState#cancelTimerTask but `Timer#purge()` is never invoked.

Once a TimerTask is scheduled, the reference to it is not released until 
`Timer#purge()` is invoked even though `TimerTask#cancel()` is invoked.

## How was this patch tested?

I checked the number of instances related to the TimerTask using jmap.

Closes #22258 from sarutak/fix-barrierexec-oom.

Authored-by: sarutak <saru...@oss.nttdata.co.jp>
Signed-off-by: Xiangrui Meng <m...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3864480e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3864480e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3864480e

Branch: refs/heads/master
Commit: 3864480e14a4961720cc1be43635c7c7dec08c09
Parents: 1fd59c1
Author: sarutak <saru...@oss.nttdata.co.jp>
Authored: Wed Aug 29 07:13:13 2018 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Wed Aug 29 07:13:13 2018 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/BarrierCoordinator.scala | 1 +
 core/src/main/scala/org/apache/spark/BarrierTaskContext.scala | 1 +
 2 files changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3864480e/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala 
b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
index 5e546c6..6439ca5 100644
--- a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
@@ -123,6 +123,7 @@ private[spark] class BarrierCoordinator(
     private def cancelTimerTask(): Unit = {
       if (timerTask != null) {
         timerTask.cancel()
+        timer.purge()
         timerTask = null
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/3864480e/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala 
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index de82798..3901f96 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -140,6 +140,7 @@ class BarrierTaskContext(
         throw e
     } finally {
       timerTask.cancel()
+      timer.purge()
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to