[ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jian He updated YARN-4398:
--------------------------
    Assignee: NING DING

> Yarn recover functionality causes the cluster running slowly and the cluster 
> usage rate is far below 100
> --------------------------------------------------------------------------------------------------------
>
>                 Key: YARN-4398
>                 URL: https://issues.apache.org/jira/browse/YARN-4398
>             Project: Hadoop YARN
>          Issue Type: Bug
>          Components: resourcemanager
>    Affects Versions: 2.7.1
>            Reporter: NING DING
>            Assignee: NING DING
>         Attachments: YARN-4398.2.patch, YARN-4398.3.patch, YARN-4398.4.patch
>
>
> In my hadoop cluster, the resourceManager recover functionality is enabled 
> with FileSystemRMStateStore.
> I found this cause the yarn cluster running slowly and cluster usage rate is 
> just 50 even there are many pending Apps. 
> The scenario is below.
> In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
> storeNewApplication method defined in RMStateStore. This storeNewApplication 
> method is synchronized.
> {code:title=RMAppImpl.java|borderStyle=solid}
>   private static final class RMAppNewlySavingTransition extends 
> RMAppTransition {
>     @Override
>     public void transition(RMAppImpl app, RMAppEvent event) {
>       // If recovery is enabled then store the application information in a
>       // non-blocking call so make sure that RM has stored the information
>       // needed to restart the AM after RM restart without further client
>       // communication
>       LOG.info("Storing application with id " + app.applicationId);
>       app.rmContext.getStateStore().storeNewApplication(app);
>     }
>   }
> {code}
> {code:title=RMStateStore.java|borderStyle=solid}
> public synchronized void storeNewApplication(RMApp app) {
>     ApplicationSubmissionContext context = app
>                                             
> .getApplicationSubmissionContext();
>     assert context instanceof ApplicationSubmissionContextPBImpl;
>     ApplicationStateData appState =
>         ApplicationStateData.newInstance(
>             app.getSubmitTime(), app.getStartTime(), context, app.getUser());
>     dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
>   }
> {code}
> In thread B, the FileSystemRMStateStore is calling 
> storeApplicationStateInternal method. It's also synchronized.
> This storeApplicationStateInternal method saves an ApplicationStateData into 
> HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
> {code:title=FileSystemRMStateStore.java|borderStyle=solid}
> public synchronized void storeApplicationStateInternal(ApplicationId appId,
>       ApplicationStateData appStateDataPB) throws Exception {
>     Path appDirPath = getAppDir(rmAppRoot, appId);
>     mkdirsWithRetries(appDirPath);
>     Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
>     LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
>     byte[] appStateData = appStateDataPB.getProto().toByteArray();
>     try {
>       // currently throw all exceptions. May need to respond differently for 
> HA
>       // based on whether we have lost the right to write to FS
>       writeFileWithRetries(nodeCreatePath, appStateData, true);
>     } catch (Exception e) {
>       LOG.info("Error storing info for app: " + appId, e);
>       throw e;
>     }
>   }
> {code}
> Think thread B firstly comes into 
> FileSystemRMStateStore.storeApplicationStateInternal method, then thread A 
> will be blocked for a while because of synchronization. In ResourceManager 
> there is only one RMStateStore instance. In my cluster it's 
> FileSystemRMStateStore type.
> Debug the RMAppNewlySavingTransition.transition method, the thread stack 
> shows it's called form AsyncDispatcher.dispatch method. This method code is 
> as below. 
> {code:title=AsyncDispatcher.java|borderStyle=solid}
>   protected void dispatch(Event event) {
>     //all events go thru this loop
>     if (LOG.isDebugEnabled()) {
>       LOG.debug("Dispatching the event " + event.getClass().getName() + "."
>           + event.toString());
>     }
>     Class<? extends Enum> type = event.getType().getDeclaringClass();
>     try{
>       EventHandler handler = eventDispatchers.get(type);
>       if(handler != null) {
>         handler.handle(event);
>       } else {
>         throw new Exception("No handler for registered for " + type);
>       }
>     } catch (Throwable t) {
>       //TODO Maybe log the state of the queue
>       LOG.fatal("Error in dispatcher thread", t);
>       // If serviceStop is called, we should exit this thread gracefully.
>       if (exitOnDispatchException
>           && (ShutdownHookManager.get().isShutdownInProgress()) == false
>           && stopped == false) {
>         Thread shutDownThread = new Thread(createShutDownThread());
>         shutDownThread.setName("AsyncDispatcher ShutDown handler");
>         shutDownThread.start();
>       }
>     }
>   }
> {code}
> Above code shows AsyncDispatcher.dispatch method can process different type 
> events.
> In fact this AsyncDispatcher instance is just ResourceManager.rmDispatcher 
> created in ResourceManager.serviceInit method.
> You can find many eventTypes and handlers are registered in 
> ResourceManager.rmDispatcher.
> In above scenario thread B blocks thread A, then many following events 
> processing are blocked.
> In my testing cluster, there is only one queue and the client submits 1000 
> applications concurrently, the yarn cluster usage rate is 50. Many apps are 
> pending. If I disable resourceManager recover functionality, the cluster 
> usage can be 100.
> To solve this issue, I removed synchronized modifier on some methods defined 
> in RMStateStore.
> Instead, in these methods I defined a dedicated lock object before calling 
> dispatcher.getEventHandler().handle. 
> In this way, the yarn cluster usage rate can be 100 and the whole cluster is 
> good running.
> Please see my attached patch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to