mjsax commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r428930276



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -420,6 +422,50 @@ public void shouldRecordProcessRatio() {
         assertThat(metric.metricValue(), equalTo(1.0d));
     }
 
+    @Test
+    public void shouldRecordE2ELatency() {
+        time = new MockTime(0L, 0L, 0L);
+        metrics = new Metrics(new 
MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), time);
+
+        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
+
+        final KafkaMetric maxMetric = getMetric("record-e2e-latency", 
"%s-max", task.id().toString(), StreamsConfig.METRICS_LATEST);
+        final KafkaMetric minMetric = getMetric("record-e2e-latency", 
"%s-min", task.id().toString(), StreamsConfig.METRICS_LATEST);
+
+        assertThat(maxMetric.metricValue(), equalTo(Double.NaN));
+
+        task.addRecords(partition1, asList(
+            getConsumerRecord(partition1, 0L),
+            getConsumerRecord(partition1, 10L),
+            getConsumerRecord(partition1, 5L),
+            getConsumerRecord(partition1, 20L)

Review comment:
       We we increase this ts to 35? This would allow to test min in the last 
step better

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
##########
@@ -14,15 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.kstream.internals.metrics;
+package org.apache.kafka.streams.processor.internals.metrics;

Review comment:
       Nice one!

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##########
@@ -86,6 +87,14 @@ private TaskMetrics() {}
     private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count 
of buffered records that are polled " +
         "from consumer and not yet processed for this active task";
 
+    private static final String RECORD_E2E_LATENCY = "record-e2e-latency";
+    static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION =
+        "The maximum end-to-end latency of a record, measuring by comparing 
the record timestamp with the "
+            + "system time when it has been fully processed by the task";

Review comment:
       Assuming that a task might have a cache, is this correct, ie, `has been 
fully processed by the task`)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to