Repository: ambari Updated Branches: refs/heads/trunk 9b176b5ff -> cd3ae1e62
AMBARI-9612 AMS : Kafka Metrics - Log flush status metrics do not show up. Feeding the data into AMS in legacy format Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/cd3ae1e6 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/cd3ae1e6 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/cd3ae1e6 Branch: refs/heads/trunk Commit: cd3ae1e6229740fb6feba7e64e50fa63570a20a3 Parents: 9b176b5 Author: Florian Barca <fba...@hortonworks.com> Authored: Wed Feb 18 09:22:53 2015 -0800 Committer: Florian Barca <fba...@hortonworks.com> Committed: Wed Feb 18 09:22:53 2015 -0800 ---------------------------------------------------------------------- .../kafka/KafkaTimelineMetricsReporter.java | 237 ++++++++----------- 1 file changed, 99 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/cd3ae1e6/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java index 762b5f2..63097e5 100644 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java @@ -51,6 +51,7 @@ import com.yammer.metrics.core.Metric; import com.yammer.metrics.core.MetricName; import com.yammer.metrics.core.MetricProcessor; import com.yammer.metrics.core.MetricsRegistry; +import com.yammer.metrics.core.Summarizable; import com.yammer.metrics.core.Timer; import com.yammer.metrics.stats.Snapshot; @@ -69,7 +70,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im private boolean initialized = false; private boolean running = false; - private Object lock = new Object(); + private final Object lock = new Object(); private String collectorUri; private String hostname; private SocketAddress socketAddress; @@ -242,33 +243,9 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im public void processMeter(MetricName name, Metered meter, Context context) throws Exception { final long currentTimeMillis = System.currentTimeMillis(); final String sanitizedName = sanitizeName(name); - final String meterCountName = sanitizedName + COUNT_SUFIX; - final TimelineMetric countMetric = createTimelineMetric(currentTimeMillis, APP_ID, meterCountName, meter.count()); - final String meterOneMinuteRateName = sanitizedName + ONE_MINUTE_RATE_SUFIX; - final TimelineMetric oneMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID, - meterOneMinuteRateName, meter.oneMinuteRate()); + String[] metricNames = cacheKafkaMetered(currentTimeMillis, sanitizedName, meter); - final String meterMeanRateName = sanitizedName + MEAN_RATE_SUFIX; - final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, meterMeanRateName, - meter.meanRate()); - - final String meterFiveMinuteRateName = sanitizedName + FIVE_MINUTE_RATE_SUFIX; - final TimelineMetric fiveMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID, - meterFiveMinuteRateName, meter.fiveMinuteRate()); - - final String meterFifteenMinuteRateName = sanitizedName + FIFTEEN_MINUTE_RATE_SUFIX; - final TimelineMetric fifteenMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID, - meterFifteenMinuteRateName, meter.fifteenMinuteRate()); - - metricsCache.putTimelineMetric(countMetric); - metricsCache.putTimelineMetric(oneMinuteRateMetric); - metricsCache.putTimelineMetric(meanMetric); - metricsCache.putTimelineMetric(fiveMinuteRateMetric); - metricsCache.putTimelineMetric(fifteenMinuteRateMetric); - - String[] metricNames = new String[] { meterCountName, meterOneMinuteRateName, meterMeanRateName, - meterFiveMinuteRateName, meterFifteenMinuteRateName }; populateMetricsList(context, metricNames); } @@ -276,9 +253,10 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im public void processCounter(MetricName name, Counter counter, Context context) throws Exception { final long currentTimeMillis = System.currentTimeMillis(); final String sanitizedName = sanitizeName(name); - final String metricCountName = sanitizedName + COUNT_SUFIX; - final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, metricCountName, counter.count()); - metricsCache.putTimelineMetric(metric); + + final String metricCountName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + COUNT_SUFIX, counter.count()); + populateMetricsList(context, metricCountName); } @@ -288,61 +266,20 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im final Snapshot snapshot = histogram.getSnapshot(); final String sanitizedName = sanitizeName(name); - final String histogramMinName = sanitizedName + MIN_SUFIX; - final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMinName, - histogram.min()); - - final String histogramMaxName = sanitizedName + MAX_SUFIX; - final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMaxName, - histogram.max()); - - final String histogramMeanName = sanitizedName + MEAN_SUFIX; - final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMeanName, - histogram.mean()); - - final String histogramMedianName = sanitizedName + MEDIAN_SUFIX; - final TimelineMetric medianMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMedianName, - snapshot.getMedian()); - - final String histogramStdDevName = sanitizedName + STD_DEV_SUFIX; - final TimelineMetric stdDevMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramStdDevName, - histogram.stdDev()); - - final String histogramSeventyFifthPercentileName = sanitizedName + SEVENTY_FIFTH_PERCENTILE_SUFIX; - final TimelineMetric seventyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - histogramSeventyFifthPercentileName, snapshot.get75thPercentile()); - - final String histogramNinetyFifthPercentileName = sanitizedName + NINETY_FIFTH_PERCENTILE_SUFIX; - final TimelineMetric nintyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - histogramNinetyFifthPercentileName, snapshot.get95thPercentile()); - - final String histogramNinetyEighthPercentileName = sanitizedName + NINETY_EIGHTH_PERCENTILE_SUFIX; - final TimelineMetric nintyEighthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - histogramNinetyEighthPercentileName, snapshot.get98thPercentile()); - - final String histogramNinetyNinethPercentileName = sanitizedName + NINETY_NINTH_PERCENTILE_SUFIX; - final TimelineMetric nintyNinthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - histogramNinetyNinethPercentileName, snapshot.get99thPercentile()); - - final String histogramNinetyNinePointNinePercentileName = sanitizedName + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX; - final TimelineMetric nintyNinthPointNinePercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - histogramNinetyNinePointNinePercentileName, snapshot.get999thPercentile()); - - metricsCache.putTimelineMetric(minMetric); - metricsCache.putTimelineMetric(maxMetric); - metricsCache.putTimelineMetric(meanMetric); - metricsCache.putTimelineMetric(medianMetric); - metricsCache.putTimelineMetric(stdDevMetric); - metricsCache.putTimelineMetric(seventyFifthPercentileMetric); - metricsCache.putTimelineMetric(nintyFifthPercentileMetric); - metricsCache.putTimelineMetric(nintyEighthPercentileMetric); - metricsCache.putTimelineMetric(nintyNinthPercentileMetric); - metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric); - - String[] metricNames = new String[] { histogramMaxName, histogramMeanName, histogramMedianName, histogramMinName, - histogramNinetyEighthPercentileName, histogramNinetyFifthPercentileName, - histogramNinetyNinePointNinePercentileName, histogramNinetyNinethPercentileName, - histogramSeventyFifthPercentileName, histogramStdDevName }; + String[] metricHNames = cacheKafkaSummarizable(currentTimeMillis, sanitizedName, histogram); + String[] metricSNames = cacheKafkaSnapshot(currentTimeMillis, sanitizedName, snapshot); + + String[] metricNames = new String[] { + metricHNames[0], + metricHNames[1], + metricSNames[0], + metricHNames[2], + metricSNames[1], + metricSNames[2], + metricSNames[3], + metricSNames[4], + metricSNames[5], + metricHNames[3] }; populateMetricsList(context, metricNames); } @@ -352,57 +289,26 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im final Snapshot snapshot = timer.getSnapshot(); final String sanitizedName = sanitizeName(name); - final String timerMinName = sanitizedName + MIN_SUFIX; - final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMinName, timer.min()); - - final String timerMaxName = sanitizedName + MAX_SUFIX; - final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMaxName, timer.max()); - - final String timerMeanName = sanitizedName + MEAN_SUFIX; - final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMeanName, timer.mean()); - - final String timerMedianName = sanitizedName + MEDIAN_SUFIX; - final TimelineMetric medianMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMedianName, - snapshot.getMedian()); - - final String timerStdDevName = sanitizedName + STD_DEV_SUFIX; - final TimelineMetric stdDevMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerStdDevName, - timer.stdDev()); - - final String timerSeventyFifthPercentileName = sanitizedName + SEVENTY_FIFTH_PERCENTILE_SUFIX; - final TimelineMetric seventyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - timerSeventyFifthPercentileName, snapshot.get75thPercentile()); - - final String timerNinetyFifthPercentileName = sanitizedName + NINETY_FIFTH_PERCENTILE_SUFIX; - final TimelineMetric nintyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - timerNinetyFifthPercentileName, snapshot.get95thPercentile()); - - final String timerNinetyEighthPercentileName = sanitizedName + NINETY_EIGHTH_PERCENTILE_SUFIX; - final TimelineMetric nintyEighthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - timerNinetyEighthPercentileName, snapshot.get98thPercentile()); - - final String timerNinetyNinthPercentileName = sanitizedName + NINETY_NINTH_PERCENTILE_SUFIX; - final TimelineMetric nintyNinthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - timerNinetyNinthPercentileName, snapshot.get99thPercentile()); - - final String timerNinetyNinePointNinePercentileName = sanitizedName + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX; - final TimelineMetric nintyNinthPointNinePercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID, - timerNinetyNinePointNinePercentileName, snapshot.get999thPercentile()); - - metricsCache.putTimelineMetric(minMetric); - metricsCache.putTimelineMetric(maxMetric); - metricsCache.putTimelineMetric(meanMetric); - metricsCache.putTimelineMetric(medianMetric); - metricsCache.putTimelineMetric(stdDevMetric); - metricsCache.putTimelineMetric(seventyFifthPercentileMetric); - metricsCache.putTimelineMetric(nintyFifthPercentileMetric); - metricsCache.putTimelineMetric(nintyEighthPercentileMetric); - metricsCache.putTimelineMetric(nintyNinthPercentileMetric); - metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric); - - String[] metricNames = new String[] { timerMaxName, timerMeanName, timerMedianName, timerMinName, - timerNinetyEighthPercentileName, timerNinetyFifthPercentileName, timerNinetyNinePointNinePercentileName, - timerNinetyNinthPercentileName, timerSeventyFifthPercentileName, timerStdDevName }; + String[] metricMNames = cacheKafkaMetered(currentTimeMillis, sanitizedName, timer); + String[] metricTNames = cacheKafkaSummarizable(currentTimeMillis, sanitizedName, timer); + String[] metricSNames = cacheKafkaSnapshot(currentTimeMillis, sanitizedName, snapshot); + + String[] metricNames = new String[] { + metricMNames[0], + metricMNames[1], + metricMNames[2], + metricMNames[3], + metricMNames[4], + metricTNames[0], + metricTNames[1], + metricSNames[0], + metricTNames[2], + metricSNames[1], + metricSNames[2], + metricSNames[3], + metricSNames[4], + metricSNames[5], + metricTNames[3] }; populateMetricsList(context, metricNames); } @@ -410,12 +316,67 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im public void processGauge(MetricName name, Gauge<?> gauge, Context context) throws Exception { final long currentTimeMillis = System.currentTimeMillis(); final String sanitizedName = sanitizeName(name); - final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, sanitizedName, - Double.parseDouble(String.valueOf(gauge.value()))); - metricsCache.putTimelineMetric(metric); + + cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, "", Double.parseDouble(String.valueOf(gauge.value()))); + populateMetricsList(context, sanitizedName); } + private String[] cacheKafkaMetered(long currentTimeMillis, String sanitizedName, Metered meter) { + final String meterCountName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + COUNT_SUFIX, meter.count()); + final String meterOneMinuteRateName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + ONE_MINUTE_RATE_SUFIX, meter.oneMinuteRate()); + final String meterMeanRateName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + MEAN_RATE_SUFIX, meter.meanRate()); + final String meterFiveMinuteRateName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + FIVE_MINUTE_RATE_SUFIX, meter.fiveMinuteRate()); + final String meterFifteenMinuteRateName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + FIFTEEN_MINUTE_RATE_SUFIX, meter.fifteenMinuteRate()); + + return new String[] { meterCountName, meterOneMinuteRateName, meterMeanRateName, + meterFiveMinuteRateName, meterFifteenMinuteRateName }; + } + + private String[] cacheKafkaSummarizable(long currentTimeMillis, String sanitizedName, Summarizable summarizable) { + final String minName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + MIN_SUFIX, summarizable.min()); + final String maxName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + MAX_SUFIX, summarizable.max()); + final String meanName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + MEAN_SUFIX, summarizable.mean()); + final String stdDevName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + STD_DEV_SUFIX, summarizable.stdDev()); + + return new String[] { maxName, meanName, minName, stdDevName }; + } + + private String[] cacheKafkaSnapshot(long currentTimeMillis, String sanitizedName, Snapshot snapshot) { + final String medianName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + MEDIAN_SUFIX, snapshot.getMedian()); + final String seventyFifthPercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + SEVENTY_FIFTH_PERCENTILE_SUFIX, snapshot.get75thPercentile()); + final String ninetyFifthPercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + NINETY_FIFTH_PERCENTILE_SUFIX, snapshot.get95thPercentile()); + final String ninetyEighthPercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + NINETY_EIGHTH_PERCENTILE_SUFIX, snapshot.get98thPercentile()); + final String ninetyNinthPercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + NINETY_NINTH_PERCENTILE_SUFIX, snapshot.get99thPercentile()); + final String ninetyNinePointNinePercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX, snapshot.get999thPercentile()); + + return new String[] { medianName, + ninetyEighthPercentileName, ninetyFifthPercentileName, ninetyNinePointNinePercentileName, + ninetyNinthPercentileName, seventyFifthPercentileName }; + } + + private String cacheSanitizedTimelineMetric(long currentTimeMillis, String sanitizedName, String suffix, Number metricValue) { + final String meterName = sanitizedName + suffix; + final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, meterName, metricValue); + metricsCache.putTimelineMetric(metric); + return meterName; + } + private void populateMetricsList(Context context, String... metricNames) { for (String metricName : metricNames) { TimelineMetric cachedMetric = metricsCache.getTimelineMetric(metricName);