[jira] [Commented] (YARN-4398) Yarn recover functionality causes the cluster running slowly and the cluster usage rate is far below 100

2015-12-02 Thread NING DING (JIRA)

[ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15035471#comment-15035471
 ] 

NING DING commented on YARN-4398:
-

I uploaded a new patch that removed useless whitespace.
The current test cases can cover the modified code in this patch. This patch 
resolved performance issue. So no new unit test cases.

> Yarn recover functionality causes the cluster running slowly and the cluster 
> usage rate is far below 100
> 
>
> Key: YARN-4398
> URL: https://issues.apache.org/jira/browse/YARN-4398
> Project: Hadoop YARN
>  Issue Type: Bug
>  Components: resourcemanager
>Affects Versions: 2.7.1
>Reporter: NING DING
> Attachments: YARN-4398.2.patch, YARN-4398.3.patch, YARN-4398.4.patch
>
>
> In my hadoop cluster, the resourceManager recover functionality is enabled 
> with FileSystemRMStateStore.
> I found this cause the yarn cluster running slowly and cluster usage rate is 
> just 50 even there are many pending Apps. 
> The scenario is below.
> In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
> storeNewApplication method defined in RMStateStore. This storeNewApplication 
> method is synchronized.
> {code:title=RMAppImpl.java|borderStyle=solid}
>   private static final class RMAppNewlySavingTransition extends 
> RMAppTransition {
> @Override
> public void transition(RMAppImpl app, RMAppEvent event) {
>   // If recovery is enabled then store the application information in a
>   // non-blocking call so make sure that RM has stored the information
>   // needed to restart the AM after RM restart without further client
>   // communication
>   LOG.info("Storing application with id " + app.applicationId);
>   app.rmContext.getStateStore().storeNewApplication(app);
> }
>   }
> {code}
> {code:title=RMStateStore.java|borderStyle=solid}
> public synchronized void storeNewApplication(RMApp app) {
> ApplicationSubmissionContext context = app
> 
> .getApplicationSubmissionContext();
> assert context instanceof ApplicationSubmissionContextPBImpl;
> ApplicationStateData appState =
> ApplicationStateData.newInstance(
> app.getSubmitTime(), app.getStartTime(), context, app.getUser());
> dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
>   }
> {code}
> In thread B, the FileSystemRMStateStore is calling 
> storeApplicationStateInternal method. It's also synchronized.
> This storeApplicationStateInternal method saves an ApplicationStateData into 
> HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
> {code:title=FileSystemRMStateStore.java|borderStyle=solid}
> public synchronized void storeApplicationStateInternal(ApplicationId appId,
>   ApplicationStateData appStateDataPB) throws Exception {
> Path appDirPath = getAppDir(rmAppRoot, appId);
> mkdirsWithRetries(appDirPath);
> Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
> LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
> byte[] appStateData = appStateDataPB.getProto().toByteArray();
> try {
>   // currently throw all exceptions. May need to respond differently for 
> HA
>   // based on whether we have lost the right to write to FS
>   writeFileWithRetries(nodeCreatePath, appStateData, true);
> } catch (Exception e) {
>   LOG.info("Error storing info for app: " + appId, e);
>   throw e;
> }
>   }
> {code}
> Think thread B firstly comes into 
> FileSystemRMStateStore.storeApplicationStateInternal method, then thread A 
> will be blocked for a while because of synchronization. In ResourceManager 
> there is only one RMStateStore instance. In my cluster it's 
> FileSystemRMStateStore type.
> Debug the RMAppNewlySavingTransition.transition method, the thread stack 
> shows it's called form AsyncDispatcher.dispatch method. This method code is 
> as below. 
> {code:title=AsyncDispatcher.java|borderStyle=solid}
>   protected void dispatch(Event event) {
> //all events go thru this loop
> if (LOG.isDebugEnabled()) {
>   LOG.debug("Dispatching the event " + event.getClass().getName() + "."
>   + event.toString());
> }
> Class type = event.getType().getDeclaringClass();
> try{
>   EventHandler handler = eventDispatchers.get(type);
>   if(handler != null) {
> handler.handle(event);
>   } else {
> throw new Exception("No handler for registered for " + type);
>   }
> } catch (Throwable t) {
>   //TODO Maybe log the state of the queue
>   LOG.fatal("Error in dispatcher thread", t);
>   // If serviceStop is called, we should exit this thread gracefully.
>   

[jira] [Commented] (YARN-4398) Yarn recover functionality causes the cluster running slowly and the cluster usage rate is far below 100

2015-12-02 Thread NING DING (JIRA)

[ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15037381#comment-15037381
 ] 

NING DING commented on YARN-4398:
-

[~jianhe], thank you.

> Yarn recover functionality causes the cluster running slowly and the cluster 
> usage rate is far below 100
> 
>
> Key: YARN-4398
> URL: https://issues.apache.org/jira/browse/YARN-4398
> Project: Hadoop YARN
>  Issue Type: Bug
>  Components: resourcemanager
>Affects Versions: 2.7.1
>Reporter: NING DING
>Assignee: NING DING
> Fix For: 2.7.3
>
> Attachments: YARN-4398.2.patch, YARN-4398.3.patch, YARN-4398.4.patch
>
>
> In my hadoop cluster, the resourceManager recover functionality is enabled 
> with FileSystemRMStateStore.
> I found this cause the yarn cluster running slowly and cluster usage rate is 
> just 50 even there are many pending Apps. 
> The scenario is below.
> In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
> storeNewApplication method defined in RMStateStore. This storeNewApplication 
> method is synchronized.
> {code:title=RMAppImpl.java|borderStyle=solid}
>   private static final class RMAppNewlySavingTransition extends 
> RMAppTransition {
> @Override
> public void transition(RMAppImpl app, RMAppEvent event) {
>   // If recovery is enabled then store the application information in a
>   // non-blocking call so make sure that RM has stored the information
>   // needed to restart the AM after RM restart without further client
>   // communication
>   LOG.info("Storing application with id " + app.applicationId);
>   app.rmContext.getStateStore().storeNewApplication(app);
> }
>   }
> {code}
> {code:title=RMStateStore.java|borderStyle=solid}
> public synchronized void storeNewApplication(RMApp app) {
> ApplicationSubmissionContext context = app
> 
> .getApplicationSubmissionContext();
> assert context instanceof ApplicationSubmissionContextPBImpl;
> ApplicationStateData appState =
> ApplicationStateData.newInstance(
> app.getSubmitTime(), app.getStartTime(), context, app.getUser());
> dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
>   }
> {code}
> In thread B, the FileSystemRMStateStore is calling 
> storeApplicationStateInternal method. It's also synchronized.
> This storeApplicationStateInternal method saves an ApplicationStateData into 
> HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
> {code:title=FileSystemRMStateStore.java|borderStyle=solid}
> public synchronized void storeApplicationStateInternal(ApplicationId appId,
>   ApplicationStateData appStateDataPB) throws Exception {
> Path appDirPath = getAppDir(rmAppRoot, appId);
> mkdirsWithRetries(appDirPath);
> Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
> LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
> byte[] appStateData = appStateDataPB.getProto().toByteArray();
> try {
>   // currently throw all exceptions. May need to respond differently for 
> HA
>   // based on whether we have lost the right to write to FS
>   writeFileWithRetries(nodeCreatePath, appStateData, true);
> } catch (Exception e) {
>   LOG.info("Error storing info for app: " + appId, e);
>   throw e;
> }
>   }
> {code}
> Think thread B firstly comes into 
> FileSystemRMStateStore.storeApplicationStateInternal method, then thread A 
> will be blocked for a while because of synchronization. In ResourceManager 
> there is only one RMStateStore instance. In my cluster it's 
> FileSystemRMStateStore type.
> Debug the RMAppNewlySavingTransition.transition method, the thread stack 
> shows it's called form AsyncDispatcher.dispatch method. This method code is 
> as below. 
> {code:title=AsyncDispatcher.java|borderStyle=solid}
>   protected void dispatch(Event event) {
> //all events go thru this loop
> if (LOG.isDebugEnabled()) {
>   LOG.debug("Dispatching the event " + event.getClass().getName() + "."
>   + event.toString());
> }
> Class type = event.getType().getDeclaringClass();
> try{
>   EventHandler handler = eventDispatchers.get(type);
>   if(handler != null) {
> handler.handle(event);
>   } else {
> throw new Exception("No handler for registered for " + type);
>   }
> } catch (Throwable t) {
>   //TODO Maybe log the state of the queue
>   LOG.fatal("Error in dispatcher thread", t);
>   // If serviceStop is called, we should exit this thread gracefully.
>   if (exitOnDispatchException
>   && (ShutdownHookManager.get().isShutdownInProgress()) == false

[jira] [Commented] (YARN-4398) Yarn recover functionality causes the cluster running slowly and the cluster usage rate is far below 100

2015-12-02 Thread NING DING (JIRA)

[ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15035714#comment-15035714
 ] 

NING DING commented on YARN-4398:
-

[~jianhe] and [~templedf], could you assign this jira to me and help to check 
the patch into trunk?
Thanks.

> Yarn recover functionality causes the cluster running slowly and the cluster 
> usage rate is far below 100
> 
>
> Key: YARN-4398
> URL: https://issues.apache.org/jira/browse/YARN-4398
> Project: Hadoop YARN
>  Issue Type: Bug
>  Components: resourcemanager
>Affects Versions: 2.7.1
>Reporter: NING DING
> Attachments: YARN-4398.2.patch, YARN-4398.3.patch, YARN-4398.4.patch
>
>
> In my hadoop cluster, the resourceManager recover functionality is enabled 
> with FileSystemRMStateStore.
> I found this cause the yarn cluster running slowly and cluster usage rate is 
> just 50 even there are many pending Apps. 
> The scenario is below.
> In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
> storeNewApplication method defined in RMStateStore. This storeNewApplication 
> method is synchronized.
> {code:title=RMAppImpl.java|borderStyle=solid}
>   private static final class RMAppNewlySavingTransition extends 
> RMAppTransition {
> @Override
> public void transition(RMAppImpl app, RMAppEvent event) {
>   // If recovery is enabled then store the application information in a
>   // non-blocking call so make sure that RM has stored the information
>   // needed to restart the AM after RM restart without further client
>   // communication
>   LOG.info("Storing application with id " + app.applicationId);
>   app.rmContext.getStateStore().storeNewApplication(app);
> }
>   }
> {code}
> {code:title=RMStateStore.java|borderStyle=solid}
> public synchronized void storeNewApplication(RMApp app) {
> ApplicationSubmissionContext context = app
> 
> .getApplicationSubmissionContext();
> assert context instanceof ApplicationSubmissionContextPBImpl;
> ApplicationStateData appState =
> ApplicationStateData.newInstance(
> app.getSubmitTime(), app.getStartTime(), context, app.getUser());
> dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
>   }
> {code}
> In thread B, the FileSystemRMStateStore is calling 
> storeApplicationStateInternal method. It's also synchronized.
> This storeApplicationStateInternal method saves an ApplicationStateData into 
> HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
> {code:title=FileSystemRMStateStore.java|borderStyle=solid}
> public synchronized void storeApplicationStateInternal(ApplicationId appId,
>   ApplicationStateData appStateDataPB) throws Exception {
> Path appDirPath = getAppDir(rmAppRoot, appId);
> mkdirsWithRetries(appDirPath);
> Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
> LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
> byte[] appStateData = appStateDataPB.getProto().toByteArray();
> try {
>   // currently throw all exceptions. May need to respond differently for 
> HA
>   // based on whether we have lost the right to write to FS
>   writeFileWithRetries(nodeCreatePath, appStateData, true);
> } catch (Exception e) {
>   LOG.info("Error storing info for app: " + appId, e);
>   throw e;
> }
>   }
> {code}
> Think thread B firstly comes into 
> FileSystemRMStateStore.storeApplicationStateInternal method, then thread A 
> will be blocked for a while because of synchronization. In ResourceManager 
> there is only one RMStateStore instance. In my cluster it's 
> FileSystemRMStateStore type.
> Debug the RMAppNewlySavingTransition.transition method, the thread stack 
> shows it's called form AsyncDispatcher.dispatch method. This method code is 
> as below. 
> {code:title=AsyncDispatcher.java|borderStyle=solid}
>   protected void dispatch(Event event) {
> //all events go thru this loop
> if (LOG.isDebugEnabled()) {
>   LOG.debug("Dispatching the event " + event.getClass().getName() + "."
>   + event.toString());
> }
> Class type = event.getType().getDeclaringClass();
> try{
>   EventHandler handler = eventDispatchers.get(type);
>   if(handler != null) {
> handler.handle(event);
>   } else {
> throw new Exception("No handler for registered for " + type);
>   }
> } catch (Throwable t) {
>   //TODO Maybe log the state of the queue
>   LOG.fatal("Error in dispatcher thread", t);
>   // If serviceStop is called, we should exit this thread gracefully.
>   if (exitOnDispatchException
>   && 

[jira] [Commented] (YARN-4398) Yarn recover functionality causes the cluster running slowly and the cluster usage rate is far below 100

2015-11-30 Thread NING DING (JIRA)

[ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15032979#comment-15032979
 ] 

NING DING commented on YARN-4398:
-

Thanks for all your comments.
I prefer to do eager initialization handlerInstance in AsyncDispatcher, then 
remove synchronized modifier in  RMStateStore.
Pelese see my new patch.

> Yarn recover functionality causes the cluster running slowly and the cluster 
> usage rate is far below 100
> 
>
> Key: YARN-4398
> URL: https://issues.apache.org/jira/browse/YARN-4398
> Project: Hadoop YARN
>  Issue Type: Bug
>  Components: resourcemanager
>Affects Versions: 2.7.1
>Reporter: NING DING
> Attachments: YARN-4398.2.patch
>
>
> In my hadoop cluster, the resourceManager recover functionality is enabled 
> with FileSystemRMStateStore.
> I found this cause the yarn cluster running slowly and cluster usage rate is 
> just 50 even there are many pending Apps. 
> The scenario is below.
> In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
> storeNewApplication method defined in RMStateStore. This storeNewApplication 
> method is synchronized.
> {code:title=RMAppImpl.java|borderStyle=solid}
>   private static final class RMAppNewlySavingTransition extends 
> RMAppTransition {
> @Override
> public void transition(RMAppImpl app, RMAppEvent event) {
>   // If recovery is enabled then store the application information in a
>   // non-blocking call so make sure that RM has stored the information
>   // needed to restart the AM after RM restart without further client
>   // communication
>   LOG.info("Storing application with id " + app.applicationId);
>   app.rmContext.getStateStore().storeNewApplication(app);
> }
>   }
> {code}
> {code:title=RMStateStore.java|borderStyle=solid}
> public synchronized void storeNewApplication(RMApp app) {
> ApplicationSubmissionContext context = app
> 
> .getApplicationSubmissionContext();
> assert context instanceof ApplicationSubmissionContextPBImpl;
> ApplicationStateData appState =
> ApplicationStateData.newInstance(
> app.getSubmitTime(), app.getStartTime(), context, app.getUser());
> dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
>   }
> {code}
> In thread B, the FileSystemRMStateStore is calling 
> storeApplicationStateInternal method. It's also synchronized.
> This storeApplicationStateInternal method saves an ApplicationStateData into 
> HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
> {code:title=FileSystemRMStateStore.java|borderStyle=solid}
> public synchronized void storeApplicationStateInternal(ApplicationId appId,
>   ApplicationStateData appStateDataPB) throws Exception {
> Path appDirPath = getAppDir(rmAppRoot, appId);
> mkdirsWithRetries(appDirPath);
> Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
> LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
> byte[] appStateData = appStateDataPB.getProto().toByteArray();
> try {
>   // currently throw all exceptions. May need to respond differently for 
> HA
>   // based on whether we have lost the right to write to FS
>   writeFileWithRetries(nodeCreatePath, appStateData, true);
> } catch (Exception e) {
>   LOG.info("Error storing info for app: " + appId, e);
>   throw e;
> }
>   }
> {code}
> Think thread B firstly comes into 
> FileSystemRMStateStore.storeApplicationStateInternal method, then thread A 
> will be blocked for a while because of synchronization. In ResourceManager 
> there is only one RMStateStore instance. In my cluster it's 
> FileSystemRMStateStore type.
> Debug the RMAppNewlySavingTransition.transition method, the thread stack 
> shows it's called form AsyncDispatcher.dispatch method. This method code is 
> as below. 
> {code:title=AsyncDispatcher.java|borderStyle=solid}
>   protected void dispatch(Event event) {
> //all events go thru this loop
> if (LOG.isDebugEnabled()) {
>   LOG.debug("Dispatching the event " + event.getClass().getName() + "."
>   + event.toString());
> }
> Class type = event.getType().getDeclaringClass();
> try{
>   EventHandler handler = eventDispatchers.get(type);
>   if(handler != null) {
> handler.handle(event);
>   } else {
> throw new Exception("No handler for registered for " + type);
>   }
> } catch (Throwable t) {
>   //TODO Maybe log the state of the queue
>   LOG.fatal("Error in dispatcher thread", t);
>   // If serviceStop is called, we should exit this thread gracefully.
>   if (exitOnDispatchException
>   && 

[jira] [Updated] (YARN-4398) Yarn recover functionality causes the cluster running slowly and the cluster usage rate is far below 100

2015-11-30 Thread NING DING (JIRA)

 [ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NING DING updated YARN-4398:

Attachment: YARN-4398.3.patch

> Yarn recover functionality causes the cluster running slowly and the cluster 
> usage rate is far below 100
> 
>
> Key: YARN-4398
> URL: https://issues.apache.org/jira/browse/YARN-4398
> Project: Hadoop YARN
>  Issue Type: Bug
>  Components: resourcemanager
>Affects Versions: 2.7.1
>Reporter: NING DING
> Attachments: YARN-4398.2.patch, YARN-4398.3.patch
>
>
> In my hadoop cluster, the resourceManager recover functionality is enabled 
> with FileSystemRMStateStore.
> I found this cause the yarn cluster running slowly and cluster usage rate is 
> just 50 even there are many pending Apps. 
> The scenario is below.
> In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
> storeNewApplication method defined in RMStateStore. This storeNewApplication 
> method is synchronized.
> {code:title=RMAppImpl.java|borderStyle=solid}
>   private static final class RMAppNewlySavingTransition extends 
> RMAppTransition {
> @Override
> public void transition(RMAppImpl app, RMAppEvent event) {
>   // If recovery is enabled then store the application information in a
>   // non-blocking call so make sure that RM has stored the information
>   // needed to restart the AM after RM restart without further client
>   // communication
>   LOG.info("Storing application with id " + app.applicationId);
>   app.rmContext.getStateStore().storeNewApplication(app);
> }
>   }
> {code}
> {code:title=RMStateStore.java|borderStyle=solid}
> public synchronized void storeNewApplication(RMApp app) {
> ApplicationSubmissionContext context = app
> 
> .getApplicationSubmissionContext();
> assert context instanceof ApplicationSubmissionContextPBImpl;
> ApplicationStateData appState =
> ApplicationStateData.newInstance(
> app.getSubmitTime(), app.getStartTime(), context, app.getUser());
> dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
>   }
> {code}
> In thread B, the FileSystemRMStateStore is calling 
> storeApplicationStateInternal method. It's also synchronized.
> This storeApplicationStateInternal method saves an ApplicationStateData into 
> HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
> {code:title=FileSystemRMStateStore.java|borderStyle=solid}
> public synchronized void storeApplicationStateInternal(ApplicationId appId,
>   ApplicationStateData appStateDataPB) throws Exception {
> Path appDirPath = getAppDir(rmAppRoot, appId);
> mkdirsWithRetries(appDirPath);
> Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
> LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
> byte[] appStateData = appStateDataPB.getProto().toByteArray();
> try {
>   // currently throw all exceptions. May need to respond differently for 
> HA
>   // based on whether we have lost the right to write to FS
>   writeFileWithRetries(nodeCreatePath, appStateData, true);
> } catch (Exception e) {
>   LOG.info("Error storing info for app: " + appId, e);
>   throw e;
> }
>   }
> {code}
> Think thread B firstly comes into 
> FileSystemRMStateStore.storeApplicationStateInternal method, then thread A 
> will be blocked for a while because of synchronization. In ResourceManager 
> there is only one RMStateStore instance. In my cluster it's 
> FileSystemRMStateStore type.
> Debug the RMAppNewlySavingTransition.transition method, the thread stack 
> shows it's called form AsyncDispatcher.dispatch method. This method code is 
> as below. 
> {code:title=AsyncDispatcher.java|borderStyle=solid}
>   protected void dispatch(Event event) {
> //all events go thru this loop
> if (LOG.isDebugEnabled()) {
>   LOG.debug("Dispatching the event " + event.getClass().getName() + "."
>   + event.toString());
> }
> Class type = event.getType().getDeclaringClass();
> try{
>   EventHandler handler = eventDispatchers.get(type);
>   if(handler != null) {
> handler.handle(event);
>   } else {
> throw new Exception("No handler for registered for " + type);
>   }
> } catch (Throwable t) {
>   //TODO Maybe log the state of the queue
>   LOG.fatal("Error in dispatcher thread", t);
>   // If serviceStop is called, we should exit this thread gracefully.
>   if (exitOnDispatchException
>   && (ShutdownHookManager.get().isShutdownInProgress()) == false
>   && stopped == false) {
> Thread shutDownThread = new Thread(createShutDownThread());
>   

[jira] [Created] (YARN-4398) Yarn recover with FileSystemRMStateStore cause the cluster running slowly and usage percentage is much below 100

2015-11-29 Thread NING DING (JIRA)
NING DING created YARN-4398:
---

 Summary: Yarn recover with FileSystemRMStateStore cause the 
cluster running slowly and usage percentage is much below 100
 Key: YARN-4398
 URL: https://issues.apache.org/jira/browse/YARN-4398
 Project: Hadoop YARN
  Issue Type: Bug
  Components: resourcemanager
Affects Versions: 2.7.1
Reporter: NING DING


Yarn recover with FileSystemRMStateStore cause the cluster running slowly and 
usage percentage is much below 100.

In my hadoop cluster, the resourceManager recover functionality is enabled with 
FileSystemRMStateStore.
I found this cause the yarn cluster running slowly and cluster usage percentage 
is just 50 even there are many pending Apps. 

The scenario is below.
In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
storeNewApplication method defined in RMStateStore. This storeNewApplication 
method is synchronized.
{code:title=RMAppImpl.java|borderStyle=solid}
  private static final class RMAppNewlySavingTransition extends RMAppTransition 
{
@Override
public void transition(RMAppImpl app, RMAppEvent event) {

  // If recovery is enabled then store the application information in a
  // non-blocking call so make sure that RM has stored the information
  // needed to restart the AM after RM restart without further client
  // communication
  LOG.info("Storing application with id " + app.applicationId);
  app.rmContext.getStateStore().storeNewApplication(app);
}
  }
{code}
{code:title=RMStateStore.java|borderStyle=solid}
public synchronized void storeNewApplication(RMApp app) {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateData appState =
ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(), context, app.getUser());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }
{code}
In thread B, the FileSystemRMStateStore is calling 
storeApplicationStateInternal method. It's also synchronized.
This storeApplicationStateInternal method saves an ApplicationStateData into 
HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
{code:title=FileSystemRMStateStore.java|borderStyle=solid}
public synchronized void storeApplicationStateInternal(ApplicationId appId,
  ApplicationStateData appStateDataPB) throws Exception {
Path appDirPath = getAppDir(rmAppRoot, appId);
mkdirsWithRetries(appDirPath);
Path nodeCreatePath = getNodePath(appDirPath, appId.toString());

LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
try {
  // currently throw all exceptions. May need to respond differently for HA
  // based on whether we have lost the right to write to FS
  writeFileWithRetries(nodeCreatePath, appStateData, true);
} catch (Exception e) {
  LOG.info("Error storing info for app: " + appId, e);
  throw e;
}
  }
{code}
Think thread B firstly come into 
FileSystemRMStateStore.storeApplicationStateInternal mehtod, then thread A must 
be blocked for a while because of synchronization.

In ResourceManager there is only one RMStateStore instance. In my cluster it's 
FileSystemRMStateStore type.
Debug the RMAppNewlySavingTransition.transition method, the thread stack shows 
it called form AsyncDispatcher.dispatch method. This method code is as below. 
{code:title=AsyncDispatcher.java|borderStyle=solid}
  protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
  LOG.debug("Dispatching the event " + event.getClass().getName() + "."
  + event.toString());
}

Class type = event.getType().getDeclaringClass();

try{
  EventHandler handler = eventDispatchers.get(type);
  if(handler != null) {
handler.handle(event);
  } else {
throw new Exception("No handler for registered for " + type);
  }
} catch (Throwable t) {
  //TODO Maybe log the state of the queue
  LOG.fatal("Error in dispatcher thread", t);
  // If serviceStop is called, we should exit this thread gracefully.
  if (exitOnDispatchException
  && (ShutdownHookManager.get().isShutdownInProgress()) == false
  && stopped == false) {
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
  }
}
  }
{code}
Above code shows AsyncDispatcher.dispatch method can process different type 
events.
In fact this AsyncDispatcher instance is just ResourceManager.rmDispatcher 
created in ResourceManager.serviceInit method.
You can find many eventTypes and 

[jira] [Updated] (YARN-4398) Yarn recover with FileSystemRMStateStore cause the cluster running slowly and usage percentage is much below 100

2015-11-29 Thread NING DING (JIRA)

 [ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NING DING updated YARN-4398:

Attachment: YARN-4398.1.patch

> Yarn recover with FileSystemRMStateStore cause the cluster running slowly and 
> usage percentage is much below 100
> 
>
> Key: YARN-4398
> URL: https://issues.apache.org/jira/browse/YARN-4398
> Project: Hadoop YARN
>  Issue Type: Bug
>  Components: resourcemanager
>Affects Versions: 2.7.1
>Reporter: NING DING
> Attachments: YARN-4398.1.patch
>
>
> Yarn recover with FileSystemRMStateStore cause the cluster running slowly and 
> usage percentage is much below 100.
> In my hadoop cluster, the resourceManager recover functionality is enabled 
> with FileSystemRMStateStore.
> I found this cause the yarn cluster running slowly and cluster usage 
> percentage is just 50 even there are many pending Apps. 
> The scenario is below.
> In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
> storeNewApplication method defined in RMStateStore. This storeNewApplication 
> method is synchronized.
> {code:title=RMAppImpl.java|borderStyle=solid}
>   private static final class RMAppNewlySavingTransition extends 
> RMAppTransition {
> @Override
> public void transition(RMAppImpl app, RMAppEvent event) {
>   // If recovery is enabled then store the application information in a
>   // non-blocking call so make sure that RM has stored the information
>   // needed to restart the AM after RM restart without further client
>   // communication
>   LOG.info("Storing application with id " + app.applicationId);
>   app.rmContext.getStateStore().storeNewApplication(app);
> }
>   }
> {code}
> {code:title=RMStateStore.java|borderStyle=solid}
> public synchronized void storeNewApplication(RMApp app) {
> ApplicationSubmissionContext context = app
> 
> .getApplicationSubmissionContext();
> assert context instanceof ApplicationSubmissionContextPBImpl;
> ApplicationStateData appState =
> ApplicationStateData.newInstance(
> app.getSubmitTime(), app.getStartTime(), context, app.getUser());
> dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
>   }
> {code}
> In thread B, the FileSystemRMStateStore is calling 
> storeApplicationStateInternal method. It's also synchronized.
> This storeApplicationStateInternal method saves an ApplicationStateData into 
> HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
> {code:title=FileSystemRMStateStore.java|borderStyle=solid}
> public synchronized void storeApplicationStateInternal(ApplicationId appId,
>   ApplicationStateData appStateDataPB) throws Exception {
> Path appDirPath = getAppDir(rmAppRoot, appId);
> mkdirsWithRetries(appDirPath);
> Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
> LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
> byte[] appStateData = appStateDataPB.getProto().toByteArray();
> try {
>   // currently throw all exceptions. May need to respond differently for 
> HA
>   // based on whether we have lost the right to write to FS
>   writeFileWithRetries(nodeCreatePath, appStateData, true);
> } catch (Exception e) {
>   LOG.info("Error storing info for app: " + appId, e);
>   throw e;
> }
>   }
> {code}
> Think thread B firstly come into 
> FileSystemRMStateStore.storeApplicationStateInternal mehtod, then thread A 
> must be blocked for a while because of synchronization.
> In ResourceManager there is only one RMStateStore instance. In my cluster 
> it's FileSystemRMStateStore type.
> Debug the RMAppNewlySavingTransition.transition method, the thread stack 
> shows it called form AsyncDispatcher.dispatch method. This method code is as 
> below. 
> {code:title=AsyncDispatcher.java|borderStyle=solid}
>   protected void dispatch(Event event) {
> //all events go thru this loop
> if (LOG.isDebugEnabled()) {
>   LOG.debug("Dispatching the event " + event.getClass().getName() + "."
>   + event.toString());
> }
> Class type = event.getType().getDeclaringClass();
> try{
>   EventHandler handler = eventDispatchers.get(type);
>   if(handler != null) {
> handler.handle(event);
>   } else {
> throw new Exception("No handler for registered for " + type);
>   }
> } catch (Throwable t) {
>   //TODO Maybe log the state of the queue
>   LOG.fatal("Error in dispatcher thread", t);
>   // If serviceStop is called, we should exit this thread gracefully.
>   if (exitOnDispatchException
>   && (ShutdownHookManager.get().isShutdownInProgress()) == 

[jira] [Updated] (YARN-4398) Yarn recover with FileSystemRMStateStore cause the cluster running slowly and usage percentage is much below 100

2015-11-29 Thread NING DING (JIRA)

 [ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NING DING updated YARN-4398:

Attachment: YARN-4398.2.patch

> Yarn recover with FileSystemRMStateStore cause the cluster running slowly and 
> usage percentage is much below 100
> 
>
> Key: YARN-4398
> URL: https://issues.apache.org/jira/browse/YARN-4398
> Project: Hadoop YARN
>  Issue Type: Bug
>  Components: resourcemanager
>Affects Versions: 2.7.1
>Reporter: NING DING
> Attachments: YARN-4398.1.patch, YARN-4398.2.patch
>
>
> Yarn recover with FileSystemRMStateStore cause the cluster running slowly and 
> usage percentage is much below 100.
> In my hadoop cluster, the resourceManager recover functionality is enabled 
> with FileSystemRMStateStore.
> I found this cause the yarn cluster running slowly and cluster usage 
> percentage is just 50 even there are many pending Apps. 
> The scenario is below.
> In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
> storeNewApplication method defined in RMStateStore. This storeNewApplication 
> method is synchronized.
> {code:title=RMAppImpl.java|borderStyle=solid}
>   private static final class RMAppNewlySavingTransition extends 
> RMAppTransition {
> @Override
> public void transition(RMAppImpl app, RMAppEvent event) {
>   // If recovery is enabled then store the application information in a
>   // non-blocking call so make sure that RM has stored the information
>   // needed to restart the AM after RM restart without further client
>   // communication
>   LOG.info("Storing application with id " + app.applicationId);
>   app.rmContext.getStateStore().storeNewApplication(app);
> }
>   }
> {code}
> {code:title=RMStateStore.java|borderStyle=solid}
> public synchronized void storeNewApplication(RMApp app) {
> ApplicationSubmissionContext context = app
> 
> .getApplicationSubmissionContext();
> assert context instanceof ApplicationSubmissionContextPBImpl;
> ApplicationStateData appState =
> ApplicationStateData.newInstance(
> app.getSubmitTime(), app.getStartTime(), context, app.getUser());
> dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
>   }
> {code}
> In thread B, the FileSystemRMStateStore is calling 
> storeApplicationStateInternal method. It's also synchronized.
> This storeApplicationStateInternal method saves an ApplicationStateData into 
> HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
> {code:title=FileSystemRMStateStore.java|borderStyle=solid}
> public synchronized void storeApplicationStateInternal(ApplicationId appId,
>   ApplicationStateData appStateDataPB) throws Exception {
> Path appDirPath = getAppDir(rmAppRoot, appId);
> mkdirsWithRetries(appDirPath);
> Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
> LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
> byte[] appStateData = appStateDataPB.getProto().toByteArray();
> try {
>   // currently throw all exceptions. May need to respond differently for 
> HA
>   // based on whether we have lost the right to write to FS
>   writeFileWithRetries(nodeCreatePath, appStateData, true);
> } catch (Exception e) {
>   LOG.info("Error storing info for app: " + appId, e);
>   throw e;
> }
>   }
> {code}
> Think thread B firstly come into 
> FileSystemRMStateStore.storeApplicationStateInternal mehtod, then thread A 
> must be blocked for a while because of synchronization.
> In ResourceManager there is only one RMStateStore instance. In my cluster 
> it's FileSystemRMStateStore type.
> Debug the RMAppNewlySavingTransition.transition method, the thread stack 
> shows it called form AsyncDispatcher.dispatch method. This method code is as 
> below. 
> {code:title=AsyncDispatcher.java|borderStyle=solid}
>   protected void dispatch(Event event) {
> //all events go thru this loop
> if (LOG.isDebugEnabled()) {
>   LOG.debug("Dispatching the event " + event.getClass().getName() + "."
>   + event.toString());
> }
> Class type = event.getType().getDeclaringClass();
> try{
>   EventHandler handler = eventDispatchers.get(type);
>   if(handler != null) {
> handler.handle(event);
>   } else {
> throw new Exception("No handler for registered for " + type);
>   }
> } catch (Throwable t) {
>   //TODO Maybe log the state of the queue
>   LOG.fatal("Error in dispatcher thread", t);
>   // If serviceStop is called, we should exit this thread gracefully.
>   if (exitOnDispatchException
>   && 

[jira] [Updated] (YARN-4398) Yarn recover functionality causes the cluster running slowly and the cluster usage rate is far below 100

2015-11-29 Thread NING DING (JIRA)

 [ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NING DING updated YARN-4398:

Attachment: (was: YARN-4398.1.patch)

> Yarn recover functionality causes the cluster running slowly and the cluster 
> usage rate is far below 100
> 
>
> Key: YARN-4398
> URL: https://issues.apache.org/jira/browse/YARN-4398
> Project: Hadoop YARN
>  Issue Type: Bug
>  Components: resourcemanager
>Affects Versions: 2.7.1
>Reporter: NING DING
> Attachments: YARN-4398.2.patch
>
>
> In my hadoop cluster, the resourceManager recover functionality is enabled 
> with FileSystemRMStateStore.
> I found this cause the yarn cluster running slowly and cluster usage rate is 
> just 50 even there are many pending Apps. 
> The scenario is below.
> In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
> storeNewApplication method defined in RMStateStore. This storeNewApplication 
> method is synchronized.
> {code:title=RMAppImpl.java|borderStyle=solid}
>   private static final class RMAppNewlySavingTransition extends 
> RMAppTransition {
> @Override
> public void transition(RMAppImpl app, RMAppEvent event) {
>   // If recovery is enabled then store the application information in a
>   // non-blocking call so make sure that RM has stored the information
>   // needed to restart the AM after RM restart without further client
>   // communication
>   LOG.info("Storing application with id " + app.applicationId);
>   app.rmContext.getStateStore().storeNewApplication(app);
> }
>   }
> {code}
> {code:title=RMStateStore.java|borderStyle=solid}
> public synchronized void storeNewApplication(RMApp app) {
> ApplicationSubmissionContext context = app
> 
> .getApplicationSubmissionContext();
> assert context instanceof ApplicationSubmissionContextPBImpl;
> ApplicationStateData appState =
> ApplicationStateData.newInstance(
> app.getSubmitTime(), app.getStartTime(), context, app.getUser());
> dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
>   }
> {code}
> In thread B, the FileSystemRMStateStore is calling 
> storeApplicationStateInternal method. It's also synchronized.
> This storeApplicationStateInternal method saves an ApplicationStateData into 
> HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
> {code:title=FileSystemRMStateStore.java|borderStyle=solid}
> public synchronized void storeApplicationStateInternal(ApplicationId appId,
>   ApplicationStateData appStateDataPB) throws Exception {
> Path appDirPath = getAppDir(rmAppRoot, appId);
> mkdirsWithRetries(appDirPath);
> Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
> LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
> byte[] appStateData = appStateDataPB.getProto().toByteArray();
> try {
>   // currently throw all exceptions. May need to respond differently for 
> HA
>   // based on whether we have lost the right to write to FS
>   writeFileWithRetries(nodeCreatePath, appStateData, true);
> } catch (Exception e) {
>   LOG.info("Error storing info for app: " + appId, e);
>   throw e;
> }
>   }
> {code}
> Think thread B firstly comes into 
> FileSystemRMStateStore.storeApplicationStateInternal method, then thread A 
> will be blocked for a while because of synchronization. In ResourceManager 
> there is only one RMStateStore instance. In my cluster it's 
> FileSystemRMStateStore type.
> Debug the RMAppNewlySavingTransition.transition method, the thread stack 
> shows it's called form AsyncDispatcher.dispatch method. This method code is 
> as below. 
> {code:title=AsyncDispatcher.java|borderStyle=solid}
>   protected void dispatch(Event event) {
> //all events go thru this loop
> if (LOG.isDebugEnabled()) {
>   LOG.debug("Dispatching the event " + event.getClass().getName() + "."
>   + event.toString());
> }
> Class type = event.getType().getDeclaringClass();
> try{
>   EventHandler handler = eventDispatchers.get(type);
>   if(handler != null) {
> handler.handle(event);
>   } else {
> throw new Exception("No handler for registered for " + type);
>   }
> } catch (Throwable t) {
>   //TODO Maybe log the state of the queue
>   LOG.fatal("Error in dispatcher thread", t);
>   // If serviceStop is called, we should exit this thread gracefully.
>   if (exitOnDispatchException
>   && (ShutdownHookManager.get().isShutdownInProgress()) == false
>   && stopped == false) {
> Thread shutDownThread = new Thread(createShutDownThread());
> 

[jira] [Commented] (YARN-4398) Yarn recover functionality causes the cluster running slowly and the cluster usage rate is far below 100

2015-11-29 Thread NING DING (JIRA)

[ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15031300#comment-15031300
 ] 

NING DING commented on YARN-4398:
-

[~jianhe] would you kindly help to take a look on this issue?

> Yarn recover functionality causes the cluster running slowly and the cluster 
> usage rate is far below 100
> 
>
> Key: YARN-4398
> URL: https://issues.apache.org/jira/browse/YARN-4398
> Project: Hadoop YARN
>  Issue Type: Bug
>  Components: resourcemanager
>Affects Versions: 2.7.1
>Reporter: NING DING
> Attachments: YARN-4398.2.patch
>
>
> In my hadoop cluster, the resourceManager recover functionality is enabled 
> with FileSystemRMStateStore.
> I found this cause the yarn cluster running slowly and cluster usage rate is 
> just 50 even there are many pending Apps. 
> The scenario is below.
> In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
> storeNewApplication method defined in RMStateStore. This storeNewApplication 
> method is synchronized.
> {code:title=RMAppImpl.java|borderStyle=solid}
>   private static final class RMAppNewlySavingTransition extends 
> RMAppTransition {
> @Override
> public void transition(RMAppImpl app, RMAppEvent event) {
>   // If recovery is enabled then store the application information in a
>   // non-blocking call so make sure that RM has stored the information
>   // needed to restart the AM after RM restart without further client
>   // communication
>   LOG.info("Storing application with id " + app.applicationId);
>   app.rmContext.getStateStore().storeNewApplication(app);
> }
>   }
> {code}
> {code:title=RMStateStore.java|borderStyle=solid}
> public synchronized void storeNewApplication(RMApp app) {
> ApplicationSubmissionContext context = app
> 
> .getApplicationSubmissionContext();
> assert context instanceof ApplicationSubmissionContextPBImpl;
> ApplicationStateData appState =
> ApplicationStateData.newInstance(
> app.getSubmitTime(), app.getStartTime(), context, app.getUser());
> dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
>   }
> {code}
> In thread B, the FileSystemRMStateStore is calling 
> storeApplicationStateInternal method. It's also synchronized.
> This storeApplicationStateInternal method saves an ApplicationStateData into 
> HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
> {code:title=FileSystemRMStateStore.java|borderStyle=solid}
> public synchronized void storeApplicationStateInternal(ApplicationId appId,
>   ApplicationStateData appStateDataPB) throws Exception {
> Path appDirPath = getAppDir(rmAppRoot, appId);
> mkdirsWithRetries(appDirPath);
> Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
> LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
> byte[] appStateData = appStateDataPB.getProto().toByteArray();
> try {
>   // currently throw all exceptions. May need to respond differently for 
> HA
>   // based on whether we have lost the right to write to FS
>   writeFileWithRetries(nodeCreatePath, appStateData, true);
> } catch (Exception e) {
>   LOG.info("Error storing info for app: " + appId, e);
>   throw e;
> }
>   }
> {code}
> Think thread B firstly comes into 
> FileSystemRMStateStore.storeApplicationStateInternal method, then thread A 
> will be blocked for a while because of synchronization. In ResourceManager 
> there is only one RMStateStore instance. In my cluster it's 
> FileSystemRMStateStore type.
> Debug the RMAppNewlySavingTransition.transition method, the thread stack 
> shows it's called form AsyncDispatcher.dispatch method. This method code is 
> as below. 
> {code:title=AsyncDispatcher.java|borderStyle=solid}
>   protected void dispatch(Event event) {
> //all events go thru this loop
> if (LOG.isDebugEnabled()) {
>   LOG.debug("Dispatching the event " + event.getClass().getName() + "."
>   + event.toString());
> }
> Class type = event.getType().getDeclaringClass();
> try{
>   EventHandler handler = eventDispatchers.get(type);
>   if(handler != null) {
> handler.handle(event);
>   } else {
> throw new Exception("No handler for registered for " + type);
>   }
> } catch (Throwable t) {
>   //TODO Maybe log the state of the queue
>   LOG.fatal("Error in dispatcher thread", t);
>   // If serviceStop is called, we should exit this thread gracefully.
>   if (exitOnDispatchException
>   && (ShutdownHookManager.get().isShutdownInProgress()) == false
>   && stopped == false) {
> Thread 

[jira] [Updated] (YARN-4398) Yarn recover functionality causes the cluster running slowly and the cluster usage rate is far below 100

2015-11-29 Thread NING DING (JIRA)

 [ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NING DING updated YARN-4398:

Description: 
In my hadoop cluster, the resourceManager recover functionality is enabled with 
FileSystemRMStateStore.
I found this cause the yarn cluster running slowly and cluster usage rate is 
just 50 even there are many pending Apps. 

The scenario is below.
In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
storeNewApplication method defined in RMStateStore. This storeNewApplication 
method is synchronized.
{code:title=RMAppImpl.java|borderStyle=solid}
  private static final class RMAppNewlySavingTransition extends RMAppTransition 
{
@Override
public void transition(RMAppImpl app, RMAppEvent event) {

  // If recovery is enabled then store the application information in a
  // non-blocking call so make sure that RM has stored the information
  // needed to restart the AM after RM restart without further client
  // communication
  LOG.info("Storing application with id " + app.applicationId);
  app.rmContext.getStateStore().storeNewApplication(app);
}
  }
{code}
{code:title=RMStateStore.java|borderStyle=solid}
public synchronized void storeNewApplication(RMApp app) {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateData appState =
ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(), context, app.getUser());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }
{code}
In thread B, the FileSystemRMStateStore is calling 
storeApplicationStateInternal method. It's also synchronized.
This storeApplicationStateInternal method saves an ApplicationStateData into 
HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
{code:title=FileSystemRMStateStore.java|borderStyle=solid}
public synchronized void storeApplicationStateInternal(ApplicationId appId,
  ApplicationStateData appStateDataPB) throws Exception {
Path appDirPath = getAppDir(rmAppRoot, appId);
mkdirsWithRetries(appDirPath);
Path nodeCreatePath = getNodePath(appDirPath, appId.toString());

LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
try {
  // currently throw all exceptions. May need to respond differently for HA
  // based on whether we have lost the right to write to FS
  writeFileWithRetries(nodeCreatePath, appStateData, true);
} catch (Exception e) {
  LOG.info("Error storing info for app: " + appId, e);
  throw e;
}
  }
{code}
Think thread B firstly come into 
FileSystemRMStateStore.storeApplicationStateInternal method, then thread A must 
be blocked for a while because of synchronization.

In ResourceManager there is only one RMStateStore instance. In my cluster it's 
FileSystemRMStateStore type.
Debug the RMAppNewlySavingTransition.transition method, the thread stack shows 
it called form AsyncDispatcher.dispatch method. This method code is as below. 
{code:title=AsyncDispatcher.java|borderStyle=solid}
  protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
  LOG.debug("Dispatching the event " + event.getClass().getName() + "."
  + event.toString());
}

Class type = event.getType().getDeclaringClass();

try{
  EventHandler handler = eventDispatchers.get(type);
  if(handler != null) {
handler.handle(event);
  } else {
throw new Exception("No handler for registered for " + type);
  }
} catch (Throwable t) {
  //TODO Maybe log the state of the queue
  LOG.fatal("Error in dispatcher thread", t);
  // If serviceStop is called, we should exit this thread gracefully.
  if (exitOnDispatchException
  && (ShutdownHookManager.get().isShutdownInProgress()) == false
  && stopped == false) {
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
  }
}
  }
{code}
Above code shows AsyncDispatcher.dispatch method can process different type 
events.
In fact this AsyncDispatcher instance is just ResourceManager.rmDispatcher 
created in ResourceManager.serviceInit method.
You can find many eventTypes and handlers are registered in 
ResourceManager.rmDispatcher.
In above scenario thread B blocks thread A, then many following events 
processing are blocked.

In my testing cluster, there is only one queue and the client submits 1000 
applications concurrently, the yarn cluster usage only can reach 50. Many apps 
are pending. 
If I disable resourceManager recover functionality, the 

[jira] [Updated] (YARN-4398) Yarn recover functionality causes the cluster running slowly and the cluster usage rate is far below 100

2015-11-29 Thread NING DING (JIRA)

 [ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NING DING updated YARN-4398:

Description: 
In my hadoop cluster, the resourceManager recover functionality is enabled with 
FileSystemRMStateStore.
I found this cause the yarn cluster running slowly and cluster usage rate is 
just 50 even there are many pending Apps. 

The scenario is below.
In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
storeNewApplication method defined in RMStateStore. This storeNewApplication 
method is synchronized.
{code:title=RMAppImpl.java|borderStyle=solid}
  private static final class RMAppNewlySavingTransition extends RMAppTransition 
{
@Override
public void transition(RMAppImpl app, RMAppEvent event) {

  // If recovery is enabled then store the application information in a
  // non-blocking call so make sure that RM has stored the information
  // needed to restart the AM after RM restart without further client
  // communication
  LOG.info("Storing application with id " + app.applicationId);
  app.rmContext.getStateStore().storeNewApplication(app);
}
  }
{code}
{code:title=RMStateStore.java|borderStyle=solid}
public synchronized void storeNewApplication(RMApp app) {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateData appState =
ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(), context, app.getUser());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }
{code}
In thread B, the FileSystemRMStateStore is calling 
storeApplicationStateInternal method. It's also synchronized.
This storeApplicationStateInternal method saves an ApplicationStateData into 
HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
{code:title=FileSystemRMStateStore.java|borderStyle=solid}
public synchronized void storeApplicationStateInternal(ApplicationId appId,
  ApplicationStateData appStateDataPB) throws Exception {
Path appDirPath = getAppDir(rmAppRoot, appId);
mkdirsWithRetries(appDirPath);
Path nodeCreatePath = getNodePath(appDirPath, appId.toString());

LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
try {
  // currently throw all exceptions. May need to respond differently for HA
  // based on whether we have lost the right to write to FS
  writeFileWithRetries(nodeCreatePath, appStateData, true);
} catch (Exception e) {
  LOG.info("Error storing info for app: " + appId, e);
  throw e;
}
  }
{code}
Think thread B firstly come into 
FileSystemRMStateStore.storeApplicationStateInternal method, then thread A must 
be blocked for a while because of synchronization.

In ResourceManager there is only one RMStateStore instance. In my cluster it's 
FileSystemRMStateStore type.
Debug the RMAppNewlySavingTransition.transition method, the thread stack shows 
it called form AsyncDispatcher.dispatch method. This method code is as below. 
{code:title=AsyncDispatcher.java|borderStyle=solid}
  protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
  LOG.debug("Dispatching the event " + event.getClass().getName() + "."
  + event.toString());
}

Class type = event.getType().getDeclaringClass();

try{
  EventHandler handler = eventDispatchers.get(type);
  if(handler != null) {
handler.handle(event);
  } else {
throw new Exception("No handler for registered for " + type);
  }
} catch (Throwable t) {
  //TODO Maybe log the state of the queue
  LOG.fatal("Error in dispatcher thread", t);
  // If serviceStop is called, we should exit this thread gracefully.
  if (exitOnDispatchException
  && (ShutdownHookManager.get().isShutdownInProgress()) == false
  && stopped == false) {
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
  }
}
  }
{code}
Above code shows AsyncDispatcher.dispatch method can process different type 
events.
In fact this AsyncDispatcher instance is just ResourceManager.rmDispatcher 
created in ResourceManager.serviceInit method.
You can find many eventTypes and handlers are registered in 
ResourceManager.rmDispatcher.
In above scenario thread B blocks thread A, then many following events 
processing are blocked.

In my testing cluster, there is only one queue and the client submits 1000 
applications concurrently, the yarn cluster usage rate is 50. Many apps are 
pending. 
If I disable resourceManager recover functionality, the cluster 

[jira] [Updated] (YARN-4398) Yarn recover functionality causes the cluster running slowly and the cluster usage rate is far below 100

2015-11-29 Thread NING DING (JIRA)

 [ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NING DING updated YARN-4398:

Description: 
In my hadoop cluster, the resourceManager recover functionality is enabled with 
FileSystemRMStateStore.
I found this cause the yarn cluster running slowly and cluster usage rate is 
just 50 even there are many pending Apps. 

The scenario is below.
In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
storeNewApplication method defined in RMStateStore. This storeNewApplication 
method is synchronized.
{code:title=RMAppImpl.java|borderStyle=solid}
  private static final class RMAppNewlySavingTransition extends RMAppTransition 
{
@Override
public void transition(RMAppImpl app, RMAppEvent event) {

  // If recovery is enabled then store the application information in a
  // non-blocking call so make sure that RM has stored the information
  // needed to restart the AM after RM restart without further client
  // communication
  LOG.info("Storing application with id " + app.applicationId);
  app.rmContext.getStateStore().storeNewApplication(app);
}
  }
{code}
{code:title=RMStateStore.java|borderStyle=solid}
public synchronized void storeNewApplication(RMApp app) {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateData appState =
ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(), context, app.getUser());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }
{code}
In thread B, the FileSystemRMStateStore is calling 
storeApplicationStateInternal method. It's also synchronized.
This storeApplicationStateInternal method saves an ApplicationStateData into 
HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
{code:title=FileSystemRMStateStore.java|borderStyle=solid}
public synchronized void storeApplicationStateInternal(ApplicationId appId,
  ApplicationStateData appStateDataPB) throws Exception {
Path appDirPath = getAppDir(rmAppRoot, appId);
mkdirsWithRetries(appDirPath);
Path nodeCreatePath = getNodePath(appDirPath, appId.toString());

LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
try {
  // currently throw all exceptions. May need to respond differently for HA
  // based on whether we have lost the right to write to FS
  writeFileWithRetries(nodeCreatePath, appStateData, true);
} catch (Exception e) {
  LOG.info("Error storing info for app: " + appId, e);
  throw e;
}
  }
{code}
Think thread B firstly come into 
FileSystemRMStateStore.storeApplicationStateInternal method, then thread A must 
be blocked for a while because of synchronization.

In ResourceManager there is only one RMStateStore instance. In my cluster it's 
FileSystemRMStateStore type.
Debug the RMAppNewlySavingTransition.transition method, the thread stack shows 
it called form AsyncDispatcher.dispatch method. This method code is as below. 
{code:title=AsyncDispatcher.java|borderStyle=solid}
  protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
  LOG.debug("Dispatching the event " + event.getClass().getName() + "."
  + event.toString());
}

Class type = event.getType().getDeclaringClass();

try{
  EventHandler handler = eventDispatchers.get(type);
  if(handler != null) {
handler.handle(event);
  } else {
throw new Exception("No handler for registered for " + type);
  }
} catch (Throwable t) {
  //TODO Maybe log the state of the queue
  LOG.fatal("Error in dispatcher thread", t);
  // If serviceStop is called, we should exit this thread gracefully.
  if (exitOnDispatchException
  && (ShutdownHookManager.get().isShutdownInProgress()) == false
  && stopped == false) {
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
  }
}
  }
{code}
Above code shows AsyncDispatcher.dispatch method can process different type 
events.
In fact this AsyncDispatcher instance is just ResourceManager.rmDispatcher 
created in ResourceManager.serviceInit method.
You can find many eventTypes and handlers are registered in 
ResourceManager.rmDispatcher.
In above scenario thread B blocks thread A, then many following events 
processing are blocked.

In my testing cluster, there is only one queue and the client submits 1000 
applications concurrently, the yarn cluster usage rate is 50. Many apps are 
pending. 
If I disable resourceManager recover functionality, the cluster 

[jira] [Updated] (YARN-4398) Yarn recover functionality causes the cluster running slowly and the cluster usage rate is far below 100

2015-11-29 Thread NING DING (JIRA)

 [ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NING DING updated YARN-4398:

Description: 
In my hadoop cluster, the resourceManager recover functionality is enabled with 
FileSystemRMStateStore.
I found this cause the yarn cluster running slowly and cluster usage rate is 
just 50 even there are many pending Apps. 

The scenario is below.
In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
storeNewApplication method defined in RMStateStore. This storeNewApplication 
method is synchronized.
{code:title=RMAppImpl.java|borderStyle=solid}
  private static final class RMAppNewlySavingTransition extends RMAppTransition 
{
@Override
public void transition(RMAppImpl app, RMAppEvent event) {

  // If recovery is enabled then store the application information in a
  // non-blocking call so make sure that RM has stored the information
  // needed to restart the AM after RM restart without further client
  // communication
  LOG.info("Storing application with id " + app.applicationId);
  app.rmContext.getStateStore().storeNewApplication(app);
}
  }
{code}
{code:title=RMStateStore.java|borderStyle=solid}
public synchronized void storeNewApplication(RMApp app) {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateData appState =
ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(), context, app.getUser());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }
{code}
In thread B, the FileSystemRMStateStore is calling 
storeApplicationStateInternal method. It's also synchronized.
This storeApplicationStateInternal method saves an ApplicationStateData into 
HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
{code:title=FileSystemRMStateStore.java|borderStyle=solid}
public synchronized void storeApplicationStateInternal(ApplicationId appId,
  ApplicationStateData appStateDataPB) throws Exception {
Path appDirPath = getAppDir(rmAppRoot, appId);
mkdirsWithRetries(appDirPath);
Path nodeCreatePath = getNodePath(appDirPath, appId.toString());

LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
try {
  // currently throw all exceptions. May need to respond differently for HA
  // based on whether we have lost the right to write to FS
  writeFileWithRetries(nodeCreatePath, appStateData, true);
} catch (Exception e) {
  LOG.info("Error storing info for app: " + appId, e);
  throw e;
}
  }
{code}
Think thread B firstly come into 
FileSystemRMStateStore.storeApplicationStateInternal method, then thread A must 
be blocked for a while because of synchronization.

In ResourceManager there is only one RMStateStore instance. In my cluster it's 
FileSystemRMStateStore type.
Debug the RMAppNewlySavingTransition.transition method, the thread stack shows 
it's called form AsyncDispatcher.dispatch method. This method code is as below. 
{code:title=AsyncDispatcher.java|borderStyle=solid}
  protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
  LOG.debug("Dispatching the event " + event.getClass().getName() + "."
  + event.toString());
}

Class type = event.getType().getDeclaringClass();

try{
  EventHandler handler = eventDispatchers.get(type);
  if(handler != null) {
handler.handle(event);
  } else {
throw new Exception("No handler for registered for " + type);
  }
} catch (Throwable t) {
  //TODO Maybe log the state of the queue
  LOG.fatal("Error in dispatcher thread", t);
  // If serviceStop is called, we should exit this thread gracefully.
  if (exitOnDispatchException
  && (ShutdownHookManager.get().isShutdownInProgress()) == false
  && stopped == false) {
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
  }
}
  }
{code}
Above code shows AsyncDispatcher.dispatch method can process different type 
events.
In fact this AsyncDispatcher instance is just ResourceManager.rmDispatcher 
created in ResourceManager.serviceInit method.
You can find many eventTypes and handlers are registered in 
ResourceManager.rmDispatcher.
In above scenario thread B blocks thread A, then many following events 
processing are blocked.

In my testing cluster, there is only one queue and the client submits 1000 
applications concurrently, the yarn cluster usage rate is 50. Many apps are 
pending. 
If I disable resourceManager recover functionality, the cluster 

[jira] [Updated] (YARN-4398) Yarn recover functionality causes the cluster running slowly and the cluster usage rate is far below 100

2015-11-29 Thread NING DING (JIRA)

 [ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NING DING updated YARN-4398:

Description: 
In my hadoop cluster, the resourceManager recover functionality is enabled with 
FileSystemRMStateStore.
I found this cause the yarn cluster running slowly and cluster usage rate is 
just 50 even there are many pending Apps. 

The scenario is below.
In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
storeNewApplication method defined in RMStateStore. This storeNewApplication 
method is synchronized.
{code:title=RMAppImpl.java|borderStyle=solid}
  private static final class RMAppNewlySavingTransition extends RMAppTransition 
{
@Override
public void transition(RMAppImpl app, RMAppEvent event) {

  // If recovery is enabled then store the application information in a
  // non-blocking call so make sure that RM has stored the information
  // needed to restart the AM after RM restart without further client
  // communication
  LOG.info("Storing application with id " + app.applicationId);
  app.rmContext.getStateStore().storeNewApplication(app);
}
  }
{code}
{code:title=RMStateStore.java|borderStyle=solid}
public synchronized void storeNewApplication(RMApp app) {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateData appState =
ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(), context, app.getUser());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }
{code}
In thread B, the FileSystemRMStateStore is calling 
storeApplicationStateInternal method. It's also synchronized.
This storeApplicationStateInternal method saves an ApplicationStateData into 
HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
{code:title=FileSystemRMStateStore.java|borderStyle=solid}
public synchronized void storeApplicationStateInternal(ApplicationId appId,
  ApplicationStateData appStateDataPB) throws Exception {
Path appDirPath = getAppDir(rmAppRoot, appId);
mkdirsWithRetries(appDirPath);
Path nodeCreatePath = getNodePath(appDirPath, appId.toString());

LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
try {
  // currently throw all exceptions. May need to respond differently for HA
  // based on whether we have lost the right to write to FS
  writeFileWithRetries(nodeCreatePath, appStateData, true);
} catch (Exception e) {
  LOG.info("Error storing info for app: " + appId, e);
  throw e;
}
  }
{code}
Think thread B firstly comes into 
FileSystemRMStateStore.storeApplicationStateInternal method, then thread A will 
be blocked for a while because of synchronization. In ResourceManager there is 
only one RMStateStore instance. In my cluster it's FileSystemRMStateStore type.
Debug the RMAppNewlySavingTransition.transition method, the thread stack shows 
it's called form AsyncDispatcher.dispatch method. This method code is as below. 
{code:title=AsyncDispatcher.java|borderStyle=solid}
  protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
  LOG.debug("Dispatching the event " + event.getClass().getName() + "."
  + event.toString());
}

Class type = event.getType().getDeclaringClass();

try{
  EventHandler handler = eventDispatchers.get(type);
  if(handler != null) {
handler.handle(event);
  } else {
throw new Exception("No handler for registered for " + type);
  }
} catch (Throwable t) {
  //TODO Maybe log the state of the queue
  LOG.fatal("Error in dispatcher thread", t);
  // If serviceStop is called, we should exit this thread gracefully.
  if (exitOnDispatchException
  && (ShutdownHookManager.get().isShutdownInProgress()) == false
  && stopped == false) {
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
  }
}
  }
{code}
Above code shows AsyncDispatcher.dispatch method can process different type 
events.
In fact this AsyncDispatcher instance is just ResourceManager.rmDispatcher 
created in ResourceManager.serviceInit method.
You can find many eventTypes and handlers are registered in 
ResourceManager.rmDispatcher.
In above scenario thread B blocks thread A, then many following events 
processing are blocked.

In my testing cluster, there is only one queue and the client submits 1000 
applications concurrently, the yarn cluster usage rate is 50. Many apps are 
pending. If I disable resourceManager recover functionality, the cluster 

[jira] [Updated] (YARN-4398) Yarn recover with FileSystemRMStateStore cause the cluster running slowly and usage percentage is much below 100

2015-11-29 Thread NING DING (JIRA)

 [ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NING DING updated YARN-4398:

Attachment: (was: YARN-4398.2.patch)

> Yarn recover with FileSystemRMStateStore cause the cluster running slowly and 
> usage percentage is much below 100
> 
>
> Key: YARN-4398
> URL: https://issues.apache.org/jira/browse/YARN-4398
> Project: Hadoop YARN
>  Issue Type: Bug
>  Components: resourcemanager
>Affects Versions: 2.7.1
>Reporter: NING DING
> Attachments: YARN-4398.1.patch
>
>
> Yarn recover with FileSystemRMStateStore cause the cluster running slowly and 
> usage percentage is much below 100.
> In my hadoop cluster, the resourceManager recover functionality is enabled 
> with FileSystemRMStateStore.
> I found this cause the yarn cluster running slowly and cluster usage 
> percentage is just 50 even there are many pending Apps. 
> The scenario is below.
> In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
> storeNewApplication method defined in RMStateStore. This storeNewApplication 
> method is synchronized.
> {code:title=RMAppImpl.java|borderStyle=solid}
>   private static final class RMAppNewlySavingTransition extends 
> RMAppTransition {
> @Override
> public void transition(RMAppImpl app, RMAppEvent event) {
>   // If recovery is enabled then store the application information in a
>   // non-blocking call so make sure that RM has stored the information
>   // needed to restart the AM after RM restart without further client
>   // communication
>   LOG.info("Storing application with id " + app.applicationId);
>   app.rmContext.getStateStore().storeNewApplication(app);
> }
>   }
> {code}
> {code:title=RMStateStore.java|borderStyle=solid}
> public synchronized void storeNewApplication(RMApp app) {
> ApplicationSubmissionContext context = app
> 
> .getApplicationSubmissionContext();
> assert context instanceof ApplicationSubmissionContextPBImpl;
> ApplicationStateData appState =
> ApplicationStateData.newInstance(
> app.getSubmitTime(), app.getStartTime(), context, app.getUser());
> dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
>   }
> {code}
> In thread B, the FileSystemRMStateStore is calling 
> storeApplicationStateInternal method. It's also synchronized.
> This storeApplicationStateInternal method saves an ApplicationStateData into 
> HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
> {code:title=FileSystemRMStateStore.java|borderStyle=solid}
> public synchronized void storeApplicationStateInternal(ApplicationId appId,
>   ApplicationStateData appStateDataPB) throws Exception {
> Path appDirPath = getAppDir(rmAppRoot, appId);
> mkdirsWithRetries(appDirPath);
> Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
> LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
> byte[] appStateData = appStateDataPB.getProto().toByteArray();
> try {
>   // currently throw all exceptions. May need to respond differently for 
> HA
>   // based on whether we have lost the right to write to FS
>   writeFileWithRetries(nodeCreatePath, appStateData, true);
> } catch (Exception e) {
>   LOG.info("Error storing info for app: " + appId, e);
>   throw e;
> }
>   }
> {code}
> Think thread B firstly come into 
> FileSystemRMStateStore.storeApplicationStateInternal mehtod, then thread A 
> must be blocked for a while because of synchronization.
> In ResourceManager there is only one RMStateStore instance. In my cluster 
> it's FileSystemRMStateStore type.
> Debug the RMAppNewlySavingTransition.transition method, the thread stack 
> shows it called form AsyncDispatcher.dispatch method. This method code is as 
> below. 
> {code:title=AsyncDispatcher.java|borderStyle=solid}
>   protected void dispatch(Event event) {
> //all events go thru this loop
> if (LOG.isDebugEnabled()) {
>   LOG.debug("Dispatching the event " + event.getClass().getName() + "."
>   + event.toString());
> }
> Class type = event.getType().getDeclaringClass();
> try{
>   EventHandler handler = eventDispatchers.get(type);
>   if(handler != null) {
> handler.handle(event);
>   } else {
> throw new Exception("No handler for registered for " + type);
>   }
> } catch (Throwable t) {
>   //TODO Maybe log the state of the queue
>   LOG.fatal("Error in dispatcher thread", t);
>   // If serviceStop is called, we should exit this thread gracefully.
>   if (exitOnDispatchException
>   && 

[jira] [Updated] (YARN-4398) Yarn recover with FileSystemRMStateStore cause the cluster running slowly and usage percentage is much below 100

2015-11-29 Thread NING DING (JIRA)

 [ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NING DING updated YARN-4398:

Attachment: YARN-4398.2.patch

> Yarn recover with FileSystemRMStateStore cause the cluster running slowly and 
> usage percentage is much below 100
> 
>
> Key: YARN-4398
> URL: https://issues.apache.org/jira/browse/YARN-4398
> Project: Hadoop YARN
>  Issue Type: Bug
>  Components: resourcemanager
>Affects Versions: 2.7.1
>Reporter: NING DING
> Attachments: YARN-4398.1.patch, YARN-4398.2.patch
>
>
> Yarn recover with FileSystemRMStateStore cause the cluster running slowly and 
> usage percentage is much below 100.
> In my hadoop cluster, the resourceManager recover functionality is enabled 
> with FileSystemRMStateStore.
> I found this cause the yarn cluster running slowly and cluster usage 
> percentage is just 50 even there are many pending Apps. 
> The scenario is below.
> In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
> storeNewApplication method defined in RMStateStore. This storeNewApplication 
> method is synchronized.
> {code:title=RMAppImpl.java|borderStyle=solid}
>   private static final class RMAppNewlySavingTransition extends 
> RMAppTransition {
> @Override
> public void transition(RMAppImpl app, RMAppEvent event) {
>   // If recovery is enabled then store the application information in a
>   // non-blocking call so make sure that RM has stored the information
>   // needed to restart the AM after RM restart without further client
>   // communication
>   LOG.info("Storing application with id " + app.applicationId);
>   app.rmContext.getStateStore().storeNewApplication(app);
> }
>   }
> {code}
> {code:title=RMStateStore.java|borderStyle=solid}
> public synchronized void storeNewApplication(RMApp app) {
> ApplicationSubmissionContext context = app
> 
> .getApplicationSubmissionContext();
> assert context instanceof ApplicationSubmissionContextPBImpl;
> ApplicationStateData appState =
> ApplicationStateData.newInstance(
> app.getSubmitTime(), app.getStartTime(), context, app.getUser());
> dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
>   }
> {code}
> In thread B, the FileSystemRMStateStore is calling 
> storeApplicationStateInternal method. It's also synchronized.
> This storeApplicationStateInternal method saves an ApplicationStateData into 
> HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
> {code:title=FileSystemRMStateStore.java|borderStyle=solid}
> public synchronized void storeApplicationStateInternal(ApplicationId appId,
>   ApplicationStateData appStateDataPB) throws Exception {
> Path appDirPath = getAppDir(rmAppRoot, appId);
> mkdirsWithRetries(appDirPath);
> Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
> LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
> byte[] appStateData = appStateDataPB.getProto().toByteArray();
> try {
>   // currently throw all exceptions. May need to respond differently for 
> HA
>   // based on whether we have lost the right to write to FS
>   writeFileWithRetries(nodeCreatePath, appStateData, true);
> } catch (Exception e) {
>   LOG.info("Error storing info for app: " + appId, e);
>   throw e;
> }
>   }
> {code}
> Think thread B firstly come into 
> FileSystemRMStateStore.storeApplicationStateInternal mehtod, then thread A 
> must be blocked for a while because of synchronization.
> In ResourceManager there is only one RMStateStore instance. In my cluster 
> it's FileSystemRMStateStore type.
> Debug the RMAppNewlySavingTransition.transition method, the thread stack 
> shows it called form AsyncDispatcher.dispatch method. This method code is as 
> below. 
> {code:title=AsyncDispatcher.java|borderStyle=solid}
>   protected void dispatch(Event event) {
> //all events go thru this loop
> if (LOG.isDebugEnabled()) {
>   LOG.debug("Dispatching the event " + event.getClass().getName() + "."
>   + event.toString());
> }
> Class type = event.getType().getDeclaringClass();
> try{
>   EventHandler handler = eventDispatchers.get(type);
>   if(handler != null) {
> handler.handle(event);
>   } else {
> throw new Exception("No handler for registered for " + type);
>   }
> } catch (Throwable t) {
>   //TODO Maybe log the state of the queue
>   LOG.fatal("Error in dispatcher thread", t);
>   // If serviceStop is called, we should exit this thread gracefully.
>   if (exitOnDispatchException
>   && 

[jira] [Updated] (YARN-4398) Yarn recover functionality causes the cluster running slowly and the cluster usage rate is far below 100

2015-11-29 Thread NING DING (JIRA)

 [ 
https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NING DING updated YARN-4398:

Description: 
In my hadoop cluster, the resourceManager recover functionality is enabled with 
FileSystemRMStateStore.
I found this cause the yarn cluster running slowly and cluster usage rate is 
just 50 even there are many pending Apps. 

The scenario is below.
In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling 
storeNewApplication method defined in RMStateStore. This storeNewApplication 
method is synchronized.
{code:title=RMAppImpl.java|borderStyle=solid}
  private static final class RMAppNewlySavingTransition extends RMAppTransition 
{
@Override
public void transition(RMAppImpl app, RMAppEvent event) {

  // If recovery is enabled then store the application information in a
  // non-blocking call so make sure that RM has stored the information
  // needed to restart the AM after RM restart without further client
  // communication
  LOG.info("Storing application with id " + app.applicationId);
  app.rmContext.getStateStore().storeNewApplication(app);
}
  }
{code}
{code:title=RMStateStore.java|borderStyle=solid}
public synchronized void storeNewApplication(RMApp app) {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationStateData appState =
ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(), context, app.getUser());
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }
{code}
In thread B, the FileSystemRMStateStore is calling 
storeApplicationStateInternal method. It's also synchronized.
This storeApplicationStateInternal method saves an ApplicationStateData into 
HDFS and it normally costs 90~300 milliseconds in my hadoop cluster.
{code:title=FileSystemRMStateStore.java|borderStyle=solid}
public synchronized void storeApplicationStateInternal(ApplicationId appId,
  ApplicationStateData appStateDataPB) throws Exception {
Path appDirPath = getAppDir(rmAppRoot, appId);
mkdirsWithRetries(appDirPath);
Path nodeCreatePath = getNodePath(appDirPath, appId.toString());

LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
try {
  // currently throw all exceptions. May need to respond differently for HA
  // based on whether we have lost the right to write to FS
  writeFileWithRetries(nodeCreatePath, appStateData, true);
} catch (Exception e) {
  LOG.info("Error storing info for app: " + appId, e);
  throw e;
}
  }
{code}
Think thread B firstly comes into 
FileSystemRMStateStore.storeApplicationStateInternal method, then thread A will 
be blocked for a while because of synchronization.

In ResourceManager there is only one RMStateStore instance. In my cluster it's 
FileSystemRMStateStore type.
Debug the RMAppNewlySavingTransition.transition method, the thread stack shows 
it's called form AsyncDispatcher.dispatch method. This method code is as below. 
{code:title=AsyncDispatcher.java|borderStyle=solid}
  protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
  LOG.debug("Dispatching the event " + event.getClass().getName() + "."
  + event.toString());
}

Class type = event.getType().getDeclaringClass();

try{
  EventHandler handler = eventDispatchers.get(type);
  if(handler != null) {
handler.handle(event);
  } else {
throw new Exception("No handler for registered for " + type);
  }
} catch (Throwable t) {
  //TODO Maybe log the state of the queue
  LOG.fatal("Error in dispatcher thread", t);
  // If serviceStop is called, we should exit this thread gracefully.
  if (exitOnDispatchException
  && (ShutdownHookManager.get().isShutdownInProgress()) == false
  && stopped == false) {
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
  }
}
  }
{code}
Above code shows AsyncDispatcher.dispatch method can process different type 
events.
In fact this AsyncDispatcher instance is just ResourceManager.rmDispatcher 
created in ResourceManager.serviceInit method.
You can find many eventTypes and handlers are registered in 
ResourceManager.rmDispatcher.
In above scenario thread B blocks thread A, then many following events 
processing are blocked.

In my testing cluster, there is only one queue and the client submits 1000 
applications concurrently, the yarn cluster usage rate is 50. Many apps are 
pending. 
If I disable resourceManager recover functionality, the cluster