Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13932#discussion_r107316905
  
    --- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala ---
    @@ -88,26 +131,94 @@ class RandomBlockReplicationPolicy
         logDebug(s"Prioritized peers : ${prioritizedPeers.mkString(", ")}")
         prioritizedPeers
       }
    +}
    +
    +@DeveloperApi
    +class BasicBlockReplicationPolicy
    +  extends BlockReplicationPolicy
    +    with Logging {
     
    -  // scalastyle:off line.size.limit
       /**
    -   * Uses sampling algorithm by Robert Floyd. Finds a random sample in 
O(n) while
    -   * minimizing space usage. Please see <a 
href="http://math.stackexchange.com/questions/178690/whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin";>
    -   * here</a>.
    +   * Method to prioritize a bunch of candidate peers of a block manager. 
This implementation
    +   * replicates the behavior of block replication in HDFS, a peer is 
chosen within the rack,
    +   * one outside and that's it. This works best with a total replication 
factor of 3.
        *
    -   * @param n total number of indices
    -   * @param m number of samples needed
    -   * @param r random number generator
    -   * @return list of m random unique indices
    +   * @param blockManagerId    Id of the current BlockManager for self 
identification
    +   * @param peers             A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId           BlockId of the block being replicated. This 
can be used as a source of
    +   *                          randomness if needed.
    +   * @param numReplicas Number of peers we need to replicate to
    +   * @return A prioritized list of peers. Lower the index of a peer, 
higher its priority
        */
    -  // scalastyle:on line.size.limit
    -  private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
    -    val indices = (n - m + 1 to n).foldLeft(Set.empty[Int]) {case (set, i) 
=>
    -      val t = r.nextInt(i) + 1
    -      if (set.contains(t)) set + i else set + t
    +  override def prioritize(
    +      blockManagerId: BlockManagerId,
    +      peers: Seq[BlockManagerId],
    +      peersReplicatedTo: mutable.HashSet[BlockManagerId],
    +      blockId: BlockId,
    +      numReplicas: Int): List[BlockManagerId] = {
    +
    +    logDebug(s"Input peers : $peers")
    +    logDebug(s"BlockManagerId : $blockManagerId")
    +
    +    val random = new Random(blockId.hashCode)
    +
    +    // if block doesn't have topology info, we can't do much, so we 
randlomly shuffle
    +    // if there is, we see what's needed from peersReplicatedTo and based 
on numReplicas,
    +    // we choose whats needed
    +    if (blockManagerId.topologyInfo.isEmpty || numReplicas == 0) {
    +      // no topology info for the block. The best we can do is randomly 
choose peers
    +      BlockReplicationUtils.getRandomSample(peers, numReplicas, random)
    +    } else {
    +      // we have topology information, we see what is left to be done from 
peersReplicatedTo
    +      val doneWithinRack = peersReplicatedTo.exists(_.topologyInfo == 
blockManagerId.topologyInfo)
    +      val doneOutsideRack = peersReplicatedTo.exists { p =>
    +        p.topologyInfo.isDefined && p.topologyInfo != 
blockManagerId.topologyInfo
    +      }
    +
    +      if (doneOutsideRack && doneWithinRack) {
    +        // we are done, we just return a random sample
    --- End diff --
    
    what? I think this branch is where we should do smart replication


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to