Github user brad-kaiser commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19041#discussion_r156458851
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
    @@ -240,6 +248,30 @@ class BlockManagerMasterEndpoint(
         blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
       }
     
    +  private def replicateLatestRDDBlock(
    +      execId: String,
    +      excludeExecutors: Seq[String],
    +      context: RpcCallContext): Unit = {
    +    logDebug(s"Replicating first cached block on $execId")
    +    val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
    +    val response: Option[Future[Option[RDDBlockId]]] = for {
    +      blockManagerId <- blockManagerIdByExecutor.get(execId)
    +      info <- blockManagerInfo.get(blockManagerId)
    +      blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
    +      // we assume blocks from the latest rdd are most relevant
    +      firstBlock <- if (blocks.isEmpty) None else 
Some(blocks.max[RDDBlockId](Ordering.by(_.rddId)))
    +      replicaSet <- blockLocations.asScala.get(firstBlock)
    +      replicas = replicaSet.toSeq
    +      // Add 2 below because you need the number of replicas, plus one for 
the original, plus one
    +      // for the new replica.
    +      maxReps = replicaSet.size + 2
    +    } yield info.slaveEndpoint
    +      .ask[Boolean](ReplicateBlock(firstBlock, replicas, excluded, 
maxReps))
    +      .map( success => if (success) Some(firstBlock) else None)
    --- End diff --
    
    good point, I've merged replicateLatestRDDBlock and removeBlockFromExecutor


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to