[ 
https://issues.apache.org/jira/browse/YARN-3491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14497085#comment-14497085
 ] 

zhihai xu commented on YARN-3491:
---------------------------------

Hi [~jlowe], thanks for the comment. Queueing is faster, but It take longer 
time to add FSDownload to the worker thread.
If all threads in the thread pool are used, it will be very fast to add an 
entry to the queue LinkedBlockingQueue#offer.
Based on the following code in ThreadPoolExecutor#execute, corePoolSize is 
thread pool size which is 4 in this case.
workQueue.offer(command) is fast but addWorker is slow. It only queues the task 
when all threads in the thread pool are running.
{code}
   public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
{code}

The issue is:
If the time to run one FSDownload(resource localization) is close to the time 
to run the submit(add FSDownload to the worker thread).
The oscillation will happen and there will be only one worker thread running. 
Then Dispatcher thread will be blocked for longer time.
The above logs can prove this situation. LocalizerRunner#addResource used by 
private localizer takes less than one millisecond to process one 
REQUEST_RESOURCE_LOCALIZATION event but PublicLocalizer#addResource used by 
public localizer takes 124 millisecond to process one 
REQUEST_RESOURCE_LOCALIZATION  event.


> Improve the public resource localization to do both FSDownload submission to 
> the thread pool and completed localization handling in one thread 
> (PublicLocalizer).
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: YARN-3491
>                 URL: https://issues.apache.org/jira/browse/YARN-3491
>             Project: Hadoop YARN
>          Issue Type: Improvement
>          Components: nodemanager
>    Affects Versions: 2.7.0
>            Reporter: zhihai xu
>            Assignee: zhihai xu
>            Priority: Critical
>
> Improve the public resource localization to do both FSDownload submission to 
> the thread pool and completed localization handling in one thread 
> (PublicLocalizer).
> Currently FSDownload submission to the thread pool is done in 
> PublicLocalizer#addResource which is running in Dispatcher thread and 
> completed localization handling is done in PublicLocalizer#run which is 
> running in PublicLocalizer thread.
> Because FSDownload submission to the thread pool at the following code is 
> time consuming, the thread pool can't be fully utilized. Instead of doing 
> public resource localization in parallel(multithreading), public resource 
> localization is serialized most of the time.
> {code}
>             synchronized (pending) {
>               pending.put(queue.submit(new FSDownload(lfs, null, conf,
>                   publicDirDestPath, resource, 
> request.getContext().getStatCache())),
>                   request);
>             }
> {code}
> Also there are two more benefits with this change:
> 1. The Dispatcher thread won't be blocked by above FSDownload submission. 
> Dispatcher thread handles most of time critical events at Node manager.
> 2. don't need synchronization on HashMap (pending).
> Because pending will be only accessed in PublicLocalizer thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to