[ https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16648085#comment-16648085 ]
ASF GitHub Bot commented on KAFKA-7223: --------------------------------------- guozhangwang closed pull request #5742: KAFKA-7223: Add late-record metrics URL: https://github.com/apache/kafka/pull/5742 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build.gradle b/build.gradle index 47fa18620fd..88d52342053 100644 --- a/build.gradle +++ b/build.gradle @@ -974,6 +974,7 @@ project(':streams') { testCompile libs.junit testCompile libs.easymock testCompile libs.bcpkix + testCompile libs.hamcrest testRuntimeOnly project(':streams:test-utils') testRuntime libs.slf4jlog4j diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 23fc68aaba5..5db105da2d4 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -57,6 +57,7 @@ versions += [ jetty: "9.4.12.v20180830", jersey: "2.27", jmh: "1.21", + hamcrest: "1.3", log4j: "1.2.17", scalaLogging: "3.9.0", jaxb: "2.3.0", @@ -115,6 +116,7 @@ libs += [ jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh", joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt", junit: "junit:junit:$versions.junit", + hamcrest: "org.hamcrest:hamcrest-all:1.3", kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100", kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101", kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102", diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java index 04c7150b9f7..a85bbb8250d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java @@ -16,10 +16,15 @@ */ package org.apache.kafka.streams.kstream.internals.metrics; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import java.util.Map; + public class Sensors { private Sensors() {} @@ -39,4 +44,35 @@ public static Sensor lateRecordDropSensor(final InternalProcessorContext context ); return sensor; } + + public static Sensor recordLatenessSensor(final InternalProcessorContext context) { + final StreamsMetricsImpl metrics = context.metrics(); + + final Sensor sensor = metrics.taskLevelSensor( + context.taskId().toString(), + "record-lateness", + Sensor.RecordingLevel.DEBUG + ); + + final Map<String, String> tags = metrics.tagMap( + "task-id", context.taskId().toString() + ); + sensor.add( + new MetricName( + "record-lateness-avg", + "stream-processor-node-metrics", + "The average observed lateness of records.", + tags), + new Avg() + ); + sensor.add( + new MetricName( + "record-lateness-max", + "stream-processor-node-metrics", + "The max observed lateness of records.", + tags), + new Max() + ); + return sensor; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 70202534d69..1fdd454ea54 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Sensor; import java.util.Collections; import java.util.Comparator; @@ -38,6 +39,7 @@ public class PartitionGroup { private final Map<TopicPartition, RecordQueue> partitionQueues; + private final Sensor recordLatenessSensor; private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime; private long streamTime; @@ -61,9 +63,10 @@ RecordQueue queue() { } } - PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues) { + PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, final Sensor recordLatenessSensor) { nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp)); this.partitionQueues = partitionQueues; + this.recordLatenessSensor = recordLatenessSensor; totalBuffered = 0; allBuffered = false; streamTime = RecordQueue.UNKNOWN; @@ -95,7 +98,12 @@ StampedRecord nextRecord(final RecordInfo info) { } // always update the stream time to the record's timestamp yet to be processed if it is larger - streamTime = Math.max(streamTime, record.timestamp); + if (record.timestamp > streamTime) { + streamTime = record.timestamp; + recordLatenessSensor.record(0); + } else { + recordLatenessSensor.record(streamTime - record.timestamp); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 2ad0acc89d4..247a156eb38 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -52,6 +52,7 @@ import static java.lang.String.format; import static java.util.Collections.singleton; +import static org.apache.kafka.streams.kstream.internals.metrics.Sensors.recordLatenessSensor; /** * A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing. @@ -234,7 +235,7 @@ public StreamTask(final TaskId id, } recordInfo = new PartitionGroup.RecordInfo(); - partitionGroup = new PartitionGroup(partitionQueues); + partitionGroup = new PartitionGroup(partitionQueues, recordLatenessSensor(processorContextImpl)); processorContextImpl.setStreamTimeSupplier(partitionGroup::timestamp); stateMgr.registerGlobalStateStores(topology.globalStateStores()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 419c861f118..1074f02ff45 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; @@ -55,7 +54,9 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -110,7 +111,7 @@ private void initStore(final boolean enableCaching) { final StoreBuilder<SessionStore<String, Long>> storeBuilder = Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3)), Serdes.String(), Serdes.Long()) - .withLoggingDisabled(); + .withLoggingDisabled(); if (enableCaching) { storeBuilder.withCachingEnabled(); @@ -335,9 +336,11 @@ public void shouldLogAndMeterWhenSkippingLateRecord() { context.setStreamTime(20); context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null)); processor.process("A", "1"); + context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null)); + processor.process("A", "1"); LogCaptureAppender.unregister(appender); - final Metric dropMetric = metrics.metrics().get(new MetricName( + final MetricName dropMetric = new MetricName( "late-record-drop-total", "stream-processor-node-metrics", "The total number of occurrence of late-record-drop operations.", @@ -346,8 +349,24 @@ public void shouldLogAndMeterWhenSkippingLateRecord() { mkEntry("task-id", "0_0"), mkEntry("processor-node-id", "TESTING_NODE") ) - )); - assertEquals(1.0, dropMetric.metricValue()); + ); + + assertThat(metrics.metrics().get(dropMetric).metricValue(), is(2.0)); + + final MetricName dropRate = new MetricName( + "late-record-drop-rate", + "stream-processor-node-metrics", + "The average number of occurrence of late-record-drop operations.", + mkMap( + mkEntry("client-id", "test"), + mkEntry("task-id", "0_0"), + mkEntry("processor-node-id", "TESTING_NODE") + ) + ); + + assertThat((Double) metrics.metrics().get(dropRate).metricValue(), greaterThan(0.0)); + assertThat(appender.getMessages(), hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10]")); + assertThat(appender.getMessages(), hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10]")); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 8ae628472d3..236cd8c6e3b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -42,6 +43,7 @@ import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.StreamsTestUtils; +import org.hamcrest.Matcher; import org.junit.Test; import java.util.List; @@ -51,9 +53,10 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; -import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -70,7 +73,7 @@ public void testAggBasic() { final KTable<Windowed<String>, String> table2 = builder .stream(topic1, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5))) .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())); @@ -128,7 +131,7 @@ public void testJoin() { final KTable<Windowed<String>, String> table1 = builder .stream(topic1, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5))) .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String())); @@ -137,7 +140,7 @@ public void testJoin() { final KTable<Windowed<String>, String> table2 = builder .stream(topic2, Consumed.with(Serdes.String(), Serdes.String())) - .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5))) .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic2-Canonized").withValueSerde(Serdes.String())); @@ -231,8 +234,9 @@ public void shouldLogAndMeterWhenSkippingNullKey() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; - final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String())); - stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String())) + builder + .stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5))) .aggregate( MockInitializer.STRING_INIT, @@ -258,15 +262,15 @@ public void shouldLogAndMeterWhenSkippingExpiredWindow() { final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String())); stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String())) - .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100)) - .aggregate( - () -> "", - MockAggregator.toStringInstance("+"), - Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled() - ) - .toStream() - .map((key, value) -> new KeyValue<>(key.toString(), value)) - .to("output"); + .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100)) + .aggregate( + () -> "", + MockAggregator.toStringInstance("+"), + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled() + ) + .toStream() + .map((key, value) -> new KeyValue<>(key.toString(), value)) + .to("output"); LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); @@ -281,17 +285,13 @@ public void shouldLogAndMeterWhenSkippingExpiredWindow() { driver.pipeInput(recordFactory.create(topic, "k", "6", 6L)); LogCaptureAppender.unregister(appender); - final MetricName metricName = new MetricName( - "late-record-drop-total", - "stream-processor-node-metrics", - "The total number of occurrence of late-record-drop operations.", - mkMap( - mkEntry("client-id", "topology-test-driver-virtual-thread"), - mkEntry("task-id", "0_0"), - mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001") - ) + assertLatenessMetrics( + driver, + is(7.0), // how many events get dropped + is(100.0), // k:0 is 100ms late, since its time is 0, but it arrives at stream time 100. + is(84.875) // (0 + 100 + 99 + 98 + 97 + 96 + 95 + 94) / 8 ); - assertThat(driver.metrics().get(metricName).metricValue(), equalTo(7.0)); + assertThat(appender.getMessages(), hasItems( "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]", "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]", @@ -316,59 +316,101 @@ public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace() { final String topic = "topic"; final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String())); - stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String())) - .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).grace(ofMillis(90L))) - .aggregate( - () -> "", - MockAggregator.toStringInstance("+"), - Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled() - ) - .toStream() - .map((key, value) -> new KeyValue<>(key.toString(), value)) - .to("output"); + stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(10)).grace(ofMillis(90L))) + .aggregate( + () -> "", + MockAggregator.toStringInstance("+"), + Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled() + ) + .toStream() + .map((key, value) -> new KeyValue<>(key.toString(), value)) + .to("output"); LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { - driver.pipeInput(recordFactory.create(topic, "k", "100", 100L)); - driver.pipeInput(recordFactory.create(topic, "k", "0", 0L)); - driver.pipeInput(recordFactory.create(topic, "k", "1", 1L)); - driver.pipeInput(recordFactory.create(topic, "k", "2", 2L)); - driver.pipeInput(recordFactory.create(topic, "k", "3", 3L)); - driver.pipeInput(recordFactory.create(topic, "k", "4", 4L)); - driver.pipeInput(recordFactory.create(topic, "k", "5", 5L)); + driver.pipeInput(recordFactory.create(topic, "k", "100", 200L)); + driver.pipeInput(recordFactory.create(topic, "k", "0", 100L)); + driver.pipeInput(recordFactory.create(topic, "k", "1", 101L)); + driver.pipeInput(recordFactory.create(topic, "k", "2", 102L)); + driver.pipeInput(recordFactory.create(topic, "k", "3", 103L)); + driver.pipeInput(recordFactory.create(topic, "k", "4", 104L)); + driver.pipeInput(recordFactory.create(topic, "k", "5", 105L)); driver.pipeInput(recordFactory.create(topic, "k", "6", 6L)); LogCaptureAppender.unregister(appender); - final MetricName metricName = new MetricName( - "late-record-drop-total", - "stream-processor-node-metrics", - "The total number of occurrence of late-record-drop operations.", - mkMap( - mkEntry("client-id", "topology-test-driver-virtual-thread"), - mkEntry("task-id", "0_0"), - mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001") - ) - ); - assertThat(driver.metrics().get(metricName).metricValue(), equalTo(7.0)); + assertLatenessMetrics(driver, is(7.0), is(194.0), is(97.375)); + assertThat(appender.getMessages(), hasItems( - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10]", - "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10]" + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110]", + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110]", + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110]", + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110]", + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110]", + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110]", + "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110]" )); - OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@95/105]", "+100", 100); - OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@100/110]", "+100", 100); - OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/15]", "+5", 5); - OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/15]", "+5+6", 6); + OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@200/210]", "+100", 200); assertThat(driver.readOutput("output"), nullValue()); } } + private void assertLatenessMetrics(final TopologyTestDriver driver, + final Matcher<Object> dropTotal, + final Matcher<Object> maxLateness, + final Matcher<Object> avgLateness) { + final MetricName dropMetric = new MetricName( + "late-record-drop-total", + "stream-processor-node-metrics", + "The total number of occurrence of late-record-drop operations.", + mkMap( + mkEntry("client-id", "topology-test-driver-virtual-thread"), + mkEntry("task-id", "0_0"), + mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001") + ) + ); + + assertThat(driver.metrics().get(dropMetric).metricValue(), dropTotal); + + + final MetricName dropRate = new MetricName( + "late-record-drop-rate", + "stream-processor-node-metrics", + "The average number of occurrence of late-record-drop operations.", + mkMap( + mkEntry("client-id", "topology-test-driver-virtual-thread"), + mkEntry("task-id", "0_0"), + mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001") + ) + ); + + assertThat(driver.metrics().get(dropRate).metricValue(), not(0.0)); + + final MetricName latenessMaxMetric = new MetricName( + "record-lateness-max", + "stream-processor-node-metrics", + "The max observed lateness of records.", + mkMap( + mkEntry("client-id", "topology-test-driver-virtual-thread"), + mkEntry("task-id", "0_0") + ) + ); + assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), maxLateness); + + final MetricName latenessAvgMetric = new MetricName( + "record-lateness-avg", + "stream-processor-node-metrics", + "The average observed lateness of records.", + mkMap( + mkEntry("client-id", "topology-test-driver-virtual-thread"), + mkEntry("task-id", "0_0") + ) + ); + assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), avgLateness); + } + private ProducerRecord<String, String> getOutput(final TopologyTestDriver driver) { return driver.readOutput("output", new StringDeserializer(), new StringDeserializer()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 2df4f66cb85..c84bbc200a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -17,7 +17,11 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -65,7 +69,19 @@ private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); - private final PartitionGroup group = new PartitionGroup(mkMap(mkEntry(partition1, queue1), mkEntry(partition2, queue2))); + private final Metrics metrics = new Metrics(); + private final MetricName lastLatenessValue = new MetricName("record-lateness-last-value", "", "", mkMap()); + + private final PartitionGroup group = new PartitionGroup( + mkMap(mkEntry(partition1, queue1), mkEntry(partition2, queue2)), + getValueSensor(metrics, lastLatenessValue) + ); + + private static Sensor getValueSensor(final Metrics metrics, final MetricName metricName) { + final Sensor lastRecordedValue = metrics.sensor(metricName.name()); + lastRecordedValue.add(metricName, new Value()); + return lastRecordedValue; + } @Test public void testTimeTracking() { @@ -90,10 +106,9 @@ public void testTimeTracking() { // 2:[2, 4, 6] // st: -1 since no records was being processed yet - assertEquals(6, group.numBuffered()); - assertEquals(3, group.numBuffered(partition1)); - assertEquals(3, group.numBuffered(partition2)); + verifyBuffered(6, 3, 3); assertEquals(-1L, group.timestamp()); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); StampedRecord record; final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); @@ -104,11 +119,9 @@ public void testTimeTracking() { // 2:[2, 4, 6] // st: 2 assertEquals(partition1, info.partition()); - assertEquals(1L, record.timestamp); - assertEquals(5, group.numBuffered()); - assertEquals(2, group.numBuffered(partition1)); - assertEquals(3, group.numBuffered(partition2)); - assertEquals(1L, group.timestamp()); + verifyTimes(record, 1L, 1L); + verifyBuffered(5, 2, 3); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); // get one record, now the time should be advanced record = group.nextRecord(info); @@ -116,11 +129,9 @@ public void testTimeTracking() { // 2:[4, 6] // st: 3 assertEquals(partition2, info.partition()); - assertEquals(2L, record.timestamp); - assertEquals(4, group.numBuffered()); - assertEquals(2, group.numBuffered(partition1)); - assertEquals(2, group.numBuffered(partition2)); - assertEquals(2L, group.timestamp()); + verifyTimes(record, 2L, 2L); + verifyBuffered(4, 2, 2); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); // add 2 more records with timestamp 2, 4 to partition-1 final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList( @@ -131,10 +142,9 @@ public void testTimeTracking() { // 1:[3, 5, 2, 4] // 2:[4, 6] // st: 3 (non-decreasing, so adding 2 doesn't change it) - assertEquals(6, group.numBuffered()); - assertEquals(4, group.numBuffered(partition1)); - assertEquals(2, group.numBuffered(partition2)); + verifyBuffered(6, 4, 2); assertEquals(2L, group.timestamp()); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); // get one record, time should not be advanced record = group.nextRecord(info); @@ -142,11 +152,9 @@ public void testTimeTracking() { // 2:[4, 6] // st: 4 as partition st is now {5, 4} assertEquals(partition1, info.partition()); - assertEquals(3L, record.timestamp); - assertEquals(5, group.numBuffered()); - assertEquals(3, group.numBuffered(partition1)); - assertEquals(2, group.numBuffered(partition2)); - assertEquals(3L, group.timestamp()); + verifyTimes(record, 3L, 3L); + verifyBuffered(5, 3, 2); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); // get one record, time should not be advanced record = group.nextRecord(info); @@ -154,11 +162,9 @@ public void testTimeTracking() { // 2:[6] // st: 5 as partition st is now {5, 6} assertEquals(partition2, info.partition()); - assertEquals(4L, record.timestamp); - assertEquals(4, group.numBuffered()); - assertEquals(3, group.numBuffered(partition1)); - assertEquals(1, group.numBuffered(partition2)); - assertEquals(4L, group.timestamp()); + verifyTimes(record, 4L, 4L); + verifyBuffered(4, 3, 1); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); // get one more record, now time should be advanced record = group.nextRecord(info); @@ -166,11 +172,9 @@ public void testTimeTracking() { // 2:[6] // st: 5 assertEquals(partition1, info.partition()); - assertEquals(5L, record.timestamp); - assertEquals(3, group.numBuffered()); - assertEquals(2, group.numBuffered(partition1)); - assertEquals(1, group.numBuffered(partition2)); - assertEquals(5L, group.timestamp()); + verifyTimes(record, 5L, 5L); + verifyBuffered(3, 2, 1); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); // get one more record, time should not be advanced record = group.nextRecord(info); @@ -178,11 +182,9 @@ public void testTimeTracking() { // 2:[6] // st: 5 assertEquals(partition1, info.partition()); - assertEquals(2L, record.timestamp); - assertEquals(2, group.numBuffered()); - assertEquals(1, group.numBuffered(partition1)); - assertEquals(1, group.numBuffered(partition2)); - assertEquals(5L, group.timestamp()); + verifyTimes(record, 2L, 5L); + verifyBuffered(2, 1, 1); + assertEquals(3.0, metrics.metric(lastLatenessValue).metricValue()); // get one more record, time should not be advanced record = group.nextRecord(info); @@ -190,11 +192,9 @@ public void testTimeTracking() { // 2:[6] // st: 4 (doesn't advance because 1 is empty, so it's still reporting the last-known time of 4) assertEquals(partition1, info.partition()); - assertEquals(4L, record.timestamp); - assertEquals(1, group.numBuffered()); - assertEquals(0, group.numBuffered(partition1)); - assertEquals(1, group.numBuffered(partition2)); - assertEquals(5L, group.timestamp()); + verifyTimes(record, 4L, 5L); + verifyBuffered(1, 0, 1); + assertEquals(1.0, metrics.metric(lastLatenessValue).metricValue()); // get one more record, time should not be advanced record = group.nextRecord(info); @@ -202,11 +202,20 @@ public void testTimeTracking() { // 2:[] // st: 4 (1 and 2 are empty, so they are still reporting the last-known times of 4 and 6.) assertEquals(partition2, info.partition()); - assertEquals(6L, record.timestamp); - assertEquals(0, group.numBuffered()); - assertEquals(0, group.numBuffered(partition1)); - assertEquals(0, group.numBuffered(partition2)); - assertEquals(6L, group.timestamp()); + verifyTimes(record, 6L, 6L); + verifyBuffered(0, 0, 0); + assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); + + } + + private void verifyTimes(final StampedRecord record, final long recordTime, final long streamTime) { + assertEquals(recordTime, record.timestamp); + assertEquals(streamTime, group.timestamp()); + } + private void verifyBuffered(final int totalBuffered, final int partitionOneBuffered, final int partitionTwoBuffered) { + assertEquals(totalBuffered, group.numBuffered()); + assertEquals(partitionOneBuffered, group.numBuffered(partition1)); + assertEquals(partitionTwoBuffered, group.numBuffered(partition2)); } } diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java index 1d64316cb3c..1dcebb517a7 100644 --- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java @@ -32,6 +32,7 @@ import java.util.Properties; import java.util.UUID; +import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -50,6 +51,7 @@ public static Properties getStreamsConfig(final String applicationId, props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdeClassName); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName); props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, DEBUG.name); props.putAll(additional); return props; } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index d10a45c00bf..2abfd6354be 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -29,7 +29,9 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; @@ -255,7 +257,13 @@ private TopologyTestDriver(final InternalTopologyBuilder builder, final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime); - metrics = new Metrics(); + + final MetricConfig metricConfig = new MetricConfig() + .samples(streamsConfig.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) + .recordLevel(Sensor.RecordingLevel.forName(streamsConfig.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) + .timeWindow(streamsConfig.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); + + metrics = new Metrics(metricConfig, mockWallClockTime); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, "topology-test-driver-virtual-thread" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KIP-328: Add in-memory Suppression > ---------------------------------- > > Key: KAFKA-7223 > URL: https://issues.apache.org/jira/browse/KAFKA-7223 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: John Roesler > Assignee: John Roesler > Priority: Major > > As described in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.] > > This ticket is to implement Suppress, but only for in-memory buffers. > (depends on KAFKA-7222) -- This message was sent by Atlassian JIRA (v7.6.3#76005)