Repository: spark
Updated Branches:
  refs/heads/branch-1.0 fcb375026 -> 214f90ee7


SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers

`var cachedPeers: Seq[BlockManagerId] = null` is used in `def 
replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel)` without 
proper protection.

There are two place will call `replicate(blockId, bytesAfterPut, level)`
* 
https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L644
 runs in `connectionManager.futureExecContext`
* 
https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L752
 `doPut` runs in `connectionManager.handleMessageExecutor`. 
`org.apache.spark.storage.BlockManagerWorker` calls `blockManager.putBytes` in 
`connectionManager.handleMessageExecutor`.

As they run in different `Executor`s, this is a race condition which may cause 
the memory pointed by `cachedPeers` is not correct even if `cachedPeers != 
null`.

The race condition of `onReceiveCallback` is that it's set in 
`BlockManagerWorker` but read in a different thread in 
`ConnectionManager.handleMessageExecutor`.

Author: zsxwing <zsxw...@gmail.com>

Closes #887 from zsxwing/SPARK-1932 and squashes the following commits:

524f69c [zsxwing] SPARK-1932: Fix race conditions in onReceiveCallback and 
cachedPeers

(cherry picked from commit 549830b0db2c8b069391224f3a73bb0d7f397f71)
Signed-off-by: Aaron Davidson <aa...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/214f90ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/214f90ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/214f90ee

Branch: refs/heads/branch-1.0
Commit: 214f90ee7910fada1dc58ecc95be0a83a1356a2d
Parents: fcb3750
Author: zsxwing <zsxw...@gmail.com>
Authored: Mon May 26 23:17:39 2014 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Mon May 26 23:17:50 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/network/ConnectionManager.scala   | 3 ++-
 core/src/main/scala/org/apache/spark/storage/BlockManager.scala   | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/214f90ee/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala 
b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index dcbbc18..5dd5fd0 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -93,7 +93,8 @@ private[spark] class ConnectionManager(port: Int, conf: 
SparkConf,
   implicit val futureExecContext = ExecutionContext.fromExecutor(
     Utils.newDaemonCachedThreadPool("Connection manager future execution 
context"))
 
-  private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => 
Option[Message]= null
+  @volatile
+  private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => 
Option[Message] = null
 
   private val authEnabled = securityManager.isAuthenticationEnabled()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/214f90ee/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6534095..6e45008 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -772,7 +772,7 @@ private[spark] class BlockManager(
   /**
    * Replicate block to another node.
    */
-  var cachedPeers: Seq[BlockManagerId] = null
+  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: 
StorageLevel) {
     val tLevel = StorageLevel(
       level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)

Reply via email to