Thank you all for the responses and feedback. I just checked the code and looks like as Reynold already mentioned, if we change below data structures
private[scheduler] val stageIdToStage = new HashMap[Int, Stage] val jobIds = new HashSet[Int] private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] to avoid concurrency issues, it should be good enough. I would like to work on this, so i will open a JIRA and update with a PR very soon. We can continue discussion on the JIRA Regards Ajith From: Reynold Xin [r...@databricks.com] Sent: Wednesday, March 07, 2018 2:47 AM To: Shivaram Venkataraman Cc: Ryan Blue; Ajith shetty; dev@spark.apache.org Subject: Re: [Spark][Scheduler] Spark DAGScheduler scheduling performance hindered on JobSubmitted Event It's mostly just hash maps from some ids to some state, and those can be replaced just with concurrent hash maps? (I haven't actually looked at code and am just guessing based on recollection.) On Tue, Mar 6, 2018 at 10:42 AM, Shivaram Venkataraman <shiva...@eecs.berkeley.edu<mailto:shiva...@eecs.berkeley.edu>> wrote: The problem with doing work in the callsite thread is that there are a number of data structures that are updated during job submission and these data structures are guarded by the event loop ensuring only one thread accesses them. I dont think there is a very easy fix for this given the structure of the DAGScheduler. Thanks Shivaram On Tue, Mar 6, 2018 at 8:53 AM, Ryan Blue <rb...@netflix.com.invalid> wrote: > I agree with Reynold. We don't need to use a separate pool, which would have > the problem you raised about FIFO. We just need to do the planning outside > of the scheduler loop. The call site thread sounds like a reasonable place > to me. > > On Mon, Mar 5, 2018 at 12:56 PM, Reynold Xin > <r...@databricks.com<mailto:r...@databricks.com>> wrote: >> >> Rather than using a separate thread pool, perhaps we can just move the >> prep code to the call site thread? >> >> >> On Sun, Mar 4, 2018 at 11:15 PM, Ajith shetty >> <ajith.she...@huawei.com<mailto:ajith.she...@huawei.com>> >> wrote: >>> >>> DAGScheduler becomes a bottleneck in cluster when multiple JobSubmitted >>> events has to be processed as DAGSchedulerEventProcessLoop is single >>> threaded and it will block other tasks in queue like TaskCompletion. >>> >>> The JobSubmitted event is time consuming depending on the nature of the >>> job (Example: calculating parent stage dependencies, shuffle dependencies, >>> partitions) and thus it blocks all the events to be processed. >>> >>> >>> >>> I see multiple JIRA referring to this behavior >>> >>> https://issues.apache.org/jira/browse/SPARK-2647 >>> >>> https://issues.apache.org/jira/browse/SPARK-4961 >>> >>> >>> >>> Similarly in my cluster some jobs partition calculation is time consuming >>> (Similar to stack at SPARK-2647) hence it slows down the spark >>> DAGSchedulerEventProcessLoop which results in user jobs to slowdown, even if >>> its tasks are finished within seconds, as TaskCompletion Events are >>> processed at a slower rate due to blockage. >>> >>> >>> >>> I think we can split a JobSubmitted Event into 2 events >>> >>> Step 1. JobSubmittedPreperation - Runs in separate thread on >>> JobSubmission, this will involve steps >>> org.apache.spark.scheduler.DAGScheduler#createResultStage >>> >>> Step 2. JobSubmittedExecution - If Step 1 is success, fire an event to >>> DAGSchedulerEventProcessLoop and let it process output of >>> org.apache.spark.scheduler.DAGScheduler#createResultStage >>> >>> >>> >>> I can see the effect of doing this may be that Job Submissions may not be >>> FIFO depending on how much time Step 1 mentioned above is going to consume. >>> >>> >>> >>> Does above solution suffice for the problem described? And is there any >>> other side effect of this solution? >>> >>> >>> >>> Regards >>> >>> Ajith >> >> > > > > -- > Ryan Blue > Software Engineer > Netflix