Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lianetm commented on PR #15738: URL: https://github.com/apache/kafka/pull/15738#issuecomment-2066577956 Hey @lucasbru , answering your questions : the new behaviour of the static membership regarding a member that joins with dup group instance Id is documented in [this](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-StaticMembership(KIP-345)) section of the KIP. We've also discussed it with @dajac and other client teams (librd), seeing the improvement the new protocol bring in this area (mainly in cases of conflicting members, that continuously kick each other out with the classic protocol) Your question does makes me notice that, even though the KIP describes how conflicting static members behave in the new protocol, it would probably be helpful extend that explanation to point out the fundamental difference it has with the legacy protocol and how it is an improved approach. (I can't find that explained in the KIP. If I'm not missing it, it would probably be a good update to the KIP [static membership section](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-StaticMembership(KIP-345)) @dajac ?) Thanks @lucasbru ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lucasbru commented on PR #15738: URL: https://github.com/apache/kafka/pull/15738#issuecomment-2066195894 LGTM, thanks! Merging this, however: * Have we discussed the behavioral difference with broker team / David? * Have we documented the behavioral difference anywhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lucasbru merged PR #15738: URL: https://github.com/apache/kafka/pull/15738 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lianetm commented on code in PR #15738: URL: https://github.com/apache/kafka/pull/15738#discussion_r1570773742 ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -348,26 +348,45 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me consumer.start() self.await_members(consumer, len(consumer.nodes)) +num_rebalances = consumer.num_rebalances() conflict_consumer.start() -self.await_members(conflict_consumer, num_conflict_consumers) -self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers) +if group_protocol == consumer_group.classic_group_protocol: +# Classic protocol: conflicting members should join, and the intial ones with conflicting instance id should fail. +self.await_members(conflict_consumer, num_conflict_consumers) +self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers) -wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers, +wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers, timeout_sec=10, err_msg="Timed out waiting for the fenced consumers to stop") +else: +# Consumer protocol: Existing members should remain active and new conflicting ones should not be able to join. +self.await_consumed_messages(consumer) +assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance" +assert len(consumer.joined_nodes()) == len(consumer.nodes) +assert len(conflict_consumer.joined_nodes()) == 0 + +# Stop existing nodes, so conflicting ones should be able to join. +consumer.stop_all() +wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes), + timeout_sec=self.session_timeout_sec+5, + err_msg="Timed out waiting for the consumer to shutdown") +conflict_consumer.start() +self.await_members(conflict_consumer, num_conflict_consumers) + + else: consumer.start() conflict_consumer.start() wait_until(lambda: len(consumer.joined_nodes()) + len(conflict_consumer.joined_nodes()) == len(consumer.nodes), - timeout_sec=self.session_timeout_sec, - err_msg="Timed out waiting for consumers to join, expected total %d joined, but only see %d joined from" + timeout_sec=self.session_timeout_sec*2, Review Comment: I added this to help a bit with the flaky behaviour, making it also consistent to how we wait for members in all other tests that rely on the [await_members](https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/tests/kafkatest/tests/verifiable_consumer_test.py#L84). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lianetm commented on PR #15738: URL: https://github.com/apache/kafka/pull/15738#issuecomment-2063881962 Hey @lucasbru, yes, I had closed it just to investigate a bit more about some failures that I noticed, but ended up getting only to flaky behaviour not related to the changes in this PR, so reopening now for review. This PR is fixing a known change in the static membership behaviour between the protocols (it was failing consistently, as expected). I still see flakiness in the test, but not in the path that this PR is fixing, and happening with the legacy and new protocol, so I would say I track that separately, since this fix is already a good improvement to the current test situation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lucasbru commented on PR #15738: URL: https://github.com/apache/kafka/pull/15738#issuecomment-2060943337 @lianetm did you mean to closet the PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lianetm closed pull request #15738: KAFKA-16566: Fix consumer static membership system test with new protocol URL: https://github.com/apache/kafka/pull/15738 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lianetm commented on PR #15738: URL: https://github.com/apache/kafka/pull/15738#issuecomment-2060248860 FYI, I've been getting successful runs with this change: running: `TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_fencing_static_consumer" bash tests/docker/run_tests.sh` results: > > SESSION REPORT (ALL TESTS) > ducktape version: 0.11.4 > session_id: 2024-04-16--016 > run time: 27 minutes 15.117 seconds > tests run:16 > passed: 16 > flaky:0 > failed: 0 > ignored: 0 > But I still got less frequent failures after trying it several times, so I'm afraid there may be something else (maybe related to sessions remaining open between test executions happening too close to each other?). If that's the case, I'll probably be adding another change here. I'll dig into it and update here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lianetm commented on code in PR #15738: URL: https://github.com/apache/kafka/pull/15738#discussion_r1568061403 ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -348,13 +348,32 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me consumer.start() self.await_members(consumer, len(consumer.nodes)) +num_rebalances = consumer.num_rebalances() conflict_consumer.start() -self.await_members(conflict_consumer, num_conflict_consumers) -self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers) +if group_protocol == consumer_group.classic_group_protocol: +# Classic protocol: conflicting members should join, and the intial ones with conflicting instance id should fail. +self.await_members(conflict_consumer, num_conflict_consumers) +self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers) -wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers, +wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers, timeout_sec=10, err_msg="Timed out waiting for the fenced consumers to stop") +else: +# Consumer protocol: Existing members should remain active and new conflicting ones should not be able to join. +self.await_consumed_messages(consumer) +assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance" +assert len(consumer.joined_nodes()) == len(consumer.nodes) +assert len(conflict_consumer.joined_nodes()) == 0 Review Comment: There should be no timing issues as I see it. For the `consumer.joined_nodes` there is a previous `self.await_members`, that ensures that we wait for the time needed for all the nodes to join. As for the `conflict_consumer.joined_nodes()`, its for nodes that never joined, we're just asserting that after the non-conflicting remained without rebalance, consuming (ensuring activity), the conflicting ones did not join. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
kirktrue commented on code in PR #15738: URL: https://github.com/apache/kafka/pull/15738#discussion_r156796 ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -348,13 +348,32 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me consumer.start() self.await_members(consumer, len(consumer.nodes)) +num_rebalances = consumer.num_rebalances() conflict_consumer.start() -self.await_members(conflict_consumer, num_conflict_consumers) -self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers) +if group_protocol == consumer_group.classic_group_protocol: +# Classic protocol: conflicting members should join, and the intial ones with conflicting instance id should fail. +self.await_members(conflict_consumer, num_conflict_consumers) +self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers) -wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers, +wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers, timeout_sec=10, err_msg="Timed out waiting for the fenced consumers to stop") +else: +# Consumer protocol: Existing members should remain active and new conflicting ones should not be able to join. +self.await_consumed_messages(consumer) +assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance" +assert len(consumer.joined_nodes()) == len(consumer.nodes) +assert len(conflict_consumer.joined_nodes()) == 0 Review Comment: Do we anticipate any timing issues here? That is, will `num_rebalances()` and `joined_nodes()` be "guaranteed" to return the correct values immediately after the call to `await_consumed_messages()` is finished? Or do we want to wrap those assertions as `wait_until()`s to give them a few seconds to coalesce to the correct value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lianetm commented on PR #15738: URL: https://github.com/apache/kafka/pull/15738#issuecomment-2059888686 Hey @lucasbru, could you take a look at this test fix? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org