lucasbru commented on code in PR #21495:
URL: https://github.com/apache/kafka/pull/21495#discussion_r2821297885
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java:
##########
@@ -462,6 +462,92 @@ public void
testRe2JPatternSubscriptionAndTopicSubscription() throws Interrupted
}
+ /**
+ * Verifies that with async reconciliation, the assignment does not change
+ * in the background without a call to poll(). Validating fix in
KAFKA-20106.
+ *
+ * When a new topic matching the subscribed pattern is created, the
reconciliation may
+ * complete in the background, but the assignment should only be updated
on the next poll().
+ * Without the fix, this test will fail because the assignment will change
without polling.
+ *
+ * Note: For this bug to manifest, two conditions must be met:
+ * 1. Auto-commit must be disabled: if auto-commit is enabled the full
reconciliation
+ * that updates assignment is only triggered from consumer.poll
+ * 2. No ConsumerRebalanceListener callbacks - if callbacks are provided
they run within
+ * a call to consumer.poll, so the assignment update requires a call to
consumer.poll
+ */
+ @ClusterTest
+ public void testAsyncConsumerAssignmentDoesNotChangeWithoutPoll() throws
InterruptedException {
Review Comment:
If this is difficult to test, I think it would also be okay to omit the
integration test for this one.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -817,11 +817,13 @@ private void transitionToStale() {
* - There are topics that haven't been added to the current assignment
yet, but all their topic IDs
* are missing from the target assignment.
*
- * @param canCommit Controls whether reconciliation can proceed when
auto-commit is enabled.
- * Set to true only when the current offset positions are
safe to commit.
- * If false and auto-commit enabled, the reconciliation
will be skipped.
+ * @param invokedByPoll True if this reconciliation attempt is triggered
by the application thread on consumer.poll().
+ * False if this is triggered by the background
thread on regular manager poll.
+ * In both cases we want to resolve metadata to
unresolved assignments,
+ * but the actual reconciliation (commit, callbacks,
assignment updates)
+ * will only proceed if this is triggered from the
application thread on consumer.poll
*/
- public void maybeReconcile(boolean canCommit) {
+ public void maybeReconcile(boolean invokedByPoll) {
Review Comment:
Can we port the same change to StreamsMembershipManager?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -817,11 +817,13 @@ private void transitionToStale() {
* - There are topics that haven't been added to the current assignment
yet, but all their topic IDs
* are missing from the target assignment.
*
- * @param canCommit Controls whether reconciliation can proceed when
auto-commit is enabled.
- * Set to true only when the current offset positions are
safe to commit.
- * If false and auto-commit enabled, the reconciliation
will be skipped.
+ * @param invokedByPoll True if this reconciliation attempt is triggered
by the application thread on consumer.poll().
+ * False if this is triggered by the background
thread on regular manager poll.
+ * In both cases we want to resolve metadata to
unresolved assignments,
+ * but the actual reconciliation (commit, callbacks,
assignment updates)
+ * will only proceed if this is triggered from the
application thread on consumer.poll
*/
- public void maybeReconcile(boolean canCommit) {
+ public void maybeReconcile(boolean invokedByPoll) {
Review Comment:
If we apply this fix in KIP-1071 as well, I think we need to make sure that
`streamsGroupMembershipRequestManager.maybeReconcile(true)` is called in the
handling of `AsyncPollEvent`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]