lucasbru commented on code in PR #15330:
URL: https://github.com/apache/kafka/pull/15330#discussion_r1507381845


##########
tests/kafkatest/tests/verifiable_consumer_test.py:
##########
@@ -72,16 +86,59 @@ def await_produced_messages(self, producer, 
min_messages=1000, timeout_sec=10):
                    err_msg="Timeout awaiting messages to be produced and 
acked")
 
     def await_consumed_messages(self, consumer, min_messages=1):
+        timeout_sec = self.consumption_timeout_sec
         current_total = consumer.total_consumed()
-        wait_until(lambda: consumer.total_consumed() >= current_total + 
min_messages,
-                   timeout_sec=self.consumption_timeout_sec,
-                   err_msg="Timed out waiting for consumption")
+        expected = current_total + min_messages
+
+        def _condition():
+            return consumer.total_consumed() >= expected
+
+        def _err_msg():
+            actual = consumer.total_consumed()
+            return "%d messages received within the timeout of %d seconds, 
expected %d" % (actual, timeout_sec, expected)
+
+        wait_until(lambda: _condition(), timeout_sec=timeout_sec, 
err_msg=_err_msg())
+
+    def await_members_stopped(self, consumer, num_consumers, timeout_sec):
+        self._await_members_in_state(consumer, num_consumers, "stopped", 
[ConsumerState.Dead], timeout_sec)
 
     def await_members(self, consumer, num_consumers):
         # Wait until all members have joined the group
-        wait_until(lambda: len(consumer.joined_nodes()) == num_consumers,
-                   timeout_sec=self.session_timeout_sec*2,
-                   err_msg="Consumers failed to join in a reasonable amount of 
time")
-        
+        states = [ConsumerState.Joined]
+
+        if 
consumer_group.is_consumer_group_protocol_enabled(consumer.group_protocol):
+            states.extend([ConsumerState.Started, ConsumerState.Rebalancing])

Review Comment:
   Why do we have to do this? Are we still checking the same thing if we 
consider "started" as "joined"?



##########
tests/kafkatest/tests/client/consumer_test.py:
##########
@@ -201,7 +200,7 @@ def test_consumer_bounce(self, clean_shutdown, bounce_mode, 
metadata_quorum=quor
         bounce_mode=["all", "rolling"],
         num_bounces=[5],
         metadata_quorum=[quorum.isolated_kraft],
-        use_new_coordinator=[True, False]
+        use_new_coordinator=[True]

Review Comment:
   Why no group protocol parameter here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to