jsancio commented on code in PR #17146:
URL: https://github.com/apache/kafka/pull/17146#discussion_r1757400655
##########
tests/kafkatest/tests/core/quorum_reconfiguration_test.py:
##########
@@ -46,54 +50,67 @@ def setUp(self):
self.num_producers = 1
self.num_consumers = 1
- def perform_reconfig(self, active_controller_id, inactive_controller_id,
inactive_controller, broker_ids):
+ def perform_reconfig(self,
+ active_controller_id: int,
+ inactive_controller_id: int,
+ inactive_controller: ClusterNode,
+ broker_only_ids: List[int]):
+ """
+ Tests quorum reconfiguration by adding a second controller and then
removing the active controller.
+
+ :param active_controller_id: id of the active controller
+ :param inactive_controller_id: id of the inactive controller
+ :param inactive_controller: node object of the inactive controller
+ :param broker_only_ids: broker ids of nodes which have no controller
process
+ """
# Check describe quorum output shows the controller (first node) is
the leader and the only voter
- output = self.kafka.describe_quorum()
- assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
- assert_nodes_in_output(r"CurrentVoters:.*", output,
active_controller_id)
- assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
-
+ wait_until(lambda:
check_describe_quorum_output(self.kafka.describe_quorum(),
+ active_controller_id,
+ [active_controller_id],
+ broker_only_ids),
timeout_sec=5)
# Start second controller
self.kafka.controller_quorum.add_broker(inactive_controller)
- output = self.kafka.describe_quorum()
- assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
- assert_nodes_in_output(r"CurrentVoters:.*", output,
active_controller_id)
- assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids +
[inactive_controller_id])
-
+ wait_until(lambda:
check_describe_quorum_output(self.kafka.describe_quorum(),
+ active_controller_id,
+ [active_controller_id],
+ broker_only_ids +
[inactive_controller_id]), timeout_sec=5)
# Add controller to quorum
self.kafka.controller_quorum.add_controller(inactive_controller_id,
inactive_controller)
# Check describe quorum output shows both controllers are voters
- output = self.kafka.describe_quorum()
- assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
- assert_nodes_in_output(r"CurrentVoters:.*", output,
active_controller_id, inactive_controller_id)
- assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
-
+ wait_until(lambda:
check_describe_quorum_output(self.kafka.describe_quorum(),
+ active_controller_id,
+ [active_controller_id,
inactive_controller_id],
+ broker_only_ids),
timeout_sec=5)
# Remove leader from quorum
- voters = json_from_line(r"CurrentVoters:.*", output)
+ voters = json_from_line(r"CurrentVoters:.*",
self.kafka.describe_quorum())
directory_id = next(voter["directoryId"] for voter in voters if
voter["id"] == active_controller_id)
self.kafka.controller_quorum.remove_controller(active_controller_id,
directory_id)
-
- # Check describe quorum output to show second_controller is now the
leader
- output = self.kafka.describe_quorum()
- assert re.search(r"LeaderId:\s*" + str(inactive_controller_id), output)
- assert_nodes_in_output(r"CurrentVoters:.*", output,
inactive_controller_id)
- assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
-
+ # Check describe quorum output to show second_controller is now the
leader. Observers contain the old controller
+ wait_until(lambda:
check_describe_quorum_output(self.kafka.describe_quorum(),
+ inactive_controller_id,
+
[inactive_controller_id],
+ broker_only_ids),
timeout_sec=5)
Review Comment:
The reason why the leader is not in the observer is related to my other
comment. `check_describe_quorum_output` always returns true.
##########
tests/kafkatest/tests/core/quorum_reconfiguration_test.py:
##########
@@ -147,17 +170,31 @@ def test_isolated_mode_reconfig(self, metadata_quorum):
inactive_controller,
[self.kafka.idx(node) for node in self.kafka.nodes]))
-def assert_nodes_in_output(pattern, output, *node_ids):
+def assert_nodes_in_output(pattern: str, output: str, *node_ids: int):
nodes = json_from_line(pattern, output)
assert len(nodes) == len(node_ids)
for node in nodes:
assert node["id"] in node_ids
-def json_from_line(pattern, output):
+def check_describe_quorum_output(output: str, leader_id: int, voter_ids:
List[int], observer_ids: List[int]):
+ """
+ Check that the describe quorum output contains the expected leader,
voters, and observers
+ :param output: Describe quorum output
+ :param leader_id: Expected leader id
+ :param voter_ids: Expected voter ids
+ :param observer_ids: Expected observer ids
+ :return:
+ """
+ assert re.search(r"LeaderId:\s*" + str(leader_id), output)
+ assert_nodes_in_output(r"CurrentVoters:.*", output, *voter_ids)
+ assert_nodes_in_output(r"CurrentObservers:.*", output, *observer_ids)
Review Comment:
Do you want to change `assert_nodes_in_output` to `check_nodes_in_output`
which instead of `assert` returns a boolean. With that you can can change the
`return` statement to:
```python
return check_nodes_in_output(r"CurrentVoters:.*", output, *voter_ids) &&
check_nodes_in_output(r"CurrentObservers:.*", output,
*observer_ids)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]