dajac commented on code in PR #16845: URL: https://github.com/apache/kafka/pull/16845#discussion_r1713707305
########## tests/kafkatest/services/verifiable_consumer.py: ########## @@ -176,6 +176,25 @@ def handle_partitions_assigned(self, event, node, logger): logger.debug("Partitions %s assigned to %s" % (assignment, node.account.hostname)) self.assignment.extend(assignment) +# This needs to be used for consumer protocol. +class ConsumerProtocolConsumerEventHandler(IncrementalAssignmentConsumerEventHandler): + def __init__(self, node, verify_offsets, idx): + super().__init__(node, verify_offsets, idx) + + def handle_partitions_revoked(self, event, node, logger): Review Comment: I wonder if we should add a comment explaining why we don't transition to `ConsumerState.Rebalancing` in this event handler. ########## tests/kafkatest/tests/client/consumer_test.py: ########## @@ -416,8 +416,15 @@ def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quor consumer.start() self.await_all_members(consumer) - partition_owner = consumer.owner(partition) - assert partition_owner is not None + partition_owner_container = {partition: None} + def check_partition_owner(partition_owner_container): + partition_owner_container[partition] = consumer.owner(partition) + return partition_owner_container[partition] is not None + + wait_until(lambda: check_partition_owner(partition_owner_container), + timeout_sec=self.session_timeout_sec*2+5, + err_msg="Timed out waiting for partition to be assigned.") + partition_owner = partition_owner_container[partition] Review Comment: I wonder if it would be better to have an helper method like `await_all_members` which waits for all partitions to be assigned. This is also what I suggested in my previous comment so we could use the same. Then, we could keep the previous code which is simpler. ########## tests/kafkatest/tests/client/consumer_test.py: ########## @@ -123,9 +123,10 @@ def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, use_new_coordina # nodes have time to expire self.rolling_bounce_brokers(consumer, clean_shutdown=True) - unexpected_rebalances = consumer.num_rebalances() - num_rebalances - assert unexpected_rebalances == 0, \ - "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances + if group_protocol == consumer_group.classic_group_protocol: + unexpected_rebalances = consumer.num_rebalances() - num_rebalances + assert unexpected_rebalances == 0, \ + "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances Review Comment: > For new protocol consumer, if reconciliation happens during bouncing the brokers, num_rebalances will also be incremented. Currently there's no way to figure out whether a new rebalance is triggered or a reconciliation happens, so the assertion is skipped here. This is also true for the classic protocol. I suppose that the issue is that we don't know whether the consumer group is fully reconciled before `consumer.num_rebalances()` is captured. Is it correct? One alternative that we could consider would be to check that all partitions are assigned in the group before capturing `consumer.num_rebalances()`. This implies that the group is stable and the assertion should work for both protocols. Have you considered this? Another alternative would be to check the group state via the command line tool. I suppose that it is harder. ########## tests/kafkatest/services/verifiable_consumer.py: ########## @@ -176,6 +176,25 @@ def handle_partitions_assigned(self, event, node, logger): logger.debug("Partitions %s assigned to %s" % (assignment, node.account.hostname)) self.assignment.extend(assignment) +# This needs to be used for consumer protocol. +class ConsumerProtocolConsumerEventHandler(IncrementalAssignmentConsumerEventHandler): + def __init__(self, node, verify_offsets, idx): + super().__init__(node, verify_offsets, idx) + + def handle_partitions_revoked(self, event, node, logger): + self.revoked_count += 1 + self.position = {} + revoked = [] + + for topic_partition in event["partitions"]: + tp = _create_partition_from_dict(topic_partition) + # tp existing in self.assignment is not guaranteed in the new consumer + # if it shuts down when revoking partitions for reconciliation. Review Comment: I am not sure to follow this comment. A partition cannot not be revoked if it was not assigned to start with. Have you see such failures? Could you elaborate a bit more on it please? ########## tests/kafkatest/services/verifiable_consumer.py: ########## @@ -245,13 +264,22 @@ def __init__(self, context, num_nodes, kafka, topic, group_id, def java_class_name(self): return "VerifiableConsumer" + def create_event_handler(self, idx, node): + if self.is_consumer_group_protocol_enabled(): + return ConsumerProtocolConsumerEventHandler(node, self.verify_offsets, idx) + elif self.is_eager(): + return ConsumerEventHandler(node, self.verify_offsets, idx) + else: + return IncrementalAssignmentConsumerEventHandler(node, self.verify_offsets, idx) + def _worker(self, idx, node): with self.lock: if node not in self.event_handlers: - if self.is_eager(): - self.event_handlers[node] = ConsumerEventHandler(node, self.verify_offsets, idx) - else: - self.event_handlers[node] = IncrementalAssignmentConsumerEventHandler(node, self.verify_offsets, idx) + self.event_handlers[node] = self.create_event_handler(idx, node) + else: + new_event_handler = self.create_event_handler(idx, node) + if self.event_handlers[node].__class__.__name__ != new_event_handler.__class__.__name__: + self.event_handlers[node] = new_event_handler Review Comment: I am still debating this change because I the semantic in inconsistent. We keep the state (e.g. consumed messages) when the consumer is restarted except if the protocol is changed. In this case, we reset the state. This may be a bit surprising for users of this class. I wonder if we should keep the state in all cases in order to be consistent. One way would be to always recreate the handler while copying over the state from the previous handler if one exists. Have you considered something like this? ########## tests/kafkatest/tests/client/consumer_test.py: ########## @@ -608,7 +617,7 @@ def test_valid_assignment(self, assignment_strategy=None, metadata_quorum=quorum consumer.start_node(node) self.await_members(consumer, num_started) wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment()), - timeout_sec=15, + timeout_sec=self.session_timeout_sec*2, Review Comment: What the reasoning for using `self.session_timeout_sec*2`? Would using 30s also work? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org