Repository: ambari
Updated Branches:
  refs/heads/trunk 9b176b5ff -> cd3ae1e62


AMBARI-9612 AMS : Kafka Metrics - Log flush status metrics do not show up.

Feeding the data into AMS in legacy format


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/cd3ae1e6
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/cd3ae1e6
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/cd3ae1e6

Branch: refs/heads/trunk
Commit: cd3ae1e6229740fb6feba7e64e50fa63570a20a3
Parents: 9b176b5
Author: Florian Barca <fba...@hortonworks.com>
Authored: Wed Feb 18 09:22:53 2015 -0800
Committer: Florian Barca <fba...@hortonworks.com>
Committed: Wed Feb 18 09:22:53 2015 -0800

----------------------------------------------------------------------
 .../kafka/KafkaTimelineMetricsReporter.java     | 237 ++++++++-----------
 1 file changed, 99 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/cd3ae1e6/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
 
b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index 762b5f2..63097e5 100644
--- 
a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ 
b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -51,6 +51,7 @@ import com.yammer.metrics.core.Metric;
 import com.yammer.metrics.core.MetricName;
 import com.yammer.metrics.core.MetricProcessor;
 import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Summarizable;
 import com.yammer.metrics.core.Timer;
 import com.yammer.metrics.stats.Snapshot;
 
@@ -69,7 +70,7 @@ public class KafkaTimelineMetricsReporter extends 
AbstractTimelineMetricsSink im
 
   private boolean initialized = false;
   private boolean running = false;
-  private Object lock = new Object();
+  private final Object lock = new Object();
   private String collectorUri;
   private String hostname;
   private SocketAddress socketAddress;
@@ -242,33 +243,9 @@ public class KafkaTimelineMetricsReporter extends 
AbstractTimelineMetricsSink im
     public void processMeter(MetricName name, Metered meter, Context context) 
throws Exception {
       final long currentTimeMillis = System.currentTimeMillis();
       final String sanitizedName = sanitizeName(name);
-      final String meterCountName = sanitizedName + COUNT_SUFIX;
-      final TimelineMetric countMetric = 
createTimelineMetric(currentTimeMillis, APP_ID, meterCountName, meter.count());
 
-      final String meterOneMinuteRateName = sanitizedName + 
ONE_MINUTE_RATE_SUFIX;
-      final TimelineMetric oneMinuteRateMetric = 
createTimelineMetric(currentTimeMillis, APP_ID,
-          meterOneMinuteRateName, meter.oneMinuteRate());
+      String[] metricNames = cacheKafkaMetered(currentTimeMillis, 
sanitizedName, meter);
 
-      final String meterMeanRateName = sanitizedName + MEAN_RATE_SUFIX;
-      final TimelineMetric meanMetric = 
createTimelineMetric(currentTimeMillis, APP_ID, meterMeanRateName,
-          meter.meanRate());
-
-      final String meterFiveMinuteRateName = sanitizedName + 
FIVE_MINUTE_RATE_SUFIX;
-      final TimelineMetric fiveMinuteRateMetric = 
createTimelineMetric(currentTimeMillis, APP_ID,
-          meterFiveMinuteRateName, meter.fiveMinuteRate());
-
-      final String meterFifteenMinuteRateName = sanitizedName + 
FIFTEEN_MINUTE_RATE_SUFIX;
-      final TimelineMetric fifteenMinuteRateMetric = 
createTimelineMetric(currentTimeMillis, APP_ID,
-          meterFifteenMinuteRateName, meter.fifteenMinuteRate());
-
-      metricsCache.putTimelineMetric(countMetric);
-      metricsCache.putTimelineMetric(oneMinuteRateMetric);
-      metricsCache.putTimelineMetric(meanMetric);
-      metricsCache.putTimelineMetric(fiveMinuteRateMetric);
-      metricsCache.putTimelineMetric(fifteenMinuteRateMetric);
-
-      String[] metricNames = new String[] { meterCountName, 
meterOneMinuteRateName, meterMeanRateName,
-          meterFiveMinuteRateName, meterFifteenMinuteRateName };
       populateMetricsList(context, metricNames);
     }
 
