Github user brad-kaiser commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r156458851 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -240,6 +248,30 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + private def replicateLatestRDDBlock( + execId: String, + excludeExecutors: Seq[String], + context: RpcCallContext): Unit = { + logDebug(s"Replicating first cached block on $execId") + val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get) + val response: Option[Future[Option[RDDBlockId]]] = for { + blockManagerId <- blockManagerIdByExecutor.get(execId) + info <- blockManagerInfo.get(blockManagerId) + blocks = info.cachedBlocks.collect { case r: RDDBlockId => r } + // we assume blocks from the latest rdd are most relevant + firstBlock <- if (blocks.isEmpty) None else Some(blocks.max[RDDBlockId](Ordering.by(_.rddId))) + replicaSet <- blockLocations.asScala.get(firstBlock) + replicas = replicaSet.toSeq + // Add 2 below because you need the number of replicas, plus one for the original, plus one + // for the new replica. + maxReps = replicaSet.size + 2 + } yield info.slaveEndpoint + .ask[Boolean](ReplicateBlock(firstBlock, replicas, excluded, maxReps)) + .map( success => if (success) Some(firstBlock) else None) --- End diff -- good point, I've merged replicateLatestRDDBlock and removeBlockFromExecutor
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org