This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a7d0fdd KAFKA-8578: Add basic functionality to expose RocksDB metrics (#6979) a7d0fdd is described below commit a7d0fdd534ef55533a868ea7388bbc081ee42718 Author: cadonna <br...@confluent.io> AuthorDate: Fri Aug 2 18:51:03 2019 +0200 KAFKA-8578: Add basic functionality to expose RocksDB metrics (#6979) * Adds RocksDBMetrics class that provides methods to get sensors from the Kafka metrics registry and to setup the sensors to record RocksDB metrics * Extends StreamsMetricsImpl with functionality to add the required metrics to the sensors. Reviewers: Boyang Chen <boy...@confluent.io>, Bill Bejeck <b...@confluent.io>, Matthias J. Sax <matth...@confluent.io>, John Roesler <vvcep...@users.noreply.github.com>, Guozhang Wang <wangg...@gmail.com> --- .../internals/metrics/StreamsMetricsImpl.java | 83 ++++- .../state/internals/metrics/RocksDBMetrics.java | 382 +++++++++++++++++++++ .../internals/metrics/StreamsMetricsImplTest.java | 113 +++++- .../internals/metrics/RocksDBMetricsTest.java | 283 +++++++++++++++ 4 files changed, 852 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index b6bfcc5..ae3d953 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -23,9 +23,12 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.metrics.stats.WindowedSum; import org.apache.kafka.streams.StreamsMetrics; import java.util.Arrays; @@ -54,17 +57,21 @@ public class StreamsMetricsImpl implements StreamsMetrics { public static final String THREAD_ID_TAG = "client-id"; public static final String TASK_ID_TAG = "task-id"; + public static final String STORE_ID_TAG = "state-id"; public static final String ALL_TASKS = "all"; public static final String LATENCY_SUFFIX = "-latency"; public static final String AVG_SUFFIX = "-avg"; public static final String MAX_SUFFIX = "-max"; + public static final String MIN_SUFFIX = "-min"; public static final String RATE_SUFFIX = "-rate"; public static final String TOTAL_SUFFIX = "-total"; + public static final String RATIO_SUFFIX = "-ratio"; public static final String THREAD_LEVEL_GROUP = "stream-metrics"; public static final String TASK_LEVEL_GROUP = "stream-task-metrics"; + public static final String STATE_LEVEL_GROUP = "stream-state-metrics"; public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics"; public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id"; @@ -123,6 +130,18 @@ public class StreamsMetricsImpl implements StreamsMetrics { } } + public Map<String, String> taskLevelTagMap(final String taskName) { + final Map<String, String> tagMap = threadLevelTagMap(); + tagMap.put(TASK_ID_TAG, taskName); + return tagMap; + } + + public Map<String, String> storeLevelTagMap(final String taskName, final String storeType, final String storeName) { + final Map<String, String> tagMap = taskLevelTagMap(taskName); + tagMap.put(storeType + "-" + STORE_ID_TAG, storeName); + return tagMap; + } + public final Sensor taskLevelSensor(final String taskName, final String sensorName, final RecordingLevel recordingLevel, @@ -237,9 +256,7 @@ public class StreamsMetricsImpl implements StreamsMetrics { if (!storeLevelSensors.containsKey(key)) { storeLevelSensors.put(key, new LinkedList<>()); } - final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; - final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); storeLevelSensors.get(key).push(fullSensorName); @@ -454,12 +471,62 @@ public class StreamsMetricsImpl implements StreamsMetrics { final String group, final Map<String, String> tags, final String operation) { - addInvocationRateAndCount(sensor, - group, - tags, - operation, - "The total number of " + operation, - "The average per-second number of " + operation); + addInvocationRateAndCount( + sensor, + group, + tags, + operation, + "The total number of " + operation, + "The average per-second number of " + operation + ); + } + + public static void addRateOfSumAndSumMetricsToSensor(final Sensor sensor, + final String group, + final Map<String, String> tags, + final String operation, + final String descriptionOfRate, + final String descriptionOfTotal) { + addRateOfSumMetricToSensor(sensor, group, tags, operation, descriptionOfRate); + addSumMetricToSensor(sensor, group, tags, operation, descriptionOfTotal); + } + + public static void addRateOfSumMetricToSensor(final Sensor sensor, + final String group, + final Map<String, String> tags, + final String operation, + final String description) { + sensor.add(new MetricName(operation + RATE_SUFFIX, group, description, tags), + new Rate(TimeUnit.SECONDS, new WindowedSum())); + } + + public static void addSumMetricToSensor(final Sensor sensor, + final String group, + final Map<String, String> tags, + final String operation, + final String description) { + sensor.add(new MetricName(operation + TOTAL_SUFFIX, group, description, tags), new CumulativeSum()); + } + + public static void addValueMetricToSensor(final Sensor sensor, + final String group, + final Map<String, String> tags, + final String name, + final String description) { + sensor.add(new MetricName(name, group, description, tags), new Value()); + } + + public static void addAvgAndSumMetricsToSensor(final Sensor sensor, + final String group, + final Map<String, String> tags, + final String metricNamePrefix, + final String descriptionOfAvg, + final String descriptionOfTotal) { + sensor.add(new MetricName(metricNamePrefix + AVG_SUFFIX, group, descriptionOfAvg, tags), new Avg()); + sensor.add( + new MetricName(metricNamePrefix + TOTAL_SUFFIX, group, descriptionOfTotal, tags), + new CumulativeSum() + ); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java new file mode 100644 index 0000000..95cb522 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals.metrics; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.AVG_SUFFIX; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.MAX_SUFFIX; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.MIN_SUFFIX; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_SUFFIX; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_LEVEL_GROUP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumMetricToSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndSumMetricsToSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addSumMetricToSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addValueMetricToSensor; + +public class RocksDBMetrics { + private RocksDBMetrics() {} + + private static final String BYTES_WRITTEN_TO_DB = "bytes-written"; + private static final String BYTES_READ_FROM_DB = "bytes-read"; + private static final String MEMTABLE_BYTES_FLUSHED = "memtable-bytes-flushed"; + private static final String MEMTABLE_HIT_RATIO = "memtable-hit" + RATIO_SUFFIX; + private static final String MEMTABLE_FLUSH_TIME = "memtable-flush-time"; + private static final String MEMTABLE_FLUSH_TIME_AVG = MEMTABLE_FLUSH_TIME + AVG_SUFFIX; + private static final String MEMTABLE_FLUSH_TIME_MIN = MEMTABLE_FLUSH_TIME + MIN_SUFFIX; + private static final String MEMTABLE_FLUSH_TIME_MAX = MEMTABLE_FLUSH_TIME + MAX_SUFFIX; + private static final String WRITE_STALL_DURATION = "write-stall-duration"; + private static final String BLOCK_CACHE_DATA_HIT_RATIO = "block-cache-data-hit" + RATIO_SUFFIX; + private static final String BLOCK_CACHE_INDEX_HIT_RATIO = "block-cache-index-hit" + RATIO_SUFFIX; + private static final String BLOCK_CACHE_FILTER_HIT_RATIO = "block-cache-filter-hit" + RATIO_SUFFIX; + private static final String BYTES_READ_DURING_COMPACTION = "bytes-read-compaction"; + private static final String BYTES_WRITTEN_DURING_COMPACTION = "bytes-written-compaction"; + private static final String COMPACTION_TIME = "compaction-time"; + private static final String COMPACTION_TIME_AVG = COMPACTION_TIME + AVG_SUFFIX; + private static final String COMPACTION_TIME_MIN = COMPACTION_TIME + MIN_SUFFIX; + private static final String COMPACTION_TIME_MAX = COMPACTION_TIME + MAX_SUFFIX; + private static final String NUMBER_OF_OPEN_FILES = "number-open-files"; + private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors"; + + private static final String BYTES_WRITTEN_TO_DB_RATE_DESCRIPTION = + "Average number of bytes written per second to the RocksDB state store"; + private static final String BYTES_WRITTEN_TO_DB_TOTAL_DESCRIPTION = + "Total number of bytes written to the RocksDB state store"; + private static final String BYTES_READ_FROM_DB_RATE_DESCRIPTION = + "Average number of bytes read per second from the RocksDB state store"; + private static final String BYTES_READ_FROM_DB_TOTAL_DESCRIPTION = + "Total number of bytes read from the RocksDB state store"; + private static final String MEMTABLE_BYTES_FLUSHED_RATE_DESCRIPTION = + "Average number of bytes flushed per second from the memtable to disk"; + private static final String MEMTABLE_BYTES_FLUSHED_TOTAL_DESCRIPTION = + "Total number of bytes flushed from the memtable to disk"; + private static final String MEMTABLE_HIT_RATIO_DESCRIPTION = + "Ratio of memtable hits relative to all lookups to the memtable"; + private static final String MEMTABLE_FLUSH_TIME_AVG_DESCRIPTION = + "Average time spent on flushing the memtable to disk in ms"; + private static final String MEMTABLE_FLUSH_TIME_MIN_DESCRIPTION = + "Minimum time spent on flushing the memtable to disk in ms"; + private static final String MEMTABLE_FLUSH_TIME_MAX_DESCRIPTION = + "Maximum time spent on flushing the memtable to disk in ms"; + private static final String WRITE_STALL_DURATION_AVG_DESCRIPTION = "Average duration of write stalls in ms"; + private static final String WRITE_STALL_DURATION_TOTAL_DESCRIPTION = "Total duration of write stalls in ms"; + private static final String BLOCK_CACHE_DATA_HIT_RATIO_DESCRIPTION = + "Ratio of block cache hits for data relative to all lookups for data to the block cache"; + private static final String BLOCK_CACHE_INDEX_HIT_RATIO_DESCRIPTION = + "Ratio of block cache hits for indexes relative to all lookups for indexes to the block cache"; + private static final String BLOCK_CACHE_FILTER_HIT_RATIO_DESCRIPTION = + "Ratio of block cache hits for filters relative to all lookups for filters to the block cache"; + private static final String BYTES_READ_DURING_COMPACTION_DESCRIPTION = + "Average number of bytes read per second during compaction"; + private static final String BYTES_WRITTEN_DURING_COMPACTION_DESCRIPTION = + "Average number of bytes written per second during compaction"; + private static final String COMPACTION_TIME_AVG_DESCRIPTION = "Average time spent on compaction in ms"; + private static final String COMPACTION_TIME_MIN_DESCRIPTION = "Minimum time spent on compaction in ms"; + private static final String COMPACTION_TIME_MAX_DESCRIPTION = "Maximum time spent on compaction in ms"; + private static final String NUMBER_OF_OPEN_FILES_DESCRIPTION = "Number of currently open files"; + private static final String NUMBER_OF_FILE_ERRORS_DESCRIPTION = "Total number of file errors occurred"; + + public static class RocksDBMetricContext { + private final String taskName; + private final String storeType; + private final String storeName; + + public RocksDBMetricContext(final String taskName, final String storeType, final String storeName) { + this.taskName = taskName; + this.storeType = "rocksdb-" + storeType; + this.storeName = storeName; + } + + public String taskName() { + return taskName; + } + public String storeType() { + return storeType; + } + public String storeName() { + return storeName; + } + } + + public static Sensor bytesWrittenToDatabaseSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, BYTES_WRITTEN_TO_DB); + addRateOfSumAndSumMetricsToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + BYTES_WRITTEN_TO_DB, + BYTES_WRITTEN_TO_DB_RATE_DESCRIPTION, + BYTES_WRITTEN_TO_DB_TOTAL_DESCRIPTION + ); + return sensor; + } + + public static Sensor bytesReadFromDatabaseSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, BYTES_READ_FROM_DB); + addRateOfSumAndSumMetricsToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + BYTES_READ_FROM_DB, + BYTES_READ_FROM_DB_RATE_DESCRIPTION, + BYTES_READ_FROM_DB_TOTAL_DESCRIPTION + ); + return sensor; + } + + public static Sensor memtableBytesFlushedSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, MEMTABLE_BYTES_FLUSHED); + addRateOfSumAndSumMetricsToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + MEMTABLE_BYTES_FLUSHED, + MEMTABLE_BYTES_FLUSHED_RATE_DESCRIPTION, + MEMTABLE_BYTES_FLUSHED_TOTAL_DESCRIPTION + ); + return sensor; + } + + public static Sensor memtableHitRatioSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, MEMTABLE_HIT_RATIO); + addValueMetricToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + MEMTABLE_HIT_RATIO, + MEMTABLE_HIT_RATIO_DESCRIPTION + ); + return sensor; + } + + public static Sensor memtableAvgFlushTimeSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, MEMTABLE_FLUSH_TIME_AVG); + addValueMetricToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + MEMTABLE_FLUSH_TIME_AVG, + MEMTABLE_FLUSH_TIME_AVG_DESCRIPTION + ); + return sensor; + } + + public static Sensor memtableMinFlushTimeSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, MEMTABLE_FLUSH_TIME_MIN); + addValueMetricToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + MEMTABLE_FLUSH_TIME_MIN, + MEMTABLE_FLUSH_TIME_MIN_DESCRIPTION + ); + return sensor; + } + + public static Sensor memtableMaxFlushTimeSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, MEMTABLE_FLUSH_TIME_MAX); + addValueMetricToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + MEMTABLE_FLUSH_TIME_MAX, + MEMTABLE_FLUSH_TIME_MAX_DESCRIPTION + ); + return sensor; + } + + public static Sensor writeStallDurationSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, WRITE_STALL_DURATION); + addAvgAndSumMetricsToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + WRITE_STALL_DURATION, + WRITE_STALL_DURATION_AVG_DESCRIPTION, + WRITE_STALL_DURATION_TOTAL_DESCRIPTION + ); + return sensor; + } + + public static Sensor blockCacheDataHitRatioSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, BLOCK_CACHE_DATA_HIT_RATIO); + addValueMetricToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + BLOCK_CACHE_DATA_HIT_RATIO, + BLOCK_CACHE_DATA_HIT_RATIO_DESCRIPTION + ); + return sensor; + } + + public static Sensor blockCacheIndexHitRatioSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, BLOCK_CACHE_INDEX_HIT_RATIO); + addValueMetricToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics.storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + BLOCK_CACHE_INDEX_HIT_RATIO, + BLOCK_CACHE_INDEX_HIT_RATIO_DESCRIPTION + ); + return sensor; + } + + public static Sensor blockCacheFilterHitRatioSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, BLOCK_CACHE_FILTER_HIT_RATIO); + addValueMetricToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + BLOCK_CACHE_FILTER_HIT_RATIO, + BLOCK_CACHE_FILTER_HIT_RATIO_DESCRIPTION + ); + return sensor; + } + + public static Sensor bytesReadDuringCompactionSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, BYTES_READ_DURING_COMPACTION); + addRateOfSumMetricToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + BYTES_READ_DURING_COMPACTION, + BYTES_READ_DURING_COMPACTION_DESCRIPTION + ); + return sensor; + } + + public static Sensor bytesWrittenDuringCompactionSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, BYTES_WRITTEN_DURING_COMPACTION); + addRateOfSumMetricToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + BYTES_WRITTEN_DURING_COMPACTION, + BYTES_WRITTEN_DURING_COMPACTION_DESCRIPTION + ); + return sensor; + } + + public static Sensor compactionTimeAvgSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, COMPACTION_TIME_AVG); + addValueMetricToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + COMPACTION_TIME_AVG, + COMPACTION_TIME_AVG_DESCRIPTION + ); + return sensor; + } + + public static Sensor compactionTimeMinSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, COMPACTION_TIME_MIN); + addValueMetricToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + COMPACTION_TIME_MIN, + COMPACTION_TIME_MIN_DESCRIPTION + ); + return sensor; + } + + public static Sensor compactionTimeMaxSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, COMPACTION_TIME_MAX); + addValueMetricToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + COMPACTION_TIME_MAX, + COMPACTION_TIME_MAX_DESCRIPTION + ); + return sensor; + } + + public static Sensor numberOfOpenFilesSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, NUMBER_OF_OPEN_FILES); + addValueMetricToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + NUMBER_OF_OPEN_FILES, + NUMBER_OF_OPEN_FILES_DESCRIPTION + ); + return sensor; + } + + public static Sensor numberOfFileErrorsSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext) { + final Sensor sensor = createSensor(streamsMetrics, metricContext, NUMBER_OF_FILE_ERRORS); + addSumMetricToSensor( + sensor, + STATE_LEVEL_GROUP, + streamsMetrics + .storeLevelTagMap(metricContext.taskName(), metricContext.storeType(), metricContext.storeName()), + NUMBER_OF_FILE_ERRORS, + NUMBER_OF_FILE_ERRORS_DESCRIPTION + ); + return sensor; + } + + private static Sensor createSensor(final StreamsMetricsImpl streamsMetrics, + final RocksDBMetricContext metricContext, + final String sensorName) { + return streamsMetrics.storeLevelSensor( + metricContext.taskName(), + metricContext.storeName(), + sensorName, + RecordingLevel.DEBUG); + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index f381a58..678d9f3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals.metrics; - import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricConfig; @@ -28,6 +27,7 @@ import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.junit.Test; +import java.time.Duration; import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -37,6 +37,8 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -49,6 +51,15 @@ public class StreamsMetricsImplTest extends EasyMockSupport { private final static String SENSOR_NAME_DELIMITER = ".s."; private final static String INTERNAL_PREFIX = "internal"; + private final Metrics metrics = new Metrics(); + private final Sensor sensor = metrics.sensor("dummy"); + private final String metricNamePrefix = "metric"; + private final String group = "group"; + private final Map<String, String> tags = mkMap(mkEntry("tag", "value")); + private final String description1 = "description number one"; + private final String description2 = "description number two"; + private final MockTime time = new MockTime(0); + @Test public void shouldGetThreadLevelSensor() { final Metrics metrics = mock(Metrics.class); @@ -230,6 +241,106 @@ public class StreamsMetricsImplTest extends EasyMockSupport { assertEquals(i, Math.round(totalMetric.measurable().measure(config, time.milliseconds()))); sensor.record(latency, time.milliseconds()); } + } + + @Test + public void shouldGetStoreLevelTagMap() { + final String threadName = "test-thread"; + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, threadName); + final String taskName = "test-task"; + final String storeType = "remote-window"; + final String storeName = "window-keeper"; + + final Map<String, String> tagMap = streamsMetrics.storeLevelTagMap(taskName, storeType, storeName); + assertThat(tagMap.size(), equalTo(3)); + assertThat(tagMap.get(StreamsMetricsImpl.THREAD_ID_TAG), equalTo(threadName)); + assertThat(tagMap.get(StreamsMetricsImpl.TASK_ID_TAG), equalTo(taskName)); + assertThat(tagMap.get(storeType + "-" + StreamsMetricsImpl.STORE_ID_TAG), equalTo(storeName)); + } + + @Test + public void shouldAddAmountRateAndSum() { + StreamsMetricsImpl + .addRateOfSumAndSumMetricsToSensor(sensor, group, tags, metricNamePrefix, description1, description2); + + final double valueToRecord1 = 18.0; + final double valueToRecord2 = 72.0; + final long defaultWindowSizeInSeconds = Duration.ofMillis(new MetricConfig().timeWindowMs()).getSeconds(); + final double expectedRateMetricValue = (valueToRecord1 + valueToRecord2) / defaultWindowSizeInSeconds; + verifyMetric(metricNamePrefix + "-rate", description1, valueToRecord1, valueToRecord2, expectedRateMetricValue); + final double expectedSumMetricValue = 2 * valueToRecord1 + 2 * valueToRecord2; // values are recorded once for each metric verification + verifyMetric(metricNamePrefix + "-total", description2, valueToRecord1, valueToRecord2, expectedSumMetricValue); + assertThat(metrics.metrics().size(), equalTo(2 + 1)); // one metric is added automatically in the constructor of Metrics + } + + @Test + public void shouldAddSum() { + StreamsMetricsImpl.addSumMetricToSensor(sensor, group, tags, metricNamePrefix, description1); + + final double valueToRecord1 = 18.0; + final double valueToRecord2 = 42.0; + final double expectedSumMetricValue = valueToRecord1 + valueToRecord2; + verifyMetric(metricNamePrefix + "-total", description1, valueToRecord1, valueToRecord2, expectedSumMetricValue); + assertThat(metrics.metrics().size(), equalTo(1 + 1)); // one metric is added automatically in the constructor of Metrics + } + + @Test + public void shouldAddAmountRate() { + StreamsMetricsImpl.addRateOfSumMetricToSensor(sensor, group, tags, metricNamePrefix, description1); + + final double valueToRecord1 = 18.0; + final double valueToRecord2 = 72.0; + final long defaultWindowSizeInSeconds = Duration.ofMillis(new MetricConfig().timeWindowMs()).getSeconds(); + final double expectedRateMetricValue = (valueToRecord1 + valueToRecord2) / defaultWindowSizeInSeconds; + verifyMetric(metricNamePrefix + "-rate", description1, valueToRecord1, valueToRecord2, expectedRateMetricValue); + assertThat(metrics.metrics().size(), equalTo(1 + 1)); // one metric is added automatically in the constructor of Metrics + } + + @Test + public void shouldAddValue() { + StreamsMetricsImpl.addValueMetricToSensor(sensor, group, tags, metricNamePrefix, description1); + + final KafkaMetric ratioMetric = metrics.metric(new MetricName(metricNamePrefix, group, description1, tags)); + assertThat(ratioMetric, is(notNullValue())); + final MetricConfig metricConfig = new MetricConfig(); + final double value1 = 42.0; + sensor.record(value1); + assertThat(ratioMetric.measurable().measure(metricConfig, time.milliseconds()), equalTo(42.0)); + final double value2 = 18.0; + sensor.record(value2); + assertThat(ratioMetric.measurable().measure(metricConfig, time.milliseconds()), equalTo(18.0)); + assertThat(metrics.metrics().size(), equalTo(1 + 1)); // one metric is added automatically in the constructor of Metrics + } + + @Test + public void shouldAddAvgAndTotalMetricsToSensor() { + StreamsMetricsImpl + .addAvgAndSumMetricsToSensor(sensor, group, tags, metricNamePrefix, description1, description2); + + final double valueToRecord1 = 18.0; + final double valueToRecord2 = 42.0; + final double expectedAvgMetricValue = (valueToRecord1 + valueToRecord2) / 2; + verifyMetric(metricNamePrefix + "-avg", description1, valueToRecord1, valueToRecord2, expectedAvgMetricValue); + final double expectedSumMetricValue = 2 * valueToRecord1 + 2 * valueToRecord2; // values are recorded once for each metric verification + verifyMetric(metricNamePrefix + "-total", description2, valueToRecord1, valueToRecord2, expectedSumMetricValue); + assertThat(metrics.metrics().size(), equalTo(2 + 1)); // one metric is added automatically in the constructor of Metrics + } + + private void verifyMetric(final String name, + final String description, + final double valueToRecord1, + final double valueToRecord2, + final double expectedMetricValue) { + final KafkaMetric metric = metrics + .metric(new MetricName(name, group, description, tags)); + assertThat(metric, is(notNullValue())); + assertThat(metric.metricName().description(), equalTo(description)); + sensor.record(valueToRecord1, time.milliseconds()); + sensor.record(valueToRecord2, time.milliseconds()); + assertThat( + metric.measurable().measure(new MetricConfig(), time.milliseconds()), + equalTo(expectedMetricValue) + ); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsTest.java new file mode 100644 index 0000000..6c8b9d8 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsTest.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals.metrics; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.RocksDBMetricContext; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.Map; + +import static org.easymock.EasyMock.expect; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.powermock.api.easymock.PowerMock.createStrictMock; +import static org.powermock.api.easymock.PowerMock.mockStatic; +import static org.powermock.api.easymock.PowerMock.replay; +import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.powermock.api.easymock.PowerMock.verify; +import static org.powermock.api.easymock.PowerMock.verifyAll; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(StreamsMetricsImpl.class) +public class RocksDBMetricsTest { + + private static final String STATE_LEVEL_GROUP = "stream-state-metrics"; + private static final String STORE_TYPE_PREFIX = "rocksdb-"; + private final String taskName = "task"; + private final String storeType = "storeType"; + private final String storeName = "store"; + private final Metrics metrics = new Metrics(); + private final Sensor sensor = metrics.sensor("dummy"); + private final StreamsMetricsImpl streamsMetrics = createStrictMock(StreamsMetricsImpl.class); + private final Map<String, String> tags = Collections.singletonMap("hello", "world"); + + private interface SensorCreator { + Sensor sensor(final StreamsMetricsImpl streamsMetrics, final RocksDBMetricContext metricContext); + } + + @Test + public void shouldGetBytesWrittenSensor() { + final String metricNamePrefix = "bytes-written"; + final String descriptionOfTotal = "Total number of bytes written to the RocksDB state store"; + final String descriptionOfRate = "Average number of bytes written per second to the RocksDB state store"; + verifyRateAndTotalSensor( + metricNamePrefix, + descriptionOfTotal, + descriptionOfRate, + RocksDBMetrics::bytesWrittenToDatabaseSensor + ); + } + + @Test + public void shouldGetBytesReadSensor() { + final String metricNamePrefix = "bytes-read"; + final String descriptionOfTotal = "Total number of bytes read from the RocksDB state store"; + final String descriptionOfRate = "Average number of bytes read per second from the RocksDB state store"; + verifyRateAndTotalSensor( + metricNamePrefix, + descriptionOfTotal, + descriptionOfRate, + RocksDBMetrics::bytesReadFromDatabaseSensor + ); + } + + @Test + public void shouldGetMemtableHitRatioSensor() { + final String metricNamePrefix = "memtable-hit-ratio"; + final String description = "Ratio of memtable hits relative to all lookups to the memtable"; + verifyValueSensor(metricNamePrefix, description, RocksDBMetrics::memtableHitRatioSensor); + } + + @Test + public void shouldGetMemtableBytesFlushedSensor() { + final String metricNamePrefix = "memtable-bytes-flushed"; + final String descriptionOfTotal = "Total number of bytes flushed from the memtable to disk"; + final String descriptionOfRate = "Average number of bytes flushed per second from the memtable to disk"; + verifyRateAndTotalSensor( + metricNamePrefix, + descriptionOfTotal, + descriptionOfRate, + RocksDBMetrics::memtableBytesFlushedSensor + ); + } + + @Test + public void shouldGetMemtableAvgFlushTimeSensor() { + final String metricNamePrefix = "memtable-flush-time-avg"; + final String description = "Average time spent on flushing the memtable to disk in ms"; + verifyValueSensor(metricNamePrefix, description, RocksDBMetrics::memtableAvgFlushTimeSensor); + } + + @Test + public void shouldGetMemtableMinFlushTimeSensor() { + final String metricNamePrefix = "memtable-flush-time-min"; + final String description = "Minimum time spent on flushing the memtable to disk in ms"; + verifyValueSensor(metricNamePrefix, description, RocksDBMetrics::memtableMinFlushTimeSensor); + } + + @Test + public void shouldGetMemtableMaxFlushTimeSensor() { + final String metricNamePrefix = "memtable-flush-time-max"; + final String description = "Maximum time spent on flushing the memtable to disk in ms"; + verifyValueSensor(metricNamePrefix, description, RocksDBMetrics::memtableMaxFlushTimeSensor); + } + + @Test + public void shouldGetWriteStallDurationSensor() { + final String metricNamePrefix = "write-stall-duration"; + final String descriptionOfAvg = "Average duration of write stalls in ms"; + final String descriptionOfTotal = "Total duration of write stalls in ms"; + setupStreamsMetricsMock(metricNamePrefix); + StreamsMetricsImpl.addAvgAndSumMetricsToSensor( + sensor, + STATE_LEVEL_GROUP, + tags, + metricNamePrefix, + descriptionOfAvg, + descriptionOfTotal + ); + + replayCallAndVerify(RocksDBMetrics::writeStallDurationSensor); + } + + @Test + public void shouldGetBlockCacheDataHitRatioSensor() { + final String metricNamePrefix = "block-cache-data-hit-ratio"; + final String description = + "Ratio of block cache hits for data relative to all lookups for data to the block cache"; + verifyValueSensor(metricNamePrefix, description, RocksDBMetrics::blockCacheDataHitRatioSensor); + } + + @Test + public void shouldGetBlockCacheIndexHitRatioSensor() { + final String metricNamePrefix = "block-cache-index-hit-ratio"; + final String description = + "Ratio of block cache hits for indexes relative to all lookups for indexes to the block cache"; + verifyValueSensor(metricNamePrefix, description, RocksDBMetrics::blockCacheIndexHitRatioSensor); + } + + @Test + public void shouldGetBlockCacheFilterHitRatioSensor() { + final String metricNamePrefix = "block-cache-filter-hit-ratio"; + final String description = + "Ratio of block cache hits for filters relative to all lookups for filters to the block cache"; + verifyValueSensor(metricNamePrefix, description, RocksDBMetrics::blockCacheFilterHitRatioSensor); + } + + @Test + public void shouldGetBytesReadDuringCompactionSensor() { + final String metricNamePrefix = "bytes-read-compaction"; + final String description = "Average number of bytes read per second during compaction"; + verifyRateSensor(metricNamePrefix, description, RocksDBMetrics::bytesReadDuringCompactionSensor); + } + + @Test + public void shouldGetBytesWrittenDuringCompactionSensor() { + final String metricNamePrefix = "bytes-written-compaction"; + final String description = "Average number of bytes written per second during compaction"; + verifyRateSensor(metricNamePrefix, description, RocksDBMetrics::bytesWrittenDuringCompactionSensor); + } + + @Test + public void shouldGetCompactionTimeAvgSensor() { + final String metricNamePrefix = "compaction-time-avg"; + final String description = "Average time spent on compaction in ms"; + verifyValueSensor(metricNamePrefix, description, RocksDBMetrics::compactionTimeAvgSensor); + } + + @Test + public void shouldGetCompactionTimeMinSensor() { + final String metricNamePrefix = "compaction-time-min"; + final String description = "Minimum time spent on compaction in ms"; + verifyValueSensor(metricNamePrefix, description, RocksDBMetrics::compactionTimeMinSensor); + } + + @Test + public void shouldGetCompactionTimeMaxSensor() { + final String metricNamePrefix = "compaction-time-max"; + final String description = "Maximum time spent on compaction in ms"; + verifyValueSensor(metricNamePrefix, description, RocksDBMetrics::compactionTimeMaxSensor); + } + + @Test + public void shouldGetNumberOfOpenFilesSensor() { + final String metricNamePrefix = "number-open-files"; + final String description = "Number of currently open files"; + verifyValueSensor(metricNamePrefix, description, RocksDBMetrics::numberOfOpenFilesSensor); + } + + @Test + public void shouldGetNumberOfFilesErrors() { + final String metricNamePrefix = "number-file-errors"; + final String description = "Total number of file errors occurred"; + setupStreamsMetricsMock(metricNamePrefix); + StreamsMetricsImpl.addSumMetricToSensor(sensor, STATE_LEVEL_GROUP, tags, metricNamePrefix, description); + + replayCallAndVerify(RocksDBMetrics::numberOfFileErrorsSensor); + } + + private void verifyRateAndTotalSensor(final String metricNamePrefix, + final String descriptionOfTotal, + final String descriptionOfRate, + final SensorCreator sensorCreator) { + setupStreamsMetricsMock(metricNamePrefix); + StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor( + sensor, + STATE_LEVEL_GROUP, + tags, + metricNamePrefix, + descriptionOfRate, + descriptionOfTotal + ); + + replayCallAndVerify(sensorCreator); + } + + private void verifyRateSensor(final String metricNamePrefix, + final String description, + final SensorCreator sensorCreator) { + setupStreamsMetricsMock(metricNamePrefix); + StreamsMetricsImpl.addRateOfSumMetricToSensor(sensor, STATE_LEVEL_GROUP, tags, metricNamePrefix, description); + + replayCallAndVerify(sensorCreator); + } + + private void verifyValueSensor(final String metricNamePrefix, + final String description, + final SensorCreator sensorCreator) { + setupStreamsMetricsMock(metricNamePrefix); + StreamsMetricsImpl.addValueMetricToSensor(sensor, STATE_LEVEL_GROUP, tags, metricNamePrefix, description); + + replayCallAndVerify(sensorCreator); + } + + private void setupStreamsMetricsMock(final String metricNamePrefix) { + mockStatic(StreamsMetricsImpl.class); + expect(streamsMetrics.storeLevelSensor( + taskName, + storeName, + metricNamePrefix, + RecordingLevel.DEBUG + )).andReturn(sensor); + expect(streamsMetrics.storeLevelTagMap( + taskName, + STORE_TYPE_PREFIX + storeType, + storeName + )).andReturn(tags); + } + + private void replayCallAndVerify(final SensorCreator sensorCreator) { + replayAll(); + replay(StreamsMetricsImpl.class); + + final Sensor sensor = + sensorCreator.sensor(streamsMetrics, new RocksDBMetricContext(taskName, storeType, storeName)); + + verifyAll(); + verify(StreamsMetricsImpl.class); + + assertThat(sensor, is(this.sensor)); + } +}