spark git commit: [SPARK-4393] Fix memory leak in ConnectionManager ACK timeout TimerTasks; use HashedWheelTimer (For branch-1.1)

2014-11-18 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 aa9ebdaa2 - 91b5fa824


[SPARK-4393] Fix memory leak in ConnectionManager ACK timeout TimerTasks; use 
HashedWheelTimer (For branch-1.1)

This patch is intended to fix a subtle memory leak in ConnectionManager's ACK 
timeout TimerTasks: in the old code, each TimerTask held a reference to the 
message being sent and a cancelled TimerTask won't necessarily be 
garbage-collected until it's scheduled to run, so this caused huge buildups of 
messages that weren't garbage collected until their timeouts expired, leading 
to OOMs.

This patch addresses this problem by capturing only the message ID in the 
TimerTask instead of the whole message, and by keeping a WeakReference to the 
promise in the TimerTask. I've also modified this code to use Netty's 
HashedWheelTimer, whose performance characteristics should be better for this 
use-case.

Author: Kousuke Saruta saru...@oss.nttdata.co.jp

Closes #3321 from sarutak/connection-manager-timeout-bugfix and squashes the 
following commits:

786af91 [Kousuke Saruta] Fixed memory leak issue of ConnectionManager


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/91b5fa82
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/91b5fa82
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/91b5fa82

Branch: refs/heads/branch-1.1
Commit: 91b5fa82477e5fd43712fdf067d92a31d4037a83
Parents: aa9ebda
Author: Kousuke Saruta saru...@oss.nttdata.co.jp
Authored: Tue Nov 18 12:09:18 2014 -0800
Committer: Josh Rosen joshro...@databricks.com
Committed: Tue Nov 18 12:09:18 2014 -0800

--
 .../spark/network/ConnectionManager.scala   | 52 +++-
 1 file changed, 39 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/91b5fa82/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala 
b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 578d806..6d58129 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -18,11 +18,11 @@
 package org.apache.spark.network
 
 import java.io.IOException
+import java.lang.ref.WeakReference
 import java.nio._
 import java.nio.channels._
 import java.nio.channels.spi._
 import java.net._
-import java.util.{Timer, TimerTask}
 import java.util.concurrent.atomic.AtomicInteger
 
 import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor}
@@ -37,6 +37,8 @@ import scala.concurrent.{Await, ExecutionContext, Future, 
Promise}
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
+import io.netty.util.{Timeout, TimerTask, HashedWheelTimer}
+
 import org.apache.spark._
 import org.apache.spark.util.{SystemClock, Utils}
 
@@ -68,7 +70,8 @@ private[spark] class ConnectionManager(
   }
 
   private val selector = SelectorProvider.provider.openSelector()
-  private val ackTimeoutMonitor = new Timer(AckTimeoutMonitor, true)
+  private val ackTimeoutMonitor =
+new HashedWheelTimer(Utils.namedThreadFactory(AckTimeoutMonitor))
 
   // default to 30 second timeout waiting for authentication
   private val authTimeout = 
conf.getInt(spark.core.connection.auth.wait.timeout, 30)
@@ -105,7 +108,10 @@ private[spark] class ConnectionManager(
 new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, 
Connection]
   private val connectionsById = new HashMap[ConnectionManagerId, 
SendingConnection]
 with SynchronizedMap[ConnectionManagerId, SendingConnection]
-  private val messageStatuses = new HashMap[Int, MessageStatus]
+  // Tracks sent messages for which we are awaiting acknowledgements.  Entries 
are added to this
+  // map when messages are sent and are removed when acknowledgement messages 
are received or when
+  // acknowledgement timeouts expire
+  private val messageStatuses = new HashMap[Int, MessageStatus]  // 
[MessageId, MessageStatus]
   private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, 
Int)]
   private val registerRequests = new SynchronizedQueue[SendingConnection]
 
