YARN-4091. Add REST API to retrieve scheduler activity. (Chen Ge and Sunil G 
via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e0d131f0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e0d131f0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e0d131f0

Branch: refs/heads/trunk
Commit: e0d131f055ee126052ad4d0f7b0d192e6c730188
Parents: d9a354c
Author: Wangda Tan <wan...@apache.org>
Authored: Fri Aug 5 10:27:34 2016 -0700
Committer: Wangda Tan <wan...@apache.org>
Committed: Fri Aug 5 10:27:34 2016 -0700

----------------------------------------------------------------------
 .../dev-support/findbugs-exclude.xml            |  12 +
 .../scheduler/AbstractYarnScheduler.java        |   8 +
 .../scheduler/activities/ActivitiesLogger.java  | 275 +++++++
 .../scheduler/activities/ActivitiesManager.java | 319 ++++++++
 .../activities/ActivityDiagnosticConstant.java  |  77 ++
 .../scheduler/activities/ActivityNode.java      | 110 +++
 .../scheduler/activities/ActivityState.java     |  37 +
 .../activities/AllocationActivity.java          |  74 ++
 .../scheduler/activities/AllocationState.java   |  35 +
 .../scheduler/activities/AppAllocation.java     | 107 +++
 .../scheduler/activities/NodeAllocation.java    | 139 ++++
 .../scheduler/capacity/AbstractCSQueue.java     |   6 +-
 .../scheduler/capacity/CapacityScheduler.java   |  49 +-
 .../capacity/CapacitySchedulerContext.java      |   3 +
 .../scheduler/capacity/LeafQueue.java           |  66 +-
 .../scheduler/capacity/ParentQueue.java         |  91 ++-
 .../allocator/AbstractContainerAllocator.java   |  51 +-
 .../capacity/allocator/ContainerAllocator.java  |  10 +-
 .../allocator/RegularContainerAllocator.java    |  97 ++-
 .../scheduler/common/fica/FiCaSchedulerApp.java |  20 +-
 .../resourcemanager/webapp/RMWebServices.java   | 123 +++
 .../webapp/dao/ActivitiesInfo.java              |  80 ++
 .../webapp/dao/ActivityNodeInfo.java            |  67 ++
 .../webapp/dao/AppActivitiesInfo.java           |  79 ++
 .../webapp/dao/AppAllocationInfo.java           |  72 ++
 .../webapp/dao/NodeAllocationInfo.java          |  51 ++
 .../webapp/TestRMWebServicesCapacitySched.java  |   6 +-
 .../TestRMWebServicesSchedulerActivities.java   | 777 +++++++++++++++++++
 28 files changed, 2768 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml 
b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 7c19c5e..a5c0f71 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -519,6 +519,7 @@
     <Or>
       <Field name="rmContext" />
       <Field name="applications" />
+      <Field name="activitiesManager" />
     </Or>
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
@@ -552,4 +553,15 @@
     <Package name="org.apache.hadoop.yarn.api.records.impl.pb" />
     <Bug pattern="NP_BOOLEAN_RETURN_NULL" />
   </Match>
+  <Match>
+    <Class 
name="org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivityNodeInfo"/>
+    <Or>
+      <Field name="allocationState" />
+      <Field name="diagnostic" />
+      <Field name="name" />
+      <Field name="requestPriority" />
+      <Field name="appPriority" />
+    </Or>
+    <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" />
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 64eb777..755defd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -71,6 +71,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
     .LeafQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
@@ -97,6 +98,8 @@ public abstract class AbstractYarnScheduler
   
   private volatile Priority maxClusterLevelAppPriority;
 
+  protected ActivitiesManager activitiesManager;
+
   /*
    * All schedulers which are inheriting AbstractYarnScheduler should use
    * concurrent version of 'applications' map.
@@ -789,4 +792,9 @@ public abstract class AbstractYarnScheduler
     }
     return schedulerChangeRequests;
   }
+
+  public ActivitiesManager getActivitiesManager() {
+    return this.activitiesManager;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
new file mode 100644
index 0000000..8fa1bb5
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+/**
+ * Utility for logging scheduler activities
+ */
+public class ActivitiesLogger {
+  private static final Log LOG = LogFactory.getLog(ActivitiesLogger.class);
+
+  /**
+   * Methods for recording activities from an app
+   */
+  public static class APP {
+
+    /*
+     * Record skipped application activity when no container allocated /
+     * reserved / re-reserved. Scheduler will look at following applications
+     * within the same leaf queue.
+     */
+    public static void recordSkippedAppActivityWithoutAllocation(
+        ActivitiesManager activitiesManager, SchedulerNode node,
+        SchedulerApplicationAttempt application, Priority priority,
+        String diagnostic) {
+      recordAppActivityWithoutAllocation(activitiesManager, node, application,
+          priority, diagnostic, ActivityState.SKIPPED);
+    }
+
+    /*
+     * Record application activity when rejected because of queue maximum
+     * capacity or user limit.
+     */
+    public static void recordRejectedAppActivityFromLeafQueue(
+        ActivitiesManager activitiesManager, SchedulerNode node,
+        SchedulerApplicationAttempt application, Priority priority,
+        String diagnostic) {
+      String type = "app";
+      recordActivity(activitiesManager, node, application.getQueueName(),
+          application.getApplicationId().toString(), priority,
+          ActivityState.REJECTED, diagnostic, type);
+      finishSkippedAppAllocationRecording(activitiesManager,
+          application.getApplicationId(), ActivityState.REJECTED, diagnostic);
+    }
+
+    /*
+     * Record application activity when no container allocated /
+     * reserved / re-reserved. Scheduler will look at following applications
+     * within the same leaf queue.
+     */
+    public static void recordAppActivityWithoutAllocation(
+        ActivitiesManager activitiesManager, SchedulerNode node,
+        SchedulerApplicationAttempt application, Priority priority,
+        String diagnostic, ActivityState appState) {
+      if (activitiesManager == null) {
+        return;
+      }
+      if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+        String type = "container";
+        // Add application-container activity into specific node allocation.
+        activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+            application.getApplicationId().toString(), null,
+            priority.toString(), ActivityState.SKIPPED, diagnostic, type);
+        type = "app";
+        // Add queue-application activity into specific node allocation.
+        activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+            application.getQueueName(),
+            application.getApplicationId().toString(),
+            application.getPriority().toString(), ActivityState.SKIPPED,
+            ActivityDiagnosticConstant.EMPTY, type);
+      }
+      // Add application-container activity into specific application 
allocation
+      // Under this condition, it fails to allocate a container to this
+      // application, so containerId is null.
+      if (activitiesManager.shouldRecordThisApp(
+          application.getApplicationId())) {
+        String type = "container";
+        activitiesManager.addSchedulingActivityForApp(
+            application.getApplicationId(), null, priority.toString(), 
appState,
+            diagnostic, type);
+      }
+    }
+
+    /*
+     * Record application activity when container allocated / reserved /
+     * re-reserved
+     */
+    public static void recordAppActivityWithAllocation(
+        ActivitiesManager activitiesManager, SchedulerNode node,
+        SchedulerApplicationAttempt application, Container updatedContainer,
+        ActivityState activityState) {
+      if (activitiesManager == null) {
+        return;
+      }
+      if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+        String type = "container";
+        // Add application-container activity into specific node allocation.
+        activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+            application.getApplicationId().toString(),
+            updatedContainer.getId().toString(),
+            updatedContainer.getPriority().toString(), activityState,
+            ActivityDiagnosticConstant.EMPTY, type);
+        type = "app";
+        // Add queue-application activity into specific node allocation.
+        activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+            application.getQueueName(),
+            application.getApplicationId().toString(),
+            application.getPriority().toString(), ActivityState.ACCEPTED,
+            ActivityDiagnosticConstant.EMPTY, type);
+      }
+      // Add application-container activity into specific application 
allocation
+      if (activitiesManager.shouldRecordThisApp(
+          application.getApplicationId())) {
+        String type = "container";
+        activitiesManager.addSchedulingActivityForApp(
+            application.getApplicationId(), 
updatedContainer.getId().toString(),
+            updatedContainer.getPriority().toString(), activityState,
+            ActivityDiagnosticConstant.EMPTY, type);
+      }
+    }
+
+    /*
+     * Invoked when scheduler starts to look at this application within one 
node
+     * update.
+     */
+    public static void startAppAllocationRecording(
+        ActivitiesManager activitiesManager, NodeId nodeId, long currentTime,
+        SchedulerApplicationAttempt application) {
+      if (activitiesManager == null) {
+        return;
+      }
+      activitiesManager.startAppAllocationRecording(nodeId, currentTime,
+          application);
+    }
+
+    /*
+     * Invoked when scheduler finishes looking at this application within one
+     * node update, and the app has any container allocated/reserved during
+     * this allocation.
+     */
+    public static void finishAllocatedAppAllocationRecording(
+        ActivitiesManager activitiesManager, ApplicationId applicationId,
+        ContainerId containerId, ActivityState containerState,
+        String diagnostic) {
+      if (activitiesManager == null) {
+        return;
+      }
+
+      if (activitiesManager.shouldRecordThisApp(applicationId)) {
+        activitiesManager.finishAppAllocationRecording(applicationId,
+            containerId, containerState, diagnostic);
+      }
+    }
+
+    /*
+     * Invoked when scheduler finishes looking at this application within one
+     * node update, and the app DOESN'T have any container allocated/reserved
+     * during this allocation.
+     */
+    public static void finishSkippedAppAllocationRecording(
+        ActivitiesManager activitiesManager, ApplicationId applicationId,
+        ActivityState containerState, String diagnostic) {
+      finishAllocatedAppAllocationRecording(activitiesManager, applicationId,
+          null, containerState, diagnostic);
+    }
+  }
+
+  /**
+   * Methods for recording activities from a queue
+   */
+  public static class QUEUE {
+    /*
+     * Record activities of a queue
+     */
+    public static void recordQueueActivity(ActivitiesManager activitiesManager,
+        SchedulerNode node, String parentQueueName, String queueName,
+        ActivityState state, String diagnostic) {
+      recordActivity(activitiesManager, node, parentQueueName, queueName, null,
+          state, diagnostic, null);
+    }
+  }
+
+  /**
+   * Methods for recording overall activities from one node update
+   */
+  public static class NODE {
+
+    /*
+     * Invoked when node allocation finishes, and there's NO container
+     * allocated or reserved during the allocation
+     */
+    public static void finishSkippedNodeAllocation(
+        ActivitiesManager activitiesManager, SchedulerNode node) {
+      finishAllocatedNodeAllocation(activitiesManager, node, null,
+          AllocationState.SKIPPED);
+    }
+
+    /*
+     * Invoked when node allocation finishes, and there's any container
+     * allocated or reserved during the allocation
+     */
+    public static void finishAllocatedNodeAllocation(
+        ActivitiesManager activitiesManager, SchedulerNode node,
+        ContainerId containerId, AllocationState containerState) {
+      if (activitiesManager == null) {
+        return;
+      }
+      if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+        activitiesManager.updateAllocationFinalState(node.getNodeID(),
+            containerId, containerState);
+      }
+    }
+
+    /*
+     * Invoked when node heartbeat finishes
+     */
+    public static void finishNodeUpdateRecording(
+        ActivitiesManager activitiesManager, NodeId nodeID) {
+      if (activitiesManager == null) {
+        return;
+      }
+      activitiesManager.finishNodeUpdateRecording(nodeID);
+    }
+
+    /*
+     * Invoked when node heartbeat starts
+     */
+    public static void startNodeUpdateRecording(
+        ActivitiesManager activitiesManager, NodeId nodeID) {
+      if (activitiesManager == null) {
+        return;
+      }
+      activitiesManager.startNodeUpdateRecording(nodeID);
+    }
+  }
+
+  // Add queue, application or container activity into specific node 
allocation.
+  private static void recordActivity(ActivitiesManager activitiesManager,
+      SchedulerNode node, String parentName, String childName,
+      Priority priority, ActivityState state, String diagnostic, String type) {
+    if (activitiesManager == null) {
+      return;
+    }
+    if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+      activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+          parentName, childName, priority != null ? priority.toString() : null,
+          state, diagnostic, type);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
new file mode 100644
index 0000000..4fa5feb
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.List;
+import java.util.Set;
+import java.util.*;
+import java.util.ArrayList;
+
+/**
+ * A class to store node or application allocations.
+ * It mainly contains operations for allocation start, add, update and finish.
+ */
+public class ActivitiesManager extends AbstractService {
+  private static final Log LOG = LogFactory.getLog(ActivitiesManager.class);
+  private ConcurrentMap<NodeId, List<NodeAllocation>> recordingNodesAllocation;
+  private ConcurrentMap<NodeId, List<NodeAllocation>> completedNodeAllocations;
+  private Set<NodeId> activeRecordedNodes;
+  private ConcurrentMap<ApplicationId, Long>
+      recordingAppActivitiesUntilSpecifiedTime;
+  private ConcurrentMap<ApplicationId, AppAllocation> appsAllocation;
+  private ConcurrentMap<ApplicationId, List<AppAllocation>>
+      completedAppAllocations;
+  private boolean recordNextAvailableNode = false;
+  private List<NodeAllocation> lastAvailableNodeActivities = null;
+  private Thread cleanUpThread;
+  private int timeThreshold = 600 * 1000;
+  private final RMContext rmContext;
+
+  public ActivitiesManager(RMContext rmContext) {
+    super(ActivitiesManager.class.getName());
+    recordingNodesAllocation = new ConcurrentHashMap<>();
+    completedNodeAllocations = new ConcurrentHashMap<>();
+    appsAllocation = new ConcurrentHashMap<>();
+    completedAppAllocations = new ConcurrentHashMap<>();
+    activeRecordedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    recordingAppActivitiesUntilSpecifiedTime = new ConcurrentHashMap<>();
+    this.rmContext = rmContext;
+  }
+
+  public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId) {
+    if (rmContext.getRMApps().get(applicationId).getFinalApplicationStatus()
+        == FinalApplicationStatus.UNDEFINED) {
+      List<AppAllocation> allocations = completedAppAllocations.get(
+          applicationId);
+
+      return new AppActivitiesInfo(allocations, applicationId);
+    } else {
+      return new AppActivitiesInfo(
+          "fail to get application activities after finished",
+          applicationId.toString());
+    }
+  }
+
+  public ActivitiesInfo getActivitiesInfo(String nodeId) {
+    List<NodeAllocation> allocations;
+    if (nodeId == null) {
+      allocations = lastAvailableNodeActivities;
+    } else {
+      allocations = completedNodeAllocations.get(NodeId.fromString(nodeId));
+    }
+    return new ActivitiesInfo(allocations, nodeId);
+  }
+
+  public void recordNextNodeUpdateActivities(String nodeId) {
+    if (nodeId == null) {
+      recordNextAvailableNode = true;
+    } else {
+      activeRecordedNodes.add(NodeId.fromString(nodeId));
+    }
+  }
+
+  public void turnOnAppActivitiesRecording(ApplicationId applicationId,
+      double maxTime) {
+    long startTS = SystemClock.getInstance().getTime();
+    long endTS = startTS + (long) (maxTime * 1000);
+    recordingAppActivitiesUntilSpecifiedTime.put(applicationId, endTS);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    cleanUpThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (true) {
+          Iterator<Map.Entry<NodeId, List<NodeAllocation>>> ite =
+              completedNodeAllocations.entrySet().iterator();
+          while (ite.hasNext()) {
+            Map.Entry<NodeId, List<NodeAllocation>> nodeAllocation = 
ite.next();
+            List<NodeAllocation> allocations = nodeAllocation.getValue();
+            long currTS = SystemClock.getInstance().getTime();
+            if (allocations.size() > 0 && allocations.get(0).getTimeStamp()
+                - currTS > timeThreshold) {
+              ite.remove();
+            }
+          }
+
+          Iterator<Map.Entry<ApplicationId, List<AppAllocation>>> iteApp =
+              completedAppAllocations.entrySet().iterator();
+          while (iteApp.hasNext()) {
+            Map.Entry<ApplicationId, List<AppAllocation>> appAllocation =
+                iteApp.next();
+            if (rmContext.getRMApps().get(appAllocation.getKey())
+                .getFinalApplicationStatus()
+                != FinalApplicationStatus.UNDEFINED) {
+              iteApp.remove();
+            }
+          }
+
+          try {
+            Thread.sleep(5000);
+          } catch (Exception e) {
+            // ignore
+          }
+        }
+      }
+    });
+
+    cleanUpThread.start();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    cleanUpThread.interrupt();
+    super.serviceStop();
+  }
+
+  void startNodeUpdateRecording(NodeId nodeID) {
+    if (recordNextAvailableNode) {
+      recordNextNodeUpdateActivities(nodeID.toString());
+    }
+    if (activeRecordedNodes.contains(nodeID)) {
+      List<NodeAllocation> nodeAllocation = new ArrayList<>();
+      recordingNodesAllocation.put(nodeID, nodeAllocation);
+    }
+  }
+
+  void startAppAllocationRecording(NodeId nodeID, long currTS,
+      SchedulerApplicationAttempt application) {
+    ApplicationId applicationId = application.getApplicationId();
+
+    if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
+        && recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
+        > currTS) {
+      appsAllocation.put(applicationId,
+          new AppAllocation(application.getPriority(), nodeID,
+              application.getQueueName()));
+    }
+
+    if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
+        && recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
+        <= currTS) {
+      turnOffActivityMonitoringForApp(applicationId);
+    }
+  }
+
+  // Add queue, application or container activity into specific node 
allocation.
+  void addSchedulingActivityForNode(NodeId nodeID, String parentName,
+      String childName, String priority, ActivityState state, String 
diagnostic,
+      String type) {
+    if (shouldRecordThisNode(nodeID)) {
+      NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID);
+      nodeAllocation.addAllocationActivity(parentName, childName, priority,
+          state, diagnostic, type);
+    }
+  }
+
+  // Add queue, application or container activity into specific application
+  // allocation.
+  void addSchedulingActivityForApp(ApplicationId applicationId,
+      String containerId, String priority, ActivityState state,
+      String diagnostic, String type) {
+    if (shouldRecordThisApp(applicationId)) {
+      AppAllocation appAllocation = appsAllocation.get(applicationId);
+      appAllocation.addAppAllocationActivity(containerId, priority, state,
+          diagnostic, type);
+    }
+  }
+
+  // Update container allocation meta status for this node allocation.
+  // It updates general container status but not the detailed activity state
+  // in updateActivityState.
+  void updateAllocationFinalState(NodeId nodeID, ContainerId containerId,
+      AllocationState containerState) {
+    if (shouldRecordThisNode(nodeID)) {
+      NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID);
+      nodeAllocation.updateContainerState(containerId, containerState);
+    }
+  }
+
+  void finishAppAllocationRecording(ApplicationId applicationId,
+      ContainerId containerId, ActivityState appState, String diagnostic) {
+    if (shouldRecordThisApp(applicationId)) {
+      long currTS = SystemClock.getInstance().getTime();
+      AppAllocation appAllocation = appsAllocation.remove(applicationId);
+      appAllocation.updateAppContainerStateAndTime(containerId, appState,
+          currTS, diagnostic);
+
+      List<AppAllocation> appAllocations;
+      if (completedAppAllocations.containsKey(applicationId)) {
+        appAllocations = completedAppAllocations.get(applicationId);
+      } else {
+        appAllocations = new ArrayList<>();
+        completedAppAllocations.put(applicationId, appAllocations);
+      }
+      if (appAllocations.size() == 1000) {
+        appAllocations.remove(0);
+      }
+      appAllocations.add(appAllocation);
+
+      if (recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
+          <= currTS) {
+        turnOffActivityMonitoringForApp(applicationId);
+      }
+    }
+  }
+
+  void finishNodeUpdateRecording(NodeId nodeID) {
+    List<NodeAllocation> value = recordingNodesAllocation.get(nodeID);
+    long timeStamp = SystemClock.getInstance().getTime();
+
+    if (value != null) {
+      if (value.size() > 0) {
+        lastAvailableNodeActivities = value;
+        for (NodeAllocation allocation : lastAvailableNodeActivities) {
+          allocation.transformToTree();
+          allocation.setTimeStamp(timeStamp);
+        }
+        if (recordNextAvailableNode) {
+          recordNextAvailableNode = false;
+        }
+      }
+
+      if (shouldRecordThisNode(nodeID)) {
+        recordingNodesAllocation.remove(nodeID);
+        completedNodeAllocations.put(nodeID, value);
+        stopRecordNodeUpdateActivities(nodeID);
+      }
+    }
+  }
+
+  boolean shouldRecordThisApp(ApplicationId applicationId) {
+    return recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
+        && appsAllocation.containsKey(applicationId);
+  }
+
+  boolean shouldRecordThisNode(NodeId nodeID) {
+    return activeRecordedNodes.contains(nodeID) && recordingNodesAllocation
+        .containsKey(nodeID);
+  }
+
+  private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) {
+    List<NodeAllocation> nodeAllocations = 
recordingNodesAllocation.get(nodeID);
+    NodeAllocation nodeAllocation;
+    // When this node has already stored allocation activities, get the
+    // last allocation for this node.
+    if (nodeAllocations.size() != 0) {
+      nodeAllocation = nodeAllocations.get(nodeAllocations.size() - 1);
+      // When final state in last allocation is not DEFAULT, it means
+      // last allocation has finished. Create a new allocation for this node,
+      // and add it to the allocation list. Return this new allocation.
+      //
+      // When final state in last allocation is DEFAULT,
+      // it means last allocation has not finished. Just get last allocation.
+      if (nodeAllocation.getFinalAllocationState() != AllocationState.DEFAULT) 
{
+        nodeAllocation = new NodeAllocation(nodeID);
+        nodeAllocations.add(nodeAllocation);
+      }
+    }
+    // When this node has not stored allocation activities,
+    // create a new allocation for this node, and add it to the allocation 
list.
+    // Return this new allocation.
+    else {
+      nodeAllocation = new NodeAllocation(nodeID);
+      nodeAllocations.add(nodeAllocation);
+    }
+    return nodeAllocation;
+  }
+
+  private void stopRecordNodeUpdateActivities(NodeId nodeId) {
+    activeRecordedNodes.remove(nodeId);
+  }
+
+  private void turnOffActivityMonitoringForApp(ApplicationId applicationId) {
+    recordingAppActivitiesUntilSpecifiedTime.remove(applicationId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java
new file mode 100644
index 0000000..fc4738e
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+/*
+ * Collection of diagnostics.
+ */
+public class ActivityDiagnosticConstant {
+  // EMPTY means it does not have any diagnostic to display.
+  // In order not to show "diagnostic" line in frontend,
+  // we set the value to null.
+  public final static String EMPTY = null;
+  public final static String NOT_ABLE_TO_ACCESS_PARTITION =
+      "Not able to access partition";
+  public final static String QUEUE_DO_NOT_NEED_MORE_RESOURCE =
+      "Queue does not need more resource";
+  public final static String QUEUE_MAX_CAPACITY_LIMIT =
+      "Hit queue max-capacity limit";
+  public final static String USER_CAPACITY_MAXIMUM_LIMIT =
+      "Hit user capacity maximum limit";
+  public final static String SKIP_BLACK_LISTED_NODE = "Skip black listed node";
+  public final static String PRIORITY_SKIPPED = "Priority skipped";
+  public final static String PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST =
+      "Priority skipped because off-switch request is null";
+  public final static String
+      PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST =
+      "Priority skipped because partition of node doesn't match request";
+  public final static String SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY =
+      "Priority skipped because of relax locality is not allowed";
+  public final static String SKIP_IN_IGNORE_EXCLUSIVITY_MODE =
+      "Skipping assigning to Node in Ignore Exclusivity mode";
+  public final static String DO_NOT_NEED_ALLOCATIONATTEMPTINFOS =
+      "Doesn't need containers based on reservation algo!";
+  public final static String QUEUE_SKIPPED_HEADROOM =
+      "Queue skipped because of headroom";
+  public final static String NON_PARTITIONED_PARTITION_FIRST =
+      "Non-partitioned resource request should be scheduled to "
+          + "non-partitioned partition first";
+  public final static String SKIP_NODE_LOCAL_REQUEST =
+      "Skip node-local request";
+  public final static String SKIP_RACK_LOCAL_REQUEST =
+      "Skip rack-local request";
+  public final static String SKIP_OFF_SWITCH_REQUEST =
+      "Skip offswitch request";
+  public final static String REQUEST_CAN_NOT_ACCESS_NODE_LABEL =
+      "Resource request can not access the label";
+  public final static String NOT_SUFFICIENT_RESOURCE =
+      "Node does not have sufficient resource for request";
+  public final static String LOCALITY_SKIPPED = "Locality skipped";
+  public final static String FAIL_TO_ALLOCATE = "Fail to allocate";
+  public final static String COULD_NOT_GET_CONTAINER =
+      "Couldn't get container for allocation";
+  public final static String APPLICATION_DO_NOT_NEED_RESOURCE =
+      "Application does not need more resource";
+  public final static String APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE =
+      "Application priority does not need more resource";
+  public final static String SKIPPED_ALL_PRIORITIES =
+      "All priorities are skipped of the app";
+  public final static String RESPECT_FIFO = "To respect FIFO of applications, "
+      + "skipped following applications in the queue";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java
new file mode 100644
index 0000000..a03814c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/*
+ * It represents tree node in "NodeAllocation" tree structure.
+ * Each node may represent queue, application or container in allocation 
activity.
+ * Node may have children node if successfully allocated to next level.
+ */
+public class ActivityNode {
+  private String activityNodeName;
+  private String parentName;
+  private String appPriority;
+  private String requestPriority;
+  private ActivityState state;
+  private String diagnostic;
+
+  private List<ActivityNode> childNode;
+
+  public ActivityNode(String activityNodeName, String parentName,
+      String priority, ActivityState state, String diagnostic, String type) {
+    this.activityNodeName = activityNodeName;
+    this.parentName = parentName;
+    if (type != null) {
+      if (type.equals("app")) {
+        this.appPriority = priority;
+      } else if (type.equals("container")) {
+        this.requestPriority = priority;
+      }
+    }
+    this.state = state;
+    this.diagnostic = diagnostic;
+    this.childNode = new LinkedList<>();
+  }
+
+  public String getName() {
+    return this.activityNodeName;
+  }
+
+  public String getParentName() {
+    return this.parentName;
+  }
+
+  public void addChild(ActivityNode node) {
+    childNode.add(0, node);
+  }
+
+  public List<ActivityNode> getChildren() {
+    return this.childNode;
+  }
+
+  public ActivityState getState() {
+    return this.state;
+  }
+
+  public String getDiagnostic() {
+    return this.diagnostic;
+  }
+
+  public String getAppPriority() {
+    return appPriority;
+  }
+
+  public String getRequestPriority() {
+    return requestPriority;
+  }
+
+  public boolean getType() {
+    if (appPriority != null) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(this.activityNodeName + " ");
+    sb.append(this.appPriority + " ");
+    sb.append(this.state + " ");
+    if (!this.diagnostic.equals("")) {
+      sb.append(this.diagnostic + "\n");
+    }
+    sb.append("\n");
+    for (ActivityNode child : childNode) {
+      sb.append(child.toString() + "\n");
+    }
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java
new file mode 100644
index 0000000..bce1fc9
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+/*
+ * Collection of activity operation states.
+ */
+public enum ActivityState {
+  // default state when adding a new activity in node allocation
+  DEFAULT,
+  // container is allocated to sub-queues/applications or this 
queue/application
+  ACCEPTED,
+  // queue or application voluntarily give up to use the resource OR
+  // nothing allocated
+  SKIPPED,
+  // container could not be allocated to sub-queues or this application
+  REJECTED,
+  ALLOCATED, // successfully allocate a new non-reserved container
+  RESERVED,  // successfully reserve a new container
+  RE_RESERVED  // successfully reserve a new container
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java
new file mode 100644
index 0000000..da768e2
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/*
+ * It records an activity operation in allocation,
+ * which can be classified as queue, application or container activity.
+ * Other information include state, diagnostic, priority.
+ */
+public class AllocationActivity {
+  private String childName = null;
+  private String parentName = null;
+  private String appPriority = null;
+  private String requestPriority = null;
+  private ActivityState state;
+  private String diagnostic = null;
+
+  private static final Log LOG = LogFactory.getLog(AllocationActivity.class);
+
+  public AllocationActivity(String parentName, String queueName,
+      String priority, ActivityState state, String diagnostic, String type) {
+    this.childName = queueName;
+    this.parentName = parentName;
+    if (type != null) {
+      if (type.equals("app")) {
+        this.appPriority = priority;
+      } else if (type.equals("container")) {
+        this.requestPriority = priority;
+      }
+    }
+    this.state = state;
+    this.diagnostic = diagnostic;
+  }
+
+  public ActivityNode createTreeNode() {
+    if (appPriority != null) {
+      return new ActivityNode(this.childName, this.parentName, 
this.appPriority,
+          this.state, this.diagnostic, "app");
+    } else if (requestPriority != null) {
+      return new ActivityNode(this.childName, this.parentName,
+          this.requestPriority, this.state, this.diagnostic, "container");
+    } else {
+      return new ActivityNode(this.childName, this.parentName, null, 
this.state,
+          this.diagnostic, null);
+    }
+  }
+
+  public String getName() {
+    return this.childName;
+  }
+
+  public String getState() {
+    return this.state.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java
new file mode 100644
index 0000000..e38cefc
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+/*
+ * Collection of allocation final states.
+ */
+public enum AllocationState {
+  DEFAULT,
+  // queue or application voluntarily give up to use the resource
+  // OR nothing allocated
+  SKIPPED,
+  // successfully allocate a new non-reserved container
+  ALLOCATED,
+  // successfully allocate a new container from an existing reserved container
+  ALLOCATED_FROM_RESERVED,
+  // successfully reserve a new container
+  RESERVED
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
new file mode 100644
index 0000000..15850c0
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * It contains allocation information for one application within a period of
+ * time.
+ * Each application allocation may have several allocation attempts.
+ */
+public class AppAllocation {
+  private Priority priority = null;
+  private NodeId nodeId;
+  private ContainerId containerId = null;
+  private ActivityState appState = null;
+  private String diagnostic = null;
+  private String queueName = null;
+  private List<ActivityNode> allocationAttempts;
+  private long timestamp;
+
+  public AppAllocation(Priority priority, NodeId nodeId, String queueName) {
+    this.priority = priority;
+    this.nodeId = nodeId;
+    this.allocationAttempts = new ArrayList<>();
+    this.queueName = queueName;
+  }
+
+  public void updateAppContainerStateAndTime(ContainerId containerId,
+      ActivityState appState, long ts, String diagnostic) {
+    this.timestamp = ts;
+    this.containerId = containerId;
+    this.appState = appState;
+    this.diagnostic = diagnostic;
+  }
+
+  public void addAppAllocationActivity(String containerId, String priority,
+      ActivityState state, String diagnostic, String type) {
+    ActivityNode container = new ActivityNode(containerId, null, priority,
+        state, diagnostic, type);
+    this.allocationAttempts.add(container);
+    if (state == ActivityState.REJECTED) {
+      this.appState = ActivityState.SKIPPED;
+    } else {
+      this.appState = state;
+    }
+  }
+
+  public String getNodeId() {
+    return nodeId.toString();
+  }
+
+  public String getQueueName() {
+    return queueName;
+  }
+
+  public ActivityState getAppState() {
+    return appState;
+  }
+
+  public String getPriority() {
+    if (priority == null) {
+      return null;
+    }
+    return priority.toString();
+  }
+
+  public String getContainerId() {
+    if (containerId == null) {
+      return null;
+    }
+    return containerId.toString();
+  }
+
+  public String getDiagnostic() {
+    return diagnostic;
+  }
+
+  public long getTime() {
+    return this.timestamp;
+  }
+
+  public List<ActivityNode> getAllocationAttempts() {
+    return allocationAttempts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java
new file mode 100644
index 0000000..6911299
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/*
+ * It contains allocation information for one allocation in a node heartbeat.
+ * Detailed allocation activities are first stored in "AllocationActivity"
+ * as operations, then transformed to a tree structure.
+ * Tree structure starts from root queue and ends in leaf queue,
+ * application or container allocation.
+ */
+public class NodeAllocation {
+  private NodeId nodeId;
+  private long timeStamp;
+  private ContainerId containerId = null;
+  private AllocationState containerState = AllocationState.DEFAULT;
+  private List<AllocationActivity> allocationOperations;
+
+  private ActivityNode root = null;
+
+  private static final Log LOG = LogFactory.getLog(NodeAllocation.class);
+
+  public NodeAllocation(NodeId nodeId) {
+    this.nodeId = nodeId;
+    this.allocationOperations = new ArrayList<>();
+  }
+
+  public void addAllocationActivity(String parentName, String childName,
+      String priority, ActivityState state, String diagnostic, String type) {
+    AllocationActivity allocate = new AllocationActivity(parentName, childName,
+        priority, state, diagnostic, type);
+    this.allocationOperations.add(allocate);
+  }
+
+  public void updateContainerState(ContainerId containerId,
+      AllocationState containerState) {
+    this.containerId = containerId;
+    this.containerState = containerState;
+  }
+
+  // In node allocation, transform each activity to a tree-like structure
+  // for frontend activity display.
+  // eg:    root
+  //         / \
+  //        a   b
+  //       / \
+  //    app1 app2
+  //    / \
+  //  CA1 CA2
+  // CA means Container Attempt
+  public void transformToTree() {
+    List<ActivityNode> allocationTree = new ArrayList<>();
+
+    if (root == null) {
+      Set<String> names = Collections.newSetFromMap(new ConcurrentHashMap<>());
+      ListIterator<AllocationActivity> ite = allocationOperations.listIterator(
+          allocationOperations.size());
+      while (ite.hasPrevious()) {
+        String name = ite.previous().getName();
+        if (name != null) {
+          if (!names.contains(name)) {
+            names.add(name);
+          } else {
+            ite.remove();
+          }
+        }
+      }
+
+      for (AllocationActivity allocationOperation : allocationOperations) {
+        ActivityNode node = allocationOperation.createTreeNode();
+        String name = node.getName();
+        for (int i = allocationTree.size() - 1; i > -1; i--) {
+          if (allocationTree.get(i).getParentName().equals(name)) {
+            node.addChild(allocationTree.get(i));
+            allocationTree.remove(i);
+          } else {
+            break;
+          }
+        }
+        allocationTree.add(node);
+      }
+      root = allocationTree.get(0);
+    }
+  }
+
+  public void setTimeStamp(long timeStamp) {
+    this.timeStamp = timeStamp;
+  }
+
+  public long getTimeStamp() {
+    return this.timeStamp;
+  }
+
+  public AllocationState getFinalAllocationState() {
+    return containerState;
+  }
+
+  public String getContainerId() {
+    if (containerId == null)
+      return null;
+    return containerId.toString();
+  }
+
+  public ActivityNode getRoot() {
+    return root;
+  }
+
+  public String getNodeId() {
+    return nodeId.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 9c88154..1d8f929 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -48,6 +48,7 @@ import 
org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -91,12 +92,15 @@ public abstract class AbstractCSQueue implements CSQueue {
   protected CapacitySchedulerContext csContext;
   protected YarnAuthorizationProvider authorizer = null;
 
-  public AbstractCSQueue(CapacitySchedulerContext cs, 
+  protected ActivitiesManager activitiesManager;
+
+  public AbstractCSQueue(CapacitySchedulerContext cs,
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     this.labelManager = cs.getRMContext().getNodeLabelManager();
     this.parent = parent;
     this.queueName = queueName;
     this.resourceCalculator = cs.getResourceCalculator();
+    this.activitiesManager = cs.getActivitiesManager();
     
     // must be called after parent and queueName is set
     this.metrics =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index bedf455..f03a0df 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -92,22 +92,12 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
@@ -307,6 +297,8 @@ public class CapacityScheduler extends
     this.applications = new ConcurrentHashMap<>();
     this.labelManager = rmContext.getNodeLabelManager();
     authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
+    this.activitiesManager = new ActivitiesManager(rmContext);
+    activitiesManager.init(conf);
     initializeQueues(this.conf);
     this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
 
@@ -344,6 +336,7 @@ public class CapacityScheduler extends
   @Override
   public void serviceStart() throws Exception {
     startSchedulerThreads();
+    activitiesManager.start();
     super.serviceStart();
   }
 
@@ -523,7 +516,7 @@ public class CapacityScheduler extends
     Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
     CSQueue newRoot = 
         parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, 
-            newQueues, queues, noop); 
+            newQueues, queues, noop);
     
     // Ensure all existing queues are still present
     validateExistingQueues(queues, newQueues);
@@ -650,7 +643,7 @@ public class CapacityScheduler extends
         throw new IllegalStateException(
             "Only Leaf Queues can be reservable for " + queueName);
       }
-      ParentQueue parentQueue = 
+      ParentQueue parentQueue =
         new ParentQueue(csContext, queueName, parent, 
oldQueues.get(queueName));
 
       // Used only for unit tests
@@ -802,7 +795,7 @@ public class CapacityScheduler extends
 
     FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
         application.getUser(), queue, queue.getActiveUsersManager(), rmContext,
-            application.getPriority(), isAttemptRecovering);
+            application.getPriority(), isAttemptRecovering, activitiesManager);
     if (transferStateFromPreviousAttempt) {
       attempt.transferStateFromPreviousAttempt(
           application.getCurrentAppAttempt());
@@ -1233,6 +1226,7 @@ public class CapacityScheduler extends
 
     RMContainer reservedContainer = node.getReservedContainer();
     if (reservedContainer != null) {
+
       FiCaSchedulerApp reservedApplication =
           getCurrentAttemptForContainer(reservedContainer.getContainerId());
 
@@ -1262,6 +1256,19 @@ public class CapacityScheduler extends
         tmp.getAssignmentInformation().incrAllocations();
         updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
         schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
+
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            queue.getParent().getQueueName(), queue.getQueueName(),
+            ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+        ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
+            node, reservedContainer.getContainerId(),
+            AllocationState.ALLOCATED_FROM_RESERVED);
+      } else {
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            queue.getParent().getQueueName(), queue.getQueueName(),
+            ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+        ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
+            node, reservedContainer.getContainerId(), AllocationState.SKIPPED);
       }
     }
 
@@ -1371,7 +1378,11 @@ public class CapacityScheduler extends
       setLastNodeUpdateTime(Time.now());
       nodeUpdate(node);
       if (!scheduleAsynchronously) {
+        ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
+            node.getNodeID());
         allocateContainersToNode(getNode(node.getNodeID()));
+        ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
+            node.getNodeID());
       }
     }
     break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index b39b289..c41a7bf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -25,6 +25,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
@@ -80,4 +81,6 @@ public interface CapacitySchedulerContext {
    *         cluster.
    */
   ResourceUsage getClusterResourceUsage();
+
+  ActivitiesManager getActivitiesManager();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index a243e93..6bbe85e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -19,15 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.*;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -65,9 +57,11 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -75,6 +69,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderi
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -135,7 +130,7 @@ public class LeafQueue extends AbstractCSQueue {
     super(cs, queueName, parent, old);
     this.scheduler = cs;
 
-    this.activeUsersManager = new ActiveUsersManager(metrics); 
+    this.activeUsersManager = new ActiveUsersManager(metrics);
 
     // One time initialization is enough since it is static ordering policy
     this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
@@ -144,7 +139,7 @@ public class LeafQueue extends AbstractCSQueue {
       LOG.debug("LeafQueue:" + " name=" + queueName
         + ", fullname=" + getQueuePath());
     }
-    
+
     setupQueueConfigs(cs.getClusterResource());
   }
 
@@ -862,7 +857,7 @@ public class LeafQueue extends AbstractCSQueue {
     float guaranteedCapacity = 
queueCapacities.getAbsoluteCapacity(nodePartition);
     limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
   }
-  
+
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
@@ -881,6 +876,10 @@ public class LeafQueue extends AbstractCSQueue {
     if (reservedContainer != null) {
       FiCaSchedulerApp application =
           getApplication(reservedContainer.getApplicationAttemptId());
+
+      ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
+          node.getNodeID(), SystemClock.getInstance().getTime(), application);
+
       synchronized (application) {
         CSAssignment assignment =
             application.assignContainers(clusterResource, node,
@@ -895,6 +894,10 @@ public class LeafQueue extends AbstractCSQueue {
     // if our queue cannot access this node, just return
     if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
         && !accessibleToPartition(node.getPartition())) {
+      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+          getParent().getQueueName(), getQueueName(), ActivityState.REJECTED,
+          ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
+              .getPartition());
       return CSAssignment.NULL_ASSIGNMENT;
     }
 
@@ -907,6 +910,9 @@ public class LeafQueue extends AbstractCSQueue {
             + ", because it doesn't need more resource, schedulingMode="
             + schedulingMode.name() + " node-partition=" + 
node.getPartition());
       }
+      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+          getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+          ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
       return CSAssignment.NULL_ASSIGNMENT;
     }
 
@@ -914,13 +920,23 @@ public class LeafQueue extends AbstractCSQueue {
         orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) 
{
       FiCaSchedulerApp application = assignmentIterator.next();
 
+      ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
+          node.getNodeID(), SystemClock.getInstance().getTime(), application);
+
       // Check queue max-capacity limit
       if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
           currentResourceLimits, application.getCurrentReservation(),
           schedulingMode)) {
+        ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
+            activitiesManager, node,
+            application, application.getPriority(),
+            ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+            ActivityDiagnosticConstant.EMPTY);
         return CSAssignment.NULL_ASSIGNMENT;
       }
-      
+
       Resource userLimit =
           computeUserLimitAndSetHeadroom(application, clusterResource,
               node.getPartition(), schedulingMode);
@@ -930,6 +946,10 @@ public class LeafQueue extends AbstractCSQueue {
           application, node.getPartition(), currentResourceLimits)) {
         application.updateAMContainerDiagnostics(AMState.ACTIVATED,
             "User capacity has reached its maximum limit.");
+        ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
+            activitiesManager, node,
+            application, application.getPriority(),
+            ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT);
         continue;
       }
 
@@ -971,10 +991,17 @@ public class LeafQueue extends AbstractCSQueue {
           incReservedResource(node.getPartition(), reservedRes);
         }
 
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParent().getQueueName(), getQueueName(), ActivityState.ACCEPTED,
+            ActivityDiagnosticConstant.EMPTY);
+
         // Done
         return assignment;
       } else if (assignment.getSkippedType()
           == CSAssignment.SkippedType.OTHER) {
+        ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
+            activitiesManager, application.getApplicationId(),
+            ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
         application.updateNodeInfoForAMDiagnostics(node);
       } else if(assignment.getSkippedType()
           == CSAssignment.SkippedType.QUEUE_LIMIT) {
@@ -982,9 +1009,18 @@ public class LeafQueue extends AbstractCSQueue {
       } else {
         // If we don't allocate anything, and it is not skipped by application,
         // we will return to respect FIFO of applications
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+            ActivityDiagnosticConstant.RESPECT_FIFO);
+        ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
+            activitiesManager, application.getApplicationId(),
+            ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
         return CSAssignment.NULL_ASSIGNMENT;
       }
     }
