cadonna commented on code in PR #17820:
URL: https://github.com/apache/kafka/pull/17820#discussion_r1843502960


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java:
##########
@@ -413,6 +415,39 @@ public void shouldAddThreadStartTimeMetric() {
         );
     }
 
+    @Test
+    public void shouldAddThreadStateTelemetryMetric() {
+        final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime) 
-> StreamThread.State.RUNNING.ordinal();
+        ThreadMetrics.addThreadStateTelemetryMetric(
+                "threadId",

Review Comment:
   There is a constant for a test thread ID:
   
   ```suggestion
                   THREAD_ID,
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java:
##########
@@ -413,6 +415,39 @@ public void shouldAddThreadStartTimeMetric() {
         );
     }
 
+    @Test
+    public void shouldAddThreadStateTelemetryMetric() {
+        final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime) 
-> StreamThread.State.RUNNING.ordinal();
+        ThreadMetrics.addThreadStateTelemetryMetric(
+                "threadId",
+                streamsMetrics,
+                threadStateProvider
+        );
+        verify(streamsMetrics).addThreadLevelMutableMetric(
+                "thread-state",
+                "The current state of the thread",
+                "threadId",

Review Comment:
   ```suggestion
                   THREAD_ID,
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java:
##########
@@ -413,6 +415,39 @@ public void shouldAddThreadStartTimeMetric() {
         );
     }
 
+    @Test
+    public void shouldAddThreadStateTelemetryMetric() {
+        final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime) 
-> StreamThread.State.RUNNING.ordinal();
+        ThreadMetrics.addThreadStateTelemetryMetric(
+                "threadId",
+                streamsMetrics,
+                threadStateProvider
+        );
+        verify(streamsMetrics).addThreadLevelMutableMetric(
+                "thread-state",
+                "The current state of the thread",
+                "threadId",
+                threadStateProvider
+        );
+    }
+
+    @Test
+    public void shouldAddThreadStateJMXMetric() {
+        final Gauge<String> threadStateProvider = (streamsMetrics, startTime) 
-> StreamThread.State.RUNNING.name().toLowerCase(Locale.getDefault());
+        ThreadMetrics.addThreadStateMetric(
+                "threadId",
+                streamsMetrics,
+                threadStateProvider
+        );
+        verify(streamsMetrics).addThreadLevelMutableMetric(
+                "state",
+                "The current state of the thread",
+                "threadId",

Review Comment:
   ```suggestion
                   THREAD_ID,
   ```



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1067,7 +1070,7 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
     private static Metrics createMetrics(final StreamsConfig config, final 
Time time, final String clientId) {
         final MetricConfig metricConfig = new MetricConfig()
             .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
-            
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
+            
.recordLevel(Sensor.RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)))

Review Comment:
   Why did you remove the `StreamsConfig.` prefix? For all other config names, 
we use the prefix. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -614,6 +615,12 @@ public StreamThread(final Time time,
             streamsMetrics,
             time.milliseconds()
         );
+        ThreadMetrics.addThreadStateTelemetryMetric(threadId,

Review Comment:
   nit: we usually use 4 spaces indentation.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java:
##########
@@ -413,6 +415,39 @@ public void shouldAddThreadStartTimeMetric() {
         );
     }
 
+    @Test
+    public void shouldAddThreadStateTelemetryMetric() {
+        final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime) 
-> StreamThread.State.RUNNING.ordinal();
+        ThreadMetrics.addThreadStateTelemetryMetric(
+                "threadId",
+                streamsMetrics,
+                threadStateProvider
+        );
+        verify(streamsMetrics).addThreadLevelMutableMetric(
+                "thread-state",
+                "The current state of the thread",
+                "threadId",
+                threadStateProvider
+        );
+    }
+
+    @Test
+    public void shouldAddThreadStateJMXMetric() {

Review Comment:
   nit:
   ```suggestion
       public void shouldAddThreadStateJmxMetric() {
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java:
##########
@@ -413,6 +415,39 @@ public void shouldAddThreadStartTimeMetric() {
         );
     }
 
+    @Test
+    public void shouldAddThreadStateTelemetryMetric() {
+        final Gauge<Integer> threadStateProvider = (streamsMetrics, startTime) 
-> StreamThread.State.RUNNING.ordinal();
+        ThreadMetrics.addThreadStateTelemetryMetric(
+                "threadId",
+                streamsMetrics,
+                threadStateProvider
+        );
+        verify(streamsMetrics).addThreadLevelMutableMetric(
+                "thread-state",
+                "The current state of the thread",
+                "threadId",
+                threadStateProvider
+        );
+    }
+
+    @Test
+    public void shouldAddThreadStateJMXMetric() {
+        final Gauge<String> threadStateProvider = (streamsMetrics, startTime) 
-> StreamThread.State.RUNNING.name().toLowerCase(Locale.getDefault());
+        ThreadMetrics.addThreadStateMetric(
+                "threadId",

Review Comment:
   ```suggestion
                   THREAD_ID,
   ```



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -180,15 +180,16 @@ public void shouldPushMetricsToBroker(final String 
recordingLevel) throws Except
                         final String name = mn.name().replace('-', '.');
                         final String group = mn.group().replace("-metrics", 
"").replace('-', '.');
                         return "org.apache.kafka." + group + "." + name;
-                    }).sorted().collect(Collectors.toList());
+                    }).filter(name -> 
!name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter 
filters out string metrics
+                    .sorted().collect(Collectors.toList());

Review Comment:
   I do not understand this. According to the KIP and the code in 
`ThreadMetrics` the type of the metric is numeric.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to