YARN-3034. Implement RM starting its timeline collector. Contributed by 
Naganarasimha G R

(cherry picked from commit dc12cad2b89f643dafa0def863325cb374c7670c)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a7985239
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a7985239
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a7985239

Branch: refs/heads/YARN-2928
Commit: a798523971852d15a4a9098f4e689e2c801fab06
Parents: afa9925
Author: Junping Du <junping...@apache.org>
Authored: Tue Mar 24 13:42:14 2015 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Tue Aug 25 10:38:43 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  14 ++-
 .../src/main/resources/yarn-default.xml         |  11 +-
 .../hadoop-yarn-server-resourcemanager/pom.xml  |   4 +
 .../resourcemanager/RMActiveServiceContext.java |  43 ++++++++
 .../yarn/server/resourcemanager/RMContext.java  |   5 +
 .../server/resourcemanager/RMContextImpl.java   |  14 ++-
 .../server/resourcemanager/ResourceManager.java |  44 ++++++--
 .../metrics/SystemMetricsPublisher.java         |  29 +++---
 .../timelineservice/RMTimelineCollector.java    | 104 +++++++++++++++++++
 10 files changed, 241 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7985239/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d9696c9..82ed275 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -38,6 +38,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3377. Fixed test failure in TestTimelineServiceClientIntegration.
     (Sangjin Lee via zjshen)
 
