lucasbru commented on code in PR #19700: URL: https://github.com/apache/kafka/pull/19700#discussion_r2086136979
########## tests/kafkatest/tests/streams/streams_eos_test.py: ########## @@ -149,15 +160,26 @@ def stop_streams2(self, keep_alive_processor, processor_to_be_stopped): def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_stopped): with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor: - self.stop_streams2(keep_alive_processor2, processor_to_be_stopped) - self.wait_for_startup(monitor, keep_alive_processor1) + with keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE) as monitor: + self.stop_streams(processor_to_be_stopped) + self.wait_for_startup_in_classic(monitor, keep_alive_processor2) + self.wait_for_startup_in_classic(monitor, keep_alive_processor1) def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_aborted): with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor1: with keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE) as monitor2: processor_to_be_aborted.stop_nodes(False) - self.wait_for_startup(monitor2, keep_alive_processor2) - self.wait_for_startup(monitor1, keep_alive_processor1) + self.wait_for_startup_in_classic(monitor2, keep_alive_processor2) + self.wait_for_startup_in_classic(monitor1, keep_alive_processor1) + + def wait_for_startup_in_classic(self, monitor, processor): + if self.group_protocol == "classic": + self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING") + else: + # Above check is not valid for streams group protocol, since not all nodes take part in rebalances Review Comment: Some of the validation inside the test does not make sense for KIP-1071. This is because in KIP-1071, if a member leaves or joins the group, not all members may enter a REBALANCING state. For streams groups, we only include a minor sleep. However, this simplification should not break the test - it can just lead to going too quickly through the various test states. Once [KAFKA-19271](https://issues.apache.org/jira/browse/KAFKA-19271) is implemented, we may change the test app to print a log line whenever the member epoch is bumped, which is the only way a member can "indirectly" observe that other members are rebalancing. This only affects the operations above where two members are being kept alive, one is being started / shut down. One member may not see a rebalance in that case. I checked, but it is not easily possible to express something like "one of the two should see a rebalance". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org