apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1637993977


##########
core/src/main/java/kafka/server/SharePartition.java:
##########
@@ -310,126 +485,361 @@ private void initialize() {
         // TODO: Provide implementation to initialize the share partition.
     }
 
+    private AcquiredRecords acquireNewBatchRecords(
+        String memberId,
+        long firstOffset,
+        long lastOffset
+    ) {
+        lock.writeLock().lock();
+        try {
+            // Schedule acquisition lock timeout for the batch.
+            AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset);
+            // Add the new batch to the in-flight records along with the 
acquisition lock timeout task for the batch.
+            cachedState.put(firstOffset, new InFlightBatch(
+                memberId,
+                firstOffset,
+                lastOffset,
+                RecordState.ACQUIRED,
+                1,
+                timerTask));
+            // if the cachedState was empty before acquiring the new batches 
then startOffset needs to be updated
+            if (cachedState.firstKey() == firstOffset)  {
+                startOffset = firstOffset;
+            }
+            endOffset = lastOffset;
+            return new AcquiredRecords()
+                .setFirstOffset(firstOffset)
+                .setLastOffset(lastOffset)
+                .setDeliveryCount((short) 1);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void acquireSubsetBatchRecords(
+        String memberId,
+        long requestFirstOffset,
+        long requestLastOffset,
+        InFlightBatch inFlightBatch,
+        List<AcquiredRecords> result
+    ) {
+        lock.writeLock().lock();
+        try {
+            for (Map.Entry<Long, InFlightState> offsetState : 
inFlightBatch.offsetState.entrySet()) {
+                // For the first batch which might have offsets prior to the 
request base
+                // offset i.e. cached batch of 10-14 offsets and request batch 
of 12-13.
+                if (offsetState.getKey() < requestFirstOffset) {
+                    continue;
+                }
+
+                if (offsetState.getKey() > requestLastOffset) {
+                    // No further offsets to process.
+                    break;
+                }
+
+                if (offsetState.getValue().state != RecordState.AVAILABLE) {
+                    log.trace("The offset is not available skipping, offset: 
{} batch: {}"
+                            + " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+                        groupId, topicIdPartition);
+                    continue;
+                }
+
+                InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, true, 
maxDeliveryCount,
+                    memberId);
+                if (updateResult == null) {
+                    log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
+                            + " for the share group: {}-{}", 
offsetState.getKey(), inFlightBatch,
+                        groupId, topicIdPartition);
+                    continue;
+                }
+                // Schedule acquisition lock timeout for the offset.
+                AcquisitionLockTimerTask acquisitionLockTimeoutTask = 
scheduleAcquisitionLockTimeout(memberId, offsetState.getKey(), 
offsetState.getKey());
+                // Update acquisition lock timeout task for the offset.
+                
offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
+
+                // TODO: Maybe we can club the continuous offsets here.
+                result.add(new AcquiredRecords()
+                    .setFirstOffset(offsetState.getKey())
+                    .setLastOffset(offsetState.getKey())
+                    .setDeliveryCount((short) 
offsetState.getValue().deliveryCount));
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
     /**
-     * The InFlightBatch maintains the in-memory state of the fetched records 
i.e. in-flight records.
+     * Check if the in-flight batch is a full match with the request offsets. 
The full match represents
+     * complete overlap of the in-flight batch with the request offsets.
+     *
+     * @param inFlightBatch The in-flight batch to check for full match.
+     * @param firstOffsetToCompare The first offset of the request batch.
+     * @param lastOffsetToCompare The last offset of the request batch.
+     *
+     * @return True if the in-flight batch is a full match with the request 
offsets, false otherwise.
      */
-    private static class InFlightBatch {
-        /**
-         * The offset of the first record in the batch that is fetched from 
the log.
-         */
+    private boolean checkForFullMatch(InFlightBatch inFlightBatch, long 
firstOffsetToCompare, long lastOffsetToCompare) {
+        return inFlightBatch.firstOffset() >= firstOffsetToCompare && 
inFlightBatch.lastOffset() <= lastOffsetToCompare;
+    }
+
+    private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String 
memberId, long firstOffset, long lastOffset) {
+        return scheduleAcquisitionLockTimeout(memberId, firstOffset, 
lastOffset, recordLockDurationMs);
+    }
+
+    // TODO: maxDeliveryCount should be utilized here once it is implemented
+    /**
+     * Apply acquisition lock to acquired records.
+     *
+     * @param memberId The member id of the client that is putting the 
acquisition lock.
+     * @param firstOffset The first offset of the acquired records.
+     * @param lastOffset The last offset of the acquired records.
+     * @param delayMs The delay in milliseconds after which the acquisition 
lock will be released.
+     */
+    private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(
+        String memberId,
+        long firstOffset,
+        long lastOffset,
+        long delayMs
+    ) {
+        AcquisitionLockTimerTask acquistionLockTimerTask = 
acquisitionLockTimerTask(memberId, firstOffset, lastOffset, delayMs);
+        timer.add(acquistionLockTimerTask);
+        return acquistionLockTimerTask;
+    }
+
+    private AcquisitionLockTimerTask acquisitionLockTimerTask(
+        String memberId,
+        long firstOffset,
+        long lastOffset,
+        long delayMs
+    ) {
+        return new AcquisitionLockTimerTask(delayMs, memberId, firstOffset, 
lastOffset);
+    }
+
+    private void releaseAcquisitionLockOnTimeout(String memberId, long 
firstOffset, long lastOffset) {
+        // TODO: Implement the logic to release the acquisition lock on 
timeout.
+    }
+
+    // Visible for testing. Should only be used for testing purposes.
+    NavigableMap<Long, InFlightBatch> cachedState() {
+        return new ConcurrentSkipListMap<>(cachedState);
+    }
+
+    private final class AcquisitionLockTimerTask extends TimerTask {
+        private final long expirationMs;
+        private final String memberId;
         private final long firstOffset;
-        /**
-         * The last offset of the batch that is fetched from the log.
-         */
         private final long lastOffset;
-        /**
-         * The in-flight state of the fetched records. If the offset state map 
is empty then inflightState
-         * determines the state of the complete batch else individual offset 
determines the state of
-         * the respective records.
-         */
-        private InFlightState inFlightState;
 
-        InFlightBatch(String memberId, long firstOffset, long lastOffset, 
RecordState state, int deliveryCount) {
+        AcquisitionLockTimerTask(long delayMs, String memberId, long 
firstOffset, long lastOffset) {
+            super(delayMs);
+            this.expirationMs = time.hiResClockMs() + delayMs;
+            this.memberId = memberId;
             this.firstOffset = firstOffset;
             this.lastOffset = lastOffset;
-            this.inFlightState = new InFlightState(state, deliveryCount, 
memberId);
         }
 
+        long expirationMs() {
+            return expirationMs;
+        }
+
+        /**
+         * The task is executed when the acquisition lock timeout is reached. 
The task releases the acquired records.
+         */
         @Override
-        public String toString() {
-            return "InFlightBatch(" +
-                " firstOffset=" + firstOffset +
-                ", lastOffset=" + lastOffset +
-                ", inFlightState=" + inFlightState +
-                ")";
+        public void run() {
+            releaseAcquisitionLockOnTimeout(memberId, firstOffset, lastOffset);
         }
     }
 
     /**
-     * The InFlightState is used to track the state and delivery count of a 
record that has been
-     * fetched from the leader. The state of the record is used to determine 
if the record should
-     * be re-deliver or if it can be acknowledged or archived.
+     * The InFlightBatch maintains the in-memory state of the fetched records 
i.e. in-flight records.
      */
-    private static class InFlightState {
-        /**
-         * The state of the fetch batch records.
-         */
-        private RecordState state;
-        /**
-         * The number of times the records has been delivered to the client.
-         */
-        private int deliveryCount;
-        /**
-         * The member id of the client that is fetching/acknowledging the 
record.
-         */
-        private String memberId;
+    final class InFlightBatch {
+        // The offset of the first record in the batch that is fetched from 
the log.
+        private final long firstOffset;
+        // The last offset of the batch that is fetched from the log.
+        private final long lastOffset;
 
-        InFlightState(RecordState state, int deliveryCount, String memberId) {
-            this.state = state;
-            this.deliveryCount = deliveryCount;
-            this.memberId = memberId;
+        // The batch state of the fetched records. If the offset state map is 
empty then batchState
+        // determines the state of the complete batch else individual offset 
determines the state of
+        // the respective records.
+        private InFlightState batchState;
+
+        // The offset state map is used to track the state of the records per 
offset. However, the
+        // offset state map is only required when the state of the offsets 
within same batch are
+        // different. The states can be different when explicit offset 
acknowledgment is done which
+        // is different from the batch state.
+        private NavigableMap<Long, InFlightState> offsetState;
+
+        InFlightBatch(String memberId, long firstOffset, long lastOffset, 
RecordState state,
+            int deliveryCount, AcquisitionLockTimerTask 
acquisitionLockTimeoutTask
+        ) {
+            this.firstOffset = firstOffset;
+            this.lastOffset = lastOffset;
+            this.batchState = new InFlightState(state, deliveryCount, 
memberId, acquisitionLockTimeoutTask);
         }
 
-        @Override
-        public int hashCode() {
-            return Objects.hash(state, deliveryCount, memberId);
+        // Visible for testing.
+        long firstOffset() {
+            return firstOffset;
         }
 
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
+        // Visible for testing.
+        long lastOffset() {
+            return lastOffset;
+        }
+
+        // Visible for testing.
+        RecordState batchState() {
+            if (batchState == null) {
+                throw new IllegalStateException("The batch state is not 
available as the offset state is maintained");
+            }
+            return batchState.state;
+        }
+
+        // Visible for testing.
+        String batchMemberId() {
+            if (batchState == null) {
+                throw new IllegalStateException("The batch member id is not 
available as the offset state is maintained");
+            }
+            return batchState.memberId;
+        }
+
+        // Visible for testing.
+        int batchDeliveryCount() {
+            if (batchState == null) {
+                throw new IllegalStateException("The batch delivery count is 
not available as the offset state is maintained");
+            }
+            return batchState.deliveryCount;
+        }
+
+        // Visible for testing.
+        NavigableMap<Long, InFlightState> offsetState() {
+            return offsetState;
+        }
+
+        private InFlightState tryUpdateBatchState(RecordState newState, 
boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
+            if (batchState == null) {
+                throw new IllegalStateException("The batch state update is not 
available as the offset state is maintained");
+            }
+            return batchState.tryUpdateState(newState, incrementDeliveryCount, 
maxDeliveryCount, newMemberId);
+        }
+
+        private void maybeInitializeOffsetStateUpdate() {

Review Comment:
   Not neccessarily needed for a private internal method. I have skipped 
writing details for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to