pgandhi999 opened a new pull request #23677: [SPARK-26755] : Optimize Spark 
Scheduler to dequeue speculative tasks…
URL: https://github.com/apache/spark/pull/23677
 
 
   … more efficiently
   
   Currently, Spark Scheduler takes quite some time to dequeue speculative 
tasks for larger tasksets within a stage(like 100000 or more) when speculation 
is turned on. On further analysis, it was found that the "task-result-getter" 
threads remain blocked on one of the dispatcher-event-loop threads holding the 
lock on TaskSchedulerImpl object
   `def resourceOffers(offers: IndexedSeq[WorkerOffer]): 
Seq[Seq[TaskDescription]] = synchronized {`
   which takes quite some time to execute the method  "dequeueSpeculativeTask" 
in TaskSetManager.scala, thus, slowing down the overall running time of the 
spark job. We were monitoring the time utilization of that lock for the whole 
duration of the job and it was close to 50% i.e. the code within the 
synchronized block would run for almost half the duration of the entire spark 
job. The screenshots of the thread dump have been attached below for reference.
   
   <img width="1419" alt="screen shot 2019-01-28 at 11 21 05 am" 
src="https://user-images.githubusercontent.com/22228190/51872726-17fea200-2320-11e9-99d6-de0db664a463.png";>
   
   <img width="1426" alt="screen shot 2019-01-28 at 11 21 25 am" 
src="https://user-images.githubusercontent.com/22228190/51872731-1d5bec80-2320-11e9-8b09-240be2ad09cd.png";>
   
   <img width="1429" alt="screen shot 2019-01-28 at 11 22 42 am" 
src="https://user-images.githubusercontent.com/22228190/51872736-2220a080-2320-11e9-9b5d-6b4131894bdd.png";>
   
   
   ## What changes were proposed in this pull request?
   
   Have split the main queue "speculatableTasks" into 5 separate queues based 
on locality preference similar to how normal tasks are enqueued. Thus, the 
"dequeueSpeculativeTask" method will avoid performing locality checks for each 
task at runtime and simply return the preferable task to be executed.
   
   ## How was this patch tested?
   We ran a spark job that performed a join on a 10 TB dataset to test the code 
change.
   Original Code:
   <img width="1433" alt="screen shot 2019-01-28 at 5 07 22 pm" 
src="https://user-images.githubusercontent.com/22228190/51872483-3b751d00-231f-11e9-8430-050f8970f2ab.png";>
   
   Optimized Code:
   <img width="1435" alt="screen shot 2019-01-28 at 5 08 19 pm" 
src="https://user-images.githubusercontent.com/22228190/51872510-55aefb00-231f-11e9-99b8-f3e70330d8fe.png";>
   
   As you can see, the run time of the ShuffleMapStage came down from 40 min to 
6 min approximately, thus, reducing the overall running time of the spark job 
by a significant amount.
   
   Another example for the same job:
   
   Original Code:
   <img width="1440" alt="screen shot 2019-01-28 at 5 11 30 pm" 
src="https://user-images.githubusercontent.com/22228190/51872620-cbb36200-231f-11e9-9652-0c4a3e36daeb.png";>
   
   Optimized Code:
   <img width="1440" alt="screen shot 2019-01-28 at 5 12 16 pm" 
src="https://user-images.githubusercontent.com/22228190/51872664-e4237c80-231f-11e9-87b8-a84b62f36bcc.png";>
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to