apoorvmittal10 commented on code in PR #20124:
URL: https://github.com/apache/kafka/pull/20124#discussion_r2194717592


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3016,6 +3016,11 @@ static final class InFlightState {
         private InFlightState rollbackState;
         // The timer task for the acquisition lock timeout.
         private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
+        // The boolean determines if the record has achieved a final state of 
ARCHIVED from which it cannot transition
+        // to any other state. This could happen because of LSO movement etc.
+        private boolean isMarkedArchived = false;
+        // The lock prevents concurrent state transitions possibly during 
acknowledgements etc.
+        private final ReadWriteLock stateTransitionLock = new 
ReentrantReadWriteLock();

Review Comment:
   I don't think it's required here, we can just use `synchronized` for locks 
as same offset/batch state access on multiple read paths should not happen.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3103,25 +3116,46 @@ private int updatedDeliveryCount(DeliveryCountOps ops) {
             };
         }
 
-        private void archive(String newMemberId) {
-            state = RecordState.ARCHIVED;
-            memberId = newMemberId;
+        // Visible for testing.
+        void archive(String newMemberId) {
+            stateTransitionLock.writeLock().lock();
+            try {
+                if (rollbackState != null) {
+                    isMarkedArchived = true;
+                }
+                state = RecordState.ARCHIVED;
+                memberId = newMemberId;
+            } finally {
+                stateTransitionLock.writeLock().unlock();
+            }
         }
 
-        private InFlightState startStateTransition(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
-            rollbackState = new InFlightState(state, deliveryCount, memberId, 
acquisitionLockTimeoutTask);
-            return tryUpdateState(newState, ops, maxDeliveryCount, 
newMemberId);
+        // Visible for testing
+        InFlightState startStateTransition(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
+            stateTransitionLock.writeLock().lock();
+            try {
+                rollbackState = new InFlightState(state, deliveryCount, 
memberId, acquisitionLockTimeoutTask);
+                return tryUpdateState(newState, ops, maxDeliveryCount, 
newMemberId);

Review Comment:
   Should it happen as below:
   ```suggestion
                   InFlightState currentState = new InFlightState(state, 
deliveryCount, memberId, acquisitionLockTimeoutTask);
                   InflightState updatedState = tryUpdateState(newState, ops, 
maxDeliveryCount, newMemberId);
                   if (updatedState != null) {
                         rollbackState = currentState;
                   }
                   return updatedState;
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3016,6 +3016,11 @@ static final class InFlightState {
         private InFlightState rollbackState;
         // The timer task for the acquisition lock timeout.
         private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
+        // The boolean determines if the record has achieved a final state of 
ARCHIVED from which it cannot transition
+        // to any other state. This could happen because of LSO movement etc.
+        private boolean isMarkedArchived = false;
+        // The lock prevents concurrent state transitions possibly during 
acknowledgements etc.
+        private final ReadWriteLock stateTransitionLock = new 
ReentrantReadWriteLock();

Review Comment:
   You should synscronize the `state()` method as well.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -3091,6 +3102,8 @@ private InFlightState tryUpdateState(RecordState 
newState, DeliveryCountOps ops,
                 log.error("Failed to update state of the records", e);
                 rollbackState = null;

Review Comment:
   Should we remove this here now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to