HBASE-19722 Implement a meta query statistics metrics source

Signed-off-by: Andrew Purtell <apurt...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2fd69cf9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2fd69cf9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2fd69cf9

Branch: refs/heads/branch-1
Commit: 2fd69cf93cf986f0ef15950a4b850751b5fbce40
Parents: c81d0af
Author: Xu Cang <xc...@salesforce.com>
Authored: Thu Jun 28 17:03:57 2018 -0700
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Thu Jun 28 17:17:50 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/HConstants.java     |   3 +-
 .../hbase/coprocessor/MetaTableMetrics.java     | 317 +++++++++++++++++++
 .../apache/hadoop/hbase/util/LossyCounting.java | 134 ++++++++
 .../hbase/coprocessor/TestMetaTableMetrics.java | 223 +++++++++++++
 .../hadoop/hbase/util/TestLossyCounting.java    |  83 +++++
 5 files changed, 759 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2fd69cf9/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index e702236..88a352a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1323,7 +1323,8 @@ public final class HConstants {
   public static final String TEMPORARY_FS_DIRECTORY_KEY = "hbase.fs.tmp.dir";
   public static final String DEFAULT_TEMPORARY_HDFS_DIRECTORY = "/user/"
       + System.getProperty("user.name") + "/hbase-staging";
-
+  public static final String DEFAULT_LOSSY_COUNTING_ERROR_RATE =
+      "hbase.util.default.lossycounting.errorrate";
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2fd69cf9/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
new file mode 100644
index 0000000..db97789
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required by applicable
+ * law or agreed to in writing, software distributed under the License is 
distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License
+ * for the specific language governing permissions and limitations under the 
License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.metrics.Meter;
+import org.apache.hadoop.hbase.metrics.Metric;
+import org.apache.hadoop.hbase.metrics.MetricRegistry;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.LossyCounting;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+/**
+ * A coprocessor that collects metrics from meta table.
+ * <p>
+ * These metrics will be available through the regular Hadoop metrics2 sinks 
(ganglia, opentsdb,
+ * etc) as well as JMX output.
+ * </p>
+ * @see MetaTableMetrics
+ */
+
+@InterfaceAudience.Private
+public class MetaTableMetrics extends BaseRegionObserver {
+
+  private Map<String, Optional<Metric>> requestsMap;
+  private RegionCoprocessorEnvironment regionCoprocessorEnv;
+  private LossyCounting clientMetricsLossyCounting;
+  private boolean active = false;
+
+  enum MetaTableOps {
+    GET, PUT, DELETE;
+  }
+
+  private ImmutableMap<Class, MetaTableOps> opsNameMap =
+      ImmutableMap.<Class, MetaTableOps>builder()
+              .put(Put.class, MetaTableOps.PUT)
+              .put(Get.class, MetaTableOps.GET)
+              .put(Delete.class, MetaTableOps.DELETE)
+              .build();
+
+  @Override
+  public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get 
get,
+      List<Cell> results) throws IOException {
+    if (!active || !isMetaTableOp(e)) {
+      return;
+    }
+    tableMetricRegisterAndMark(e, get);
+    clientMetricRegisterAndMark(e);
+    regionMetricRegisterAndMark(e, get);
+    opMetricRegisterAndMark(e, get);
+    opWithClientMetricRegisterAndMark(e, get);
+  }
+
+  @Override
+  public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, 
WALEdit edit,
+      Durability durability) throws IOException {
+    if (!active || !isMetaTableOp(e)) {
+      return;
+    }
+    tableMetricRegisterAndMark(e, put);
+    clientMetricRegisterAndMark(e);
+    regionMetricRegisterAndMark(e, put);
+    opMetricRegisterAndMark(e, put);
+    opWithClientMetricRegisterAndMark(e, put);
+  }
+
+  @Override
+  public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, 
Delete delete,
+      WALEdit edit, Durability durability) throws IOException {
+    if (!active || !isMetaTableOp(e)) {
+      return;
+    }
+    tableMetricRegisterAndMark(e, delete);
+    clientMetricRegisterAndMark(e);
+    regionMetricRegisterAndMark(e, delete);
+    opMetricRegisterAndMark(e, delete);
+    opWithClientMetricRegisterAndMark(e, delete);
+  }
+
+  private void markMeterIfPresent(String requestMeter) {
+    if (requestMeter.isEmpty()) {
+      return;
+    }
+    Metric metric =
+        requestsMap.get(requestMeter).isPresent() ? 
requestsMap.get(requestMeter).get() : null;
+    if (metric != null) {
+      ((Meter) metric).mark();
+    }
+  }
+
+  private void 
registerMeterIfNotPresent(ObserverContext<RegionCoprocessorEnvironment> e,
+      String requestMeter) {
+    if (requestMeter.isEmpty()) {
+      return;
+    }
+    if (!requestsMap.containsKey(requestMeter)) {
+      MetricRegistry registry = 
regionCoprocessorEnv.getMetricRegistryForRegionServer();
+      registry.meter(requestMeter);
+      requestsMap.put(requestMeter, registry.get(requestMeter));
+    }
+  }
+
+  /**
+   * Registers and counts lossyCount for Meters that kept by lossy counting.
+   * By using lossy count to maintain meters, at most 7 / e meters will be 
kept  (e is error rate)
+   * e.g. when e is 0.02 by default, at most 50 Clients request metrics will 
be kept
+   *      also, all kept elements have frequency higher than e * N. (N is 
total count)
+   * @param e Region coprocessor environment
+   * @param requestMeter meter to be registered
+   * @param lossyCounting lossyCounting object for one type of meters.
+   */
+  private void registerLossyCountingMeterIfNotPresent(
+      ObserverContext<RegionCoprocessorEnvironment> e,
+      String requestMeter, LossyCounting lossyCounting) {
+    if (requestMeter.isEmpty()) {
+      return;
+    }
+    Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
+    if(!requestsMap.containsKey(requestMeter) && 
metersToBeRemoved.contains(requestMeter)){
+      for(String meter: metersToBeRemoved) {
+        //cleanup requestsMap according swept data from lossy count;
+        requestsMap.remove(meter);
+        MetricRegistry registry = 
regionCoprocessorEnv.getMetricRegistryForRegionServer();
+        registry.remove(meter);
+      }
+      // newly added meter is swept by lossy counting cleanup. No need to put 
it into requestsMap.
+      return;
+    }
+
+    if (!requestsMap.containsKey(requestMeter)) {
+      MetricRegistry registry = 
regionCoprocessorEnv.getMetricRegistryForRegionServer();
+      registry.meter(requestMeter);
+      requestsMap.put(requestMeter, registry.get(requestMeter));
+    }
+  }
+
+  /**
+   * Get table name from Ops such as: get, put, delete.
+   * @param op such as get, put or delete.
+   */
+  private String getTableNameFromOp(Row op) {
+    String tableName = null;
+    String tableRowKey = new String(((Row) op).getRow(), 
StandardCharsets.UTF_8);
+    if (tableRowKey.isEmpty()) {
+      return null;
+    }
+    tableName = tableRowKey.split(",").length > 0 ? tableRowKey.split(",")[0] 
: null;
+    return tableName;
+  }
+
+  /**
+   * Get regionId from Ops such as: get, put, delete.
+   * @param op  such as get, put or delete.
+   */
+  private String getRegionIdFromOp(Row op) {
+    String regionId = null;
+    String tableRowKey = new String(((Row) op).getRow(), 
StandardCharsets.UTF_8);
+    if (tableRowKey.isEmpty()) {
+      return null;
+    }
+    regionId = tableRowKey.split(",").length > 2 ? tableRowKey.split(",")[2] : 
null;
+    return regionId;
+  }
+
+  private boolean isMetaTableOp(ObserverContext<RegionCoprocessorEnvironment> 
e) {
+    return TableName.META_TABLE_NAME.toString()
+        .equals(new 
String(e.getEnvironment().getRegionInfo().getTable().getName(),
+            StandardCharsets.UTF_8));
+  }
+
+  private void 
clientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e) {
+    String clientIP = RpcServer.getRemoteIp() != null ? 
RpcServer.getRemoteIp().toString() : "";
+
+    String clientRequestMeter = clientRequestMeterName(clientIP);
+    registerLossyCountingMeterIfNotPresent(e, clientRequestMeter, 
clientMetricsLossyCounting);
+    markMeterIfPresent(clientRequestMeter);
+  }
+
+  private void 
tableMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
+      Row op) {
+    // Mark the meta table meter whenever the coprocessor is called
+    String tableName = getTableNameFromOp(op);
+    String tableRequestMeter = tableMeterName(tableName);
+    registerMeterIfNotPresent(e, tableRequestMeter);
+    markMeterIfPresent(tableRequestMeter);
+  }
+
+  private void 
regionMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
+      Row op) {
+    // Mark the meta table meter whenever the coprocessor is called
+    String regionId = getRegionIdFromOp(op);
+    String regionRequestMeter = regionMeterName(regionId);
+    registerMeterIfNotPresent(e, regionRequestMeter);
+    markMeterIfPresent(regionRequestMeter);
+  }
+
+  private void 
opMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
+      Row op) {
+    String opMeterName = opMeterName(op);
+    registerMeterIfNotPresent(e, opMeterName);
+    markMeterIfPresent(opMeterName);
+  }
+
+  private void 
opWithClientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> 
e,
+      Object op) {
+    String opWithClientMeterName = opWithClientMeterName(op);
+    registerMeterIfNotPresent(e, opWithClientMeterName);
+    markMeterIfPresent(opWithClientMeterName);
+  }
+
+  private String opWithClientMeterName(Object op) {
+    String clientIP = RpcServer.getRemoteIp() != null ? 
RpcServer.getRemoteIp().toString() : "";
+    if (clientIP.isEmpty()) {
+      return "";
+    }
+    MetaTableOps ops = opsNameMap.get(op.getClass());
+    String opWithClientMeterName = "";
+    switch (ops) {
+      case GET:
+        opWithClientMeterName = 
String.format("MetaTable_client_%s_get_request", clientIP);
+        break;
+      case PUT:
+        opWithClientMeterName = 
String.format("MetaTable_client_%s_put_request", clientIP);
+        break;
+      case DELETE:
+        opWithClientMeterName = 
String.format("MetaTable_client_%s_delete_request", clientIP);
+        break;
+      default:
+        break;
+    }
+    return opWithClientMeterName;
+  }
+
+  private String opMeterName(Object op) {
+    MetaTableOps ops = opsNameMap.get(op.getClass());
+    String opMeterName = "";
+    switch (ops) {
+      case GET:
+        opMeterName = "MetaTable_get_request";
+        break;
+      case PUT:
+        opMeterName = "MetaTable_put_request";
+        break;
+      case DELETE:
+        opMeterName = "MetaTable_delete_request";
+        break;
+      default:
+        break;
+    }
+    return opMeterName;
+  }
+
+  private String tableMeterName(String tableName) {
+    return String.format("MetaTable_table_%s_request", tableName);
+  }
+
+  private String clientRequestMeterName(String clientIP) {
+    if (clientIP.isEmpty()) {
+      return "";
+    }
+    return String.format("MetaTable_client_%s_request", clientIP);
+  }
+
+  private String regionMeterName(String regionId) {
+    return String.format("MetaTable_region_%s_request", regionId);
+  }
+
+  @Override
+  public void start(CoprocessorEnvironment env) throws IOException {
+    if (env instanceof RegionCoprocessorEnvironment
+        && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != 
null
+        && ((RegionCoprocessorEnvironment) 
env).getRegionInfo().getTable().getName() != null
+        && new String(((RegionCoprocessorEnvironment) 
env).getRegionInfo().getTable().getName(),
+          
StandardCharsets.UTF_8).equals(TableName.META_TABLE_NAME.toString())) {
+      regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
+      requestsMap = new ConcurrentHashMap<>();
+      clientMetricsLossyCounting = new LossyCounting();
+      // only be active mode when this region holds meta table.
+      active = true;
+    }
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment e) throws IOException {
+    // since meta region can move around, clear stale metrics when stop.
+    for (String meterName : requestsMap.keySet()) {
+      MetricRegistry registry = 
regionCoprocessorEnv.getMetricRegistryForRegionServer();
+      registry.remove(meterName);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2fd69cf9/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
new file mode 100644
index 0000000..2195da6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
@@ -0,0 +1,134 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * LossyCounting utility, bounded data structure that maintains approximate 
high frequency
+ * elements in data stream.
+ *
+ * Bucket size is 1 / error rate.  (Error rate is 0.02 by default)
+ * Lemma If element does not appear in set, then is frequency is less than e 
* N
+ *       (N is total element counts until now.)
+ * Based on paper:
+ * http://www.vldb.org/conf/2002/S10P03.pdf
+ */
+
+@InterfaceAudience.Public
+public class LossyCounting {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LossyCounting.class);
+  private long bucketSize;
+  private long currentTerm;
+  private double errorRate;
+  private Map<String, Integer> data;
+  private long totalDataCount;
+
+  public LossyCounting(double errorRate) {
+    this.errorRate = errorRate;
+    if (errorRate < 0.0 || errorRate > 1.0) {
+      throw new IllegalArgumentException(" Lossy Counting error rate should be 
within range [0,1]");
+    }
+    this.bucketSize = (long) Math.ceil(1 / errorRate);
+    this.currentTerm = 1;
+    this.totalDataCount = 0;
+    this.errorRate = errorRate;
+    this.data = new ConcurrentHashMap<>();
+    calculateCurrentTerm();
+  }
+
+  public LossyCounting() {
+    Configuration conf = HBaseConfiguration.create();
+    this.errorRate = 
conf.getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02);
+    this.bucketSize = (long) Math.ceil(1.0 / errorRate);
+    this.currentTerm = 1;
+    this.totalDataCount = 0;
+    this.data = new ConcurrentHashMap<>();
+    calculateCurrentTerm();
+  }
+
+  public Set<String> addByOne(String key) {
+    if(data.containsKey(key)) {
+      data.put(key, data.get(key) +1);
+    } else {
+      data.put(key, 1);
+    }
+    totalDataCount++;
+    calculateCurrentTerm();
+    Set<String> dataToBeSwept = new HashSet<>();
+    if(totalDataCount % bucketSize == 0) {
+      dataToBeSwept = sweep();
+    }
+    return dataToBeSwept;
+  }
+
+  /**
+   * sweep low frequency data
+   * @return Names of elements got swept
+   */
+  private Set<String> sweep() {
+    Set<String> dataToBeSwept = new HashSet<>();
+    for(Map.Entry<String, Integer> entry : data.entrySet()) {
+      if(entry.getValue() + errorRate < currentTerm) {
+        dataToBeSwept.add(entry.getKey());
+      }
+    }
+    for(String key : dataToBeSwept) {
+      data.remove(key);
+    }
+    LOG.debug(String.format("Swept %d of elements.", dataToBeSwept.size()));
+    return dataToBeSwept;
+  }
+
+  /**
+   * Calculate and set current term
+   */
+  private void calculateCurrentTerm() {
+    this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize);
+  }
+
+  public long getBuketSize(){
+    return bucketSize;
+  }
+
+  public long getDataSize() {
+    return data.size();
+  }
+
+  public boolean contains(String key) {
+    return data.containsKey(key);
+  }
+
+  public long getCurrentTerm() {
+    return currentTerm;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/2fd69cf9/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
new file mode 100644
index 0000000..23dd517
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required by applicable
+ * law or agreed to in writing, software distributed under the License is 
distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License
+ * for the specific language governing permissions and limitations under the 
License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.JMXListener;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Category({ CoprocessorTests.class, MediumTests.class })
+public class TestMetaTableMetrics {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestMetaTableMetrics.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static final TableName NAME1 = 
TableName.valueOf("TestExampleMetaTableMetricsOne");
+  private static final byte[] FAMILY = Bytes.toBytes("f");
+  private static final byte[] QUALIFIER = Bytes.toBytes("q");
+  private static final int NUM_ROWS = 5;
+  private static final String value = "foo";
+  private static Configuration conf = null;
+  private static int connectorPort = 61120;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+
+    conf = UTIL.getConfiguration();
+    // Set system coprocessor so it can be applied to meta regions
+    UTIL.getConfiguration().set("hbase.coprocessor.region.classes",
+      MetaTableMetrics.class.getName());
+    conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, 
JMXListener.class.getName());
+    Random rand = new Random();
+    for (int i = 0; i < 10; i++) {
+      do {
+        int sign = i % 2 == 0 ? 1 : -1;
+        connectorPort += sign * rand.nextInt(100);
+      } while (!HBaseTestingUtility.available(connectorPort));
+      try {
+        conf.setInt("regionserver.rmi.registry.port", connectorPort);
+        UTIL.startMiniCluster(1);
+        UTIL.createTable(NAME1, new byte[][]{FAMILY});
+        LOG.error("util to string" + UTIL.toString());
+        break;
+      } catch (Exception e) {
+        LOG.debug("Encountered exception when starting cluster. Trying port " 
+ connectorPort, e);
+        try {
+          // this is to avoid "IllegalStateException: A mini-cluster is 
already running"
+          UTIL.shutdownMiniCluster();
+        } catch (Exception ex) {
+          LOG.debug("Encountered exception shutting down cluster", ex);
+        }
+      }
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  private void writeData(Table t) throws IOException {
+    List<Put> puts = new ArrayList<>(NUM_ROWS);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      Put p = new Put(Bytes.toBytes(i + 1));
+      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(value));
+      puts.add(p);
+    }
+    t.put(puts);
+  }
+
+  private Set<String> readJmxMetricsWithRetry() throws IOException {
+    final int count = 0;
+    for (int i = 0; i < 10; i++) {
+      Set<String> metrics = readJmxMetrics();
+      if (metrics != null) {
+        return metrics;
+      }
+      LOG.warn("Failed to get jmxmetrics... sleeping, retrying; " + i + " of " 
+ count + " times");
+      Threads.sleep(1000);
+    }
+    return null;
+  }
+
+  /**
+   * Read the attributes from Hadoop->HBase->RegionServer->MetaTableMetrics in 
JMX
+   * @throws IOException when fails to retrieve jmx metrics.
+   */
+  // this method comes from this class: TestStochasticBalancerJmxMetrics with 
minor modifications.
+  private Set<String> readJmxMetrics() throws IOException {
+    JMXConnector connector = null;
+    ObjectName target = null;
+    MBeanServerConnection mb = null;
+    try {
+      connector =
+          
JMXConnectorFactory.connect(JMXListener.buildJMXServiceURL(connectorPort, 
connectorPort));
+      mb = connector.getMBeanServerConnection();
+
+      @SuppressWarnings("JdkObsolete")
+      Hashtable<String, String> pairs = new Hashtable<>();
+      pairs.put("service", "HBase");
+      pairs.put("name", "RegionServer");
+      pairs.put("sub",
+        "Coprocessor.Region.CP_org.apache.hadoop.hbase.coprocessor"
+            + ".MetaTableMetrics");
+      target = new ObjectName("Hadoop", pairs);
+      MBeanInfo beanInfo = mb.getMBeanInfo(target);
+
+      Set<String> existingAttrs = new HashSet<>();
+      for (MBeanAttributeInfo attrInfo : beanInfo.getAttributes()) {
+        existingAttrs.add(attrInfo.getName());
+      }
+      return existingAttrs;
+    } catch (Exception e) {
+      LOG.warn("Failed to get bean." + target, e);
+      if (mb != null) {
+        Set<ObjectInstance> instances = mb.queryMBeans(null, null);
+        Iterator<ObjectInstance> iterator = instances.iterator();
+        LOG.warn("MBean Found:");
+        while (iterator.hasNext()) {
+          ObjectInstance instance = iterator.next();
+          LOG.warn("Class Name: " + instance.getClassName());
+          LOG.warn("Object Name: " + instance.getObjectName());
+        }
+      }
+    } finally {
+      if (connector != null) {
+        try {
+          connector.close();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+    return null;
+  }
+
+  // verifies meta table metrics exist from jmx
+  // for one table, there should be 5 MetaTable_table_<TableName> metrics.
+  // such as:
+  // [Time-limited test] example.TestMetaTableMetrics(204): ==
+  //    MetaTable_table_TestExampleMetaTableMetricsOne_request_count
+  // [Time-limited test] example.TestMetaTableMetrics(204): ==
+  //    MetaTable_table_TestExampleMetaTableMetricsOne_request_mean_rate
+  // [Time-limited test] example.TestMetaTableMetrics(204): ==
+  //    MetaTable_table_TestExampleMetaTableMetricsOne_request_1min_rate
+  // [Time-limited test] example.TestMetaTableMetrics(204): ==
+  //    MetaTable_table_TestExampleMetaTableMetricsOne_request_5min_rate
+  // [Time-limited test] example.TestMetaTableMetrics(204): ==
+  // MetaTable_table_TestExampleMetaTableMetricsOne_request_15min_rate
+  @Test
+  public void test() throws IOException, InterruptedException {
+    try (Table t = UTIL.getConnection().getTable(NAME1)) {
+      writeData(t);
+      // Flush the data
+      UTIL.flush(NAME1);
+      // Issue a compaction
+      UTIL.compact(NAME1, true);
+      Thread.sleep(2000);
+    }
+    Set<String> jmxMetrics = readJmxMetricsWithRetry();
+    assertNotNull(jmxMetrics);
+
+    long name1TableMetricsCount = 0;
+    for(String metric : jmxMetrics) {
+      if (metric.contains("MetaTable_table_" + NAME1)){
+        name1TableMetricsCount++;
+      }
+    }
+    assertEquals(5L, name1TableMetricsCount);
+
+    String putWithClientMetricNameRegex = "MetaTable_client_.+_put_request.*";
+    long putWithClientMetricsCount = 0;
+    for(String metric : jmxMetrics) {
+      if(metric.matches(putWithClientMetricNameRegex)) {
+        putWithClientMetricsCount++;
+      }
+    }
+    assertEquals(5L, putWithClientMetricsCount);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2fd69cf9/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
new file mode 100644
index 0000000..71945fc
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MiscTests.class, SmallTests.class})
+public class TestLossyCounting {
+
+  @Test
+  public void testBucketSize() {
+    LossyCounting lossyCounting = new LossyCounting(0.01);
+    assertEquals(100L, lossyCounting.getBuketSize());
+    LossyCounting lossyCounting2 = new LossyCounting();
+    assertEquals(50L, lossyCounting2.getBuketSize());
+  }
+
+  @Test
+  public void testAddByOne() {
+    LossyCounting lossyCounting = new LossyCounting(0.01);
+    for(int i = 0; i < 100; i++){
+      String key = "" + i;
+      lossyCounting.addByOne(key);
+    }
+    assertEquals(100L, lossyCounting.getDataSize());
+    for(int i = 0; i < 100; i++){
+      String key = "" + i;
+      assertEquals(true, lossyCounting.contains(key));
+    }
+  }
+
+  @Test
+  public void testSweep1() {
+    LossyCounting lossyCounting = new LossyCounting(0.01);
+    for(int i = 0; i < 400; i++){
+      String key = "" + i;
+      lossyCounting.addByOne(key);
+    }
+    assertEquals(4L, lossyCounting.getCurrentTerm());
+    assertEquals(0L, lossyCounting.getDataSize());
+  }
+
+  @Test
+  public void testSweep2() {
+    LossyCounting lossyCounting = new LossyCounting(0.1);
+    for(int i = 0; i < 10; i++){
+      String key = "" + i;
+      lossyCounting.addByOne(key);
+    }
+    assertEquals(10L, lossyCounting.getDataSize());
+    for(int i = 0; i < 10; i++){
+      String key = "1";
+      lossyCounting.addByOne(key);
+    }
+    assertEquals(1L, lossyCounting.getDataSize());
+  }
+
+
+}
\ No newline at end of file

Reply via email to