[ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16648085#comment-16648085
 ] 

ASF GitHub Bot commented on KAFKA-7223:
---------------------------------------

guozhangwang closed pull request #5742: KAFKA-7223: Add late-record metrics
URL: https://github.com/apache/kafka/pull/5742
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 47fa18620fd..88d52342053 100644
--- a/build.gradle
+++ b/build.gradle
@@ -974,6 +974,7 @@ project(':streams') {
     testCompile libs.junit
     testCompile libs.easymock
     testCompile libs.bcpkix
+    testCompile libs.hamcrest
 
     testRuntimeOnly project(':streams:test-utils')
     testRuntime libs.slf4jlog4j
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 23fc68aaba5..5db105da2d4 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -57,6 +57,7 @@ versions += [
   jetty: "9.4.12.v20180830",
   jersey: "2.27",
   jmh: "1.21",
+  hamcrest: "1.3",
   log4j: "1.2.17",
   scalaLogging: "3.9.0",
   jaxb: "2.3.0",
@@ -115,6 +116,7 @@ libs += [
   jmhGeneratorAnnProcess: 
"org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
   joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
   junit: "junit:junit:$versions.junit",
+  hamcrest: "org.hamcrest:hamcrest-all:1.3",
   kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
   kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",
   kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index 04c7150b9f7..a85bbb8250d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -16,10 +16,15 @@
  */
 package org.apache.kafka.streams.kstream.internals.metrics;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
+import java.util.Map;
+
 public class Sensors {
     private Sensors() {}
 
@@ -39,4 +44,35 @@ public static Sensor lateRecordDropSensor(final 
InternalProcessorContext context
         );
         return sensor;
     }
+
+    public static Sensor recordLatenessSensor(final InternalProcessorContext 
context) {
+        final StreamsMetricsImpl metrics = context.metrics();
+
+        final Sensor sensor = metrics.taskLevelSensor(
+            context.taskId().toString(),
+            "record-lateness",
+            Sensor.RecordingLevel.DEBUG
+        );
+
+        final Map<String, String> tags = metrics.tagMap(
+            "task-id", context.taskId().toString()
+        );
+        sensor.add(
+            new MetricName(
+                "record-lateness-avg",
+                "stream-processor-node-metrics",
+                "The average observed lateness of records.",
+                tags),
+            new Avg()
+        );
+        sensor.add(
+            new MetricName(
+                "record-lateness-max",
+                "stream-processor-node-metrics",
+                "The max observed lateness of records.",
+                tags),
+            new Max()
+        );
+        return sensor;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 70202534d69..1fdd454ea54 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
 
 import java.util.Collections;
 import java.util.Comparator;
@@ -38,6 +39,7 @@
 public class PartitionGroup {
 
     private final Map<TopicPartition, RecordQueue> partitionQueues;
+    private final Sensor recordLatenessSensor;
     private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
 
     private long streamTime;
@@ -61,9 +63,10 @@ RecordQueue queue() {
         }
     }
 
-    PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues) {
+    PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, 
final Sensor recordLatenessSensor) {
         nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), 
Comparator.comparingLong(RecordQueue::timestamp));
         this.partitionQueues = partitionQueues;
+        this.recordLatenessSensor = recordLatenessSensor;
         totalBuffered = 0;
         allBuffered = false;
         streamTime = RecordQueue.UNKNOWN;
@@ -95,7 +98,12 @@ StampedRecord nextRecord(final RecordInfo info) {
                 }
 
                 // always update the stream time to the record's timestamp yet 
to be processed if it is larger
-                streamTime = Math.max(streamTime, record.timestamp);
+                if (record.timestamp > streamTime) {
+                    streamTime = record.timestamp;
+                    recordLatenessSensor.record(0);
+                } else {
+                    recordLatenessSensor.record(streamTime - record.timestamp);
+                }
             }
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 2ad0acc89d4..247a156eb38 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -52,6 +52,7 @@
 
 import static java.lang.String.format;
 import static java.util.Collections.singleton;
+import static 
org.apache.kafka.streams.kstream.internals.metrics.Sensors.recordLatenessSensor;
 
 /**
  * A StreamTask is associated with a {@link PartitionGroup}, and is assigned 
to a StreamThread for processing.
@@ -234,7 +235,7 @@ public StreamTask(final TaskId id,
         }
 
         recordInfo = new PartitionGroup.RecordInfo();
-        partitionGroup = new PartitionGroup(partitionQueues);
+        partitionGroup = new PartitionGroup(partitionQueues, 
recordLatenessSensor(processorContextImpl));
         processorContextImpl.setStreamTimeSupplier(partitionGroup::timestamp);
 
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 419c861f118..1074f02ff45 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
@@ -55,7 +54,9 @@
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -110,7 +111,7 @@ private void initStore(final boolean enableCaching) {
         final StoreBuilder<SessionStore<String, Long>> storeBuilder = 
Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME, 
ofMillis(GAP_MS * 3)),
                                                                                
                  Serdes.String(),
                                                                                
                  Serdes.Long())
-            .withLoggingDisabled();
+                                                                            
.withLoggingDisabled();
 
         if (enableCaching) {
             storeBuilder.withCachingEnabled();
@@ -335,9 +336,11 @@ public void shouldLogAndMeterWhenSkippingLateRecord() {
         context.setStreamTime(20);
         context.setRecordContext(new ProcessorRecordContext(0, -2, -3, 
"topic", null));
         processor.process("A", "1");
+        context.setRecordContext(new ProcessorRecordContext(1, -2, -3, 
"topic", null));
+        processor.process("A", "1");
         LogCaptureAppender.unregister(appender);
 
-        final Metric dropMetric = metrics.metrics().get(new MetricName(
+        final MetricName dropMetric = new MetricName(
             "late-record-drop-total",
             "stream-processor-node-metrics",
             "The total number of occurrence of late-record-drop operations.",
@@ -346,8 +349,24 @@ public void shouldLogAndMeterWhenSkippingLateRecord() {
                 mkEntry("task-id", "0_0"),
                 mkEntry("processor-node-id", "TESTING_NODE")
             )
-        ));
-        assertEquals(1.0, dropMetric.metricValue());
+        );
+
+        assertThat(metrics.metrics().get(dropMetric).metricValue(), is(2.0));
+
+        final MetricName dropRate = new MetricName(
+            "late-record-drop-rate",
+            "stream-processor-node-metrics",
+            "The average number of occurrence of late-record-drop operations.",
+            mkMap(
+                mkEntry("client-id", "test"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("processor-node-id", "TESTING_NODE")
+            )
+        );
+
+        assertThat((Double) metrics.metrics().get(dropRate).metricValue(), 
greaterThan(0.0));
+
         assertThat(appender.getMessages(), hasItem("Skipping record for 
expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] 
window=[0,0) expiration=[10]"));
+        assertThat(appender.getMessages(), hasItem("Skipping record for 
expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] 
window=[1,1) expiration=[10]"));
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 8ae628472d3..236cd8c6e3b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -27,6 +27,7 @@
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -42,6 +43,7 @@
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
+import org.hamcrest.Matcher;
 import org.junit.Test;
 
 import java.util.List;
@@ -51,9 +53,10 @@
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
-import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -70,7 +73,7 @@ public void testAggBasic() {
 
         final KTable<Windowed<String>, String> table2 = builder
             .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
             .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
             .aggregate(MockInitializer.STRING_INIT, 
MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()));
 
@@ -128,7 +131,7 @@ public void testJoin() {
 
         final KTable<Windowed<String>, String> table1 = builder
             .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
             .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
             .aggregate(MockInitializer.STRING_INIT, 
MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()));
 
@@ -137,7 +140,7 @@ public void testJoin() {
 
         final KTable<Windowed<String>, String> table2 = builder
             .stream(topic2, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
             .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
             .aggregate(MockInitializer.STRING_INIT, 
MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic2-Canonized").withValueSerde(Serdes.String()));
 
@@ -231,8 +234,9 @@ public void shouldLogAndMeterWhenSkippingNullKey() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
 
-        final KStream<String, String> stream1 = builder.stream(topic, 
Consumed.with(Serdes.String(), Serdes.String()));
-        stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+        builder
+            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
             .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
             .aggregate(
                 MockInitializer.STRING_INIT,
@@ -258,15 +262,15 @@ public void shouldLogAndMeterWhenSkippingExpiredWindow() {
 
         final KStream<String, String> stream1 = builder.stream(topic, 
Consumed.with(Serdes.String(), Serdes.String()));
         stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
-            
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100))
-            .aggregate(
-                () -> "",
-                MockAggregator.toStringInstance("+"),
-                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
-            )
-            .toStream()
-            .map((key, value) -> new KeyValue<>(key.toString(), value))
-            .to("output");
+               
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100))
+               .aggregate(
+                   () -> "",
+                   MockAggregator.toStringInstance("+"),
+                   Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
+               )
+               .toStream()
+               .map((key, value) -> new KeyValue<>(key.toString(), value))
+               .to("output");
 
         LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class);
         final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
@@ -281,17 +285,13 @@ public void shouldLogAndMeterWhenSkippingExpiredWindow() {
             driver.pipeInput(recordFactory.create(topic, "k", "6", 6L));
             LogCaptureAppender.unregister(appender);
 
-            final MetricName metricName = new MetricName(
-                "late-record-drop-total",
-                "stream-processor-node-metrics",
-                "The total number of occurrence of late-record-drop 
operations.",
-                mkMap(
-                    mkEntry("client-id", 
"topology-test-driver-virtual-thread"),
-                    mkEntry("task-id", "0_0"),
-                    mkEntry("processor-node-id", 
"KSTREAM-AGGREGATE-0000000001")
-                )
+            assertLatenessMetrics(
+                driver,
+                is(7.0), // how many events get dropped
+                is(100.0), // k:0 is 100ms late, since its time is 0, but it 
arrives at stream time 100.
+                is(84.875) // (0 + 100 + 99 + 98 + 97 + 96 + 95 + 94) / 8
             );
-            assertThat(driver.metrics().get(metricName).metricValue(), 
equalTo(7.0));
+
             assertThat(appender.getMessages(), hasItems(
                 "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
                 "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
@@ -316,59 +316,101 @@ public void 
shouldLogAndMeterWhenSkippingExpiredWindowByGrace() {
         final String topic = "topic";
 
         final KStream<String, String> stream1 = builder.stream(topic, 
Consumed.with(Serdes.String(), Serdes.String()));
-        stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
-            
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).grace(ofMillis(90L)))
-            .aggregate(
-                () -> "",
-                MockAggregator.toStringInstance("+"),
-                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
-            )
-            .toStream()
-            .map((key, value) -> new KeyValue<>(key.toString(), value))
-            .to("output");
+        stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+               
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(10)).grace(ofMillis(90L)))
+               .aggregate(
+                   () -> "",
+                   MockAggregator.toStringInstance("+"),
+                   Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
+               )
+               .toStream()
+               .map((key, value) -> new KeyValue<>(key.toString(), value))
+               .to("output");
 
         LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class);
         final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
-            driver.pipeInput(recordFactory.create(topic, "k", "100", 100L));
-            driver.pipeInput(recordFactory.create(topic, "k", "0", 0L));
-            driver.pipeInput(recordFactory.create(topic, "k", "1", 1L));
-            driver.pipeInput(recordFactory.create(topic, "k", "2", 2L));
-            driver.pipeInput(recordFactory.create(topic, "k", "3", 3L));
-            driver.pipeInput(recordFactory.create(topic, "k", "4", 4L));
-            driver.pipeInput(recordFactory.create(topic, "k", "5", 5L));
+            driver.pipeInput(recordFactory.create(topic, "k", "100", 200L));
+            driver.pipeInput(recordFactory.create(topic, "k", "0", 100L));
+            driver.pipeInput(recordFactory.create(topic, "k", "1", 101L));
+            driver.pipeInput(recordFactory.create(topic, "k", "2", 102L));
+            driver.pipeInput(recordFactory.create(topic, "k", "3", 103L));
+            driver.pipeInput(recordFactory.create(topic, "k", "4", 104L));
+            driver.pipeInput(recordFactory.create(topic, "k", "5", 105L));
             driver.pipeInput(recordFactory.create(topic, "k", "6", 6L));
             LogCaptureAppender.unregister(appender);
 
-            final MetricName metricName = new MetricName(
-                "late-record-drop-total",
-                "stream-processor-node-metrics",
-                "The total number of occurrence of late-record-drop 
operations.",
-                mkMap(
-                    mkEntry("client-id", 
"topology-test-driver-virtual-thread"),
-                    mkEntry("task-id", "0_0"),
-                    mkEntry("processor-node-id", 
"KSTREAM-AGGREGATE-0000000001")
-                )
-            );
-            assertThat(driver.metrics().get(metricName).metricValue(), 
equalTo(7.0));
+            assertLatenessMetrics(driver, is(7.0), is(194.0), is(97.375));
+
             assertThat(appender.getMessages(), hasItems(
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10]"
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110]",
+                "Skipping record for expired window. key=[k] topic=[topic] 
partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110]"
             ));
 
-            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), 
"[k@95/105]", "+100", 100);
-            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), 
"[k@100/110]", "+100", 100);
-            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), 
"[k@5/15]", "+5", 5);
-            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), 
"[k@5/15]", "+5+6", 6);
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), 
"[k@200/210]", "+100", 200);
             assertThat(driver.readOutput("output"), nullValue());
         }
     }
 
+    private void assertLatenessMetrics(final TopologyTestDriver driver,
+                                       final Matcher<Object> dropTotal,
+                                       final Matcher<Object> maxLateness,
+                                       final Matcher<Object> avgLateness) {
+        final MetricName dropMetric = new MetricName(
+            "late-record-drop-total",
+            "stream-processor-node-metrics",
+            "The total number of occurrence of late-record-drop operations.",
+            mkMap(
+                mkEntry("client-id", "topology-test-driver-virtual-thread"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
+            )
+        );
+
+        assertThat(driver.metrics().get(dropMetric).metricValue(), dropTotal);
+
+
+        final MetricName dropRate = new MetricName(
+            "late-record-drop-rate",
+            "stream-processor-node-metrics",
+            "The average number of occurrence of late-record-drop operations.",
+            mkMap(
+                mkEntry("client-id", "topology-test-driver-virtual-thread"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
+            )
+        );
+
+        assertThat(driver.metrics().get(dropRate).metricValue(), not(0.0));
+
+        final MetricName latenessMaxMetric = new MetricName(
+            "record-lateness-max",
+            "stream-processor-node-metrics",
+            "The max observed lateness of records.",
+            mkMap(
+                mkEntry("client-id", "topology-test-driver-virtual-thread"),
+                mkEntry("task-id", "0_0")
+            )
+        );
+        assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), 
maxLateness);
+
+        final MetricName latenessAvgMetric = new MetricName(
+            "record-lateness-avg",
+            "stream-processor-node-metrics",
+            "The average observed lateness of records.",
+            mkMap(
+                mkEntry("client-id", "topology-test-driver-virtual-thread"),
+                mkEntry("task-id", "0_0")
+            )
+        );
+        assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), 
avgLateness);
+    }
+
     private ProducerRecord<String, String> getOutput(final TopologyTestDriver 
driver) {
         return driver.readOutput("output", new StringDeserializer(), new 
StringDeserializer());
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 2df4f66cb85..c84bbc200a5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -17,7 +17,11 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -65,7 +69,19 @@
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
 
-    private final PartitionGroup group = new 
PartitionGroup(mkMap(mkEntry(partition1, queue1), mkEntry(partition2, queue2)));
+    private final Metrics metrics = new Metrics();
+    private final MetricName lastLatenessValue = new 
MetricName("record-lateness-last-value", "", "", mkMap());
+
+    private final PartitionGroup group = new PartitionGroup(
+        mkMap(mkEntry(partition1, queue1), mkEntry(partition2, queue2)),
+        getValueSensor(metrics, lastLatenessValue)
+    );
+
+    private static Sensor getValueSensor(final Metrics metrics, final 
MetricName metricName) {
+        final Sensor lastRecordedValue = metrics.sensor(metricName.name());
+        lastRecordedValue.add(metricName, new Value());
+        return lastRecordedValue;
+    }
 
     @Test
     public void testTimeTracking() {
@@ -90,10 +106,9 @@ public void testTimeTracking() {
         // 2:[2, 4, 6]
         // st: -1 since no records was being processed yet
 
-        assertEquals(6, group.numBuffered());
-        assertEquals(3, group.numBuffered(partition1));
-        assertEquals(3, group.numBuffered(partition2));
+        verifyBuffered(6, 3, 3);
         assertEquals(-1L, group.timestamp());
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         StampedRecord record;
         final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
@@ -104,11 +119,9 @@ public void testTimeTracking() {
         // 2:[2, 4, 6]
         // st: 2
         assertEquals(partition1, info.partition());
-        assertEquals(1L, record.timestamp);
-        assertEquals(5, group.numBuffered());
-        assertEquals(2, group.numBuffered(partition1));
-        assertEquals(3, group.numBuffered(partition2));
-        assertEquals(1L, group.timestamp());
+        verifyTimes(record, 1L, 1L);
+        verifyBuffered(5, 2, 3);
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         // get one record, now the time should be advanced
         record = group.nextRecord(info);
@@ -116,11 +129,9 @@ public void testTimeTracking() {
         // 2:[4, 6]
         // st: 3
         assertEquals(partition2, info.partition());
-        assertEquals(2L, record.timestamp);
-        assertEquals(4, group.numBuffered());
-        assertEquals(2, group.numBuffered(partition1));
-        assertEquals(2, group.numBuffered(partition2));
-        assertEquals(2L, group.timestamp());
+        verifyTimes(record, 2L, 2L);
+        verifyBuffered(4, 2, 2);
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         // add 2 more records with timestamp 2, 4 to partition-1
         final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@@ -131,10 +142,9 @@ public void testTimeTracking() {
         // 1:[3, 5, 2, 4]
         // 2:[4, 6]
         // st: 3 (non-decreasing, so adding 2 doesn't change it)
-        assertEquals(6, group.numBuffered());
-        assertEquals(4, group.numBuffered(partition1));
-        assertEquals(2, group.numBuffered(partition2));
+        verifyBuffered(6, 4, 2);
         assertEquals(2L, group.timestamp());
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         // get one record, time should not be advanced
         record = group.nextRecord(info);
@@ -142,11 +152,9 @@ public void testTimeTracking() {
         // 2:[4, 6]
         // st: 4 as partition st is now {5, 4}
         assertEquals(partition1, info.partition());
-        assertEquals(3L, record.timestamp);
-        assertEquals(5, group.numBuffered());
-        assertEquals(3, group.numBuffered(partition1));
-        assertEquals(2, group.numBuffered(partition2));
-        assertEquals(3L, group.timestamp());
+        verifyTimes(record, 3L, 3L);
+        verifyBuffered(5, 3, 2);
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         // get one record, time should not be advanced
         record = group.nextRecord(info);
@@ -154,11 +162,9 @@ public void testTimeTracking() {
         // 2:[6]
         // st: 5 as partition st is now {5, 6}
         assertEquals(partition2, info.partition());
-        assertEquals(4L, record.timestamp);
-        assertEquals(4, group.numBuffered());
-        assertEquals(3, group.numBuffered(partition1));
-        assertEquals(1, group.numBuffered(partition2));
-        assertEquals(4L, group.timestamp());
+        verifyTimes(record, 4L, 4L);
+        verifyBuffered(4, 3, 1);
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         // get one more record, now time should be advanced
         record = group.nextRecord(info);
@@ -166,11 +172,9 @@ public void testTimeTracking() {
         // 2:[6]
         // st: 5
         assertEquals(partition1, info.partition());
-        assertEquals(5L, record.timestamp);
-        assertEquals(3, group.numBuffered());
-        assertEquals(2, group.numBuffered(partition1));
-        assertEquals(1, group.numBuffered(partition2));
-        assertEquals(5L, group.timestamp());
+        verifyTimes(record, 5L, 5L);
+        verifyBuffered(3, 2, 1);
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         // get one more record, time should not be advanced
         record = group.nextRecord(info);
@@ -178,11 +182,9 @@ public void testTimeTracking() {
         // 2:[6]
         // st: 5
         assertEquals(partition1, info.partition());
-        assertEquals(2L, record.timestamp);
-        assertEquals(2, group.numBuffered());
-        assertEquals(1, group.numBuffered(partition1));
-        assertEquals(1, group.numBuffered(partition2));
-        assertEquals(5L, group.timestamp());
+        verifyTimes(record, 2L, 5L);
+        verifyBuffered(2, 1, 1);
+        assertEquals(3.0, metrics.metric(lastLatenessValue).metricValue());
 
         // get one more record, time should not be advanced
         record = group.nextRecord(info);
@@ -190,11 +192,9 @@ public void testTimeTracking() {
         // 2:[6]
         // st: 4 (doesn't advance because 1 is empty, so it's still reporting 
the last-known time of 4)
         assertEquals(partition1, info.partition());
-        assertEquals(4L, record.timestamp);
-        assertEquals(1, group.numBuffered());
-        assertEquals(0, group.numBuffered(partition1));
-        assertEquals(1, group.numBuffered(partition2));
-        assertEquals(5L, group.timestamp());
+        verifyTimes(record, 4L, 5L);
+        verifyBuffered(1, 0, 1);
+        assertEquals(1.0, metrics.metric(lastLatenessValue).metricValue());
 
         // get one more record, time should not be advanced
         record = group.nextRecord(info);
@@ -202,11 +202,20 @@ public void testTimeTracking() {
         // 2:[]
         // st: 4 (1 and 2 are empty, so they are still reporting the 
last-known times of 4 and 6.)
         assertEquals(partition2, info.partition());
-        assertEquals(6L, record.timestamp);
-        assertEquals(0, group.numBuffered());
-        assertEquals(0, group.numBuffered(partition1));
-        assertEquals(0, group.numBuffered(partition2));
-        assertEquals(6L, group.timestamp());
+        verifyTimes(record, 6L, 6L);
+        verifyBuffered(0, 0, 0);
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
+
+    }
+
+    private void verifyTimes(final StampedRecord record, final long 
recordTime, final long streamTime) {
+        assertEquals(recordTime, record.timestamp);
+        assertEquals(streamTime, group.timestamp());
+    }
 
+    private void verifyBuffered(final int totalBuffered, final int 
partitionOneBuffered, final int partitionTwoBuffered) {
+        assertEquals(totalBuffered, group.numBuffered());
+        assertEquals(partitionOneBuffered, group.numBuffered(partition1));
+        assertEquals(partitionTwoBuffered, group.numBuffered(partition2));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java 
b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 1d64316cb3c..1dcebb517a7 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -32,6 +32,7 @@
 import java.util.Properties;
 import java.util.UUID;
 
+import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -50,6 +51,7 @@ public static Properties getStreamsConfig(final String 
applicationId,
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
keySerdeClassName);
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
valueSerdeClassName);
         props.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, DEBUG.name);
         props.putAll(additional);
         return props;
     }
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index d10a45c00bf..2abfd6354be 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -29,7 +29,9 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -255,7 +257,13 @@ private TopologyTestDriver(final InternalTopologyBuilder 
builder,
 
         final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
         stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime);
-        metrics = new Metrics();
+
+        final MetricConfig metricConfig = new MetricConfig()
+            
.samples(streamsConfig.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+            
.recordLevel(Sensor.RecordingLevel.forName(streamsConfig.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
+            
.timeWindow(streamsConfig.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
 TimeUnit.MILLISECONDS);
+
+        metrics = new Metrics(metricConfig, mockWallClockTime);
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
             "topology-test-driver-virtual-thread"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-328: Add in-memory Suppression
> ----------------------------------
>
>                 Key: KAFKA-7223
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7223
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Major
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.]
>  
> This ticket is to implement Suppress, but only for in-memory buffers.
> (depends on KAFKA-7222)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to