Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/13932#discussion_r107833257
--- Diff:
core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala ---
@@ -88,26 +129,96 @@ class RandomBlockReplicationPolicy
logDebug(s"Prioritized peers : ${prioritizedPeers.mkString(", ")}")
prioritizedPeers
}
+}
+
+@DeveloperApi
+class BasicBlockReplicationPolicy
+ extends BlockReplicationPolicy
+ with Logging {
- // scalastyle:off line.size.limit
/**
- * Uses sampling algorithm by Robert Floyd. Finds a random sample in
O(n) while
- * minimizing space usage. Please see <a
href="http://math.stackexchange.com/questions/178690/whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin">
- * here</a>.
+ * Method to prioritize a bunch of candidate peers of a block manager.
This implementation
+ * replicates the behavior of block replication in HDFS. For a given
number of replicas needed,
+ * we choose a peer within the rack, one outside and remaining
blockmanagers are chosen at
+ * random, in that order till we meet the number of replicas needed.
+ * This works best with a total replication factor of 3, like HDFS.
*
- * @param n total number of indices
- * @param m number of samples needed
- * @param r random number generator
- * @return list of m random unique indices
+ * @param blockManagerId Id of the current BlockManager for self
identification
+ * @param peers A list of peers of a BlockManager
+ * @param peersReplicatedTo Set of peers already replicated to
+ * @param blockId BlockId of the block being replicated. This
can be used as a source of
+ * randomness if needed.
+ * @param numReplicas Number of peers we need to replicate to
+ * @return A prioritized list of peers. Lower the index of a peer,
higher its priority
*/
- // scalastyle:on line.size.limit
- private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
- val indices = (n - m + 1 to n).foldLeft(Set.empty[Int]) {case (set, i)
=>
- val t = r.nextInt(i) + 1
- if (set.contains(t)) set + i else set + t
+ override def prioritize(
+ blockManagerId: BlockManagerId,
+ peers: Seq[BlockManagerId],
+ peersReplicatedTo: mutable.HashSet[BlockManagerId],
+ blockId: BlockId,
+ numReplicas: Int): List[BlockManagerId] = {
+
+ logDebug(s"Input peers : $peers")
+ logDebug(s"BlockManagerId : $blockManagerId")
+
+ val random = new Random(blockId.hashCode)
+
+ // if block doesn't have topology info, we can't do much, so we
randomly shuffle
+ // if there is, we see what's needed from peersReplicatedTo and based
on numReplicas,
+ // we choose whats needed
+ if (blockManagerId.topologyInfo.isEmpty || numReplicas == 0) {
+ // no topology info for the block. The best we can do is randomly
choose peers
+ BlockReplicationUtils.getRandomSample(peers, numReplicas, random)
+ } else {
+ // we have topology information, we see what is left to be done from
peersReplicatedTo
+ val doneWithinRack = peersReplicatedTo.exists(_.topologyInfo ==
blockManagerId.topologyInfo)
+ val doneOutsideRack = peersReplicatedTo.exists { p =>
--- End diff --
actually we can also use `.partition(_.topologyInfo ==
blockManagerId.topologyInfo)` to get the `doneWithinRack` and `doneOutsideRack`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]