lucasbru commented on code in PR #15579:
URL: https://github.com/apache/kafka/pull/15579#discussion_r1535558299


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1010,49 +1015,55 @@ private void revokeAndAssign(LocalAssignment 
resolvedAssignment,
         // and assignment, executed sequentially).
         CompletableFuture<Void> reconciliationResult =
             revocationResult.thenCompose(__ -> {
-                boolean memberHasRejoined = memberEpochOnReconciliationStart 
!= memberEpoch;
-                if (state == MemberState.RECONCILING && !memberHasRejoined) {
+                if (!maybeAbortReconciliation()) {
                     // Apply assignment
                     return assignPartitions(assignedTopicIdPartitions, 
addedPartitions);
                 } else {
-                    log.debug("Revocation callback completed but the member 
already " +
-                        "transitioned out of the reconciling state for epoch 
{} into " +
-                        "{} state with epoch {}. Interrupting reconciliation 
as it's " +
-                        "not relevant anymore,", 
memberEpochOnReconciliationStart, state, memberEpoch);
-                    String reason = interruptedReconciliationErrorMessage();
                     CompletableFuture<Void> res = new CompletableFuture<>();
                     res.completeExceptionally(new KafkaException("Interrupting 
reconciliation" +
-                        " after revocation. " + reason));
+                        " after revocation. " + 
interruptedReconciliationReason()));
                     return res;
                 }
             });
 
         reconciliationResult.whenComplete((result, error) -> {
-            markReconciliationCompleted();
             if (error != null) {
                 // Leaving member in RECONCILING state after callbacks fail. 
The member
                 // won't send the ack, and the expectation is that the broker 
will kick the
                 // member out of the group after the rebalance timeout 
expires, leading to a
                 // RECONCILING -> FENCED transition.
                 log.error("Reconciliation failed.", error);
             } else {
-                if (state == MemberState.RECONCILING) {
+                if (!maybeAbortReconciliation()) {
                     currentAssignment = resolvedAssignment;
 
                     // Reschedule the auto commit starting from now that the 
member has a new assignment.
                     commitRequestManager.resetAutoCommitTimer();
 
                     // Make assignment effective on the broker by 
transitioning to send acknowledge.
                     transitionTo(MemberState.ACKNOWLEDGING);
-                } else {
-                    String reason = interruptedReconciliationErrorMessage();
-                    log.error("Interrupting reconciliation after partitions 
assigned callback " +
-                        "completed. " + reason);
                 }
             }
+            markReconciliationCompleted();
         });
     }
 
