HADOOP-13782. Make MutableRates metrics thread-local write, aggregate-on-read. 
Contributed by Erik Krogen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/77c13c38
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/77c13c38
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/77c13c38

Branch: refs/heads/YARN-4752
Commit: 77c13c385774c51766fe505397fa916754ac08d4
Parents: 2a65eb1
Author: Zhe Zhang <z...@apache.org>
Authored: Tue Nov 8 16:07:36 2016 -0800
Committer: Zhe Zhang <z...@apache.org>
Committed: Tue Nov 8 16:07:36 2016 -0800

----------------------------------------------------------------------
 .../hadoop/ipc/metrics/RpcDetailedMetrics.java  |   4 +-
 .../metrics2/lib/MutableMetricsFactory.java     |   5 +
 .../hadoop/metrics2/lib/MutableRates.java       |   6 +
 .../lib/MutableRatesWithAggregation.java        | 148 +++++++++++++++++++
 .../hadoop/metrics2/lib/TestMutableMetrics.java | 148 ++++++++++++++++++-
 5 files changed, 306 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/77c13c38/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
index 7414364..ad36742 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
-import org.apache.hadoop.metrics2.lib.MutableRates;
+import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
 
 /**
  * This class is for maintaining RPC method related statistics
@@ -34,7 +34,7 @@ import org.apache.hadoop.metrics2.lib.MutableRates;
 @Metrics(about="Per method RPC metrics", context="rpcdetailed")
 public class RpcDetailedMetrics {
 
-  @Metric MutableRates rates;
+  @Metric MutableRatesWithAggregation rates;
 
   static final Log LOG = LogFactory.getLog(RpcDetailedMetrics.class);
   final MetricsRegistry registry;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77c13c38/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableMetricsFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableMetricsFactory.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableMetricsFactory.java
index ae2cb4f..b926c4b 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableMetricsFactory.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableMetricsFactory.java
@@ -67,6 +67,11 @@ public class MutableMetricsFactory {
     if (cls == MutableRates.class) {
       return new MutableRates(registry);
     }
+    if (cls == MutableRatesWithAggregation.class) {
+      MutableRatesWithAggregation rates = new MutableRatesWithAggregation();
+      registry.add(info.name(), rates);
+      return rates;
+    }
     if (cls == MutableStat.class) {
       return registry.newStat(info.name(), info.description(),
                               annotation.sampleName(), annotation.valueName(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77c13c38/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRates.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRates.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRates.java
index 121c292..1074e87 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRates.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRates.java
@@ -33,6 +33,12 @@ import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 
 /**
  * Helper class to manage a group of mutable rate metrics
+ *
+ * This class synchronizes all accesses to the metrics it
+ * contains, so it should not be used in situations where
+ * there is high contention on the metrics.
+ * {@link MutableRatesWithAggregation} is preferable in that
+ * situation.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77c13c38/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
new file mode 100644
index 0000000..64eae03
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.lib;
+
+import com.google.common.collect.Sets;
+import java.lang.ref.WeakReference;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.util.SampleStat;
+
+
+/**
+ * Helper class to manage a group of mutable rate metrics.
+ *
+ * Each thread will maintain a local rate count, and upon snapshot,
+ * these values will be aggregated into a global rate. This class
+ * should only be used for long running threads, as any metrics
+ * produced between the last snapshot and the death of a thread
+ * will be lost. This allows for significantly higher concurrency
+ * than {@link MutableRates}. See HADOOP-24420.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MutableRatesWithAggregation extends MutableMetric {
+  static final Log LOG = LogFactory.getLog(MutableRatesWithAggregation.class);
+  private final Map<String, MutableRate> globalMetrics = new HashMap<>();
+  private final Set<Class<?>> protocolCache = Sets.newHashSet();
+
+  private final ConcurrentLinkedDeque<WeakReference<ConcurrentMap<String, 
ThreadSafeSampleStat>>>
+      weakReferenceQueue = new ConcurrentLinkedDeque<>();
+  private final ThreadLocal<ConcurrentMap<String, ThreadSafeSampleStat>>
+      threadLocalMetricsMap = new ThreadLocal<>();
+
+  /**
+   * Initialize the registry with all the methods in a protocol
+   * so they all show up in the first snapshot.
+   * Convenient for JMX implementations.
+   * @param protocol the protocol class
+   */
+  public void init(Class<?> protocol) {
+    if (protocolCache.contains(protocol)) {
+      return;
+    }
+    protocolCache.add(protocol);
+    for (Method method : protocol.getDeclaredMethods()) {
+      String name = method.getName();
+      LOG.debug(name);
+      addMetricIfNotExists(name);
+    }
+  }
+
+  /**
+   * Add a rate sample for a rate metric.
+   * @param name of the rate metric
+   * @param elapsed time
+   */
+  public void add(String name, long elapsed) {
+    ConcurrentMap<String, ThreadSafeSampleStat> localStats =
+        threadLocalMetricsMap.get();
+    if (localStats == null) {
+      localStats = new ConcurrentHashMap<>();
+      threadLocalMetricsMap.set(localStats);
+      weakReferenceQueue.add(new WeakReference<>(localStats));
+    }
+    ThreadSafeSampleStat stat = localStats.get(name);
+    if (stat == null) {
+      stat = new ThreadSafeSampleStat();
+      localStats.put(name, stat);
+    }
+    stat.add(elapsed);
+  }
+
+  @Override
+  public synchronized void snapshot(MetricsRecordBuilder rb, boolean all) {
+    Iterator<WeakReference<ConcurrentMap<String, ThreadSafeSampleStat>>> iter =
+        weakReferenceQueue.iterator();
+    while (iter.hasNext()) {
+      ConcurrentMap<String, ThreadSafeSampleStat> map = iter.next().get();
+      if (map == null) {
+        // Thread has died; clean up its state
+        iter.remove();
+      } else {
+        // Aggregate the thread's local samples into the global metrics
+        for (Map.Entry<String, ThreadSafeSampleStat> entry : map.entrySet()) {
+          String name = entry.getKey();
+          MutableRate globalMetric = addMetricIfNotExists(name);
+          entry.getValue().snapshotInto(globalMetric);
+        }
+      }
+    }
+    for (MutableRate globalMetric : globalMetrics.values()) {
+      globalMetric.snapshot(rb, all);
+    }
+  }
+
+  private synchronized MutableRate addMetricIfNotExists(String name) {
+    MutableRate metric = globalMetrics.get(name);
+    if (metric == null) {
+      metric = new MutableRate(name, name, false);
+      globalMetrics.put(name, metric);
+    }
+    return metric;
+  }
+
+  private static class ThreadSafeSampleStat {
+
+    private SampleStat stat = new SampleStat();
+
+    synchronized void add(double x) {
+      stat.add(x);
+    }
+
+    synchronized void snapshotInto(MutableRate metric) {
+      if (stat.numSamples() > 0) {
+        metric.add(stat.numSamples(), Math.round(stat.total()));
+        stat.reset();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77c13c38/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java
index 9161df5..1faa361 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java
@@ -19,9 +19,7 @@
 package org.apache.hadoop.metrics2.lib;
 
 import static org.apache.hadoop.metrics2.lib.Interns.info;
-import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
-import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
-import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder;
+import static org.apache.hadoop.test.MetricsAsserts.*;
 import static org.mockito.AdditionalMatchers.eq;
 import static org.mockito.AdditionalMatchers.geq;
 import static org.mockito.AdditionalMatchers.leq;
@@ -29,10 +27,15 @@ import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.junit.Assert.*;
 
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.util.Quantile;
 import org.junit.Test;
@@ -42,6 +45,7 @@ import org.junit.Test;
  */
 public class TestMutableMetrics {
 
+  private static final Log LOG = LogFactory.getLog(TestMutableMetrics.class);
   private final double EPSILON = 1e-42;
 
   /**
@@ -129,6 +133,144 @@ public class TestMutableMetrics {
     assertGauge("BarAvgTime", 0.0, rb);
   }
 
+  @Test public void testMutableRatesWithAggregationInit() {
+    MetricsRecordBuilder rb = mockMetricsRecordBuilder();
+    MutableRatesWithAggregation rates = new MutableRatesWithAggregation();
+
+    rates.init(TestProtocol.class);
+    rates.snapshot(rb, false);
+
+    assertCounter("FooNumOps", 0L, rb);
+    assertGauge("FooAvgTime", 0.0, rb);
+    assertCounter("BarNumOps", 0L, rb);
+    assertGauge("BarAvgTime", 0.0, rb);
+  }
+
+  @Test public void testMutableRatesWithAggregationSingleThread() {
+    MutableRatesWithAggregation rates = new MutableRatesWithAggregation();
+
+    rates.add("foo", 1);
+    rates.add("bar", 5);
+
+    MetricsRecordBuilder rb = mockMetricsRecordBuilder();
+    rates.snapshot(rb, false);
+    assertCounter("FooNumOps", 1L, rb);
+    assertGauge("FooAvgTime", 1.0, rb);
+    assertCounter("BarNumOps", 1L, rb);
+    assertGauge("BarAvgTime", 5.0, rb);
+
+    rates.add("foo", 1);
+    rates.add("foo", 3);
+    rates.add("bar", 6);
+
+    rb = mockMetricsRecordBuilder();
+    rates.snapshot(rb, false);
+    assertCounter("FooNumOps", 3L, rb);
+    assertGauge("FooAvgTime", 2.0, rb);
+    assertCounter("BarNumOps", 2L, rb);
+    assertGauge("BarAvgTime", 6.0, rb);
+  }
+
+  @Test public void testMutableRatesWithAggregationManyThreads()
+      throws InterruptedException {
+    final MutableRatesWithAggregation rates = new 
MutableRatesWithAggregation();
+
+    final int n = 10;
+    long[] opCount = new long[n];
+    double[] opTotalTime = new double[n];
+
+    for (int i = 0; i < n; i++) {
+      opCount[i] = 0;
+      opTotalTime[i] = 0;
+      // Initialize so that the getLongCounter() method doesn't complain
+      rates.add("metric" + i, 0);
+    }
+
+    Thread[] threads = new Thread[n];
+    final CountDownLatch firstAddsFinished = new 
CountDownLatch(threads.length);
+    final CountDownLatch firstSnapshotsFinished = new CountDownLatch(1);
+    final CountDownLatch secondAddsFinished =
+        new CountDownLatch(threads.length);
+    final CountDownLatch secondSnapshotsFinished = new CountDownLatch(1);
+    long seed = new Random().nextLong();
+    LOG.info("Random seed = " + seed);
+    final Random sleepRandom = new Random(seed);
+    for (int tIdx = 0; tIdx < threads.length; tIdx++) {
+      final int threadIdx = tIdx;
+      threads[threadIdx] = new Thread() {
+        @Override
+        public void run() {
+          try {
+            for (int i = 0; i < 1000; i++) {
+              rates.add("metric" + (i % n), (i / n) % 2 == 0 ? 1 : 2);
+              // Sleep so additions can be interleaved with snapshots
+              Thread.sleep(sleepRandom.nextInt(5));
+            }
+            firstAddsFinished.countDown();
+
+            // Make sure all threads stay alive long enough for the first
+            // snapshot to complete; else their metrics may be lost to GC
+            firstSnapshotsFinished.await();
+
+            // Let half the threads continue with more metrics and let half die
+            if (threadIdx % 2 == 0) {
+              for (int i = 0; i < 1000; i++) {
+                rates.add("metric" + (i % n), (i / n) % 2 == 0 ? 1 : 2);
+              }
+              secondAddsFinished.countDown();
+              secondSnapshotsFinished.await();
+            } else {
+              secondAddsFinished.countDown();
+            }
+          } catch (InterruptedException e) {
+            // Ignore
+          }
+        }
+      };
+    }
+    for (Thread t : threads) {
+      t.start();
+    }
+    // Snapshot concurrently with additions but aggregate the totals into
+    // opCount / opTotalTime
+    for (int i = 0; i < 100; i++) {
+      snapshotMutableRatesWithAggregation(rates, opCount, opTotalTime);
+      Thread.sleep(sleepRandom.nextInt(20));
+    }
+    firstAddsFinished.await();
+    // Final snapshot to grab any remaining metrics and then verify that
+    // the totals are as expected
+    snapshotMutableRatesWithAggregation(rates, opCount, opTotalTime);
+    for (int i = 0; i < n; i++) {
+      assertEquals("metric" + i + " count", 1001, opCount[i]);
+      assertEquals("metric" + i + " total", 1500, opTotalTime[i], 1.0);
+    }
+    firstSnapshotsFinished.countDown();
+
+    // After half of the threads die, ensure that the remaining ones still
+    // add metrics correctly and that snapshot occurs correctly
+    secondAddsFinished.await();
+    snapshotMutableRatesWithAggregation(rates, opCount, opTotalTime);
+    for (int i = 0; i < n; i++) {
+      assertEquals("metric" + i + " count", 1501, opCount[i]);
+      assertEquals("metric" + i + " total", 2250, opTotalTime[i], 1.0);
+    }
+    secondSnapshotsFinished.countDown();
+  }
+
+  private static void snapshotMutableRatesWithAggregation(
+      MutableRatesWithAggregation rates, long[] opCount, double[] opTotalTime) 
{
+    MetricsRecordBuilder rb = mockMetricsRecordBuilder();
+    rates.snapshot(rb, true);
+    for (int i = 0; i < opCount.length; i++) {
+      long prevOpCount = opCount[i];
+      long newOpCount = getLongCounter("Metric" + i + "NumOps", rb);
+      opCount[i] = newOpCount;
+      double avgTime = getDoubleGauge("Metric" + i + "AvgTime", rb);
+      opTotalTime[i] += avgTime * (newOpCount - prevOpCount);
+    }
+  }
+
   /**
    * Tests that when using {@link MutableStat#add(long, long)}, even with a 
high
    * sample count, the mean does not lose accuracy.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to