[
https://issues.apache.org/jira/browse/YARN-3491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14497411#comment-14497411
]
zhihai xu commented on YARN-3491:
---------------------------------
Hi [~sjlee0], that is a good point. I just think about queue.submit is the
Bottleneck. Queue.submit is just part of the code in
PublicLocalizer#addResource, the Bottleneck may come from
publicRsrc.getPathForLocalization, we add a lot of stuff in
LocalResourcesTrackerImpl#getPathForLocalization such as
{{stateStore.startResourceLocalization(user, appId,
((LocalResourcePBImpl) lr).getProto(), localPath); }}
I should describe it more clearly. Based on the log, the issue is:
PublicLocalizer#addResource is very slow, which blocks the Dispatcher thread, I
looked at the following code at PublicLocalizer#addResource, I feel
queue.submit may take most of CPU cycles, based on [~jlowe]'s and your comment,
the slowness may come from other code such as publicRsrc.getPathForLocalization
or dirsHandler.getLocalPathForWrite. But I think moving all these code in
PublicLocalizer#addResource from Dispatcher thread to PublicLocalizer thread
should be a good optimization. We can use a synchronizedList of
LocalizerResourceRequestEvent to store all these events for public resource
localization, which is similar as what LocalizerRunner does for private
resource localization.
I will do some more profiling to see what is Bottleneck in
PublicLocalizer#addResource,
{code}
public void addResource(LocalizerResourceRequestEvent request) {
// TODO handle failures, cancellation, requests by other containers
LocalizedResource rsrc = request.getResource();
LocalResourceRequest key = rsrc.getRequest();
LOG.info("Downloading public rsrc:" + key);
/*
* Here multiple containers may request the same resource. So we need
* to start downloading only when
* 1) ResourceState == DOWNLOADING
* 2) We are able to acquire non blocking semaphore lock.
* If not we will skip this resource as either it is getting downloaded
* or it FAILED / LOCALIZED.
*/
if (rsrc.tryAcquire()) {
if (rsrc.getState() == ResourceState.DOWNLOADING) {
LocalResource resource = request.getResource().getRequest();
try {
Path publicRootPath =
dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
+ ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
Path publicDirDestPath =
publicRsrc.getPathForLocalization(key, publicRootPath);
if (!publicDirDestPath.getParent().equals(publicRootPath)) {
DiskChecker.checkDir(new
File(publicDirDestPath.toUri().getPath()));
}
// In case this is not a newly initialized nm state, ensure
// initialized local/log dirs similar to LocalizerRunner
getInitializedLocalDirs();
getInitializedLogDirs();
// explicitly synchronize pending here to avoid future task
// completing and being dequeued before pending updated
synchronized (pending) {
pending.put(queue.submit(new FSDownload(lfs, null, conf,
publicDirDestPath, resource,
request.getContext().getStatCache())),
request);
}
} catch (IOException e) {
rsrc.unlock();
publicRsrc.handle(new ResourceFailedLocalizationEvent(request
.getResource().getRequest(), e.getMessage()));
LOG.error("Local path for public localization is not found. "
+ " May be disks failed.", e);
} catch (IllegalArgumentException ie) {
rsrc.unlock();
publicRsrc.handle(new ResourceFailedLocalizationEvent(request
.getResource().getRequest(), ie.getMessage()));
LOG.error("Local path for public localization is not found. "
+ " Incorrect path. " + request.getResource().getRequest()
.getPath(), ie);
} catch (RejectedExecutionException re) {
rsrc.unlock();
publicRsrc.handle(new ResourceFailedLocalizationEvent(request
.getResource().getRequest(), re.getMessage()));
LOG.error("Failed to submit rsrc " + rsrc + " for download."
+ " Either queue is full or threadpool is shutdown.", re);
}
} else {
rsrc.unlock();
}
}
}
{code}
> Improve the public resource localization to do both FSDownload submission to
> the thread pool and completed localization handling in one thread
> (PublicLocalizer).
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: YARN-3491
> URL: https://issues.apache.org/jira/browse/YARN-3491
> Project: Hadoop YARN
> Issue Type: Improvement
> Components: nodemanager
> Affects Versions: 2.7.0
> Reporter: zhihai xu
> Assignee: zhihai xu
> Priority: Critical
>
> Improve the public resource localization to do both FSDownload submission to
> the thread pool and completed localization handling in one thread
> (PublicLocalizer).
> Currently FSDownload submission to the thread pool is done in
> PublicLocalizer#addResource which is running in Dispatcher thread and
> completed localization handling is done in PublicLocalizer#run which is
> running in PublicLocalizer thread.
> Because FSDownload submission to the thread pool at the following code is
> time consuming, the thread pool can't be fully utilized. Instead of doing
> public resource localization in parallel(multithreading), public resource
> localization is serialized most of the time.
> {code}
> synchronized (pending) {
> pending.put(queue.submit(new FSDownload(lfs, null, conf,
> publicDirDestPath, resource,
> request.getContext().getStatCache())),
> request);
> }
> {code}
> Also there are two more benefits with this change:
> 1. The Dispatcher thread won't be blocked by above FSDownload submission.
> Dispatcher thread handles most of time critical events at Node manager.
> 2. don't need synchronization on HashMap (pending).
> Because pending will be only accessed in PublicLocalizer thread.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)