+    /**
+     * @return True if the reconciliation in progress should not continue. 
This could be because
+     * the member is not in RECONCILING state anymore (member failed or is 
leaving the group), or
+     * if it has rejoined the group (note that after rejoining the member 
could be RECONCILING
+     * again, so checking the state is not enough)
+     */
+    boolean maybeAbortReconciliation() {
+        boolean shouldAbort = state != MemberState.RECONCILING || 
rejoinedWhileReconciliationInProgress;
+        if (shouldAbort) {
+            String reason = interruptedReconciliationReason();
+            log.error("Interrupting reconciliation because " + reason);

Review Comment:
   I'd say that is at most a `warn` log, possibly `info`, right? Because 
entering this code path is completely fine and expected, and should be handled 
correctly by the consumer.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1010,49 +1015,55 @@ private void revokeAndAssign(LocalAssignment 
resolvedAssignment,
         // and assignment, executed sequentially).
         CompletableFuture<Void> reconciliationResult =
             revocationResult.thenCompose(__ -> {
-                boolean memberHasRejoined = memberEpochOnReconciliationStart 
!= memberEpoch;
-                if (state == MemberState.RECONCILING && !memberHasRejoined) {
+                if (!maybeAbortReconciliation()) {
                     // Apply assignment
                     return assignPartitions(assignedTopicIdPartitions, 
addedPartitions);
                 } else {
-                    log.debug("Revocation callback completed but the member 
already " +
-                        "transitioned out of the reconciling state for epoch 
{} into " +
-                        "{} state with epoch {}. Interrupting reconciliation 
as it's " +
-                        "not relevant anymore,", 
memberEpochOnReconciliationStart, state, memberEpoch);
-                    String reason = interruptedReconciliationErrorMessage();
                     CompletableFuture<Void> res = new CompletableFuture<>();
                     res.completeExceptionally(new KafkaException("Interrupting 
reconciliation" +

Review Comment:
   Do we want to actually fail the future here? We will repeat the exception 
with `Reconciliation failed` at ERROR level 5 lines below. It would be good to 
separate the "error" path from the "aborted, but that's fine" path.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -228,11 +228,10 @@ public class MembershipManagerImpl implements 
MembershipManager {
     private boolean reconciliationInProgress;
 
     /**
-     * Epoch the member had when the reconciliation in progress started. This 
is used to identify if
-     * the member has rejoined while it was reconciling an assignment (in 
which case the result
-     * of the reconciliation is not applied.)
+     * True if a reconciliation is in progress and the member rejoins the 
group. Used to know

Review Comment:
   ```suggestion
        * True if a reconciliation is in progress and the member rejoined the 
group since the start of the reconciliaton. Used to know
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1070,29 +1081,27 @@ private SortedSet<TopicPartition> 
toTopicPartitionSet(SortedSet<TopicIdPartition
     /**
      * @return Reason for interrupting a reconciliation progress when 
callbacks complete.
      */
-    private String interruptedReconciliationErrorMessage() {
-        String reason;
-        if (state != MemberState.RECONCILING) {
-            reason = "The member already transitioned out of the reconciling 
state into " + state;
-        } else {
-            reason = "The member has re-joined the group.";
+    private String interruptedReconciliationReason() {

Review Comment:
   nit: if we get rid of the exception (see comment above), we may want to just 
inline this function.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1010,49 +1015,55 @@ private void revokeAndAssign(LocalAssignment 
resolvedAssignment,
         // and assignment, executed sequentially).
         CompletableFuture<Void> reconciliationResult =
             revocationResult.thenCompose(__ -> {
-                boolean memberHasRejoined = memberEpochOnReconciliationStart 
!= memberEpoch;
-                if (state == MemberState.RECONCILING && !memberHasRejoined) {
+                if (!maybeAbortReconciliation()) {
                     // Apply assignment
                     return assignPartitions(assignedTopicIdPartitions, 
addedPartitions);
                 } else {
-                    log.debug("Revocation callback completed but the member 
already " +
-                        "transitioned out of the reconciling state for epoch 
{} into " +
-                        "{} state with epoch {}. Interrupting reconciliation 
as it's " +
-                        "not relevant anymore,", 
memberEpochOnReconciliationStart, state, memberEpoch);
-                    String reason = interruptedReconciliationErrorMessage();
                     CompletableFuture<Void> res = new CompletableFuture<>();
                     res.completeExceptionally(new KafkaException("Interrupting 
reconciliation" +
-                        " after revocation. " + reason));
+                        " after revocation. " + 
interruptedReconciliationReason()));
                     return res;
                 }
             });
 
         reconciliationResult.whenComplete((result, error) -> {
-            markReconciliationCompleted();
             if (error != null) {
                 // Leaving member in RECONCILING state after callbacks fail. 
The member
                 // won't send the ack, and the expectation is that the broker 
will kick the
                 // member out of the group after the rebalance timeout 
expires, leading to a
                 // RECONCILING -> FENCED transition.
                 log.error("Reconciliation failed.", error);
             } else {
-                if (state == MemberState.RECONCILING) {
+                if (!maybeAbortReconciliation()) {
                     currentAssignment = resolvedAssignment;
 
                     // Reschedule the auto commit starting from now that the 
member has a new assignment.
                     commitRequestManager.resetAutoCommitTimer();
 
                     // Make assignment effective on the broker by 
transitioning to send acknowledge.
                     transitionTo(MemberState.ACKNOWLEDGING);
-                } else {
-                    String reason = interruptedReconciliationErrorMessage();
-                    log.error("Interrupting reconciliation after partitions 
assigned callback " +
-                        "completed. " + reason);
                 }
             }
+            markReconciliationCompleted();
         });
     }
 
+    /**
+     * @return True if the reconciliation in progress should not continue. 
This could be because
+     * the member is not in RECONCILING state anymore (member failed or is 
leaving the group), or
+     * if it has rejoined the group (note that after rejoining the member 
could be RECONCILING
+     * again, so checking the state is not enough)
+     */
+    boolean maybeAbortReconciliation() {
+        boolean shouldAbort = state != MemberState.RECONCILING || 
rejoinedWhileReconciliationInProgress;
+        if (shouldAbort) {
+            String reason = interruptedReconciliationReason();
+            log.error("Interrupting reconciliation because " + reason);
+            markReconciliationCompleted();

Review Comment:
   In the code path where reconciliation is aborted after assignment is 
completed, `markReconciliationCompleted` will be called twice. Can we avoid it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to