lucasbru commented on code in PR #19908: URL: https://github.com/apache/kafka/pull/19908#discussion_r2131868905
########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1047,6 +1051,21 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs); } + private void throwIfUnsupportedFeatureIsUsedWithStreamsRebalanceProtocol() { + if (applicationConfigs.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name)) { Review Comment: Done ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1525,10 +1525,27 @@ protected StreamsConfig(final Map<?, ?> props, } verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG)); verifyClientTelemetryConfigs(); + verifyStreamsProtocolCompatibility(doLog); + } + private void verifyStreamsProtocolCompatibility(final boolean doLog) { if (doLog && getString(GROUP_PROTOCOL_CONFIG).equals(GroupProtocol.STREAMS.name().toLowerCase(Locale.ROOT))) { Review Comment: Good point. I actually added a little protected method to check if streams protocol is enabled, to eliminate this source of bugs. ########## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ########## @@ -1616,6 +1616,67 @@ public void shouldSetGroupProtocolToStreams() { assertTrue(GroupProtocol.STREAMS.name().equalsIgnoreCase(streamsConfig.getString(GROUP_PROTOCOL_CONFIG))); } + @Test + public void shouldLogWarningWhenStreamsProtocolIsUsed() { + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + appender.setClassLogger(StreamsConfig.class, Level.WARN); + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); + + new StreamsConfig(props); + + assertTrue(appender.getMessages().stream() + .anyMatch(msg -> msg.contains("The streams rebalance protocol is still in development and should " + + "not be used in production. Please set group.protocol=classic (default) in all production use cases."))); + } + } + + @Test + public void shouldLogWarningWhenWarmupReplicasSetWithStreamsProtocol() { + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + appender.setClassLogger(StreamsConfig.class, Level.WARN); + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); + props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 1); + + new StreamsConfig(props); + + assertTrue(appender.getMessages().stream() + .anyMatch(msg -> msg.contains("Warmup replicas are not supported yet with the streams protocol and " + + "will be ignored. If you want to use warmup replicas, please set group.protocol=classic."))); + } + } + + @Test + public void shouldLogWarningWhenStandbyReplicasSetWithStreamsProtocol() { + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + appender.setClassLogger(StreamsConfig.class, Level.WARN); + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); + props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + + new StreamsConfig(props); + + assertTrue(appender.getMessages().stream() + .anyMatch(msg -> msg.contains("Standby replicas are configured broker-side in the streams group " + + "protocol and will be ignored. Please use the admin client or kafka-configs.sh to set the streams " + + "groups's standby replicas."))); + } + } + + @Test + public void shouldThrowConfigExceptionWhenStreamsProtocolUsedWithStaticMembership() { + final Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092"); + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); + props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "static-member-1"); Review Comment: Good point. Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org