apoorvmittal10 commented on code in PR #20124: URL: https://github.com/apache/kafka/pull/20124#discussion_r2194717592
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -3016,6 +3016,11 @@ static final class InFlightState { private InFlightState rollbackState; // The timer task for the acquisition lock timeout. private AcquisitionLockTimerTask acquisitionLockTimeoutTask; + // The boolean determines if the record has achieved a final state of ARCHIVED from which it cannot transition + // to any other state. This could happen because of LSO movement etc. + private boolean isMarkedArchived = false; + // The lock prevents concurrent state transitions possibly during acknowledgements etc. + private final ReadWriteLock stateTransitionLock = new ReentrantReadWriteLock(); Review Comment: I don't think it's required here, we can just use `synchronized` for locks as same offset/batch state access on multiple read paths should not happen. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -3103,25 +3116,46 @@ private int updatedDeliveryCount(DeliveryCountOps ops) { }; } - private void archive(String newMemberId) { - state = RecordState.ARCHIVED; - memberId = newMemberId; + // Visible for testing. + void archive(String newMemberId) { + stateTransitionLock.writeLock().lock(); + try { + if (rollbackState != null) { + isMarkedArchived = true; + } + state = RecordState.ARCHIVED; + memberId = newMemberId; + } finally { + stateTransitionLock.writeLock().unlock(); + } } - private InFlightState startStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) { - rollbackState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask); - return tryUpdateState(newState, ops, maxDeliveryCount, newMemberId); + // Visible for testing + InFlightState startStateTransition(RecordState newState, DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) { + stateTransitionLock.writeLock().lock(); + try { + rollbackState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask); + return tryUpdateState(newState, ops, maxDeliveryCount, newMemberId); Review Comment: Should it happen as below: ```suggestion InFlightState currentState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask); InflightState updatedState = tryUpdateState(newState, ops, maxDeliveryCount, newMemberId); if (updatedState != null) { rollbackState = currentState; } return updatedState; ``` ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -3016,6 +3016,11 @@ static final class InFlightState { private InFlightState rollbackState; // The timer task for the acquisition lock timeout. private AcquisitionLockTimerTask acquisitionLockTimeoutTask; + // The boolean determines if the record has achieved a final state of ARCHIVED from which it cannot transition + // to any other state. This could happen because of LSO movement etc. + private boolean isMarkedArchived = false; + // The lock prevents concurrent state transitions possibly during acknowledgements etc. + private final ReadWriteLock stateTransitionLock = new ReentrantReadWriteLock(); Review Comment: You should synscronize the `state()` method as well. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -3091,6 +3102,8 @@ private InFlightState tryUpdateState(RecordState newState, DeliveryCountOps ops, log.error("Failed to update state of the records", e); rollbackState = null; Review Comment: Should we remove this here now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org