[
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15034466#comment-15034466
]
Jian He commented on YARN-4398:
-------------------------------
patch looks good to me. thanks [~iceberg565], and [~templedf] for the review.
> Yarn recover functionality causes the cluster running slowly and the cluster
> usage rate is far below 100
> --------------------------------------------------------------------------------------------------------
>
> Key: YARN-4398
> URL: https://issues.apache.org/jira/browse/YARN-4398
> Project: Hadoop YARN
> Issue Type: Bug
> Components: resourcemanager
> Affects Versions: 2.7.1
> Reporter: NING DING
> Attachments: YARN-4398.2.patch, YARN-4398.3.patch
>
>
> In my hadoop cluster, the resourceManager recover functionality is enabled
> with FileSystemRMStateStore.
> I found this cause the yarn cluster running slowly and cluster usage rate is
> just 50 even there are many pending Apps.
> The scenario is below.
> In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling
> storeNewApplication method defined in RMStateStore. This storeNewApplication
> method is synchronized.
> {code:title=RMAppImpl.java|borderStyle=solid}
> private static final class RMAppNewlySavingTransition extends
> RMAppTransition {
> @Override
> public void transition(RMAppImpl app, RMAppEvent event) {
> // If recovery is enabled then store the application information in a
> // non-blocking call so make sure that RM has stored the information
> // needed to restart the AM after RM restart without further client
> // communication
> LOG.info("Storing application with id " + app.applicationId);
> app.rmContext.getStateStore().storeNewApplication(app);
> }
> }
> {code}
> {code:title=RMStateStore.java|borderStyle=solid}
> public synchronized void storeNewApplication(RMApp app) {
> ApplicationSubmissionContext context = app
>
> .getApplicationSubmissionContext();
> assert context instanceof ApplicationSubmissionContextPBImpl;
> ApplicationStateData appState =
> ApplicationStateData.newInstance(
> app.getSubmitTime(), app.getStartTime(), context, app.getUser());
> dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
> }
> {code}
> In thread B, the FileSystemRMStateStore is calling
> storeApplicationStateInternal method. It's also synchronized.
> This storeApplicationStateInternal method saves an ApplicationStateData into
> HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
> {code:title=FileSystemRMStateStore.java|borderStyle=solid}
> public synchronized void storeApplicationStateInternal(ApplicationId appId,
> ApplicationStateData appStateDataPB) throws Exception {
> Path appDirPath = getAppDir(rmAppRoot, appId);
> mkdirsWithRetries(appDirPath);
> Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
> LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
> byte[] appStateData = appStateDataPB.getProto().toByteArray();
> try {
> // currently throw all exceptions. May need to respond differently for
> HA
> // based on whether we have lost the right to write to FS
> writeFileWithRetries(nodeCreatePath, appStateData, true);
> } catch (Exception e) {
> LOG.info("Error storing info for app: " + appId, e);
> throw e;
> }
> }
> {code}
> Think thread B firstly comes into
> FileSystemRMStateStore.storeApplicationStateInternal method, then thread A
> will be blocked for a while because of synchronization. In ResourceManager
> there is only one RMStateStore instance. In my cluster it's
> FileSystemRMStateStore type.
> Debug the RMAppNewlySavingTransition.transition method, the thread stack
> shows it's called form AsyncDispatcher.dispatch method. This method code is
> as below.
> {code:title=AsyncDispatcher.java|borderStyle=solid}
> protected void dispatch(Event event) {
> //all events go thru this loop
> if (LOG.isDebugEnabled()) {
> LOG.debug("Dispatching the event " + event.getClass().getName() + "."
> + event.toString());
> }
> Class<? extends Enum> type = event.getType().getDeclaringClass();
> try{
> EventHandler handler = eventDispatchers.get(type);
> if(handler != null) {
> handler.handle(event);
> } else {
> throw new Exception("No handler for registered for " + type);
> }
> } catch (Throwable t) {
> //TODO Maybe log the state of the queue
> LOG.fatal("Error in dispatcher thread", t);
> // If serviceStop is called, we should exit this thread gracefully.
> if (exitOnDispatchException
> && (ShutdownHookManager.get().isShutdownInProgress()) == false
> && stopped == false) {
> Thread shutDownThread = new Thread(createShutDownThread());
> shutDownThread.setName("AsyncDispatcher ShutDown handler");
> shutDownThread.start();
> }
> }
> }
> {code}
> Above code shows AsyncDispatcher.dispatch method can process different type
> events.
> In fact this AsyncDispatcher instance is just ResourceManager.rmDispatcher
> created in ResourceManager.serviceInit method.
> You can find many eventTypes and handlers are registered in
> ResourceManager.rmDispatcher.
> In above scenario thread B blocks thread A, then many following events
> processing are blocked.
> In my testing cluster, there is only one queue and the client submits 1000
> applications concurrently, the yarn cluster usage rate is 50. Many apps are
> pending. If I disable resourceManager recover functionality, the cluster
> usage can be 100.
> To solve this issue, I removed synchronized modifier on some methods defined
> in RMStateStore.
> Instead, in these methods I defined a dedicated lock object before calling
> dispatcher.getEventHandler().handle.
> In this way, the yarn cluster usage rate can be 100 and the whole cluster is
> good running.
> Please see my attached patch.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)