Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6607#discussion_r31785926
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 ---
    @@ -308,12 +305,41 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
             supervisor.start()
             supervisor.awaitTermination()
           }
    +
           // Run the dummy Spark job to ensure that all slaves have registered.
           // This avoids all the receivers to be scheduled on the same node.
           if (!ssc.sparkContext.isLocal) {
             ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 
1)).reduceByKey(_ + _, 20).collect()
           }
     
    +      // Right now, we only honor preferences if all receivers have them
    +      val hasLocationPreferences = 
receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
    +
    +      // If no location preferences are specified, set host location for 
each receiver
    +      // so as to distribute them evenly over executors in a round-robin 
fashion
    +      var roundRobin = false;
    +      if (!hasLocationPreferences && !ssc.sparkContext.isLocal) {
    +        val executors = getExecutors(ssc)
    +        if (!executors.isEmpty) {
    +          var i = 0;
    +          for (i <- 0 to (receivers.length - 1)) {
    +            receivers(i).host = Some(executors(i % executors.length))
    +          }
    +          roundRobin = true
    +        }
    +      }
    +
    +      val tempRDD =
    +        if (hasLocationPreferences) {
    --- End diff --
    
    The right way to allocate hosts to each receiver that takes in to account 
the preferred locations on some of them isthe following. 
    1. Lets say there are 10 receivers, with 2 receivers, #5 and #9 has 
preferred location set. 
    2. Lets say there are 25 executors
    3. Maintain an array of type Array[ArrayBuffer[String]] of size 10. Lets 
call it `locations`. Each ArrayBuffer is empty at first.
    4. First allocate all the locations of receivers that already have them. So 
for `locations[5] ++= receiver[5].preferredLocation` and same for receiver 9.
    5. Then distribute all the executors among all the other receivers. Yes, 
there can be multiple hosts per receiver. This helps to ensure that if one of 
the executors fail, the task will use other allocated hosts, and wont end up in 
an executor that is already running a receiver.
    
    Now you can use `makeRDD(receivers, locations)`, no further conditions.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to