Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6607#discussion_r31837712
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 ---
    @@ -271,27 +272,64 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
         }
     
         /**
    -     * Get the receivers from the ReceiverInputDStreams, distributes them 
to the
    -     * worker nodes as a parallel collection, and runs them.
    +     * Get the list of executors excluding driver
          */
    -    private def startReceivers() {
    -      val receivers = receiverInputStreams.map(nis => {
    -        val rcvr = nis.getReceiver()
    -        rcvr.setReceiverId(nis.id)
    -        rcvr
    -      })
    +    private def getExecutors(ssc: StreamingContext): List[String] = {
    +      val executors = 
ssc.sparkContext.getExecutorMemoryStatus.map(_._1.split(":")(0)).toList
    +      val driver = ssc.sparkContext.getConf.get("spark.driver.host")
    +      executors.diff(List(driver))
    +    }
     
    -      // Right now, we only honor preferences if all receivers have them
    +    /* Schedule receivers using preferredLocation if specified
    +     * and round-robin otherwise
    +     */
    +    private def scheduleReceivers(receivers: Seq[Receiver[_]]): 
RDD[Receiver[_]] = {
    +      // Location preferences are honored if all receivers have them
           val hasLocationPreferences = 
receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
     
    -      // Create the parallel collection of receivers to distributed them 
on the worker nodes
    +      // If no location preferences are specified, set host location for 
each receiver
    +      // so as to distribute them evenly over executors in a round-robin 
fashion
    +      // If num_executors > num_receivers, distribute executors among 
receivers
    +      val locations = new Array[ArrayBuffer[String]](receivers.length)
    --- End diff --
    
    Do you really need this to be an `Array[ArrayBuffer]`? Couldn't this just 
be `Array[Seq]`? You could just wrap the executor in a `Seq` when adding it to 
the array. So you could do `locations(i) = Seq(executor(index))` instead of 
first allocating an `ArrayBuffer` first etc. Is there a case where a receiver 
could have more than 1 location specified? I don't really see one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to