@@ -276,9 +253,10 @@ public class KafkaTimelineMetricsReporter extends 
AbstractTimelineMetricsSink im
     public void processCounter(MetricName name, Counter counter, Context 
context) throws Exception {
       final long currentTimeMillis = System.currentTimeMillis();
       final String sanitizedName = sanitizeName(name);
-      final String metricCountName = sanitizedName + COUNT_SUFIX;
-      final TimelineMetric metric = createTimelineMetric(currentTimeMillis, 
APP_ID, metricCountName, counter.count());
-      metricsCache.putTimelineMetric(metric);
+
+      final String metricCountName = 
cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          COUNT_SUFIX, counter.count());
+
       populateMetricsList(context, metricCountName);
     }
 
@@ -288,61 +266,20 @@ public class KafkaTimelineMetricsReporter extends 
AbstractTimelineMetricsSink im
       final Snapshot snapshot = histogram.getSnapshot();
       final String sanitizedName = sanitizeName(name);
 
-      final String histogramMinName = sanitizedName + MIN_SUFIX;
-      final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, 
APP_ID, histogramMinName,
-          histogram.min());
-
-      final String histogramMaxName = sanitizedName + MAX_SUFIX;
-      final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, 
APP_ID, histogramMaxName,
-          histogram.max());
-
-      final String histogramMeanName = sanitizedName + MEAN_SUFIX;
-      final TimelineMetric meanMetric = 
createTimelineMetric(currentTimeMillis, APP_ID, histogramMeanName,
-          histogram.mean());
-
-      final String histogramMedianName = sanitizedName + MEDIAN_SUFIX;
-      final TimelineMetric medianMetric = 
createTimelineMetric(currentTimeMillis, APP_ID, histogramMedianName,
-          snapshot.getMedian());
-
-      final String histogramStdDevName = sanitizedName + STD_DEV_SUFIX;
-      final TimelineMetric stdDevMetric = 
createTimelineMetric(currentTimeMillis, APP_ID, histogramStdDevName,
-          histogram.stdDev());
-
-      final String histogramSeventyFifthPercentileName = sanitizedName + 
SEVENTY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric seventyFifthPercentileMetric = 
createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramSeventyFifthPercentileName, snapshot.get75thPercentile());
-
-      final String histogramNinetyFifthPercentileName = sanitizedName + 
NINETY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyFifthPercentileMetric = 
createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyFifthPercentileName, snapshot.get95thPercentile());
-
-      final String histogramNinetyEighthPercentileName = sanitizedName + 
NINETY_EIGHTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyEighthPercentileMetric = 
createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyEighthPercentileName, snapshot.get98thPercentile());
-
-      final String histogramNinetyNinethPercentileName = sanitizedName + 
NINETY_NINTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPercentileMetric = 
createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyNinethPercentileName, snapshot.get99thPercentile());
-
-      final String histogramNinetyNinePointNinePercentileName = sanitizedName 
+ NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPointNinePercentileMetric = 
createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyNinePointNinePercentileName, 
snapshot.get999thPercentile());
-
-      metricsCache.putTimelineMetric(minMetric);
-      metricsCache.putTimelineMetric(maxMetric);
-      metricsCache.putTimelineMetric(meanMetric);
-      metricsCache.putTimelineMetric(medianMetric);
-      metricsCache.putTimelineMetric(stdDevMetric);
-      metricsCache.putTimelineMetric(seventyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyEighthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric);
-
-      String[] metricNames = new String[] { histogramMaxName, 
histogramMeanName, histogramMedianName, histogramMinName,
-          histogramNinetyEighthPercentileName, 
histogramNinetyFifthPercentileName,
-          histogramNinetyNinePointNinePercentileName, 
histogramNinetyNinethPercentileName,
-          histogramSeventyFifthPercentileName, histogramStdDevName };
+      String[] metricHNames = cacheKafkaSummarizable(currentTimeMillis, 
sanitizedName, histogram);
+      String[] metricSNames = cacheKafkaSnapshot(currentTimeMillis, 
sanitizedName, snapshot);
+
+      String[] metricNames = new String[] {
+          metricHNames[0],
+          metricHNames[1],
+          metricSNames[0],
+          metricHNames[2],
+          metricSNames[1],
+          metricSNames[2],
+          metricSNames[3],
+          metricSNames[4],
+          metricSNames[5],
+          metricHNames[3] };
       populateMetricsList(context, metricNames);
     }
 
