ahuang98 commented on code in PR #17146:
URL: https://github.com/apache/kafka/pull/17146#discussion_r1776070346
##########
tests/kafkatest/tests/core/quorum_reconfiguration_test.py:
##########
@@ -46,54 +50,67 @@ def setUp(self):
self.num_producers = 1
self.num_consumers = 1
- def perform_reconfig(self, active_controller_id, inactive_controller_id,
inactive_controller, broker_ids):
+ def perform_reconfig(self,
+ active_controller_id: int,
+ inactive_controller_id: int,
+ inactive_controller: ClusterNode,
+ broker_only_ids: List[int]):
+ """
+ Tests quorum reconfiguration by adding a second controller and then
removing the active controller.
+
+ :param active_controller_id: id of the active controller
+ :param inactive_controller_id: id of the inactive controller
+ :param inactive_controller: node object of the inactive controller
+ :param broker_only_ids: broker ids of nodes which have no controller
process
+ """
# Check describe quorum output shows the controller (first node) is
the leader and the only voter
- output = self.kafka.describe_quorum()
- assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
- assert_nodes_in_output(r"CurrentVoters:.*", output,
active_controller_id)
- assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
-
+ wait_until(lambda:
check_describe_quorum_output(self.kafka.describe_quorum(),
+ active_controller_id,
+ [active_controller_id],
+ broker_only_ids),
timeout_sec=5)
# Start second controller
self.kafka.controller_quorum.add_broker(inactive_controller)
- output = self.kafka.describe_quorum()
- assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
- assert_nodes_in_output(r"CurrentVoters:.*", output,
active_controller_id)
- assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids +
[inactive_controller_id])
-
+ wait_until(lambda:
check_describe_quorum_output(self.kafka.describe_quorum(),
+ active_controller_id,
+ [active_controller_id],
+ broker_only_ids +
[inactive_controller_id]), timeout_sec=5)
# Add controller to quorum
self.kafka.controller_quorum.add_controller(inactive_controller_id,
inactive_controller)
# Check describe quorum output shows both controllers are voters
- output = self.kafka.describe_quorum()
- assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
- assert_nodes_in_output(r"CurrentVoters:.*", output,
active_controller_id, inactive_controller_id)
- assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
-
+ wait_until(lambda:
check_describe_quorum_output(self.kafka.describe_quorum(),
+ active_controller_id,
+ [active_controller_id,
inactive_controller_id],
+ broker_only_ids),
timeout_sec=5)
# Remove leader from quorum
- voters = json_from_line(r"CurrentVoters:.*", output)
+ voters = json_from_line(r"CurrentVoters:.*",
self.kafka.describe_quorum())
directory_id = next(voter["directoryId"] for voter in voters if
voter["id"] == active_controller_id)
self.kafka.controller_quorum.remove_controller(active_controller_id,
directory_id)
-
- # Check describe quorum output to show second_controller is now the
leader
- output = self.kafka.describe_quorum()
- assert re.search(r"LeaderId:\s*" + str(inactive_controller_id), output)
- assert_nodes_in_output(r"CurrentVoters:.*", output,
inactive_controller_id)
- assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
-
+ # Describe quorum output shows the second controller is now leader,
old controller is an observer
+ wait_until(lambda:
check_describe_quorum_output(self.kafka.describe_quorum(),
+ inactive_controller_id,
+
[inactive_controller_id],
+ broker_only_ids +
[active_controller_id]), timeout_sec=30)
Review Comment:
timeout here can be reduced to 5, will change with the next review
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]