dajac commented on code in PR #16845:
URL: https://github.com/apache/kafka/pull/16845#discussion_r1713707305


##########
tests/kafkatest/services/verifiable_consumer.py:
##########
@@ -176,6 +176,25 @@ def handle_partitions_assigned(self, event, node, logger):
         logger.debug("Partitions %s assigned to %s" % (assignment, 
node.account.hostname))
         self.assignment.extend(assignment)
 
+# This needs to be used for consumer protocol.
+class 
ConsumerProtocolConsumerEventHandler(IncrementalAssignmentConsumerEventHandler):
+    def __init__(self, node, verify_offsets, idx):
+        super().__init__(node, verify_offsets, idx)
+
+    def handle_partitions_revoked(self, event, node, logger):

Review Comment:
   I wonder if we should add a comment explaining why we don't transition to 
`ConsumerState.Rebalancing` in this event handler.



##########
tests/kafkatest/tests/client/consumer_test.py:
##########
@@ -416,8 +416,15 @@ def test_consumer_failure(self, clean_shutdown, 
enable_autocommit, metadata_quor
         consumer.start()
         self.await_all_members(consumer)
 
-        partition_owner = consumer.owner(partition)
-        assert partition_owner is not None
+        partition_owner_container = {partition: None}
+        def check_partition_owner(partition_owner_container):
+            partition_owner_container[partition] = consumer.owner(partition)
+            return partition_owner_container[partition] is not None
+
+        wait_until(lambda: check_partition_owner(partition_owner_container),
+                   timeout_sec=self.session_timeout_sec*2+5,
+                   err_msg="Timed out waiting for partition to be assigned.")
+        partition_owner = partition_owner_container[partition]

Review Comment:
   I wonder if it would be better to have an helper method like 
`await_all_members` which waits for all partitions to be assigned. This is also 
what I suggested in my previous comment so we could use the same. Then, we 
could keep the previous code which is simpler.



##########
tests/kafkatest/tests/client/consumer_test.py:
##########
@@ -123,9 +123,10 @@ def test_broker_rolling_bounce(self, 
metadata_quorum=quorum.zk, use_new_coordina
         #       nodes have time to expire
         self.rolling_bounce_brokers(consumer, clean_shutdown=True)
 
-        unexpected_rebalances = consumer.num_rebalances() - num_rebalances
-        assert unexpected_rebalances == 0, \
-            "Broker rolling bounce caused %d unexpected group rebalances" % 
unexpected_rebalances
+        if group_protocol == consumer_group.classic_group_protocol:
+            unexpected_rebalances = consumer.num_rebalances() - num_rebalances
+            assert unexpected_rebalances == 0, \
+                "Broker rolling bounce caused %d unexpected group rebalances" 
% unexpected_rebalances

Review Comment:
   > For new protocol consumer, if reconciliation happens during bouncing the 
brokers, num_rebalances will also be incremented. Currently there's no way to 
figure out whether a new rebalance is triggered or a reconciliation happens, so 
the assertion is skipped here.
   
   This is also true for the classic protocol. I suppose that the issue is that 
we don't know whether the consumer group is fully reconciled before 
`consumer.num_rebalances()` is captured. Is it correct?
   
   One alternative that we could consider would be to check that all partitions 
are assigned in the group before capturing `consumer.num_rebalances()`. This 
implies that the group is stable and the assertion should work for both 
protocols. Have you considered this? 
   
   Another alternative would be to check the group state via the command line 
tool. I suppose that it is harder.



##########
tests/kafkatest/services/verifiable_consumer.py:
##########
@@ -176,6 +176,25 @@ def handle_partitions_assigned(self, event, node, logger):
         logger.debug("Partitions %s assigned to %s" % (assignment, 
node.account.hostname))
         self.assignment.extend(assignment)
 
+# This needs to be used for consumer protocol.
+class 
ConsumerProtocolConsumerEventHandler(IncrementalAssignmentConsumerEventHandler):
+    def __init__(self, node, verify_offsets, idx):
+        super().__init__(node, verify_offsets, idx)
+
+    def handle_partitions_revoked(self, event, node, logger):
+        self.revoked_count += 1
+        self.position = {}
+        revoked = []
+
+        for topic_partition in event["partitions"]:
+            tp = _create_partition_from_dict(topic_partition)
+            # tp existing in self.assignment is not guaranteed in the new 
consumer
+            # if it shuts down when revoking partitions for reconciliation.

Review Comment:
   I am not sure to follow this comment. A partition cannot not be revoked if 
it was not assigned to start with. Have you see such failures? Could you 
elaborate a bit more on it please?



##########
tests/kafkatest/services/verifiable_consumer.py:
##########
@@ -245,13 +264,22 @@ def __init__(self, context, num_nodes, kafka, topic, 
group_id,
     def java_class_name(self):
         return "VerifiableConsumer"
 
+    def create_event_handler(self, idx, node):
+        if self.is_consumer_group_protocol_enabled():
+            return ConsumerProtocolConsumerEventHandler(node, 
self.verify_offsets, idx)
+        elif self.is_eager():
+            return ConsumerEventHandler(node, self.verify_offsets, idx)
+        else:
+            return IncrementalAssignmentConsumerEventHandler(node, 
self.verify_offsets, idx)
+
     def _worker(self, idx, node):
         with self.lock:
             if node not in self.event_handlers:
-                if self.is_eager():
-                    self.event_handlers[node] = ConsumerEventHandler(node, 
self.verify_offsets, idx)
-                else:
-                    self.event_handlers[node] = 
IncrementalAssignmentConsumerEventHandler(node, self.verify_offsets, idx)
+                self.event_handlers[node] = self.create_event_handler(idx, 
node)
+            else:
+                new_event_handler = self.create_event_handler(idx, node)
+                if self.event_handlers[node].__class__.__name__ != 
new_event_handler.__class__.__name__:
+                    self.event_handlers[node] = new_event_handler

Review Comment:
   I am still debating this change because I the semantic in inconsistent. We 
keep the state (e.g. consumed messages) when the consumer is restarted except 
if the protocol is changed. In this case, we reset the state. This may be a bit 
surprising for users of this class. I wonder if we should keep the state in all 
cases in order to be consistent. One way would be to always recreate the 
handler while copying over the state from the previous handler if one exists. 
Have you considered something like this?



##########
tests/kafkatest/tests/client/consumer_test.py:
##########
@@ -608,7 +617,7 @@ def test_valid_assignment(self, assignment_strategy=None, 
metadata_quorum=quorum
             consumer.start_node(node)
             self.await_members(consumer, num_started)
             wait_until(lambda: self.valid_assignment(self.TOPIC, 
self.NUM_PARTITIONS, consumer.current_assignment()),
-                timeout_sec=15,
+                timeout_sec=self.session_timeout_sec*2,

Review Comment:
   What the reasoning for using `self.session_timeout_sec*2`? Would using 30s 
also work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to