Github user brad-kaiser commented on a diff in the pull request:
https://github.com/apache/spark/pull/19041#discussion_r176159874
--- Diff:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
---
@@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint(
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
}
+ private def recoverLatestRDDBlock(
+ execId: String,
+ excludeExecutors: Seq[String],
+ context: RpcCallContext): Unit = {
+ logDebug(s"Replicating first cached block on $execId")
+ val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+ val response: Option[Future[Boolean]] = for {
+ blockManagerId <- blockManagerIdByExecutor.get(execId)
+ info <- blockManagerInfo.get(blockManagerId)
+ blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
+ // we assume blocks from the latest rdd are most relevant
+ firstBlock <- if (blocks.isEmpty) None else
Some(blocks.max[RDDBlockId](Ordering.by(_.rddId)))
+ replicaSet <- blockLocations.asScala.get(firstBlock)
+ // Add 2 below because you need the number of replicas, plus one for
the original, plus one
+ // for the new replica.
+ maxReps = replicaSet.size + 2
+ } yield info.slaveEndpoint
+ .ask[Boolean](ReplicateBlock(firstBlock, replicaSet.toSeq, excluded,
maxReps))
+ .flatMap { success =>
+ if (success) {
+ logTrace(s"Replicated block $firstBlock on executor $execId")
+ replicaSet -= blockManagerId
+ info.slaveEndpoint.ask[Boolean](RemoveBlock(firstBlock))
--- End diff --
Thanks for catching this. I updated this function so we remove the block
from BlockManagerMasterEndpoint's records before we ask the slave to delete the
block. If the executor fails to remove the block for some reason it shouldn't
matter because it will get shut down soon anyway.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]