lianetm commented on code in PR #18737: URL: https://github.com/apache/kafka/pull/18737#discussion_r1952918176
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ########## @@ -818,6 +826,8 @@ void maybeReconcile() { return; } + if (autoCommitEnabled && !canCommit) return; Review Comment: I think we can improve this a bit more: if there are no partitions to revoke, we could carry on with this reconciliation really, meaning no delay reconciling newly added partitions (reconciled from the background poll as before, no need to wait for the app poll). So, I expect we just need to move this check (along with the markReconciliationInProg) to right before the log.info("Reconciling assignment with local epoch...")? and then we could check: ``` if (autoCommitEnabled && !revokedPartitions.isEmpty() && !canCommit) return; ``` would that work? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java: ########## @@ -1301,7 +1299,7 @@ public void testUnresolvedTargetAssignmentIsReconciledWhenMetadataReceived() { when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty()); - membershipManager.poll(time.milliseconds()); + membershipManager.maybeReconcile(true); Review Comment: ditto ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java: ########## @@ -881,7 +879,7 @@ public void testDelayedMetadataUsedToCompleteAssignment() { ); when(metadata.topicNames()).thenReturn(fullTopicMetadata); - membershipManager.poll(time.milliseconds()); + membershipManager.maybeReconcile(true); Review Comment: ditto ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java: ########## @@ -819,8 +818,7 @@ public void testDelayedReconciliationResultAppliedWhenTargetChangedWithNewAssign assertEquals(MemberState.RECONCILING, membershipManager.state()); clearInvocations(membershipManager, commitRequestManager); - // Next poll should trigger final reconciliation - membershipManager.poll(time.milliseconds()); + membershipManager.maybeReconcile(true); Review Comment: same here, if this test has no revocations (seems so?) ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java: ########## @@ -1249,7 +1247,7 @@ public void testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() { verifyReconciliationNotTriggered(membershipManager); - membershipManager.poll(time.milliseconds()); + membershipManager.maybeReconcile(true); Review Comment: ditto ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java: ########## @@ -2226,7 +2223,7 @@ public void testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() { receiveEmptyAssignment(membershipManager); verifyReconciliationNotTriggered(membershipManager); - membershipManager.poll(time.milliseconds()); + membershipManager.maybeReconcile(true); Review Comment: ditto ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java: ########## @@ -2655,7 +2652,7 @@ private ConsumerMembershipManager createMemberInStableState(String groupInstance when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty()); membershipManager.onHeartbeatSuccess(heartbeatResponse); assertEquals(MemberState.RECONCILING, membershipManager.state()); - membershipManager.poll(time.milliseconds()); + membershipManager.maybeReconcile(true); Review Comment: ditto ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java: ########## @@ -1403,7 +1401,7 @@ public void testReconciliationSkippedWhenSameAssignmentReceived() { receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager); verifyReconciliationNotTriggered(membershipManager); - membershipManager.poll(time.milliseconds()); + membershipManager.maybeReconcile(true); Review Comment: ditto? (not sure) ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java: ########## @@ -663,7 +663,7 @@ public void testSameAssignmentReconciledAgainWithMissingTopic() { // stay in RECONCILING state, since an unresolved topic is assigned membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1, membershipManager.memberId())); assertEquals(MemberState.RECONCILING, membershipManager.state()); - membershipManager.poll(time.milliseconds()); + membershipManager.maybeReconcile(true); Review Comment: helpful to see this passing with `maybeReconcile(false)` here, since this test doesn't seem to have any revocations right? (would be good test coverage to ensure we still reconcile in the same way, called from background poll, when it's only about adding partitions) ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java: ########## @@ -1594,7 +1591,7 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager); verifyReconciliationNotTriggered(membershipManager); - membershipManager.poll(time.milliseconds()); + membershipManager.maybeReconcile(true); Review Comment: I expect this one should be reconciled asap (param false), and the next one below, ln 1618, would need to wait to commit (param true) ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java: ########## @@ -1363,7 +1361,7 @@ public void testReconcileNewPartitionsAssignedWhenNoPartitionOwned() { receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager); verifyReconciliationNotTriggered(membershipManager); - membershipManager.poll(time.milliseconds()); + membershipManager.maybeReconcile(true); Review Comment: ditto ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java: ########## @@ -1207,7 +1205,7 @@ public void testNewAssignmentReplacesPreviousOneWaitingOnMetadata() { receiveAssignment(topicId, Collections.singletonList(0), membershipManager); verifyReconciliationNotTriggered(membershipManager); - membershipManager.poll(time.milliseconds()); + membershipManager.maybeReconcile(true); Review Comment: ditto ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java: ########## @@ -1546,8 +1544,7 @@ public void testMetadataUpdatesReconcilesUnresolvedAssignments() { String topicName = "topic1"; mockTopicNameInMetadataCache(Collections.singletonMap(topicId, topicName), true); - // When the next poll is run, the member should re-trigger reconciliation - membershipManager.poll(time.milliseconds()); + membershipManager.maybeReconcile(true); Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org