lianetm commented on code in PR #15579:
URL: https://github.com/apache/kafka/pull/15579#discussion_r1536040639


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1010,49 +1015,55 @@ private void revokeAndAssign(LocalAssignment 
resolvedAssignment,
         // and assignment, executed sequentially).
         CompletableFuture<Void> reconciliationResult =
             revocationResult.thenCompose(__ -> {
-                boolean memberHasRejoined = memberEpochOnReconciliationStart 
!= memberEpoch;
-                if (state == MemberState.RECONCILING && !memberHasRejoined) {
+                if (!maybeAbortReconciliation()) {
                     // Apply assignment
                     return assignPartitions(assignedTopicIdPartitions, 
addedPartitions);
                 } else {
-                    log.debug("Revocation callback completed but the member 
already " +
-                        "transitioned out of the reconciling state for epoch 
{} into " +
-                        "{} state with epoch {}. Interrupting reconciliation 
as it's " +
-                        "not relevant anymore,", 
memberEpochOnReconciliationStart, state, memberEpoch);
-                    String reason = interruptedReconciliationErrorMessage();
                     CompletableFuture<Void> res = new CompletableFuture<>();
                     res.completeExceptionally(new KafkaException("Interrupting 
reconciliation" +
-                        " after revocation. " + reason));
+                        " after revocation. " + 
interruptedReconciliationReason()));
                     return res;
                 }
             });
 
         reconciliationResult.whenComplete((result, error) -> {
-            markReconciliationCompleted();
             if (error != null) {
                 // Leaving member in RECONCILING state after callbacks fail. 
The member
                 // won't send the ack, and the expectation is that the broker 
will kick the
                 // member out of the group after the rebalance timeout 
expires, leading to a
                 // RECONCILING -> FENCED transition.
                 log.error("Reconciliation failed.", error);
             } else {
-                if (state == MemberState.RECONCILING) {
+                if (!maybeAbortReconciliation()) {
                     currentAssignment = resolvedAssignment;
 
                     // Reschedule the auto commit starting from now that the 
member has a new assignment.
                     commitRequestManager.resetAutoCommitTimer();
 
                     // Make assignment effective on the broker by 
transitioning to send acknowledge.
                     transitionTo(MemberState.ACKNOWLEDGING);
-                } else {
-                    String reason = interruptedReconciliationErrorMessage();
-                    log.error("Interrupting reconciliation after partitions 
assigned callback " +
-                        "completed. " + reason);
                 }
             }
+            markReconciliationCompleted();
         });
     }
 