+    ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+        getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+        ActivityDiagnosticConstant.EMPTY);
 
     return CSAssignment.NULL_ASSIGNMENT;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0d131f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 9ae35ee..a245e3b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -42,12 +42,11 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -81,7 +80,7 @@ public class ParentQueue extends AbstractCSQueue {
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
 
-  public ParentQueue(CapacitySchedulerContext cs, 
+  public ParentQueue(CapacitySchedulerContext cs,
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
     this.scheduler = cs;
@@ -98,14 +97,14 @@ public class ParentQueue extends AbstractCSQueue {
           "capacity of " + rawCapacity + " for queue " + queueName +
           ". Must be " + 
CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
     }
-    
+
     this.childQueues = new TreeSet<CSQueue>(nonPartitionedQueueComparator);
-    
+
     setupQueueConfigs(cs.getClusterResource());
 
-    LOG.info("Initialized parent-queue " + queueName + 
-        " name=" + queueName + 
-        ", fullname=" + getQueuePath()); 
+    LOG.info("Initialized parent-queue " + queueName +
+        " name=" + queueName +
+        ", fullname=" + getQueuePath());
   }
 
   synchronized void setupQueueConfigs(Resource clusterResource)
@@ -380,6 +379,10 @@ public class ParentQueue extends AbstractCSQueue {
         " #applications: " + getNumApplications());
   }
 
+  private String getParentName() {
+    return getParent() != null ? getParent().getQueueName() : "";
+  }
+
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits resourceLimits,
@@ -392,6 +395,16 @@ public class ParentQueue extends AbstractCSQueue {
             + ", because it is not able to access partition=" + node
             .getPartition());
       }
