Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19041#discussion_r155383283
--- Diff:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
---
@@ -240,6 +248,30 @@ class BlockManagerMasterEndpoint(
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
}
+ private def replicateLatestRDDBlock(
+ execId: String,
+ excludeExecutors: Seq[String],
+ context: RpcCallContext): Unit = {
+ logDebug(s"Replicating first cached block on $execId")
+ val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+ val response: Option[Future[Option[RDDBlockId]]] = for {
+ blockManagerId <- blockManagerIdByExecutor.get(execId)
+ info <- blockManagerInfo.get(blockManagerId)
+ blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
+ // we assume blocks from the latest rdd are most relevant
+ firstBlock <- if (blocks.isEmpty) None else
Some(blocks.max[RDDBlockId](Ordering.by(_.rddId)))
+ replicaSet <- blockLocations.asScala.get(firstBlock)
+ replicas = replicaSet.toSeq
+ // Add 2 below because you need the number of replicas, plus one for
the original, plus one
+ // for the new replica.
+ maxReps = replicaSet.size + 2
+ } yield info.slaveEndpoint
+ .ask[Boolean](ReplicateBlock(firstBlock, replicas, excluded,
maxReps))
+ .map( success => if (success) Some(firstBlock) else None)
--- End diff --
`.map { success =>`
Instead of making the driver make an explicit call to
`removeBlockFromExecutor`, couldn't you chain that action here, and have a
single RPC from the driver do replication + drop the block?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]