+    /**
+     * @return True if the reconciliation in progress should not continue. 
This could be because
+     * the member is not in RECONCILING state anymore (member failed or is 
leaving the group), or
+     * if it has rejoined the group (note that after rejoining the member 
could be RECONCILING
+     * again, so checking the state is not enough)
+     */
+    boolean maybeAbortReconciliation() {
+        boolean shouldAbort = state != MemberState.RECONCILING || 
rejoinedWhileReconciliationInProgress;
+        if (shouldAbort) {
+            String reason = interruptedReconciliationReason();
+            log.error("Interrupting reconciliation because " + reason);
+            markReconciliationCompleted();

Review Comment:
   We couldn't have that case of "new reconciliation triggered while another 
one executing callbacks". If there is a reconciliation that goes into the user 
space to run callbacks, the background still has the flag 
`reconciliationInProgress` true, so even in the case that the member gets a new 
assignment from the broker, all it does is to update the 
`currentTargetAssignment`. On every poll, the attempt to trigger a new 
reconciliation for that new target received will be ignored 
([here](https://github.com/apache/kafka/blob/2e8d69b78ca52196decd851c8520798aa856c073/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L902))
 until the ongoing reconciliation completes. Makes sense? 
   [This 
test](https://github.com/apache/kafka/blob/2e8d69b78ca52196decd851c8520798aa856c073/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java#L664)
 shows this path (using commit though), but basically showing one 
reconciliation triggered at a time, keeping target updated as it's 
received/discovered, and reconciled when the initial reconciliation completes.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1010,49 +1015,55 @@ private void revokeAndAssign(LocalAssignment 
resolvedAssignment,
         // and assignment, executed sequentially).
         CompletableFuture<Void> reconciliationResult =
             revocationResult.thenCompose(__ -> {
-                boolean memberHasRejoined = memberEpochOnReconciliationStart 
!= memberEpoch;
-                if (state == MemberState.RECONCILING && !memberHasRejoined) {
+                if (!maybeAbortReconciliation()) {
                     // Apply assignment
                     return assignPartitions(assignedTopicIdPartitions, 
addedPartitions);
                 } else {
-                    log.debug("Revocation callback completed but the member 
already " +
-                        "transitioned out of the reconciling state for epoch 
{} into " +
-                        "{} state with epoch {}. Interrupting reconciliation 
as it's " +
-                        "not relevant anymore,", 
memberEpochOnReconciliationStart, state, memberEpoch);
-                    String reason = interruptedReconciliationErrorMessage();
                     CompletableFuture<Void> res = new CompletableFuture<>();
                     res.completeExceptionally(new KafkaException("Interrupting 
reconciliation" +
-                        " after revocation. " + reason));
+                        " after revocation. " + 
interruptedReconciliationReason()));
                     return res;
                 }
             });
 
         reconciliationResult.whenComplete((result, error) -> {
-            markReconciliationCompleted();
             if (error != null) {
                 // Leaving member in RECONCILING state after callbacks fail. 
The member
                 // won't send the ack, and the expectation is that the broker 
will kick the
                 // member out of the group after the rebalance timeout 
expires, leading to a
                 // RECONCILING -> FENCED transition.
                 log.error("Reconciliation failed.", error);
             } else {
-                if (state == MemberState.RECONCILING) {
+                if (!maybeAbortReconciliation()) {
                     currentAssignment = resolvedAssignment;
 
                     // Reschedule the auto commit starting from now that the 
member has a new assignment.
                     commitRequestManager.resetAutoCommitTimer();
 
                     // Make assignment effective on the broker by 
transitioning to send acknowledge.
                     transitionTo(MemberState.ACKNOWLEDGING);
-                } else {
-                    String reason = interruptedReconciliationErrorMessage();
-                    log.error("Interrupting reconciliation after partitions 
assigned callback " +
-                        "completed. " + reason);
                 }
             }
+            markReconciliationCompleted();
         });
     }
 
+    /**
+     * @return True if the reconciliation in progress should not continue. 
This could be because
+     * the member is not in RECONCILING state anymore (member failed or is 
leaving the group), or
+     * if it has rejoined the group (note that after rejoining the member 
could be RECONCILING
+     * again, so checking the state is not enough)
+     */
+    boolean maybeAbortReconciliation() {
+        boolean shouldAbort = state != MemberState.RECONCILING || 
rejoinedWhileReconciliationInProgress;
+        if (shouldAbort) {
+            String reason = interruptedReconciliationReason();
+            log.error("Interrupting reconciliation because " + reason);
+            markReconciliationCompleted();

Review Comment:
   We couldn't have that case of "new reconciliation triggered while another 
one executing callbacks". If there is a reconciliation that goes into the user 
space to run callbacks, the background still has the flag 
`reconciliationInProgress` true, so even in the case that the member gets a new 
assignment from the broker, all it does is to update the 
`currentTargetAssignment`. On every poll, the attempt to trigger a new 
reconciliation for that new target received will be ignored 
([here](https://github.com/apache/kafka/blob/2e8d69b78ca52196decd851c8520798aa856c073/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L902))
 until the ongoing reconciliation completes. Makes sense? 
   
   [This 
test](https://github.com/apache/kafka/blob/2e8d69b78ca52196decd851c8520798aa856c073/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java#L664)
 shows this path (using commit though), but basically showing one 
reconciliation triggered at a time, keeping target updated as it's 
received/discovered, and reconciled when the initial reconciliation completes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to