lucasbru commented on code in PR #19908:
URL: https://github.com/apache/kafka/pull/19908#discussion_r2131868905


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1047,6 +1051,21 @@ private KafkaStreams(final TopologyMetadata 
topologyMetadata,
         rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs);
     }
 
+    private void throwIfUnsupportedFeatureIsUsedWithStreamsRebalanceProtocol() 
{
+        if 
(applicationConfigs.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name))
 {

Review Comment:
   Done



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1525,10 +1525,27 @@ protected StreamsConfig(final Map<?, ?> props,
         }
         
verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG));
         verifyClientTelemetryConfigs();
+        verifyStreamsProtocolCompatibility(doLog);
+    }
 
+    private void verifyStreamsProtocolCompatibility(final boolean doLog) {
         if (doLog && 
getString(GROUP_PROTOCOL_CONFIG).equals(GroupProtocol.STREAMS.name().toLowerCase(Locale.ROOT)))
 {

Review Comment:
   Good point. I actually added a little protected method to check if streams 
protocol is enabled, to eliminate this source of bugs.



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1616,6 +1616,67 @@ public void shouldSetGroupProtocolToStreams() {
         
assertTrue(GroupProtocol.STREAMS.name().equalsIgnoreCase(streamsConfig.getString(GROUP_PROTOCOL_CONFIG)));
     }
 
+    @Test
+    public void shouldLogWarningWhenStreamsProtocolIsUsed() {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+            appender.setClassLogger(StreamsConfig.class, Level.WARN);
+            props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+
+            new StreamsConfig(props);
+
+            assertTrue(appender.getMessages().stream()
+                .anyMatch(msg -> msg.contains("The streams rebalance protocol 
is still in development and should " +
+                    "not be used in production. Please set 
group.protocol=classic (default) in all production use cases.")));
+        }
+    }
+
+    @Test
+    public void shouldLogWarningWhenWarmupReplicasSetWithStreamsProtocol() {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+            appender.setClassLogger(StreamsConfig.class, Level.WARN);
+            props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+            props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 1);
+
+            new StreamsConfig(props);
+
+            assertTrue(appender.getMessages().stream()
+                .anyMatch(msg -> msg.contains("Warmup replicas are not 
supported yet with the streams protocol and " +
+                    "will be ignored. If you want to use warmup replicas, 
please set group.protocol=classic.")));
+        }
+    }
+
+    @Test
+    public void shouldLogWarningWhenStandbyReplicasSetWithStreamsProtocol() {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+            appender.setClassLogger(StreamsConfig.class, Level.WARN);
+            props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+            props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+
+            new StreamsConfig(props);
+
+            assertTrue(appender.getMessages().stream()
+                .anyMatch(msg -> msg.contains("Standby replicas are configured 
broker-side in the streams group " +
+                    "protocol and will be ignored. Please use the admin client 
or kafka-configs.sh to set the streams " +
+                    "groups's standby replicas.")));
+        }
+    }
+
+    @Test
+    public void 
shouldThrowConfigExceptionWhenStreamsProtocolUsedWithStaticMembership() {
+        final Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
+        props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+        props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "static-member-1");

Review Comment:
   Good point. Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to