cadonna commented on a change in pull request #9614: URL: https://github.com/apache/kafka/pull/9614#discussion_r526880412
########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -1070,7 +1070,9 @@ private Thread shutdownHelper(final boolean error) { adminClient.close(); streamsMetrics.removeAllClientLevelMetrics(); + streamsMetrics.removeAllClientLevelSensors(); Review comment: Could you please add a public method to `StreamsMetricsImpl` named `removeAllClientLevelSensorsAndMetrics()` that calls `removeAllClientLevelMetrics()` and `removeAllClientLevelSensors()` and make the latter two methods `private`? ########## File path: streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java ########## @@ -60,6 +65,7 @@ private ClientMetrics() {} "The description of the topology executed in the Kafka Streams client"; private static final String STATE_DESCRIPTION = "The state of the Kafka Streams client"; private static final String ALIVE_STREAM_THREADS_DESCRIPTION = "The current number of alive stream threads that are running or participating in rebalance"; + private static final String FAILED_STREAM_THREADS_DESCRIPTION = "The number of failed stream threads so far for a given Kafka Streams client"; Review comment: ```suggestion private static final String FAILED_STREAM_THREADS_DESCRIPTION = "The number of failed stream threads since the start of the Kafka Streams client"; ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ########## @@ -93,6 +93,7 @@ public int hashCode() { private final Version version; private final Deque<MetricName> clientLevelMetrics = new LinkedList<>(); + private final Map<String, Deque<String>> clientLevelSensors = new HashMap<>(); Review comment: Here you should just need a queue as for `clientLevelMetrics`. We need a map for the other levels because there can be multiple objects for each level, e.g., there might be multiple stream thread and each one manages its sensors under a key in the map. However, there is only one client on client level. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ########## @@ -253,6 +268,16 @@ public final void removeAllClientLevelMetrics() { } } + public final void removeAllClientLevelSensors() { Review comment: Unit tests for this method are missing. Please also consider my comment in class `KafkaStreams` for these unit tests. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -221,6 +221,9 @@ State setState(final State newState) { throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState); } else { log.info("State transition from {} to {}", oldState, newState); + if (newState == State.DEAD) { + failedStreamThreadSensor.record(); + } Review comment: Not every dead stream thread is a failed stream thread. You should record this metric where the uncaught exception handler is called because there we now that a stream thread died unexpectedly. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java ########## @@ -377,7 +378,7 @@ private void shouldAddMetricsOnAllLevels(final String builtInMetricsVersion) thr builtInMetricsVersion ); checkCacheMetrics(builtInMetricsVersion); - + verifyFailedStreamThreadsSensor(0.0); Review comment: I would put the test whether the metric is recorded correctly in `StreamThreadTest`. An example for such a test is `shouldLogAndRecordSkippedRecordsForInvalidTimestamps()`. I do not think an integration test is needed. The test regarding the existence of the metric, i.e., `checkMetricByName(listMetricThread, FAILED_STREAM_THREADS, 1);` should stay here. ########## File path: streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java ########## @@ -99,6 +121,27 @@ public void shouldAddAliveStreamThreadsMetric() { ); } + @Test + public void shouldGetFailedStreamThreadsSensor() { + final String name = "failed-stream-threads"; + final String description = "The number of failed stream threads so far for a given Kafka Streams client"; + expect(streamsMetrics.clientLevelSensor(name, RecordingLevel.INFO)).andReturn(expectedSensor); + expect(streamsMetrics.clientLevelTagMap()).andReturn(tagMap); + StreamsMetricsImpl.addSumMetricToSensor( + expectedSensor, + CLIENT_LEVEL_GROUP, + tagMap, + name, + false, + description + ); + replay(StreamsMetricsImpl.class, streamsMetrics); + + final Sensor sensor = ClientMetrics.failedStreamThreadSensor(streamsMetrics); Review comment: nit: I like to insert a blank line after the call to test to visually separate setup, call, and verification. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ########## @@ -214,6 +215,20 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() { } } + public final Sensor clientLevelSensor(final String sensorName, Review comment: Unit tests for this method are missing. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org