@@ -352,57 +289,26 @@ public class KafkaTimelineMetricsReporter extends 
AbstractTimelineMetricsSink im
       final Snapshot snapshot = timer.getSnapshot();
       final String sanitizedName = sanitizeName(name);
 
-      final String timerMinName = sanitizedName + MIN_SUFIX;
-      final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, 
APP_ID, timerMinName, timer.min());
-
-      final String timerMaxName = sanitizedName + MAX_SUFIX;
-      final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, 
APP_ID, timerMaxName, timer.max());
-
-      final String timerMeanName = sanitizedName + MEAN_SUFIX;
-      final TimelineMetric meanMetric = 
createTimelineMetric(currentTimeMillis, APP_ID, timerMeanName, timer.mean());
-
-      final String timerMedianName = sanitizedName + MEDIAN_SUFIX;
-      final TimelineMetric medianMetric = 
createTimelineMetric(currentTimeMillis, APP_ID, timerMedianName,
-          snapshot.getMedian());
-
-      final String timerStdDevName = sanitizedName + STD_DEV_SUFIX;
-      final TimelineMetric stdDevMetric = 
createTimelineMetric(currentTimeMillis, APP_ID, timerStdDevName,
-          timer.stdDev());
-
-      final String timerSeventyFifthPercentileName = sanitizedName + 
SEVENTY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric seventyFifthPercentileMetric = 
createTimelineMetric(currentTimeMillis, APP_ID,
-          timerSeventyFifthPercentileName, snapshot.get75thPercentile());
-
-      final String timerNinetyFifthPercentileName = sanitizedName + 
NINETY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyFifthPercentileMetric = 
createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyFifthPercentileName, snapshot.get95thPercentile());
-
-      final String timerNinetyEighthPercentileName = sanitizedName + 
NINETY_EIGHTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyEighthPercentileMetric = 
createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyEighthPercentileName, snapshot.get98thPercentile());
-
-      final String timerNinetyNinthPercentileName = sanitizedName + 
NINETY_NINTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPercentileMetric = 
createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyNinthPercentileName, snapshot.get99thPercentile());
-
-      final String timerNinetyNinePointNinePercentileName = sanitizedName + 
NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPointNinePercentileMetric = 
createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyNinePointNinePercentileName, 
snapshot.get999thPercentile());
-
-      metricsCache.putTimelineMetric(minMetric);
-      metricsCache.putTimelineMetric(maxMetric);
-      metricsCache.putTimelineMetric(meanMetric);
-      metricsCache.putTimelineMetric(medianMetric);
-      metricsCache.putTimelineMetric(stdDevMetric);
-      metricsCache.putTimelineMetric(seventyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyEighthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric);
-
-      String[] metricNames = new String[] { timerMaxName, timerMeanName, 
timerMedianName, timerMinName,
-          timerNinetyEighthPercentileName, timerNinetyFifthPercentileName, 
timerNinetyNinePointNinePercentileName,
-          timerNinetyNinthPercentileName, timerSeventyFifthPercentileName, 
timerStdDevName };
+      String[] metricMNames = cacheKafkaMetered(currentTimeMillis, 
sanitizedName, timer);
+      String[] metricTNames = cacheKafkaSummarizable(currentTimeMillis, 
sanitizedName, timer);
+      String[] metricSNames = cacheKafkaSnapshot(currentTimeMillis, 
sanitizedName, snapshot);
+
+      String[] metricNames = new String[] {
+          metricMNames[0],
+          metricMNames[1],
+          metricMNames[2],
+          metricMNames[3],
+          metricMNames[4],
+          metricTNames[0],
+          metricTNames[1],
+          metricSNames[0],
+          metricTNames[2],
+          metricSNames[1],
+          metricSNames[2],
+          metricSNames[3],
+          metricSNames[4],
+          metricSNames[5],
+          metricTNames[3] };
       populateMetricsList(context, metricNames);
     }
 
