Repository: spark Updated Branches: refs/heads/branch-1.0 fcb375026 -> 214f90ee7
SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers `var cachedPeers: Seq[BlockManagerId] = null` is used in `def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel)` without proper protection. There are two place will call `replicate(blockId, bytesAfterPut, level)` * https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L644 runs in `connectionManager.futureExecContext` * https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L752 `doPut` runs in `connectionManager.handleMessageExecutor`. `org.apache.spark.storage.BlockManagerWorker` calls `blockManager.putBytes` in `connectionManager.handleMessageExecutor`. As they run in different `Executor`s, this is a race condition which may cause the memory pointed by `cachedPeers` is not correct even if `cachedPeers != null`. The race condition of `onReceiveCallback` is that it's set in `BlockManagerWorker` but read in a different thread in `ConnectionManager.handleMessageExecutor`. Author: zsxwing <zsxw...@gmail.com> Closes #887 from zsxwing/SPARK-1932 and squashes the following commits: 524f69c [zsxwing] SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers (cherry picked from commit 549830b0db2c8b069391224f3a73bb0d7f397f71) Signed-off-by: Aaron Davidson <aa...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/214f90ee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/214f90ee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/214f90ee Branch: refs/heads/branch-1.0 Commit: 214f90ee7910fada1dc58ecc95be0a83a1356a2d Parents: fcb3750 Author: zsxwing <zsxw...@gmail.com> Authored: Mon May 26 23:17:39 2014 -0700 Committer: Aaron Davidson <aa...@databricks.com> Committed: Mon May 26 23:17:50 2014 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/network/ConnectionManager.scala | 3 ++- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/214f90ee/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index dcbbc18..5dd5fd0 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -93,7 +93,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, implicit val futureExecContext = ExecutionContext.fromExecutor( Utils.newDaemonCachedThreadPool("Connection manager future execution context")) - private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null + @volatile + private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null private val authEnabled = securityManager.isAuthenticationEnabled() http://git-wip-us.apache.org/repos/asf/spark/blob/214f90ee/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6534095..6e45008 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -772,7 +772,7 @@ private[spark] class BlockManager( /** * Replicate block to another node. */ - var cachedPeers: Seq[BlockManagerId] = null + @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) { val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)