lucasbru commented on code in PR #19700:
URL: https://github.com/apache/kafka/pull/19700#discussion_r2086136979


##########
tests/kafkatest/tests/streams/streams_eos_test.py:
##########
@@ -149,15 +160,26 @@ def stop_streams2(self, keep_alive_processor, 
processor_to_be_stopped):
 
     def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, 
processor_to_be_stopped):
         with 
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE)
 as monitor:
-            self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
-            self.wait_for_startup(monitor, keep_alive_processor1)
+            with 
keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE)
 as monitor:
+                self.stop_streams(processor_to_be_stopped)
+                self.wait_for_startup_in_classic(monitor, 
keep_alive_processor2)
+            self.wait_for_startup_in_classic(monitor, keep_alive_processor1)
 
     def abort_streams(self, keep_alive_processor1, keep_alive_processor2, 
processor_to_be_aborted):
         with 
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE)
 as monitor1:
             with 
keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE)
 as monitor2:
                 processor_to_be_aborted.stop_nodes(False)
-            self.wait_for_startup(monitor2, keep_alive_processor2)
-        self.wait_for_startup(monitor1, keep_alive_processor1)
+            self.wait_for_startup_in_classic(monitor2, keep_alive_processor2)
+        self.wait_for_startup_in_classic(monitor1, keep_alive_processor1)
+
+    def wait_for_startup_in_classic(self, monitor, processor):
+        if self.group_protocol == "classic":
+            self.wait_for(monitor, processor, "StateChange: REBALANCING -> 
RUNNING")
+        else:
+            # Above check is not valid for streams group protocol, since not 
all nodes take part in rebalances

Review Comment:
   Some of the validation inside the test does not make sense for KIP-1071.
   This is because in KIP-1071, if a member leaves or joins the group, not
   all members may enter a REBALANCING state. For streams groups, we only
   include a minor sleep. However, this simplification should not break
   the test - it can just lead to going too quickly through the various
   test states.
   
   Once [KAFKA-19271](https://issues.apache.org/jira/browse/KAFKA-19271) is
   implemented, we may change the test app to print a log line whenever
   the member epoch is bumped, which is the only way a member can
   "indirectly" observe that other members are rebalancing.
   
   This only affects the operations above where two members are being kept 
alive, one is being started / shut down. One member may not see a rebalance in 
that case. I checked, but it is not easily possible to express something like 
"one of the two should see a rebalance".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to