+    YARN-3034. Implement RM starting its timeline collector. (Naganarasimha G R
+    via junping_du)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7985239/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 9d5b63b..e72c1c0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -373,12 +373,20 @@ public class YarnConfiguration extends Configuration {
 
   /**
    *  The setting that controls whether yarn system metrics is published on the
-   *  timeline server or not by RM.
+   *  timeline server or not by RM. This configuration setting is for ATS V1
    */
-  public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED =
-      RM_PREFIX + "system-metrics-publisher.enabled";
+  public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED = RM_PREFIX
+      + "system-metrics-publisher.enabled";
   public static final boolean DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED = 
false;
 
+  /**
+   *  The setting that controls whether yarn system metrics is published on the
+   *  timeline server or not by RM and NM. This configuration setting is for 
ATS V2
+   */
+  public static final String SYSTEM_METRICS_PUBLISHER_ENABLED = YARN_PREFIX
+      + "system-metrics-publisher.enabled";
+  public static final boolean DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED = false;
+
   public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
       RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size";
   public static final int 
DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7985239/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 78b6ae8..10450a9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -746,12 +746,21 @@
 
   <property>
     <description>The setting that controls whether yarn system metrics is
-    published on the timeline server or not by RM.</description>
+    published to the Timeline server (version one) or not, by RM. 
+    This configuration is deprecated.</description>
     <name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
     <value>false</value>
   </property>
 
   <property>
+    <description>The setting that controls whether yarn system metrics is
+    published on the Timeline server (version two) or not by RM And 
NM.</description>
+    <name>yarn.system-metrics-publisher.enabled</name>
+    <value>false</value>
+  </property>
+ 
+
+  <property>
     <description>Number of worker threads that send the yarn system metrics
     data.</description>
     
<name>yarn.resourcemanager.system-metrics-publisher.dispatcher.pool-size</name>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7985239/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index 9d54184..b88815f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -171,6 +171,10 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>     
+      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-web-proxy</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7985239/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 1abb14e..d79e542 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -47,6 +47,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 
@@ -92,6 +93,10 @@ public class RMActiveServiceContext {
   private NodesListManager nodesListManager;
   private ResourceTrackerService resourceTrackerService;
   private ApplicationMasterService applicationMasterService;
+  private RMApplicationHistoryWriter rmApplicationHistoryWriter;
+  private SystemMetricsPublisher systemMetricsPublisher;
+  private RMTimelineCollector timelineCollector;
+
   private RMNodeLabelsManager nodeLabelManager;
   private long epoch;
   private Clock systemClock = new SystemClock();
@@ -366,6 +371,44 @@ public class RMActiveServiceContext {
 
   @Private
   @Unstable
+  public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
+    return rmApplicationHistoryWriter;
+  }
+
+  @Private
+  @Unstable
+  public RMTimelineCollector getRMTimelineCollector() {
+    return timelineCollector;
+  }
+
+  @Private
+  @Unstable
+  public void setRMTimelineCollector(RMTimelineCollector timelineCollector) {
+    this.timelineCollector = timelineCollector;
+  }
+
+  @Private
+  @Unstable
+  public void setSystemMetricsPublisher(
+      SystemMetricsPublisher systemMetricsPublisher) {
+    this.systemMetricsPublisher = systemMetricsPublisher;
+  }
+
+  @Private
+  @Unstable
+  public SystemMetricsPublisher getSystemMetricsPublisher() {
+    return systemMetricsPublisher;
+  }
+
+  @Private
+  @Unstable
+  public void setRMApplicationHistoryWriter(
+      RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+    this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
+  }
+
+  @Private
+  @Unstable
   public long getEpoch() {
     return this.epoch;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7985239/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index bc50268..05fee99 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -43,6 +43,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
 
 /**
  * Context of the ResourceManager.
@@ -108,6 +109,10 @@ public interface RMContext {
   void setSystemMetricsPublisher(SystemMetricsPublisher 
systemMetricsPublisher);
 
   SystemMetricsPublisher getSystemMetricsPublisher();
+  
+  void setRMTimelineCollector(RMTimelineCollector timelineCollector);
+  
+  RMTimelineCollector getRMTimelineCollector();
 
   ConfigurationProvider getConfigurationProvider();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7985239/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index d6d573d..8543cec 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -22,8 +22,8 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
@@ -47,6 +47,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
 import org.apache.hadoop.yarn.util.Clock;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -354,6 +355,17 @@ public class RMContextImpl implements RMContext {
   }
 
   @Override
+  public void setRMTimelineCollector(
+      RMTimelineCollector timelineCollector) {
+    activeServiceContext.setRMTimelineCollector(timelineCollector);
+  }
+
+  @Override
+  public RMTimelineCollector getRMTimelineCollector() {
+    return activeServiceContext.getRMTimelineCollector();
+  }
+  
+  @Override
   public void setSystemMetricsPublisher(
       SystemMetricsPublisher systemMetricsPublisher) {
     this.systemMetricsPublisher = systemMetricsPublisher;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7985239/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index d6d9629..e5c72cd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -18,7 +18,16 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -80,11 +89,14 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
@@ -99,15 +111,7 @@ import org.apache.hadoop.yarn.webapp.WebApps;
 import org.apache.hadoop.yarn.webapp.WebApps.Builder;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintStream;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * The ResourceManager is the main class that is a set of components.
@@ -364,6 +368,10 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
     return new RMApplicationHistoryWriter();
   }
 
+  private RMTimelineCollector createRMTimelineCollector() {
+    return new RMTimelineCollector();
+  }
+
   protected SystemMetricsPublisher createSystemMetricsPublisher() {
     return new SystemMetricsPublisher(); 
   }
@@ -476,6 +484,20 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
         rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
       }
 
+      RMApplicationHistoryWriter rmApplicationHistoryWriter =
+          createRMApplicationHistoryWriter();
+      addService(rmApplicationHistoryWriter);
+      rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
+
+      SystemMetricsPublisher systemMetricsPublisher = 
createSystemMetricsPublisher();
+      addService(systemMetricsPublisher);
+      rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
+
+      RMTimelineCollector timelineCollector =
+          createRMTimelineCollector();
+      addService(timelineCollector);
+      rmContext.setRMTimelineCollector(timelineCollector);
+
       // Register event handler for NodesListManager
       nodesListManager = new NodesListManager(rmContext);
       rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7985239/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
index 3d7ac9f..b4ce4f9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
@@ -53,7 +53,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 /**
- * The class that helps RM publish metrics to the timeline server. RM will
+ * The class that helps RM publish metrics to the timeline server V1. RM will
  * always invoke the methods of this class regardless the service is enabled or
  * not. If it is disabled, publishing requests will be ignored silently.
  */
@@ -66,7 +66,7 @@ public class SystemMetricsPublisher extends CompositeService {
 
   private Dispatcher dispatcher;
   private TimelineClient client;
-  private boolean publishSystemMetrics;
+  private boolean publishSystemMetricsToATSv1;
 
   public SystemMetricsPublisher() {
     super(SystemMetricsPublisher.class.getName());
@@ -74,13 +74,14 @@ public class SystemMetricsPublisher extends 
CompositeService {
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    publishSystemMetrics =
+    publishSystemMetricsToATSv1 =
         conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) &&
-        conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
-            YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED);
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
+            && conf.getBoolean(
+                YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+                YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
 
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       client = TimelineClient.createTimelineClient();
       addIfService(client);
 
@@ -97,7 +98,7 @@ public class SystemMetricsPublisher extends CompositeService {
 
   @SuppressWarnings("unchecked")
   public void appCreated(RMApp app, long createdTime) {
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       dispatcher.getEventHandler().handle(
           new ApplicationCreatedEvent(
               app.getApplicationId(),
@@ -114,7 +115,7 @@ public class SystemMetricsPublisher extends 
CompositeService {
 
   @SuppressWarnings("unchecked")
   public void appFinished(RMApp app, RMAppState state, long finishedTime) {
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       dispatcher.getEventHandler().handle(
           new ApplicationFinishedEvent(
               app.getApplicationId(),
@@ -131,7 +132,7 @@ public class SystemMetricsPublisher extends 
CompositeService {
   @SuppressWarnings("unchecked")
   public void appACLsUpdated(RMApp app, String appViewACLs,
       long updatedTime) {
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       dispatcher.getEventHandler().handle(
           new ApplicationACLsUpdatedEvent(
               app.getApplicationId(),
@@ -143,7 +144,7 @@ public class SystemMetricsPublisher extends 
CompositeService {
   @SuppressWarnings("unchecked")
   public void appAttemptRegistered(RMAppAttempt appAttempt,
       long registeredTime) {
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       dispatcher.getEventHandler().handle(
           new AppAttemptRegisteredEvent(
               appAttempt.getAppAttemptId(),
@@ -159,7 +160,7 @@ public class SystemMetricsPublisher extends 
CompositeService {
   @SuppressWarnings("unchecked")
   public void appAttemptFinished(RMAppAttempt appAttempt,
       RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       dispatcher.getEventHandler().handle(
           new AppAttemptFinishedEvent(
               appAttempt.getAppAttemptId(),
@@ -176,7 +177,7 @@ public class SystemMetricsPublisher extends 
CompositeService {
 
   @SuppressWarnings("unchecked")
   public void containerCreated(RMContainer container, long createdTime) {
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       dispatcher.getEventHandler().handle(
           new ContainerCreatedEvent(
               container.getContainerId(),
@@ -189,7 +190,7 @@ public class SystemMetricsPublisher extends 
CompositeService {
 
   @SuppressWarnings("unchecked")
   public void containerFinished(RMContainer container, long finishedTime) {
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       dispatcher.getEventHandler().handle(
           new ContainerFinishedEvent(
               container.getContainerId(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7985239/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
new file mode 100644
index 0000000..22743d6
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEventType;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
+
+/**
+ * This class is responsible for posting application and appattempt lifecycle
+ * related events to timeline service V2
+ */
+@Private
+@Unstable
+public class RMTimelineCollector extends TimelineCollector {
+  private static final Log LOG = LogFactory.getLog(RMTimelineCollector.class);
+
+  public RMTimelineCollector() {
+    super("Resource Manager TimelineCollector");
+  }
+
+  private Dispatcher dispatcher;
+
+  private boolean publishSystemMetricsForV2;
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    publishSystemMetricsForV2 =
+        conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
+            && conf.getBoolean(
+                YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+                YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
+
+    if (publishSystemMetricsForV2) {
+      // having separate dispatcher to avoid load on RMDispatcher
+      LOG.info("RMTimelineCollector has been configured to publish"
+          + " System Metrics in ATS V2");
+      dispatcher = new AsyncDispatcher();
+      dispatcher.register(SystemMetricsEventType.class,
+          new ForwardingEventHandler());
+    } else {
+      LOG.warn("RMTimelineCollector has not been configured to publish"
+          + " System Metrics in ATS V2");
+    }
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  protected void handleSystemMetricsEvent(SystemMetricsEvent event) {
+    switch (event.getType()) {
+    default:
+      LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
+    }
+  }
+
+  /**
+   * EventHandler implementation which forward events to 
SystemMetricsPublisher.
+   * Making use of it, SystemMetricsPublisher can avoid to have a public handle
+   * method.
+   */
+  private final class ForwardingEventHandler implements
+      EventHandler<SystemMetricsEvent> {
+
+    @Override
+    public void handle(SystemMetricsEvent event) {
+      handleSystemMetricsEvent(event);
+    }
+  }
+}

Reply via email to