apoorvmittal10 commented on code in PR #20286:
URL: https://github.com/apache/kafka/pull/20286#discussion_r2248199447


##########
server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java:
##########
@@ -190,13 +224,22 @@ public InFlightState startStateTransition(RecordState 
newState, DeliveryCountOps
      * @param commit If true, commits the state transition, otherwise rolls 
back.
      */
     public void completeStateTransition(boolean commit) {
-        if (commit) {
+        if (commit || isTerminalState()) {
+            // Cancel the acquisition lock timeout task for the state since it 
is acknowledged/released successfully.
+            cancelAndClearAcquisitionLockTimeoutTask();
             rollbackState = null;
             return;
         }
-        state = rollbackState.state;
-        deliveryCount = rollbackState.deliveryCount;
-        memberId = rollbackState.memberId;
+        // Check is acquisition lock timeout task is expired then mark the 
message as Available.
+        if (acquisitionLockTimeoutTask != null && 
acquisitionLockTimeoutTask.hasExpired()) {
+            state = RecordState.AVAILABLE;

Review Comment:
   So this is not required as per the current implementation. Currently the 
acquisition lock timeout do not bump the delivery count which means the records 
should not be marked as archived when lock timeout occurs. Hence this change is 
not required.
   
   Having said that, I think we have an incorrect behaviour today i.e. we bump 
the delivery count on acquisition rather we should do that on acquisition lock 
timeout and release. Currently, say max delivery is 5 and the record is 
released all times then 5th time record will be acquired to send back in 
response but the state will be marked directly as archived. Which is incorrect. 
Then acknowledgement for the respective offset/batch will fail, if timeout 
occurs then it will result in "offset not found". I am plannign to correct the 
whole behaviour in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to