junrao commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1830199934


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -51,21 +55,26 @@ public class DelayedShareFetch extends DelayedOperation {
 
     private static final Logger log = 
LoggerFactory.getLogger(DelayedShareFetch.class);
 
-    private final ShareFetchData shareFetchData;
+    private final ShareFetchData partitionsToComplete;
     private final ReplicaManager replicaManager;
 
-    private Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionDataFromTryComplete;
+    private Map<TopicIdPartition, FetchRequest.PartitionData> 
partitionsAcquired;
+    private Map<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;
     private final SharePartitionManager sharePartitionManager;
+    private final Map<TopicIdPartition, SharePartition> sharePartitions;
 
     DelayedShareFetch(
-            ShareFetchData shareFetchData,
+            ShareFetchData partitionsToComplete,

Review Comment:
   This is actually a super set of the partitions to complete. So, it's better 
to just keep the name shareFetchData. It would be useful to add a comment that 
the partitions to be completed are given by sharePartitions and is a subset of 
shareFetchData.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -51,21 +55,26 @@ public class DelayedShareFetch extends DelayedOperation {
 
     private static final Logger log = 
LoggerFactory.getLogger(DelayedShareFetch.class);
 
-    private final ShareFetchData shareFetchData;
+    private final ShareFetchData partitionsToComplete;
     private final ReplicaManager replicaManager;
 
-    private Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionDataFromTryComplete;
+    private Map<TopicIdPartition, FetchRequest.PartitionData> 
partitionsAcquired;
+    private Map<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;
     private final SharePartitionManager sharePartitionManager;
+    private final Map<TopicIdPartition, SharePartition> sharePartitions;
 
     DelayedShareFetch(
-            ShareFetchData shareFetchData,
+            ShareFetchData partitionsToComplete,
             ReplicaManager replicaManager,
-            SharePartitionManager sharePartitionManager) {
-        super(shareFetchData.fetchParams().maxWaitMs, Option.empty());
-        this.shareFetchData = shareFetchData;
+            SharePartitionManager sharePartitionManager,
+            Map<TopicIdPartition, SharePartition> sharePartitions) {
+        super(partitionsToComplete.fetchParams().maxWaitMs, Option.empty());
+        this.partitionsToComplete = partitionsToComplete;
         this.replicaManager = replicaManager;
-        this.topicPartitionDataFromTryComplete = new LinkedHashMap<>();
+        this.partitionsAcquired = new LinkedHashMap<>();

Review Comment:
   Why is this a LinkedHashMap instead of just a HashMap?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +147,49 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty()) {
+        try {
+            if (!topicPartitionData.isEmpty()) {
+                // In case, fetch offset metadata doesn't exist for one or 
more topic partitions, we do a
+                // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
+                // those topic partitions.
+                Map<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
+                maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse);
+                if 
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData)) {
+                    partitionsAcquired = topicPartitionData;
+                    partitionsAlreadyFetched = replicaManagerReadResponse;
+                    boolean completedByMe = forceComplete();
+                    // If invocation of forceComplete is not successful, then 
that means the request is already completed
+                    // hence release the acquired locks.
+                    if (!completedByMe) {
+                        releasePartitionLocks(partitionsAcquired.keySet());
+                        partitionsAlreadyFetched.clear();
+                        partitionsAcquired.clear();

Review Comment:
   We can skip clearing these two maps here since the operation is completed at 
this point. In `onComplete()`, we don't clear these two maps. So, this will 
make the behavior more consistent.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +233,139 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
-        topicIdPartitions.forEach(tp -> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(groupId, tp);
-            if (sharePartition == null) {
-                log.error("Encountered null share partition for groupId={}, 
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), tp);
-                return;
+    private Map<TopicIdPartition, LogReadResult> 
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+        Map<TopicIdPartition, FetchRequest.PartitionData> 
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+        topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+                
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition, partitionData);
             }
+        });
+        if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        // We fetch data from replica manager corresponding to the topic 
partitions that have missing fetch offset metadata.
+        return readFromLog(missingFetchOffsetMetadataTopicPartitions);
+    }
+
+    private void maybeUpdateFetchOffsetMetadata(
+        Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponseData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            LogReadResult replicaManagerLogReadResult = entry.getValue();
+            if (replicaManagerLogReadResult.error().code() != 
Errors.NONE.code()) {
+                log.debug("Replica manager read log result {} errored out for 
topic partition {}",
+                    replicaManagerLogReadResult, topicIdPartition);
+                continue;
+            }
+            
sharePartition.updateFetchOffsetMetadata(Optional.of(replicaManagerLogReadResult.info().fetchOffsetMetadata));
+        }
+    }
+
+    private boolean isMinBytesSatisfied(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
+        long accumulatedSize = 0;
+        for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : 
topicPartitionData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            FetchRequest.PartitionData partitionData = entry.getValue();
+            LogOffsetMetadata endOffsetMetadata = 
endOffsetMetadataForTopicPartition(topicIdPartition);
+
+            if (endOffsetMetadata == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+                continue;
+
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+
+            Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = 
sharePartition.fetchOffsetMetadata();
+            if (optionalFetchOffsetMetadata.isEmpty() || 
optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+                continue;
+            LogOffsetMetadata fetchOffsetMetadata = 
optionalFetchOffsetMetadata.get();
+
+            if (fetchOffsetMetadata.messageOffset > 
endOffsetMetadata.messageOffset) {
+                log.debug("Satisfying delayed share fetch request for group 
{}, member {} since it is fetching later segments of " +
+                    "topicIdPartition {}", partitionsToComplete.groupId(), 
partitionsToComplete.memberId(), topicIdPartition);
+                return true;
+            } else if (fetchOffsetMetadata.messageOffset < 
endOffsetMetadata.messageOffset) {
+                if (fetchOffsetMetadata.onOlderSegment(endOffsetMetadata)) {
+                    // This can happen when the fetch operation is falling 
behind the current segment or the partition
+                    // has just rolled a new segment.
+                    log.debug("Satisfying delayed share fetch request for 
group {}, member {} immediately since it is fetching older " +
+                        "segments of topicIdPartition {}", 
partitionsToComplete.groupId(), partitionsToComplete.memberId(), 
topicIdPartition);
+                    return true;
+                } else if 
(fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
+                    // we take the partition fetch size as upper bound when 
accumulating the bytes.
+                    long bytesAvailable = 
Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata), 
partitionData.maxBytes);
+                    accumulatedSize += bytesAvailable;
+                }
+            }
+        }
+        return accumulatedSize >= partitionsToComplete.fetchParams().minBytes;
+    }
+
+    private LogOffsetMetadata 
endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) {
+        Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+        LogOffsetSnapshot offsetSnapshot = 
partition.fetchOffsetSnapshot(Optional.empty(), true);
+        // The FetchIsolation type that we use for share fetch is 
FetchIsolation.HIGH_WATERMARK. In the future, we can
+        // extend it to support other FetchIsolation types.
+        FetchIsolation isolationType = 
partitionsToComplete.fetchParams().isolation;
+        if (isolationType == FetchIsolation.LOG_END)
+            return offsetSnapshot.logEndOffset;
+        else if (isolationType == FetchIsolation.HIGH_WATERMARK)
+            return offsetSnapshot.highWatermark;
+        else
+            return offsetSnapshot.lastStableOffset;
+
+    }
+
+    private Map<TopicIdPartition, LogReadResult> 
readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+        Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
+            partitionsToComplete.fetchParams(),
+            CollectionConverters.asScala(
+                topicPartitionData.entrySet().stream().map(entry ->
+                    new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
+            ),
+            QuotaFactory.UNBOUNDED_QUOTA,
+            true);
+
+        Map<TopicIdPartition, LogReadResult> responseData = new HashMap<>();
+        responseLogResult.foreach(tpLogResult -> {
+            responseData.put(tpLogResult._1(), tpLogResult._2());
+            return BoxedUnit.UNIT;
+        });
+
+        log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
+        return responseData;
+    }
+
+    private boolean anyTopicIdPartitionHasLogReadError(Map<TopicIdPartition, 
LogReadResult> replicaManagerReadResponse) {
+        for (LogReadResult logReadResult : 
replicaManagerReadResponse.values()) {
+            if (logReadResult.error().code()  != Errors.NONE.code()) {
+                return true;
+            }
+        }
+        return false;

Review Comment:
   This can be a bit simpler.
   
   ```
   return replicaManagerReadResponse.values().stream()
       .anyMatch(logReadResult -> logReadResult.error().code() != 
Errors.NONE.code());
   ```



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +233,139 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
-        topicIdPartitions.forEach(tp -> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(groupId, tp);
-            if (sharePartition == null) {
-                log.error("Encountered null share partition for groupId={}, 
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), tp);
-                return;
+    private Map<TopicIdPartition, LogReadResult> 
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+        Map<TopicIdPartition, FetchRequest.PartitionData> 
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+        topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+                
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition, partitionData);
             }
