mjsax commented on code in PR #17422:
URL: https://github.com/apache/kafka/pull/17422#discussion_r1792596154


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1459,6 +1467,43 @@ private void verifyEOSTransactionTimeoutCompatibility() {
         }
     }
 
+    private void verifyClientTelemetryConfigs() {
+        final String mainConsumerMetricsPushKey = 
mainConsumerPrefix(ENABLE_METRICS_PUSH_CONFIG);
+        final String adminClientMetricsPushKey = 
adminClientPrefix(ENABLE_METRICS_PUSH_CONFIG);
+        final boolean streamTelemetryEnabled =

Review Comment:
   Can we just use `getBoolean(ENABLE_METRICS_PUSH_CONFIG)` ?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1459,6 +1467,43 @@ private void verifyEOSTransactionTimeoutCompatibility() {
         }
     }
 
+    private void verifyClientTelemetryConfigs() {
+        final String mainConsumerMetricsPushKey = 
mainConsumerPrefix(ENABLE_METRICS_PUSH_CONFIG);
+        final String adminClientMetricsPushKey = 
adminClientPrefix(ENABLE_METRICS_PUSH_CONFIG);
+        final boolean streamTelemetryEnabled =
+                !originals().containsKey(ENABLE_METRICS_PUSH_CONFIG) || 
(boolean) Objects.requireNonNull(
+                        parseType(ENABLE_METRICS_PUSH_CONFIG, 
originals().get(ENABLE_METRICS_PUSH_CONFIG), Type.BOOLEAN),
+                        "Can't parse " + ENABLE_METRICS_PUSH_CONFIG + " 
because it's null");
+
+        final boolean mainConsumerMetricsDisabled =
+                !originals().containsKey(mainConsumerMetricsPushKey) || 
(boolean) Objects.requireNonNull(

Review Comment:
   Why `!originals().containsKey(mainConsumerMetricsPushKey) ||` ? If there is 
no `main.consumer.enable.metric.push` that it should default to `true` what 
mean enabled?
   
   Also, if there is no `main.consumer.enable.metric.push` there could still be 
`consumer.enable.metric.push` and I think we need to check this case if 
`main.consumer.xxx` is not set.
   
   Would it simplify the code, if we use `originalsWithPrefix(...)` helper 
method?



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1323,6 +1327,51 @@ public void 
shouldDisableMetricCollectionOnMainConsumerOnly() {
         );
     }
 
+    @Test
+    public void 
shouldThrowConfigExceptionWhenAdminClientMetricsDisabledStreamsMetricsPushEnabled()
 {
+        
props.put(StreamsConfig.adminClientPrefix(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG),
 false);
+
+        final Exception exception = assertThrows(ConfigException.class, () -> 
new StreamsConfig(props));
+
+        assertThat(
+                exception.getMessage(),
+                containsString("KafkaStreams has metrics push enabled" +
+                        " but the admin client metrics push is disabled. 
Enable " +
+                        " metrics push for the admin client")
+        );
+        assertNull(

Review Comment:
   Seems this in unrelated to this test? If we want to verify this, might be 
best to add new test case?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1459,6 +1467,43 @@ private void verifyEOSTransactionTimeoutCompatibility() {
         }
     }
 
+    private void verifyClientTelemetryConfigs() {
+        final String mainConsumerMetricsPushKey = 
mainConsumerPrefix(ENABLE_METRICS_PUSH_CONFIG);
+        final String adminClientMetricsPushKey = 
adminClientPrefix(ENABLE_METRICS_PUSH_CONFIG);
+        final boolean streamTelemetryEnabled =
+                !originals().containsKey(ENABLE_METRICS_PUSH_CONFIG) || 
(boolean) Objects.requireNonNull(
+                        parseType(ENABLE_METRICS_PUSH_CONFIG, 
originals().get(ENABLE_METRICS_PUSH_CONFIG), Type.BOOLEAN),
+                        "Can't parse " + ENABLE_METRICS_PUSH_CONFIG + " 
because it's null");
+
+        final boolean mainConsumerMetricsDisabled =
+                !originals().containsKey(mainConsumerMetricsPushKey) || 
(boolean) Objects.requireNonNull(
+                parseType(mainConsumerMetricsPushKey, 
originals().get(mainConsumerMetricsPushKey), Type.BOOLEAN),
+                "Can't parse " + mainConsumerMetricsPushKey + " because it's 
null");
+
+        final boolean adminMetricsEnabled =
+                !originals().containsKey(adminClientMetricsPushKey) || 
(boolean) Objects.requireNonNull(
+                        parseType(adminClientMetricsPushKey, 
originals().get(adminClientMetricsPushKey), Type.BOOLEAN),

Review Comment:
   Would it simplify the code, if we use `originalsWithPrefix(...)` helper 
method?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1459,6 +1467,43 @@ private void verifyEOSTransactionTimeoutCompatibility() {
         }
     }
 
+    private void verifyClientTelemetryConfigs() {
+        final String mainConsumerMetricsPushKey = 
mainConsumerPrefix(ENABLE_METRICS_PUSH_CONFIG);
+        final String adminClientMetricsPushKey = 
adminClientPrefix(ENABLE_METRICS_PUSH_CONFIG);
+        final boolean streamTelemetryEnabled =
+                !originals().containsKey(ENABLE_METRICS_PUSH_CONFIG) || 
(boolean) Objects.requireNonNull(
+                        parseType(ENABLE_METRICS_PUSH_CONFIG, 
originals().get(ENABLE_METRICS_PUSH_CONFIG), Type.BOOLEAN),
+                        "Can't parse " + ENABLE_METRICS_PUSH_CONFIG + " 
because it's null");
+
+        final boolean mainConsumerMetricsDisabled =
+                !originals().containsKey(mainConsumerMetricsPushKey) || 
(boolean) Objects.requireNonNull(
+                parseType(mainConsumerMetricsPushKey, 
originals().get(mainConsumerMetricsPushKey), Type.BOOLEAN),
+                "Can't parse " + mainConsumerMetricsPushKey + " because it's 
null");
+
+        final boolean adminMetricsEnabled =
+                !originals().containsKey(adminClientMetricsPushKey) || 
(boolean) Objects.requireNonNull(
+                        parseType(adminClientMetricsPushKey, 
originals().get(adminClientMetricsPushKey), Type.BOOLEAN),
+                        "Can't parse " + adminClientMetricsPushKey + " because 
it's null");
+
+        final String baseMetricsMisconfigurationMessage = "KafkaStreams has 
metrics push enabled" +
+                " but the %s metrics push is disabled. Enable " +
+                " metrics push for the %s";
+
+        if (streamTelemetryEnabled) {
+            if (!mainConsumerMetricsDisabled && !adminMetricsEnabled) {

Review Comment:
   Given that you use `!...Disabled` and `!...Enabled` would it make sense to 
flip the variables from "disabled -> enable" and "enable -> disabled" to avoid 
the negation here (and below)?
   
   Also, I think we should throw if `streamTelemetryEnabled && 
mainConsumerMetricsDisabled` not if "main consumer enable" -- it seem the code 
works correctly though, as we compute `mainConsumerMetricsDisabled` incorrectly 
above, and thus there is two error which cancel out each other?



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1323,6 +1327,51 @@ public void 
shouldDisableMetricCollectionOnMainConsumerOnly() {
         );
     }
 
+    @Test
+    public void 
shouldThrowConfigExceptionWhenAdminClientMetricsDisabledStreamsMetricsPushEnabled()
 {
+        
props.put(StreamsConfig.adminClientPrefix(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG),
 false);
+
+        final Exception exception = assertThrows(ConfigException.class, () -> 
new StreamsConfig(props));
+
+        assertThat(
+                exception.getMessage(),
+                containsString("KafkaStreams has metrics push enabled" +
+                        " but the admin client metrics push is disabled. 
Enable " +
+                        " metrics push for the admin client")
+        );
+        assertNull(
+                streamsConfig.getRestoreConsumerConfigs("clientId")
+                        .get(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG)
+        );
+        assertNull(
+                streamsConfig.getGlobalConsumerConfigs("clientId")
+                        .get(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG)
+        );
+    }
+
+    @Test
+    public void 
shouldThrowConfigExceptionWhenAdminClientAndMainConsumerMetricsDisabledStreamsMetricsPushEnabled()
 {
+        
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG),
 false);
+        
props.put(StreamsConfig.adminClientPrefix(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG),
 false);
+
+        final Exception exception = assertThrows(ConfigException.class, () -> 
new StreamsConfig(props));
+
+        assertThat(
+                exception.getMessage(),
+                containsString("KafkaStreams has metrics push enabled" +
+                        " but the main consumer and admin client metrics push 
is disabled. Enable " +
+                        " metrics push for the main consumer and the admin 
client")
+        );
+        assertNull(

Review Comment:
   as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to