[ 
https://issues.apache.org/jira/browse/YARN-3491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14497411#comment-14497411
 ] 

zhihai xu commented on YARN-3491:
---------------------------------

Hi [~sjlee0], that is a good point. I just think about queue.submit is the 
Bottleneck. Queue.submit is just part of the code in 
PublicLocalizer#addResource, the Bottleneck may come from 
publicRsrc.getPathForLocalization, we add a lot of stuff in 
LocalResourcesTrackerImpl#getPathForLocalization such as 
{{stateStore.startResourceLocalization(user, appId,
          ((LocalResourcePBImpl) lr).getProto(), localPath); }}

I should describe it more clearly. Based on the log, the issue is: 
PublicLocalizer#addResource is very slow, which blocks the Dispatcher thread, I 
looked at the following code at PublicLocalizer#addResource, I feel 
queue.submit may take most of CPU cycles, based on [~jlowe]'s and your comment, 
the slowness may come from other code such as publicRsrc.getPathForLocalization 
or dirsHandler.getLocalPathForWrite. But I think moving all these code in 
PublicLocalizer#addResource from Dispatcher thread to PublicLocalizer thread 
should be a good optimization. We can use a synchronizedList of 
LocalizerResourceRequestEvent to store all these events for public resource 
localization, which is similar as what LocalizerRunner does for private 
resource localization.
I will do some more profiling to see what is Bottleneck in 
PublicLocalizer#addResource,
{code}
    public void addResource(LocalizerResourceRequestEvent request) {
      // TODO handle failures, cancellation, requests by other containers
      LocalizedResource rsrc = request.getResource();
      LocalResourceRequest key = rsrc.getRequest();
      LOG.info("Downloading public rsrc:" + key);
      /*
       * Here multiple containers may request the same resource. So we need
       * to start downloading only when
       * 1) ResourceState == DOWNLOADING
       * 2) We are able to acquire non blocking semaphore lock.
       * If not we will skip this resource as either it is getting downloaded
       * or it FAILED / LOCALIZED.
       */

      if (rsrc.tryAcquire()) {
        if (rsrc.getState() == ResourceState.DOWNLOADING) {
          LocalResource resource = request.getResource().getRequest();
          try {
            Path publicRootPath =
                dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
                    + ContainerLocalizer.FILECACHE,
                  ContainerLocalizer.getEstimatedSize(resource), true);
            Path publicDirDestPath =
                publicRsrc.getPathForLocalization(key, publicRootPath);
            if (!publicDirDestPath.getParent().equals(publicRootPath)) {
              DiskChecker.checkDir(new 
File(publicDirDestPath.toUri().getPath()));
            }

            // In case this is not a newly initialized nm state, ensure
            // initialized local/log dirs similar to LocalizerRunner
            getInitializedLocalDirs();
            getInitializedLogDirs();

            // explicitly synchronize pending here to avoid future task
            // completing and being dequeued before pending updated
            synchronized (pending) {
              pending.put(queue.submit(new FSDownload(lfs, null, conf,
                  publicDirDestPath, resource, 
request.getContext().getStatCache())),
                  request);
            }
          } catch (IOException e) {
            rsrc.unlock();
            publicRsrc.handle(new ResourceFailedLocalizationEvent(request
              .getResource().getRequest(), e.getMessage()));
            LOG.error("Local path for public localization is not found. "
                + " May be disks failed.", e);
          } catch (IllegalArgumentException ie) {
            rsrc.unlock();
            publicRsrc.handle(new ResourceFailedLocalizationEvent(request
                .getResource().getRequest(), ie.getMessage()));
            LOG.error("Local path for public localization is not found. "
                + " Incorrect path. " + request.getResource().getRequest()
                .getPath(), ie);
          } catch (RejectedExecutionException re) {
            rsrc.unlock();
            publicRsrc.handle(new ResourceFailedLocalizationEvent(request
              .getResource().getRequest(), re.getMessage()));
            LOG.error("Failed to submit rsrc " + rsrc + " for download."
                + " Either queue is full or threadpool is shutdown.", re);
          }
        } else {
          rsrc.unlock();
        }
      }
    }
{code}



> Improve the public resource localization to do both FSDownload submission to 
> the thread pool and completed localization handling in one thread 
> (PublicLocalizer).
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: YARN-3491
>                 URL: https://issues.apache.org/jira/browse/YARN-3491
>             Project: Hadoop YARN
>          Issue Type: Improvement
>          Components: nodemanager
>    Affects Versions: 2.7.0
>            Reporter: zhihai xu
>            Assignee: zhihai xu
>            Priority: Critical
>
> Improve the public resource localization to do both FSDownload submission to 
> the thread pool and completed localization handling in one thread 
> (PublicLocalizer).
> Currently FSDownload submission to the thread pool is done in 
> PublicLocalizer#addResource which is running in Dispatcher thread and 
> completed localization handling is done in PublicLocalizer#run which is 
> running in PublicLocalizer thread.
> Because FSDownload submission to the thread pool at the following code is 
> time consuming, the thread pool can't be fully utilized. Instead of doing 
> public resource localization in parallel(multithreading), public resource 
> localization is serialized most of the time.
> {code}
>             synchronized (pending) {
>               pending.put(queue.submit(new FSDownload(lfs, null, conf,
>                   publicDirDestPath, resource, 
> request.getContext().getStatCache())),
>                   request);
>             }
> {code}
> Also there are two more benefits with this change:
> 1. The Dispatcher thread won't be blocked by above FSDownload submission. 
> Dispatcher thread handles most of time critical events at Node manager.
> 2. don't need synchronization on HashMap (pending).
> Because pending will be only accessed in PublicLocalizer thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to