Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19041#discussion_r143819863
--- Diff:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
---
@@ -237,6 +246,43 @@ class BlockManagerMasterEndpoint(
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
}
+ private def replicateOneBlock(
+ execId: String,
+ blockId: BlockId,
+ excludeExecutors: Seq[String],
+ context: RpcCallContext): Unit = {
+ logDebug(s"replicating block $blockId")
+ val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+ val response: Option[Future[Boolean]] = for {
+ blockManagerId <- blockManagerIdByExecutor.get(execId)
+ info <- blockManagerInfo.get(blockManagerId)
+ replicaSet <- blockLocations.asScala.get(blockId)
+ replicas = replicaSet.toSeq
+ maxReps = replicaSet.size + 2
+ } yield info.slaveEndpoint.ask[Boolean](ReplicateBlock(blockId,
replicas, excluded, maxReps))
+
+ response.getOrElse(Future.successful(false)).foreach(context.reply)
+ }
+
+ private def getCachedBlocks(executorId: String): collection.Set[BlockId]
= {
--- End diff --
just `Set[BlockId]`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]