apoorvmittal10 commented on code in PR #21262:
URL: https://github.com/apache/kafka/pull/21262#discussion_r2691618844


##########
server/src/main/java/org/apache/kafka/server/share/fetch/acquire/BatchCreationContext.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share.fetch.acquire;
+
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
+
+/**
+ * Context object containing all parameters needed for batch creation.
+ */
+public record BatchCreationContext(
+    String memberId,
+    Iterable<? extends RecordBatch> batches,
+    long firstAcquiredOffset,
+    long lastAcquiredOffset,
+    int batchSize,
+    int maxFetchRecords,
+    BatchCacheOperations cacheOperations // Callback interface for operations 
that require SharePartition state
+) {
+    /**
+     * Interface for operations that interact with SharePartition's cached 
state.
+     * This allows the strategy to perform necessary state updates without
+     * direct access to SharePartition internals.
+     */
+    public interface BatchCacheOperations {
+        /**
+         * Adds a new in-flight batch to the cache with acquisition lock 
scheduled.
+         */
+        void addBatchToCache(
+            String memberId,
+            long firstOffset,
+            long lastOffset,
+            AcquisitionLockTimerTask timerTask
+        );
+
+        /**
+         * Adds a new in-flight batch with offset state initialized for 
partial acquisition.
+         * Used in record_limit mode when batch exceeds max fetch records.
+         */
+        void addBatchWithOffsetState(
+            String memberId,
+            long firstOffset,
+            long lastOffset,
+            long acquiredLastOffset,
+            int delayMs
+        );
+
+        /**
+         * Schedules an acquisition lock timeout for the given offset range.
+         */
+        AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(
+            String memberId,
+            long firstOffset,
+            long lastOffset
+        );

Review Comment:
   Why a `scheduleAcquisitionLockTimeout` is a cache operation, and same for 
others like `recordLockDurationMs`, `updateFindNextFetchOffset` and 
`recordInFlightBatchMessageCount`? Seeems like you needed callback but calling 
them cache operations is incorrect.



##########
server/src/main/java/org/apache/kafka/server/share/fetch/acquire/RecordLimitStrategy.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share.fetch.acquire;
+
+import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
+import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
+import org.apache.kafka.server.share.fetch.InFlightBatch;
+
+import java.util.List;
+
+/**
+ * Strategy for record-limit acquisition mode.
+ * <p>
+ * This mode provides precise control over the number of records acquired by:
+ * <ul>
+ *   <li>Creating a single batch containing all records</li>
+ *   <li>Acquiring only up to maxFetchRecords, leaving remaining offsets 
AVAILABLE</li>
+ *   <li>Initializing offset state when batch exceeds the limit</li>
+ * </ul>
+ */
+public class RecordLimitStrategy implements AcquireStrategy {
+
+    public static final RecordLimitStrategy INSTANCE = new 
RecordLimitStrategy();

Review Comment:
   Will the instance always be created even Share Fetches are not happening on 
the cluster? If yes, though it's cheap but why to create an instance which 
might never be used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to