I do also have this problem. The total time for launching receivers seems related to the total number of executors. In my case, when I run 400 executors with 200 receivers, it takes about a minute for all receivers become active, but with 800 executors, it takes 3 minutes to activate all receivers.
I am running on YARN on EMR 4.7.2, with Spark 1.6.2. 2015-10-21 12:15 GMT-07:00 Budde, Adam <bu...@amazon.com>: > Hi all, > > My team uses Spark Streaming to implement the batch processing component > of a lambda architecture with 5 min intervals. We process roughly 15 TB/day > using three discrete Spark clusters and about 250 receivers per cluster. > We've been having some issues migrating our platform from Spark 1.4.x to > Spark 1.5.x. > > The first issue we've been having relates to receiver scheduling. Under > Spark 1.4.x, each receiver becomes active almost immediately and the > application quickly reaches its peak input throughput. Under the new > receiver scheduling mechanism introduced in Spark 1.5.x (SPARK-8882 > <https://issues.apache.org/jira/browse/SPARK-8882>) we see that it takes > quite a while for our receivers to become active. I haven't spent too much > time gathering hard numbers on this, but my estimate would be that it takes > over half an hour for half the receivers to become active and well over an > hour for all of them to become active. > > I spent some time digging into the code for the *ReceiverTracker*, > *ReceiverSchedulingPolicy*, and *ReceiverSupervisor* classes and > recompiling Spark with some added debug logging. As far as I can tell, this > is what is happening: > > - On program start, the ReceiverTracker RPC endpoint receives a > *StartAllReceivers* message via its own *launchReceivers()* method > (itself invoked by *start()*) > - The handler for StartAllReceivers invokes > *ReceiverSchedulingPolicy.scheduleReceivers()* to generate a desired > receiver to executor mapping and calls > *ReceiverTracker.startReceiver()* for each receiver > - *startReceiver()* uses the SparkContext to submit a job that creates > an instance of ReceiverSupervisorImpl to run the receiver on a random > executor > - While bootstrapping the receiver, the > *ReceiverSupervisorImpl.onReceiverStart()* sends a* RegisterReceiver* > message to the ReceiverTracker RPC endpoint > - The handler for RegisterReceiver checks if the randomly-selected > executor was the one the receiver was assigned to by > ReceiverSchedulingPolicy.scheduleReceivers() and fails the job if it > isn't > - ReceiverTracker restarts the failed receiver job and this process > continues until all receivers are assigned to their proper executor > > Assuming this order of operations is correct, I have the following > questions: > > 1. Is there any way to coerce SparkContext.submitJob() into scheduling > a job on a specific executor? Put another way, is there a mechanism we can > use to ensure that each receiver job is run on the executor it was assigned > to on the first call to ReceiverSchedulingPolicy.scheduleReceivers()? > 2. If (1) is not possible, is there anything we can do to speed up the > StartReceiver -> RegisterReceiver -> RestartReceiver loop? Right now, it > seems to take about 30-40 sec between attempts to invoke RegisterReceiver > on a given receiver. > > Thanks for the help! > > Adam >