[ https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15037381#comment-15037381 ]
NING DING commented on YARN-4398: --------------------------------- [~jianhe], thank you. > Yarn recover functionality causes the cluster running slowly and the cluster > usage rate is far below 100 > -------------------------------------------------------------------------------------------------------- > > Key: YARN-4398 > URL: https://issues.apache.org/jira/browse/YARN-4398 > Project: Hadoop YARN > Issue Type: Bug > Components: resourcemanager > Affects Versions: 2.7.1 > Reporter: NING DING > Assignee: NING DING > Fix For: 2.7.3 > > Attachments: YARN-4398.2.patch, YARN-4398.3.patch, YARN-4398.4.patch > > > In my hadoop cluster, the resourceManager recover functionality is enabled > with FileSystemRMStateStore. > I found this cause the yarn cluster running slowly and cluster usage rate is > just 50 even there are many pending Apps. > The scenario is below. > In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling > storeNewApplication method defined in RMStateStore. This storeNewApplication > method is synchronized. > {code:title=RMAppImpl.java|borderStyle=solid} > private static final class RMAppNewlySavingTransition extends > RMAppTransition { > @Override > public void transition(RMAppImpl app, RMAppEvent event) { > // If recovery is enabled then store the application information in a > // non-blocking call so make sure that RM has stored the information > // needed to restart the AM after RM restart without further client > // communication > LOG.info("Storing application with id " + app.applicationId); > app.rmContext.getStateStore().storeNewApplication(app); > } > } > {code} > {code:title=RMStateStore.java|borderStyle=solid} > public synchronized void storeNewApplication(RMApp app) { > ApplicationSubmissionContext context = app > > .getApplicationSubmissionContext(); > assert context instanceof ApplicationSubmissionContextPBImpl; > ApplicationStateData appState = > ApplicationStateData.newInstance( > app.getSubmitTime(), app.getStartTime(), context, app.getUser()); > dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); > } > {code} > In thread B, the FileSystemRMStateStore is calling > storeApplicationStateInternal method. It's also synchronized. > This storeApplicationStateInternal method saves an ApplicationStateData into > HDFS and it normally costs 90~300 milliseconds in my hadoop cluster. > {code:title=FileSystemRMStateStore.java|borderStyle=solid} > public synchronized void storeApplicationStateInternal(ApplicationId appId, > ApplicationStateData appStateDataPB) throws Exception { > Path appDirPath = getAppDir(rmAppRoot, appId); > mkdirsWithRetries(appDirPath); > Path nodeCreatePath = getNodePath(appDirPath, appId.toString()); > LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); > byte[] appStateData = appStateDataPB.getProto().toByteArray(); > try { > // currently throw all exceptions. May need to respond differently for > HA > // based on whether we have lost the right to write to FS > writeFileWithRetries(nodeCreatePath, appStateData, true); > } catch (Exception e) { > LOG.info("Error storing info for app: " + appId, e); > throw e; > } > } > {code} > Think thread B firstly comes into > FileSystemRMStateStore.storeApplicationStateInternal method, then thread A > will be blocked for a while because of synchronization. In ResourceManager > there is only one RMStateStore instance. In my cluster it's > FileSystemRMStateStore type. > Debug the RMAppNewlySavingTransition.transition method, the thread stack > shows it's called form AsyncDispatcher.dispatch method. This method code is > as below. > {code:title=AsyncDispatcher.java|borderStyle=solid} > protected void dispatch(Event event) { > //all events go thru this loop > if (LOG.isDebugEnabled()) { > LOG.debug("Dispatching the event " + event.getClass().getName() + "." > + event.toString()); > } > Class<? extends Enum> type = event.getType().getDeclaringClass(); > try{ > EventHandler handler = eventDispatchers.get(type); > if(handler != null) { > handler.handle(event); > } else { > throw new Exception("No handler for registered for " + type); > } > } catch (Throwable t) { > //TODO Maybe log the state of the queue > LOG.fatal("Error in dispatcher thread", t); > // If serviceStop is called, we should exit this thread gracefully. > if (exitOnDispatchException > && (ShutdownHookManager.get().isShutdownInProgress()) == false > && stopped == false) { > Thread shutDownThread = new Thread(createShutDownThread()); > shutDownThread.setName("AsyncDispatcher ShutDown handler"); > shutDownThread.start(); > } > } > } > {code} > Above code shows AsyncDispatcher.dispatch method can process different type > events. > In fact this AsyncDispatcher instance is just ResourceManager.rmDispatcher > created in ResourceManager.serviceInit method. > You can find many eventTypes and handlers are registered in > ResourceManager.rmDispatcher. > In above scenario thread B blocks thread A, then many following events > processing are blocked. > In my testing cluster, there is only one queue and the client submits 1000 > applications concurrently, the yarn cluster usage rate is 50. Many apps are > pending. If I disable resourceManager recover functionality, the cluster > usage can be 100. > To solve this issue, I removed synchronized modifier on some methods defined > in RMStateStore. > Instead, in these methods I defined a dedicated lock object before calling > dispatcher.getEventHandler().handle. > In this way, the yarn cluster usage rate can be 100 and the whole cluster is > good running. > Please see my attached patch. -- This message was sent by Atlassian JIRA (v6.3.4#6332)