+
+      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+          getParentName(), getQueueName(), ActivityState.REJECTED,
+          ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
+              .getPartition());
+      if (rootQueue) {
+        ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+            node);
+      }
+
       return CSAssignment.NULL_ASSIGNMENT;
     }
     
@@ -404,6 +417,15 @@ public class ParentQueue extends AbstractCSQueue {
             + ", because it doesn't need more resource, schedulingMode="
             + schedulingMode.name() + " node-partition=" + 
node.getPartition());
       }
+
+      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+          getParentName(), getQueueName(), ActivityState.SKIPPED,
+          ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
+      if (rootQueue) {
+        ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+            node);
+      }
+
       return CSAssignment.NULL_ASSIGNMENT;
     }
     
@@ -423,9 +445,18 @@ public class ParentQueue extends AbstractCSQueue {
           resourceLimits, Resources.createResource(
               getMetrics().getReservedMB(), getMetrics()
                   .getReservedVirtualCores()), schedulingMode)) {
+
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParentName(), getQueueName(), ActivityState.SKIPPED,
+            ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
+        if (rootQueue) {
+          ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+              node);
+        }
+
         break;
       }
-      
+
       // Schedule
       CSAssignment assignedToChild =
           assignContainersToChildQueues(clusterResource, node, resourceLimits,
@@ -436,6 +467,29 @@ public class ParentQueue extends AbstractCSQueue {
       if (Resources.greaterThan(
               resourceCalculator, clusterResource, 
               assignedToChild.getResource(), Resources.none())) {
+
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParentName(), getQueueName(), ActivityState.ACCEPTED,
+            ActivityDiagnosticConstant.EMPTY);
+
+        if (node.getReservedContainer() == null) {
+          if (rootQueue) {
+            ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
+                activitiesManager, node,
+                assignedToChild.getAssignmentInformation()
+                    .getFirstAllocatedOrReservedContainerId(),
+                AllocationState.ALLOCATED);
+          }
+        } else {
+          if (rootQueue) {
+            ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
+                activitiesManager, node,
+                assignedToChild.getAssignmentInformation()
+                    .getFirstAllocatedOrReservedContainerId(),
+                AllocationState.RESERVED);
+          }
+        }
+
         // Track resource utilization for the parent-queue
         allocateResource(clusterResource, assignedToChild.getResource(),
             node.getPartition(), assignedToChild.isIncreasedAllocation());
@@ -474,6 +528,15 @@ public class ParentQueue extends AbstractCSQueue {
 
       } else {
         assignment.setSkippedType(assignedToChild.getSkippedType());
+
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParentName(), getQueueName(), ActivityState.SKIPPED,
+            ActivityDiagnosticConstant.EMPTY);
+        if (rootQueue) {
+          ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+              node);
+        }
+
         break;
       }
 
@@ -631,7 +694,7 @@ public class ParentQueue extends AbstractCSQueue {
             resourceToSubtract);
       }
     }
-    
+
     return assignment;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to