+        });
+        if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        // We fetch data from replica manager corresponding to the topic 
partitions that have missing fetch offset metadata.
+        return readFromLog(missingFetchOffsetMetadataTopicPartitions);
+    }
+
+    private void maybeUpdateFetchOffsetMetadata(
+        Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponseData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            LogReadResult replicaManagerLogReadResult = entry.getValue();
+            if (replicaManagerLogReadResult.error().code() != 
Errors.NONE.code()) {
+                log.debug("Replica manager read log result {} errored out for 
topic partition {}",
+                    replicaManagerLogReadResult, topicIdPartition);
+                continue;
+            }
+            
sharePartition.updateFetchOffsetMetadata(Optional.of(replicaManagerLogReadResult.info().fetchOffsetMetadata));
+        }
+    }
+
+    private boolean isMinBytesSatisfied(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
+        long accumulatedSize = 0;
+        for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : 
topicPartitionData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            FetchRequest.PartitionData partitionData = entry.getValue();
+            LogOffsetMetadata endOffsetMetadata = 
endOffsetMetadataForTopicPartition(topicIdPartition);
+
+            if (endOffsetMetadata == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+                continue;
+
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+
+            Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = 
sharePartition.fetchOffsetMetadata();
+            if (optionalFetchOffsetMetadata.isEmpty() || 
optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+                continue;
+            LogOffsetMetadata fetchOffsetMetadata = 
optionalFetchOffsetMetadata.get();
+
+            if (fetchOffsetMetadata.messageOffset > 
endOffsetMetadata.messageOffset) {
+                log.debug("Satisfying delayed share fetch request for group 
{}, member {} since it is fetching later segments of " +
+                    "topicIdPartition {}", partitionsToComplete.groupId(), 
partitionsToComplete.memberId(), topicIdPartition);
+                return true;
+            } else if (fetchOffsetMetadata.messageOffset < 
endOffsetMetadata.messageOffset) {
+                if (fetchOffsetMetadata.onOlderSegment(endOffsetMetadata)) {
+                    // This can happen when the fetch operation is falling 
behind the current segment or the partition
+                    // has just rolled a new segment.
+                    log.debug("Satisfying delayed share fetch request for 
group {}, member {} immediately since it is fetching older " +
+                        "segments of topicIdPartition {}", 
partitionsToComplete.groupId(), partitionsToComplete.memberId(), 
topicIdPartition);
+                    return true;
+                } else if 
(fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
+                    // we take the partition fetch size as upper bound when 
accumulating the bytes.
+                    long bytesAvailable = 
Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata), 
partitionData.maxBytes);
+                    accumulatedSize += bytesAvailable;
+                }
+            }
+        }
+        return accumulatedSize >= partitionsToComplete.fetchParams().minBytes;
+    }
+
+    private LogOffsetMetadata 
endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) {
+        Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+        LogOffsetSnapshot offsetSnapshot = 
partition.fetchOffsetSnapshot(Optional.empty(), true);
+        // The FetchIsolation type that we use for share fetch is 
FetchIsolation.HIGH_WATERMARK. In the future, we can
+        // extend it to support other FetchIsolation types.
+        FetchIsolation isolationType = 
partitionsToComplete.fetchParams().isolation;
+        if (isolationType == FetchIsolation.LOG_END)
+            return offsetSnapshot.logEndOffset;
+        else if (isolationType == FetchIsolation.HIGH_WATERMARK)
+            return offsetSnapshot.highWatermark;
+        else
+            return offsetSnapshot.lastStableOffset;
+
+    }
+
+    private Map<TopicIdPartition, LogReadResult> 
readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+        Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
+            partitionsToComplete.fetchParams(),
+            CollectionConverters.asScala(
+                topicPartitionData.entrySet().stream().map(entry ->
+                    new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
+            ),
+            QuotaFactory.UNBOUNDED_QUOTA,
+            true);
+
+        Map<TopicIdPartition, LogReadResult> responseData = new HashMap<>();
+        responseLogResult.foreach(tpLogResult -> {
+            responseData.put(tpLogResult._1(), tpLogResult._2());
+            return BoxedUnit.UNIT;
+        });
+
+        log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
+        return responseData;
+    }
+
+    private boolean anyTopicIdPartitionHasLogReadError(Map<TopicIdPartition, 
LogReadResult> replicaManagerReadResponse) {

Review Comment:
   anyTopicIdPartitionHasLogReadError => anyPartitionHasLogReadError ?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +147,49 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty()) {
+        try {
+            if (!topicPartitionData.isEmpty()) {
+                // In case, fetch offset metadata doesn't exist for one or 
more topic partitions, we do a
+                // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
+                // those topic partitions.
+                Map<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
+                maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse);
+                if 
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData)) {
+                    partitionsAcquired = topicPartitionData;
+                    partitionsAlreadyFetched = replicaManagerReadResponse;
+                    boolean completedByMe = forceComplete();
+                    // If invocation of forceComplete is not successful, then 
that means the request is already completed
+                    // hence release the acquired locks.
+                    if (!completedByMe) {
+                        releasePartitionLocks(partitionsAcquired.keySet());
+                        partitionsAlreadyFetched.clear();
+                        partitionsAcquired.clear();
+                    }
+                    return completedByMe;
+                } else {
+                    log.debug("minBytes is not satisfied for the share fetch 
request for group {}, member {}, " +
+                            "topic partitions {}", 
partitionsToComplete.groupId(), partitionsToComplete.memberId(),
+                        sharePartitions.keySet());
+                    releasePartitionLocks(topicPartitionData.keySet());
+                }
+            } else {
+                log.trace("Can't acquire records for any partition in the 
share fetch request for group {}, member {}, " +
+                        "topic partitions {}", partitionsToComplete.groupId(), 
partitionsToComplete.memberId(),
+                    sharePartitions.keySet());
+            }
+            return false;
+        } catch (Exception e) {
+            log.error("Error processing delayed share fetch request", e);
             boolean completedByMe = forceComplete();
             // If invocation of forceComplete is not successful, then that 
means the request is already completed
             // hence release the acquired locks.
             if (!completedByMe) {
-                releasePartitionLocks(shareFetchData.groupId(), 
topicPartitionDataFromTryComplete.keySet());
+                releasePartitionLocks(topicPartitionData.keySet());

Review Comment:
   This part of the exception is still quite confusing to me. If we hit the 
exception, should we release the locks before calling forceComplete()? 
Otherwise, `forceComplete` will try to acquire the same locks again, but can't. 
It will also help to narrow the try/catch to where the exception can be thrown.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -574,6 +575,7 @@ void processShareFetch(ShareFetchData shareFetchData) {
         // Initialize lazily, if required.
         Map<TopicIdPartition, Throwable> erroneous = null;
         Set<DelayedShareFetchKey> delayedShareFetchWatchKeys = new HashSet<>();
+        Map<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();

Review Comment:
   Why is this a LinkedHashMap instead of just a HashMap?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +233,139 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
-        topicIdPartitions.forEach(tp -> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(groupId, tp);
-            if (sharePartition == null) {
-                log.error("Encountered null share partition for groupId={}, 
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), tp);
-                return;
+    private Map<TopicIdPartition, LogReadResult> 
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+        Map<TopicIdPartition, FetchRequest.PartitionData> 
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();

Review Comment:
   missingFetchOffsetMetadataTopicPartitions => 
partitionsMissingFetchOffsetMetadata?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -80,64 +89,56 @@ public void onExpiration() {
     @Override
     public void onComplete() {
         log.trace("Completing the delayed share fetch request for group {}, 
member {}, "
-            + "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
-            topicPartitionDataFromTryComplete.keySet());
+            + "topic partitions {}", partitionsToComplete.groupId(), 
partitionsToComplete.memberId(),
+            partitionsAcquired.keySet());
 
-        if (shareFetchData.future().isDone())
+        if (partitionsToComplete.future().isDone())
             return;
 
         Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
         // tryComplete did not invoke forceComplete, so we need to check if we 
have any partitions to fetch.
-        if (topicPartitionDataFromTryComplete.isEmpty())
+        if (partitionsAcquired.isEmpty())
             topicPartitionData = acquirablePartitions();
         // tryComplete invoked forceComplete, so we can use the data from 
tryComplete.
         else
-            topicPartitionData = topicPartitionDataFromTryComplete;
+            topicPartitionData = partitionsAcquired;
 
         if (topicPartitionData.isEmpty()) {
             // No locks for share partitions could be acquired, so we complete 
the request with an empty response.
-            shareFetchData.future().complete(Collections.emptyMap());
+            partitionsToComplete.future().complete(Collections.emptyMap());
             return;
         }
         log.trace("Fetchable share partitions data: {} with groupId: {} fetch 
params: {}",
-            topicPartitionData, shareFetchData.groupId(), 
shareFetchData.fetchParams());
+            topicPartitionData, partitionsToComplete.groupId(), 
partitionsToComplete.fetchParams());
 
         try {
-            Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
-                shareFetchData.fetchParams(),
-                CollectionConverters.asScala(
-                    topicPartitionData.entrySet().stream().map(entry ->
-                        new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
-                ),
-                QuotaFactory.UNBOUNDED_QUOTA,
-                true);
-
-            Map<TopicIdPartition, FetchPartitionData> responseData = new 
HashMap<>();
-            responseLogResult.foreach(tpLogResult -> {
-                TopicIdPartition topicIdPartition = tpLogResult._1();
-                LogReadResult logResult = tpLogResult._2();
-                FetchPartitionData fetchPartitionData = 
logResult.toFetchPartitionData(false);
-                responseData.put(topicIdPartition, fetchPartitionData);
-                return BoxedUnit.UNIT;
-            });
-
-            log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
-            Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result 
=
-                ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData, sharePartitionManager, replicaManager);
-            shareFetchData.future().complete(result);
+            Map<TopicIdPartition, LogReadResult> responseData;
+            if (partitionsAlreadyFetched.isEmpty())
+                responseData = readFromLog(topicPartitionData);
+            else
+                // There shouldn't be a case when we have a 
partitionsAlreadyFetched value here and this variable is getting
+                // updated in a different tryComplete thread.

Review Comment:
   Hmm, Apoorv had a good point in his comment 
(https://github.com/apache/kafka/pull/17539#discussion_r1828579315). There 
seems to be a potential problem. It's possible that thread1 calls 
`tryComplete`, finds `completed` to be false, and is about to set 
`partitionsAlreadyFetched`. The expiration thread then calls `forceComplete` 
and sets `completed` to true and proceeds to here. Now, thread1 continues and 
updates `partitionsAlreadyFetched`. The expiration thread will pick up the 
wrong `partitionsAlreadyFetched`.



##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -64,11 +64,7 @@ static Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData> processFetchR
             TopicIdPartition topicIdPartition = entry.getKey();
             FetchPartitionData fetchPartitionData = entry.getValue();
 
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(shareFetchData.groupId(), 
topicIdPartition);
-            if (sharePartition == null) {
-                log.error("Encountered null share partition for groupId={}, 
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
-                continue;
-            }
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);

Review Comment:
   This is an existing issue. In the line below, we are logging for each 
partition, but shareFetchData contains the full request.
   
   ```
                   log.trace("Acquired records for topicIdPartition: {} with 
share fetch data: {}, records: {}",
                       topicIdPartition, shareFetchData, shareAcquiredRecords);
   ```



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -107,23 +109,130 @@ public void 
testDelayedShareFetchTryCompleteReturnsFalse() {
         when(sp0.maybeAcquireFetchLock()).thenReturn(true);
         when(sp1.maybeAcquireFetchLock()).thenReturn(true);
 
-        SharePartitionManager sharePartitionManager = 
mock(SharePartitionManager.class);
-        when(sharePartitionManager.sharePartition(groupId, 
tp0)).thenReturn(sp0);
-        when(sharePartitionManager.sharePartition(groupId, 
tp1)).thenReturn(sp1);
+        Map<TopicIdPartition, SharePartition> sharePartitions = new 
HashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
 
         ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
 
         when(sp0.canAcquireRecords()).thenReturn(false);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        DelayedShareFetch delayedShareFetch = 
DelayedShareFetchBuilder.builder()
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetchData)
-            .withSharePartitionManager(sharePartitionManager)
-            .build();
+            .withSharePartitions(sharePartitions)
+            .build());
 
         // Since there is no partition that can be acquired, tryComplete 
should return false.
         assertFalse(delayedShareFetch.tryComplete());
         assertFalse(delayedShareFetch.isCompleted());
+        Mockito.verify(delayedShareFetch, 
times(0)).releasePartitionLocks(any());
+    }
+
+    @Test
+    public void 
testTryCompleteReturnsFalseWhenMinBytesNotSatisfiedOnFirstFetch() {
+        String groupId = "grp";
+        Uuid topicId = Uuid.randomUuid();
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(topicId, new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(topicId, new 
TopicPartition("foo", 1));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+        partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+        partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+
+        Map<TopicIdPartition, SharePartition> sharePartitions = new 
HashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        ShareFetchData shareFetchData = new ShareFetchData(
+            new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+                2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
+            new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
+
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+        when(sp0.acquire(any(), anyInt(), any())).thenReturn(
+            ShareAcquiredRecords.fromAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // We are testing the case when the share partition is getting fetched 
for the first time, so for the first time
+        // the fetchOffsetMetadata will return empty. Post the readFromLog 
call, the fetchOffsetMetadata will be
+        // populated for the share partition, which has 1 as the positional 
difference, so it doesn't satisfy the minBytes(2).
+        when(sp0.fetchOffsetMetadata())
+            .thenReturn(Optional.empty())
+            .thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
+        LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 1);
+        mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
+
+        doAnswer(invocation -> 
buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetchData)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .build());
+        assertFalse(delayedShareFetch.isCompleted());
+
+        // Since sp1 cannot be acquired, tryComplete should return false.
+        assertFalse(delayedShareFetch.tryComplete());
+        assertFalse(delayedShareFetch.isCompleted());
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(any());
+    }
+
+    @Test
+    public void 
testTryCompleteReturnsFalseWhenMinBytesNotSatisfiedOnLatestFetch() {

Review Comment:
   testTryCompleteReturnsFalseWhenMinBytesNotSatisfiedOnLatestFetch =>
   testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch ?



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -107,23 +109,130 @@ public void 
testDelayedShareFetchTryCompleteReturnsFalse() {
         when(sp0.maybeAcquireFetchLock()).thenReturn(true);
         when(sp1.maybeAcquireFetchLock()).thenReturn(true);
 
-        SharePartitionManager sharePartitionManager = 
mock(SharePartitionManager.class);
-        when(sharePartitionManager.sharePartition(groupId, 
tp0)).thenReturn(sp0);
-        when(sharePartitionManager.sharePartition(groupId, 
tp1)).thenReturn(sp1);
+        Map<TopicIdPartition, SharePartition> sharePartitions = new 
HashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
 
         ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
 
         when(sp0.canAcquireRecords()).thenReturn(false);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        DelayedShareFetch delayedShareFetch = 
DelayedShareFetchBuilder.builder()
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetchData)
-            .withSharePartitionManager(sharePartitionManager)
-            .build();
+            .withSharePartitions(sharePartitions)
+            .build());
 
         // Since there is no partition that can be acquired, tryComplete 
should return false.
         assertFalse(delayedShareFetch.tryComplete());
         assertFalse(delayedShareFetch.isCompleted());
+        Mockito.verify(delayedShareFetch, 
times(0)).releasePartitionLocks(any());
+    }
+
+    @Test
+    public void 
testTryCompleteReturnsFalseWhenMinBytesNotSatisfiedOnFirstFetch() {

Review Comment:
   testTryCompleteReturnsFalseWhenMinBytesNotSatisfiedOnFirstFetch => 
testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to