@@ -410,12 +316,67 @@ public class KafkaTimelineMetricsReporter extends 
AbstractTimelineMetricsSink im
     public void processGauge(MetricName name, Gauge<?> gauge, Context context) 
throws Exception {
       final long currentTimeMillis = System.currentTimeMillis();
       final String sanitizedName = sanitizeName(name);
-      final TimelineMetric metric = createTimelineMetric(currentTimeMillis, 
APP_ID, sanitizedName,
-          Double.parseDouble(String.valueOf(gauge.value())));
-      metricsCache.putTimelineMetric(metric);
+
+      cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, "", 
Double.parseDouble(String.valueOf(gauge.value())));
+
       populateMetricsList(context, sanitizedName);
     }
 
+    private String[] cacheKafkaMetered(long currentTimeMillis, String 
sanitizedName, Metered meter) {
+      final String meterCountName = 
cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          COUNT_SUFIX, meter.count());
+      final String meterOneMinuteRateName = 
cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          ONE_MINUTE_RATE_SUFIX, meter.oneMinuteRate());
+      final String meterMeanRateName = 
cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          MEAN_RATE_SUFIX, meter.meanRate());
+      final String meterFiveMinuteRateName = 
cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          FIVE_MINUTE_RATE_SUFIX, meter.fiveMinuteRate());
+      final String meterFifteenMinuteRateName = 
cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          FIFTEEN_MINUTE_RATE_SUFIX, meter.fifteenMinuteRate());
+
+      return new String[] { meterCountName, meterOneMinuteRateName, 
meterMeanRateName,
+          meterFiveMinuteRateName, meterFifteenMinuteRateName };
+    }
+
+    private String[] cacheKafkaSummarizable(long currentTimeMillis, String 
sanitizedName, Summarizable summarizable) {
+      final String minName = cacheSanitizedTimelineMetric(currentTimeMillis, 
sanitizedName,
+          MIN_SUFIX, summarizable.min());
+      final String maxName = cacheSanitizedTimelineMetric(currentTimeMillis, 
sanitizedName,
+          MAX_SUFIX, summarizable.max());
+      final String meanName = cacheSanitizedTimelineMetric(currentTimeMillis, 
sanitizedName,
+          MEAN_SUFIX, summarizable.mean());
+      final String stdDevName = 
cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          STD_DEV_SUFIX, summarizable.stdDev());
+
+      return new String[] { maxName, meanName, minName, stdDevName };
+    }
+
+    private String[] cacheKafkaSnapshot(long currentTimeMillis, String 
sanitizedName, Snapshot snapshot) {
+      final String medianName = 
cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          MEDIAN_SUFIX, snapshot.getMedian());
+      final String seventyFifthPercentileName = 
cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          SEVENTY_FIFTH_PERCENTILE_SUFIX, snapshot.get75thPercentile());
+      final String ninetyFifthPercentileName = 
cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          NINETY_FIFTH_PERCENTILE_SUFIX, snapshot.get95thPercentile());
+      final String ninetyEighthPercentileName = 
cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          NINETY_EIGHTH_PERCENTILE_SUFIX, snapshot.get98thPercentile());
+      final String ninetyNinthPercentileName = 
cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          NINETY_NINTH_PERCENTILE_SUFIX, snapshot.get99thPercentile());
+      final String ninetyNinePointNinePercentileName = 
cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX, 
snapshot.get999thPercentile());
+
+      return new String[] { medianName,
+          ninetyEighthPercentileName, ninetyFifthPercentileName, 
ninetyNinePointNinePercentileName,
+          ninetyNinthPercentileName, seventyFifthPercentileName };
+    }
+
+    private String cacheSanitizedTimelineMetric(long currentTimeMillis, String 
sanitizedName, String suffix, Number metricValue) {
+      final String meterName = sanitizedName + suffix;
+      final TimelineMetric metric = createTimelineMetric(currentTimeMillis, 
APP_ID, meterName, metricValue);
+      metricsCache.putTimelineMetric(metric);
+      return meterName;
+    }
+
     private void populateMetricsList(Context context, String... metricNames) {
       for (String metricName : metricNames) {
         TimelineMetric cachedMetric = 
metricsCache.getTimelineMetric(metricName);

Reply via email to