Github user brad-kaiser commented on a diff in the pull request:
https://github.com/apache/spark/pull/19041#discussion_r155045840
--- Diff:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
---
@@ -237,6 +246,43 @@ class BlockManagerMasterEndpoint(
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
}
+ private def replicateOneBlock(
+ execId: String,
+ blockId: BlockId,
+ excludeExecutors: Seq[String],
+ context: RpcCallContext): Unit = {
+ logDebug(s"replicating block $blockId")
+ val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+ val response: Option[Future[Boolean]] = for {
+ blockManagerId <- blockManagerIdByExecutor.get(execId)
+ info <- blockManagerInfo.get(blockManagerId)
+ replicaSet <- blockLocations.asScala.get(blockId)
+ replicas = replicaSet.toSeq
+ maxReps = replicaSet.size + 2
+ } yield info.slaveEndpoint.ask[Boolean](ReplicateBlock(blockId,
replicas, excluded, maxReps))
+
+ response.getOrElse(Future.successful(false)).foreach(context.reply)
+ }
+
+ private def getCachedBlocks(executorId: String): collection.Set[BlockId]
= {
+ val cachedBlocks = for {
+ blockManagerId <- blockManagerIdByExecutor.get(executorId)
+ info <- blockManagerInfo.get(blockManagerId)
+ } yield info.cachedBlocks
+
+ cachedBlocks.getOrElse(Set.empty)
+ }
+
+ private def getSizeOfBlocks(blockMap: Map[String, Set[RDDBlockId]]):
Map[String, Long] = for {
--- End diff --
removed
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]