MAPREDUCE-6526. Remove usage of metrics v1 from hadoop-mapreduce. (aajisaka)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ee4e5ca
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ee4e5ca
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ee4e5ca

Branch: refs/heads/HDFS-7240
Commit: 4ee4e5ca2b8488459d2231dd1de8ed44dd656d5c
Parents: 3ff0510
Author: Akira Ajisaka <aajis...@apache.org>
Authored: Tue May 3 10:46:11 2016 +0900
Committer: Akira Ajisaka <aajis...@apache.org>
Committed: Tue May 3 10:46:11 2016 +0900

----------------------------------------------------------------------
 .../apache/hadoop/mapred/LocalJobRunner.java    |  2 +-
 .../hadoop/mapred/LocalJobRunnerMetrics.java    | 94 +++++++-------------
 .../hadoop/mapreduce/task/reduce/Shuffle.java   |  2 +-
 .../task/reduce/ShuffleClientMetrics.java       | 91 ++++++++-----------
 4 files changed, 70 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee4e5ca/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
index 37c147d..02b9a87 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -768,7 +768,7 @@ public class LocalJobRunner implements ClientProtocol {
   public LocalJobRunner(JobConf conf) throws IOException {
     this.fs = FileSystem.getLocal(conf);
     this.conf = conf;
-    myMetrics = new LocalJobRunnerMetrics(new JobConf(conf));
+    myMetrics = LocalJobRunnerMetrics.create();
   }
 
   // JobSubmissionProtocol methods

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee4e5ca/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java
index aec70ed..0186cdc 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java
@@ -17,82 +17,50 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
-import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 
-@SuppressWarnings("deprecation")
-class LocalJobRunnerMetrics implements Updater {
-  private final MetricsRecord metricsRecord;
+import java.util.concurrent.ThreadLocalRandom;
 
-  private int numMapTasksLaunched = 0;
-  private int numMapTasksCompleted = 0;
-  private int numReduceTasksLaunched = 0;
-  private int numReduceTasksCompleted = 0;
-  private int numWaitingMaps = 0;
-  private int numWaitingReduces = 0;
-  
-  public LocalJobRunnerMetrics(JobConf conf) {
-    String sessionId = conf.getSessionId();
-    // Initiate JVM Metrics
-    JvmMetrics.init("JobTracker", sessionId);
-    // Create a record for map-reduce metrics
-    MetricsContext context = MetricsUtil.getContext("mapred");
-    // record name is jobtracker for compatibility 
-    metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
-    metricsRecord.setTag("sessionId", sessionId);
-    context.registerUpdater(this);
+@Metrics(name="LocalJobRunnerMetrics", context="mapred")
+final class LocalJobRunnerMetrics {
+
+  @Metric
+  private MutableCounterInt numMapTasksLaunched;
+  @Metric
+  private MutableCounterInt numMapTasksCompleted;
+  @Metric
+  private MutableCounterInt numReduceTasksLaunched;
+  @Metric
+  private MutableGaugeInt numReduceTasksCompleted;
+
+  private LocalJobRunnerMetrics() {
   }
-    
-  /**
-   * Since this object is a registered updater, this method will be called
-   * periodically, e.g. every 5 seconds.
-   */
-  public void doUpdates(MetricsContext unused) {
-    synchronized (this) {
-      metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
-      metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
-      metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
-      metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
-      metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
-      metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
 
-      numMapTasksLaunched = 0;
-      numMapTasksCompleted = 0;
-      numReduceTasksLaunched = 0;
-      numReduceTasksCompleted = 0;
-      numWaitingMaps = 0;
-      numWaitingReduces = 0;
-    }
-    metricsRecord.update();
+  public static LocalJobRunnerMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.initialize("JobTracker");
+    return ms.register("LocalJobRunnerMetrics-" +
+            ThreadLocalRandom.current().nextInt(), null,
+        new LocalJobRunnerMetrics());
   }
 
   public synchronized void launchMap(TaskAttemptID taskAttemptID) {
-    ++numMapTasksLaunched;
-    decWaitingMaps(taskAttemptID.getJobID(), 1);
+    numMapTasksLaunched.incr();
   }
 
-  public synchronized void completeMap(TaskAttemptID taskAttemptID) {
-    ++numMapTasksCompleted;
+  public void completeMap(TaskAttemptID taskAttemptID) {
+    numMapTasksCompleted.incr();
   }
 
   public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
-    ++numReduceTasksLaunched;
-    decWaitingReduces(taskAttemptID.getJobID(), 1);
-  }
-
-  public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
-    ++numReduceTasksCompleted;
+    numReduceTasksLaunched.incr();
   }
 
-  private synchronized void decWaitingMaps(JobID id, int task) {
-    numWaitingMaps -= task;
-  }
-  
-  private synchronized void decWaitingReduces(JobID id, int task){
-    numWaitingReduces -= task;
+  public void completeReduce(TaskAttemptID taskAttemptID) {
+    numReduceTasksCompleted.incr();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee4e5ca/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
index 93f9a50..3382bbf 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
@@ -68,7 +68,7 @@ public class Shuffle<K, V> implements 
ShuffleConsumerPlugin<K, V>, ExceptionRepo
     this.jobConf = context.getJobConf();
     this.umbilical = context.getUmbilical();
     this.reporter = context.getReporter();
-    this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
+    this.metrics = ShuffleClientMetrics.create();
     this.copyPhase = context.getCopyPhase();
     this.taskStatus = context.getStatus();
     this.reduceTask = context.getReduceTask();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee4e5ca/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
index 92c69a6..d4e185d 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
@@ -20,70 +20,53 @@ package org.apache.hadoop.mapreduce.task.reduce;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+
+import java.util.concurrent.ThreadLocalRandom;
 
 @InterfaceAudience.LimitedPrivate({"MapReduce"})
 @InterfaceStability.Unstable
-public class ShuffleClientMetrics implements Updater {
+@Metrics(name="ShuffleClientMetrics", context="mapred")
+public class ShuffleClientMetrics {
 
-  private MetricsRecord shuffleMetrics = null;
-  private int numFailedFetches = 0;
-  private int numSuccessFetches = 0;
-  private long numBytes = 0;
-  private int numThreadsBusy = 0;
-  private final int numCopiers;
-  
-  ShuffleClientMetrics(TaskAttemptID reduceId, JobConf jobConf) {
-    this.numCopiers = jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
+  @Metric
+  private MutableCounterInt numFailedFetches;
+  @Metric
+  private MutableCounterInt numSuccessFetches;
+  @Metric
+  private MutableCounterLong numBytes;
+  @Metric
+  private MutableGaugeInt numThreadsBusy;
 
-    MetricsContext metricsContext = MetricsUtil.getContext("mapred");
-    this.shuffleMetrics = 
-      MetricsUtil.createRecord(metricsContext, "shuffleInput");
-    this.shuffleMetrics.setTag("user", jobConf.getUser());
-    this.shuffleMetrics.setTag("jobName", jobConf.getJobName());
-    this.shuffleMetrics.setTag("jobId", reduceId.getJobID().toString());
-    this.shuffleMetrics.setTag("taskId", reduceId.toString());
-    this.shuffleMetrics.setTag("sessionId", jobConf.getSessionId());
-    metricsContext.registerUpdater(this);
+  private ShuffleClientMetrics() {
   }
-  public synchronized void inputBytes(long numBytes) {
-    this.numBytes += numBytes;
+
+  public static ShuffleClientMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.initialize("JobTracker");
+    return ms.register("ShuffleClientMetrics-" +
+        ThreadLocalRandom.current().nextInt(), null,
+        new ShuffleClientMetrics());
   }
-  public synchronized void failedFetch() {
-    ++numFailedFetches;
+
+  public void inputBytes(long bytes) {
+    numBytes.incr(bytes);
   }
-  public synchronized void successFetch() {
-    ++numSuccessFetches;
+  public void failedFetch() {
+    numFailedFetches.incr();
   }
-  public synchronized void threadBusy() {
-    ++numThreadsBusy;
+  public void successFetch() {
+    numSuccessFetches.incr();
   }
-  public synchronized void threadFree() {
-    --numThreadsBusy;
+  public void threadBusy() {
+    numThreadsBusy.incr();
   }
-  public void doUpdates(MetricsContext unused) {
-    synchronized (this) {
-      shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
-      shuffleMetrics.incrMetric("shuffle_failed_fetches", 
-                                numFailedFetches);
-      shuffleMetrics.incrMetric("shuffle_success_fetches", 
-                                numSuccessFetches);
-      if (numCopiers != 0) {
-        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
-            100*((float)numThreadsBusy/numCopiers));
-      } else {
-        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
-      }
-      numBytes = 0;
-      numSuccessFetches = 0;
-      numFailedFetches = 0;
-    }
-    shuffleMetrics.update();
+  public void threadFree() {
+    numThreadsBusy.decr();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to