apoorvmittal10 commented on code in PR #20286:
URL: https://github.com/apache/kafka/pull/20286#discussion_r2248199447
##########
server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java:
##########
@@ -190,13 +224,22 @@ public InFlightState startStateTransition(RecordState
newState, DeliveryCountOps
* @param commit If true, commits the state transition, otherwise rolls
back.
*/
public void completeStateTransition(boolean commit) {
- if (commit) {
+ if (commit || isTerminalState()) {
+ // Cancel the acquisition lock timeout task for the state since it
is acknowledged/released successfully.
+ cancelAndClearAcquisitionLockTimeoutTask();
rollbackState = null;
return;
}
- state = rollbackState.state;
- deliveryCount = rollbackState.deliveryCount;
- memberId = rollbackState.memberId;
+ // Check is acquisition lock timeout task is expired then mark the
message as Available.
+ if (acquisitionLockTimeoutTask != null &&
acquisitionLockTimeoutTask.hasExpired()) {
+ state = RecordState.AVAILABLE;
Review Comment:
So this is not required as per the current implementation. Currently the
acquisition lock timeout do not bump the delivery count which means the records
should not be marked as archived when lock timeout occurs. Hence this change is
not required.
Having said that, I think we have an incorrect behaviour today i.e. we bump
the delivery count on acquisition rather we should do that on acquisition lock
timeout and release. Currently, say max delivery is 5 and the record is
released all times then 5th time record will be acquired to send back in
response but the state will be marked directly as archived. Which is incorrect.
Then acknowledgement for the respective offset/batch will fail, if timeout
occurs then it will result in "offset not found". I am plannign to correct the
whole behaviour in a separate PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]