@@ -846,20 +852,41 @@ private[spark] class ConnectionManager(
   : Future[Message] = {
 val promise = Promise[Message]()
 
-val timeoutTask = new TimerTask {
-  override def run(): Unit = {
+// It's important that the TimerTask doesn't capture a reference to 
`message`, which can cause
+// memory leaks since cancelled TimerTasks won't necessarily be garbage 
collected until the time
+// at which they would originally be scheduled to run.  Therefore, extract 
the message id
+// from 

spark git commit: [SPARK-4393] Fix memory leak in ConnectionManager ACK timeout TimerTasks; use HashedWheelTimer

2014-11-16 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 84468b2e2 - 7850e0c70


[SPARK-4393] Fix memory leak in ConnectionManager ACK timeout TimerTasks; use 
HashedWheelTimer

This patch is intended to fix a subtle memory leak in ConnectionManager's ACK 
timeout TimerTasks: in the old code, each TimerTask held a reference to the 
message being sent and a cancelled TimerTask won't necessarily be 
garbage-collected until it's scheduled to run, so this caused huge buildups of 
messages that weren't garbage collected until their timeouts expired, leading 
to OOMs.

This patch addresses this problem by capturing only the message ID in the 
TimerTask instead of the whole message, and by keeping a WeakReference to the 
promise in the TimerTask.  I've also modified this code to use Netty's 
HashedWheelTimer, whose performance characteristics should be better for this 
use-case.

Thanks to cristianopris for narrowing down this issue!

Author: Josh Rosen joshro...@databricks.com

Closes #3259 from JoshRosen/connection-manager-timeout-bugfix and squashes the 
following commits:

afcc8d6 [Josh Rosen] Address rxin's review feedback.
2a2e92d [Josh Rosen] Keep only WeakReference to promise in TimerTask;
0f0913b [Josh Rosen] Spelling fix: timout = timeout
3200c33 [Josh Rosen] Use Netty HashedWheelTimer
f847dd4 [Josh Rosen] Don't capture entire message in ACK timeout task.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7850e0c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7850e0c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7850e0c7

Branch: refs/heads/master
Commit: 7850e0c707affd5eafd570fb43716753396cf479
Parents: 84468b2
Author: Josh Rosen joshro...@databricks.com
Authored: Sun Nov 16 00:44:15 2014 -0800
Committer: Reynold Xin r...@databricks.com
Committed: Sun Nov 16 00:44:15 2014 -0800

--
 .../spark/network/nio/ConnectionManager.scala   | 47 +++-
 1 file changed, 35 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7850e0c7/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala 
b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index f198aa8..df4b085 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -18,13 +18,13 @@
 package org.apache.spark.network.nio
 
 import java.io.IOException
+import java.lang.ref.WeakReference
 import java.net._
 import java.nio._
 import java.nio.channels._
 import java.nio.channels.spi._
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.{LinkedBlockingDeque, ThreadPoolExecutor, TimeUnit}
-import java.util.{Timer, TimerTask}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, 
SynchronizedMap, SynchronizedQueue}
 import scala.concurrent.duration._
@@ -32,6 +32,7 @@ import scala.concurrent.{Await, ExecutionContext, Future, 
Promise}
 import scala.language.postfixOps
 
 import com.google.common.base.Charsets.UTF_8
+import io.netty.util.{Timeout, TimerTask, HashedWheelTimer}
 
 import org.apache.spark._
 import org.apache.spark.network.sasl.{SparkSaslClient, SparkSaslServer}
@@ -77,7 +78,8 @@ private[nio] class ConnectionManager(
   }
 
   private val selector = SelectorProvider.provider.openSelector()
-  private val ackTimeoutMonitor = new Timer(AckTimeoutMonitor, true)
+  private val ackTimeoutMonitor =
+new HashedWheelTimer(Utils.namedThreadFactory(AckTimeoutMonitor))
 
   private val ackTimeout = 
conf.getInt(spark.core.connection.ack.wait.timeout, 60)
 
@@ -139,7 +141,10 @@ private[nio] class ConnectionManager(
 new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, 
Connection]
   private val connectionsById = new HashMap[ConnectionManagerId, 
SendingConnection]
 with SynchronizedMap[ConnectionManagerId, SendingConnection]
-  private val messageStatuses = new HashMap[Int, MessageStatus]
+  // Tracks sent messages for which we are awaiting acknowledgements.  Entries 
are added to this
+  // map when messages are sent and are removed when acknowledgement messages 
are received or when
+  // acknowledgement timeouts expire
+  private val messageStatuses = new HashMap[Int, MessageStatus]  // 
[MessageId, MessageStatus]
   private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, 
Int)]
   private val registerRequests = new SynchronizedQueue[SendingConnection]
 
@@ -899,22 +904,41 @@ private[nio] class ConnectionManager(
   : Future[Message] = {
 val promise = Promise[Message]()
 
-val 

spark git commit: [SPARK-4393] Fix memory leak in ConnectionManager ACK timeout TimerTasks; use HashedWheelTimer

2014-11-16 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 24287014f - 2200de635


[SPARK-4393] Fix memory leak in ConnectionManager ACK timeout TimerTasks; use 
HashedWheelTimer

This patch is intended to fix a subtle memory leak in ConnectionManager's ACK 
timeout TimerTasks: in the old code, each TimerTask held a reference to the 
message being sent and a cancelled TimerTask won't necessarily be 
garbage-collected until it's scheduled to run, so this caused huge buildups of 
messages that weren't garbage collected until their timeouts expired, leading 
to OOMs.

This patch addresses this problem by capturing only the message ID in the 
TimerTask instead of the whole message, and by keeping a WeakReference to the 
promise in the TimerTask.  I've also modified this code to use Netty's 
HashedWheelTimer, whose performance characteristics should be better for this 
use-case.

Thanks to cristianopris for narrowing down this issue!

Author: Josh Rosen joshro...@databricks.com

Closes #3259 from JoshRosen/connection-manager-timeout-bugfix and squashes the 
following commits:

afcc8d6 [Josh Rosen] Address rxin's review feedback.
2a2e92d [Josh Rosen] Keep only WeakReference to promise in TimerTask;
0f0913b [Josh Rosen] Spelling fix: timout = timeout
3200c33 [Josh Rosen] Use Netty HashedWheelTimer
f847dd4 [Josh Rosen] Don't capture entire message in ACK timeout task.

(cherry picked from commit 7850e0c707affd5eafd570fb43716753396cf479)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2200de63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2200de63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2200de63

Branch: refs/heads/branch-1.2
Commit: 2200de6352fdc1000908554003912303edc3d160
Parents: 2428701
Author: Josh Rosen joshro...@databricks.com
Authored: Sun Nov 16 00:44:15 2014 -0800
Committer: Reynold Xin r...@databricks.com
Committed: Sun Nov 16 00:44:31 2014 -0800

--
 .../spark/network/nio/ConnectionManager.scala   | 47 +++-
 1 file changed, 35 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2200de63/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala 
b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index f198aa8..df4b085 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -18,13 +18,13 @@
 package org.apache.spark.network.nio
 
 import java.io.IOException
+import java.lang.ref.WeakReference
 import java.net._
 import java.nio._
 import java.nio.channels._
 import java.nio.channels.spi._
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.{LinkedBlockingDeque, ThreadPoolExecutor, TimeUnit}
-import java.util.{Timer, TimerTask}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, 
SynchronizedMap, SynchronizedQueue}
 import scala.concurrent.duration._
@@ -32,6 +32,7 @@ import scala.concurrent.{Await, ExecutionContext, Future, 
Promise}
 import scala.language.postfixOps
 
 import com.google.common.base.Charsets.UTF_8
+import io.netty.util.{Timeout, TimerTask, HashedWheelTimer}
 
 import org.apache.spark._
 import org.apache.spark.network.sasl.{SparkSaslClient, SparkSaslServer}
@@ -77,7 +78,8 @@ private[nio] class ConnectionManager(
   }
 
   private val selector = SelectorProvider.provider.openSelector()
-  private val ackTimeoutMonitor = new Timer(AckTimeoutMonitor, true)
+  private val ackTimeoutMonitor =
+new HashedWheelTimer(Utils.namedThreadFactory(AckTimeoutMonitor))
 
   private val ackTimeout = 
conf.getInt(spark.core.connection.ack.wait.timeout, 60)
 
@@ -139,7 +141,10 @@ private[nio] class ConnectionManager(
 new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, 
Connection]
   private val connectionsById = new HashMap[ConnectionManagerId, 
SendingConnection]
 with SynchronizedMap[ConnectionManagerId, SendingConnection]
-  private val messageStatuses = new HashMap[Int, MessageStatus]
+  // Tracks sent messages for which we are awaiting acknowledgements.  Entries 
are added to this
+  // map when messages are sent and are removed when acknowledgement messages 
are received or when
+  // acknowledgement timeouts expire
+  private val messageStatuses = new HashMap[Int, MessageStatus]  // 
[MessageId, MessageStatus]
   private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, 
Int)]
   private val registerRequests = new SynchronizedQueue[SendingConnection]
 
@@ -899,22 +904,41 @@