apoorvmittal10 commented on code in PR #20246: URL: https://github.com/apache/kafka/pull/20246#discussion_r2461831082
########## server/src/main/java/org/apache/kafka/server/share/fetch/ShareAcquireMode.java: ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.share.fetch; + +public enum ShareAcquireMode { Review Comment: Why do we need another server side class and can't use same client ShareAcquireMode class? ########## clients/src/main/java/org/apache/kafka/clients/consumer/ShareAcquireMode.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import java.util.Arrays; +import java.util.Locale; +import java.util.stream.Collectors; + +public enum ShareAcquireMode { + BATCH_OPTIMIZED("batch_optimized", (byte) 0), + RECORD_LIMIT("record_limit", (byte) 1); + + public final String name; + + public final byte id; + + ShareAcquireMode(final String name, byte id) { Review Comment: Why do we have final modifier only for name, have the modifier for both or none. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1614,9 +1649,55 @@ private ShareAcquiredRecords acquireNewBatchRecords( } } + private AcquiredRecords filterShareAcquiredRecordsInRecordLimitMode(int maxFetchRecords, List<AcquiredRecords> acquiredRecords) { + // Only acquire one single batch in record limit mode. + AcquiredRecords records = acquiredRecords.get(0); + lock.writeLock().lock(); + try { + InFlightBatch inFlightBatch = cachedState.get(records.firstOffset()); + if (inFlightBatch != null) { + long lastOffset = records.firstOffset() + maxFetchRecords - 1; + // Initialize the offset state map if not already initialized. + inFlightBatch.maybeInitializeOffsetStateUpdate(); + List<PersisterBatch> persisterBatches = new ArrayList<>(); + CompletableFuture<Void> future = new CompletableFuture<>(); + NavigableMap<Long, InFlightState> offsetStateMap = inFlightBatch.offsetState(); + for (Map.Entry<Long, InFlightState> offsetState : offsetStateMap.tailMap(lastOffset, false).entrySet()) { + if (offsetState.getValue().state() == RecordState.ACQUIRED) { + // These records were not actually acquired so update the offset status back to available. + InFlightState updateResult = offsetState.getValue().startStateTransition( + RecordState.AVAILABLE, + DeliveryCountOps.DECREASE, + this.maxDeliveryCount, + EMPTY_MEMBER_ID + ); + if (updateResult == null) { + log.error("Unable to update records status for the offset: {} in batch: {} for the share partition: {}-{}", offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition); + continue; + } + persisterBatches.add(new PersisterBatch(updateResult, new PersisterStateBatch(offsetState.getKey(), + offsetState.getKey(), updateResult.state().id(), (short) updateResult.deliveryCount()))); + } else { + log.error("Unexpected record state {} for offset: {} in batch: {} in share partition: {}-{}", + offsetState.getValue().state(), offsetState.getKey(), inFlightBatch, groupId, topicIdPartition); + } + } + rollbackOrProcessStateUpdates(future, null, persisterBatches); + records.setLastOffset(lastOffset); Review Comment: This doesn't seems right, why do you want to make a call to persister while acquiring records? ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -748,8 +752,13 @@ public ShareAcquiredRecords acquire( groupId, topicIdPartition); // It's safe to use lastOffsetToAcquire instead of lastBatch.lastOffset() because there is no // overlap hence the lastOffsetToAcquire is same as lastBatch.lastOffset() or before that. - ShareAcquiredRecords shareAcquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), + ShareAcquiredRecords shareAcquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), isRecordLimitMode, firstBatch.baseOffset(), lastOffsetToAcquire, batchSize, maxRecordsToAcquire); + if (isRecordLimitMode && shareAcquiredRecords.count() > maxRecordsToAcquire) { + AcquiredRecords records = filterShareAcquiredRecordsInRecordLimitMode(maxRecordsToAcquire, shareAcquiredRecords.acquiredRecords()); + // Return only single batch of filtered records. + shareAcquiredRecords = new ShareAcquiredRecords(List.of(records), maxRecordsToAcquire); + } Review Comment: `acquireNewBatchRecords` will update cache and create acquistion lock timeouts. Then later we call to `filterShareAcquiredRecordsInRecordLimitMode` and update the states. Can't we do something like below: 1. Move below code to a separate method frome `createBatches`: ``` result.forEach(acquiredRecords -> { // Schedule acquisition lock timeout for the batch. AcquisitionLockTimerTask timerTask = scheduleAcquisitionLockTimeout(memberId, acquiredRecords.firstOffset(), acquiredRecords.lastOffset()); // Add the new batch to the in-flight records along with the acquisition lock timeout task for the batch. cachedState.put(acquiredRecords.firstOffset(), new InFlightBatch( timer, time, memberId, acquiredRecords.firstOffset(), acquiredRecords.lastOffset(), RecordState.ACQUIRED, 1, timerTask, timeoutHandler, sharePartitionMetrics)); // Update the in-flight batch message count metrics for the share partition. sharePartitionMetrics.recordInFlightBatchMessageCount(acquiredRecords.lastOffset() - acquiredRecords.firstOffset() + 1); }); ``` 2. The result from `acquireNewBatchRecords` will let us know what all batches to be made in cache, at most. 3. If the mode is `batch_optimized` then call new metod else if `record_limit` is the mode then only keep first batch in result. 4. If the size of the `acquireNewBatchRecords` first batch is greater than `max fetch records` then instatiate OffsetState accordingly. This approach will keep the cache clean with only required data in cache. Else you might also need to worry about other cache capacity issues as well. Also creating and remving acquisition lock timeouts again is not good. ########## server/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java: ########## @@ -63,6 +63,10 @@ public class ShareFetch { * The batch size of the fetch request. */ private final int batchSize; + /** + * The share acquire mode for the fetch request. + */ + private final byte shareAcquireMode; Review Comment: nit: Rather than byte we should have ShareAcquireMode here itself i.e. convert to enum here itself so we need no conversion again. ########## clients/src/main/java/org/apache/kafka/clients/consumer/ShareAcquireMode.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import java.util.Arrays; +import java.util.Locale; +import java.util.stream.Collectors; + +public enum ShareAcquireMode { + BATCH_OPTIMIZED("batch_optimized", (byte) 0), + RECORD_LIMIT("record_limit", (byte) 1); + + public final String name; + + public final byte id; + + ShareAcquireMode(final String name, byte id) { + this.name = name; + this.id = id; + } + + /** + * Case-insensitive acquire mode lookup by string name. + */ + public static ShareAcquireMode of(final String name) { + try { + return ShareAcquireMode.valueOf(name.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid value :" + name + " for ShareAcquireMode. The value must either be 'batch_optimized' or 'record_limit'."); + } + } + + public byte id() { + return id; + } + + @Override + public String toString() { + return super.toString().toLowerCase(Locale.ROOT); Review Comment: In Kafka generally a format is used for toString: ``` return "ShareAcquireMode{" + "name=" + name + ", id=" + id + "}"; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
