apoorvmittal10 commented on code in PR #20246:
URL: https://github.com/apache/kafka/pull/20246#discussion_r2461831082


##########
server/src/main/java/org/apache/kafka/server/share/fetch/ShareAcquireMode.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share.fetch;
+
+public enum ShareAcquireMode {

Review Comment:
   Why do we need another server side class and can't use same client 
ShareAcquireMode class?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ShareAcquireMode.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+public enum ShareAcquireMode {
+    BATCH_OPTIMIZED("batch_optimized", (byte) 0),
+    RECORD_LIMIT("record_limit", (byte) 1);
+
+    public final String name;
+
+    public final byte id;
+
+    ShareAcquireMode(final String name, byte id) {

Review Comment:
   Why do we have final modifier only for name, have the modifier for both or 
none.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1614,9 +1649,55 @@ private ShareAcquiredRecords acquireNewBatchRecords(
         }
     }
 
+    private AcquiredRecords filterShareAcquiredRecordsInRecordLimitMode(int 
maxFetchRecords, List<AcquiredRecords> acquiredRecords) {
+        // Only acquire one single batch in record limit mode.
+        AcquiredRecords records = acquiredRecords.get(0);
+        lock.writeLock().lock();
+        try {
+            InFlightBatch inFlightBatch = 
cachedState.get(records.firstOffset());
+            if (inFlightBatch != null) {
+                long lastOffset = records.firstOffset() + maxFetchRecords - 1;
+                // Initialize the offset state map if not already initialized.
+                inFlightBatch.maybeInitializeOffsetStateUpdate();
+                List<PersisterBatch> persisterBatches = new ArrayList<>();
+                CompletableFuture<Void> future = new CompletableFuture<>();
+                NavigableMap<Long, InFlightState> offsetStateMap = 
inFlightBatch.offsetState();
+                for (Map.Entry<Long, InFlightState> offsetState : 
offsetStateMap.tailMap(lastOffset, false).entrySet()) {
+                    if (offsetState.getValue().state() == 
RecordState.ACQUIRED) {
+                        // These records were not actually acquired so update 
the offset status back to available.
+                        InFlightState updateResult = 
offsetState.getValue().startStateTransition(
+                            RecordState.AVAILABLE,
+                            DeliveryCountOps.DECREASE,
+                            this.maxDeliveryCount,
+                            EMPTY_MEMBER_ID
+                        );
+                        if (updateResult == null) {
+                            log.error("Unable to update records status for the 
offset: {} in batch: {} for the share partition: {}-{}", offsetState.getKey(), 
inFlightBatch,
+                                groupId, topicIdPartition);
+                            continue;
+                        }
+                        persisterBatches.add(new PersisterBatch(updateResult, 
new PersisterStateBatch(offsetState.getKey(),
+                            offsetState.getKey(), updateResult.state().id(), 
(short) updateResult.deliveryCount())));
+                    } else {
+                        log.error("Unexpected record state {} for offset: {} 
in batch: {} in share partition: {}-{}",
+                            offsetState.getValue().state(), 
offsetState.getKey(), inFlightBatch, groupId, topicIdPartition);
+                    }
+                }
+                rollbackOrProcessStateUpdates(future, null, persisterBatches);
+                records.setLastOffset(lastOffset);

Review Comment:
   This doesn't seems right, why do you want to make a call to persister while 
acquiring records?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -748,8 +752,13 @@ public ShareAcquiredRecords acquire(
                     groupId, topicIdPartition);
                 // It's safe to use lastOffsetToAcquire instead of 
lastBatch.lastOffset() because there is no
                 // overlap hence the lastOffsetToAcquire is same as 
lastBatch.lastOffset() or before that.
-                ShareAcquiredRecords shareAcquiredRecords = 
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
+                ShareAcquiredRecords shareAcquiredRecords = 
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), 
isRecordLimitMode,
                     firstBatch.baseOffset(), lastOffsetToAcquire, batchSize, 
maxRecordsToAcquire);
+                if (isRecordLimitMode && shareAcquiredRecords.count() > 
maxRecordsToAcquire) {
+                    AcquiredRecords records = 
filterShareAcquiredRecordsInRecordLimitMode(maxRecordsToAcquire, 
shareAcquiredRecords.acquiredRecords());
+                    // Return only single batch of filtered records.
+                    shareAcquiredRecords = new 
ShareAcquiredRecords(List.of(records), maxRecordsToAcquire);
+                }

Review Comment:
   `acquireNewBatchRecords` will update cache and create acquistion lock 
timeouts. Then later we call to `filterShareAcquiredRecordsInRecordLimitMode` 
and update the states. Can't we do something like below:
   
   1. Move below code to a separate method frome `createBatches`:
   ```
   result.forEach(acquiredRecords -> {
                   // Schedule acquisition lock timeout for the batch.
                   AcquisitionLockTimerTask timerTask = 
scheduleAcquisitionLockTimeout(memberId, acquiredRecords.firstOffset(), 
acquiredRecords.lastOffset());
                   // Add the new batch to the in-flight records along with the 
acquisition lock timeout task for the batch.
                   cachedState.put(acquiredRecords.firstOffset(), new 
InFlightBatch(
                       timer,
                       time,
                       memberId,
                       acquiredRecords.firstOffset(),
                       acquiredRecords.lastOffset(),
                       RecordState.ACQUIRED,
                       1,
                       timerTask,
                       timeoutHandler,
                       sharePartitionMetrics));
                   // Update the in-flight batch message count metrics for the 
share partition.
                   
sharePartitionMetrics.recordInFlightBatchMessageCount(acquiredRecords.lastOffset()
 - acquiredRecords.firstOffset() + 1);
               });
   ```
   2. The result from `acquireNewBatchRecords` will let us know what all 
batches to be made in cache, at most.
   3. If the mode is `batch_optimized` then call new metod else if 
`record_limit` is the mode then only keep first batch in result.
   4. If the size of the `acquireNewBatchRecords` first batch is greater than 
`max fetch records` then instatiate OffsetState accordingly.
   
   This approach will keep the cache clean with only required data in cache. 
Else you might also need to worry about other cache capacity issues as well. 
Also creating and remving acquisition lock timeouts again is not good.



##########
server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java:
##########
@@ -63,6 +63,10 @@ public class ShareFetch {
      * The batch size of the fetch request.
      */
     private final int batchSize;
+    /**
+     * The share acquire mode for the fetch request.
+     */
+    private final byte shareAcquireMode;

Review Comment:
   nit: Rather than byte we should have ShareAcquireMode here itself i.e. 
convert to enum here itself so we need no conversion again.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ShareAcquireMode.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+public enum ShareAcquireMode {
+    BATCH_OPTIMIZED("batch_optimized", (byte) 0),
+    RECORD_LIMIT("record_limit", (byte) 1);
+
+    public final String name;
+
+    public final byte id;
+
+    ShareAcquireMode(final String name, byte id) {
+        this.name = name;
+        this.id = id;
+    }
+
+    /**
+     * Case-insensitive acquire mode lookup by string name.
+     */
+    public static ShareAcquireMode of(final String name) {
+        try {
+            return ShareAcquireMode.valueOf(name.toUpperCase(Locale.ROOT));
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Invalid value :" + name + " 
for ShareAcquireMode. The value must either be 'batch_optimized' or 
'record_limit'.");
+        }
+    }
+
+    public byte id() {
+        return id;
+    }
+
+    @Override
+    public String toString() {
+        return super.toString().toLowerCase(Locale.ROOT);

Review Comment:
   In Kafka generally a format is used for toString:
   
   ```
   return "ShareAcquireMode{"  +
               "name=" + name +
               ", id=" + id +
               "}";



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to