Thank you all for the responses and feedback. I just checked the code and looks 
like as Reynold already mentioned, if we change below data structures

private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
val jobIds = new HashSet[Int]
private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]

to avoid concurrency issues, it should be good enough.

I would like to work on this, so i will open a JIRA and update with a PR very 
soon. We can continue discussion on the JIRA

Regards
Ajith

From: Reynold Xin [r...@databricks.com]
Sent: Wednesday, March 07, 2018 2:47 AM
To: Shivaram Venkataraman
Cc: Ryan Blue; Ajith shetty; dev@spark.apache.org
Subject: Re: [Spark][Scheduler] Spark DAGScheduler scheduling performance 
hindered on JobSubmitted Event

It's mostly just hash maps from some ids to some state, and those can be 
replaced just with concurrent hash maps?

(I haven't actually looked at code and am just guessing based on recollection.)

On Tue, Mar 6, 2018 at 10:42 AM, Shivaram Venkataraman 
<shiva...@eecs.berkeley.edu<mailto:shiva...@eecs.berkeley.edu>> wrote:
The problem with doing work in the callsite thread is that there are a
number of data structures that are updated during job submission and
these data structures are guarded by the event loop ensuring only one
thread accesses them.  I dont think there is a very easy fix for this
given the structure of the DAGScheduler.

Thanks
Shivaram

On Tue, Mar 6, 2018 at 8:53 AM, Ryan Blue <rb...@netflix.com.invalid> wrote:
> I agree with Reynold. We don't need to use a separate pool, which would have
> the problem you raised about FIFO. We just need to do the planning outside
> of the scheduler loop. The call site thread sounds like a reasonable place
> to me.
>
> On Mon, Mar 5, 2018 at 12:56 PM, Reynold Xin 
> <r...@databricks.com<mailto:r...@databricks.com>> wrote:
>>
>> Rather than using a separate thread pool, perhaps we can just move the
>> prep code to the call site thread?
>>
>>
>> On Sun, Mar 4, 2018 at 11:15 PM, Ajith shetty 
>> <ajith.she...@huawei.com<mailto:ajith.she...@huawei.com>>
>> wrote:
>>>
>>> DAGScheduler becomes a bottleneck in cluster when multiple JobSubmitted
>>> events has to be processed as DAGSchedulerEventProcessLoop is single
>>> threaded and it will block other tasks in queue like TaskCompletion.
>>>
>>> The JobSubmitted event is time consuming depending on the nature of the
>>> job (Example: calculating parent stage dependencies, shuffle dependencies,
>>> partitions) and thus it blocks all the events to be processed.
>>>
>>>
>>>
>>> I see multiple JIRA referring to this behavior
>>>
>>> https://issues.apache.org/jira/browse/SPARK-2647
>>>
>>> https://issues.apache.org/jira/browse/SPARK-4961
>>>
>>>
>>>
>>> Similarly in my cluster some jobs partition calculation is time consuming
>>> (Similar to stack at SPARK-2647) hence it slows down the spark
>>> DAGSchedulerEventProcessLoop which results in user jobs to slowdown, even if
>>> its tasks are finished within seconds, as TaskCompletion Events are
>>> processed at a slower rate due to blockage.
>>>
>>>
>>>
>>> I think we can split a JobSubmitted Event into 2 events
>>>
>>> Step 1. JobSubmittedPreperation - Runs in separate thread on
>>> JobSubmission, this will involve steps
>>> org.apache.spark.scheduler.DAGScheduler#createResultStage
>>>
>>> Step 2. JobSubmittedExecution - If Step 1 is success, fire an event to
>>> DAGSchedulerEventProcessLoop and let it process output of
>>> org.apache.spark.scheduler.DAGScheduler#createResultStage
>>>
>>>
>>>
>>> I can see the effect of doing this may be that Job Submissions may not be
>>> FIFO depending on how much time Step 1 mentioned above is going to consume.
>>>
>>>
>>>
>>> Does above solution suffice for the problem described? And is there any
>>> other side effect of this solution?
>>>
>>>
>>>
>>> Regards
>>>
>>> Ajith
>>
>>
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix

Reply via email to