junrao commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1813657687


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -89,25 +91,45 @@ public void onComplete() {
             return;
 
         Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
-        Map<TopicIdPartition, FetchPartitionData> fetchResponseData;
-        // tryComplete did not invoke forceComplete, so we need to get replica 
manager response data.
-        if (replicaManagerFetchDataFromTryComplete.isEmpty()) {
+        // tryComplete did not invoke forceComplete, so we need to check if we 
have any partitions to fetch.
+        if (topicPartitionDataFromTryComplete.isEmpty())
             topicPartitionData = acquirablePartitions();
-            if (topicPartitionData.isEmpty()) {
-                // No locks for share partitions could be acquired, so we 
complete the request with an empty response.
-                shareFetchData.future().complete(Collections.emptyMap());
-                return;
-            }
-            fetchResponseData = replicaManagerFetchData(topicPartitionData, 
true);
-        } else {
-            // tryComplete invoked forceComplete, so we can use the topic 
partitions data and replica manager response data from tryComplete.
+        // tryComplete invoked forceComplete, so we can use the data from 
tryComplete.
+        else
             topicPartitionData = topicPartitionDataFromTryComplete;
-            fetchResponseData = replicaManagerFetchDataFromTryComplete;
+
+        if (topicPartitionData.isEmpty()) {
+            // No locks for share partitions could be acquired, so we complete 
the request with an empty response.
+            shareFetchData.future().complete(Collections.emptyMap());
+            return;
         }
+        log.trace("Fetchable share partitions data: {} with groupId: {} fetch 
params: {}",
+                topicPartitionData, shareFetchData.groupId(), 
shareFetchData.fetchParams());
 
         try {
+            Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
+                shareFetchData.fetchParams(),
+                CollectionConverters.asScala(
+                    topicPartitionData.entrySet().stream().map(entry ->
+                        new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
+                ),
+                QuotaFactory.UnboundedQuota$.MODULE$,
+                true);
+
+            Map<TopicIdPartition, FetchPartitionData> responseData = new 
HashMap<>();
+            Map<TopicIdPartition, LogOffsetMetadata> fetchOffsetMetadata = new 
HashMap<>();
+            responseLogResult.foreach(tpLogResult -> {
+                TopicIdPartition topicIdPartition = tpLogResult._1();
+                LogReadResult logResult = tpLogResult._2();
+                FetchPartitionData fetchPartitionData = 
logResult.toFetchPartitionData(false);
+                responseData.put(topicIdPartition, fetchPartitionData);
+                fetchOffsetMetadata.put(topicIdPartition, 
logResult.info().fetchOffsetMetadata);
+                return BoxedUnit.UNIT;
+            });
+
+            log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
             Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result 
=
-                ShareFetchUtils.processFetchResponse(shareFetchData, 
fetchResponseData, sharePartitionManager, replicaManager);
+                ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData, fetchOffsetMetadata, sharePartitionManager, replicaManager);

Review Comment:
   It's kind of late to set the offset metadata here since we will acquire the 
fetch records and move the next fetch offset. This means the cached offset 
metadata doesn't match the next fetch offset.
   
   I was thinking of doing the following. In `tryComplete`, if the offset 
metadata doesn't exist, we call `replicaManager.readFromLog` to populate the 
offset metadata. We can then proceed with the minByte estimation. If 
SharePartition.endOffset moves, we invalidate the offset metadata.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -197,58 +219,25 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    /**
-     * Prepare partitions fetch data structure for acquirable partitions in 
the share fetch request satisfying minBytes criteria.
-     */
-    Map<TopicIdPartition, FetchPartitionData> 
replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData,
-                                                                      boolean 
hasRequestTimedOut) {
-        log.trace("Fetchable share partitions data: {} with groupId: {} fetch 
params: {}", topicPartitionData,
-            shareFetchData.groupId(), shareFetchData.fetchParams());
-        boolean minBytesSatisfied = false;
-        Map<TopicIdPartition, FetchPartitionData> responseData = new 
HashMap<>();
-        try {
-            Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
-                shareFetchData.fetchParams(),
-                CollectionConverters.asScala(
-                    topicPartitionData.entrySet().stream().map(entry ->
-                        new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
-                ),
-                QuotaFactory.UnboundedQuota$.MODULE$,
-                true);
-
-            AtomicLong accumulatedBytes = new AtomicLong(0);
-
-            responseLogResult.foreach(tpLogResult -> {
-                TopicIdPartition topicIdPartition = tpLogResult._1();
-                LogReadResult logResult = tpLogResult._2();
-                FetchPartitionData fetchPartitionData = 
logResult.toFetchPartitionData(false);
-                responseData.put(topicIdPartition, fetchPartitionData);
-                
accumulatedBytes.addAndGet(logResult.info().records.sizeInBytes());
-                return BoxedUnit.UNIT;
-            });
-            log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
-
-            if (accumulatedBytes.get() >= 
shareFetchData.fetchParams().minBytes)
-                minBytesSatisfied = true;
-        } catch (Exception e) {
-            log.error("Error processing delayed share fetch request", e);
-        } finally {
-            // The case when we cannot satisfy the share fetch requests 
because the response has lesser data than minBytes
-            // and the call is coming from tryComplete, hence we want to 
release partitions lock so that the next
-            // tryComplete/onComplete call complete successfully.
-            if (!minBytesSatisfied && !hasRequestTimedOut) {
-                // Releasing the lock to move ahead with the next request in 
queue.
-                releasePartitionLocks(shareFetchData.groupId(), 
topicPartitionData.keySet());
+    boolean isMinBytesCriteriaSatisfied(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
+        AtomicLong accumulatedSize = new AtomicLong(0);
+        topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+            Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+            LogOffsetSnapshot offsetSnapshot = 
partition.fetchOffsetSnapshot(Optional.empty(), true);
+            LogOffsetMetadata endOffsetMetadata = offsetSnapshot.highWatermark;

Review Comment:
   We need to check `shareFetchData.fetchParams().isolation` to decide whether 
to use HWM or lastStableOffset.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -197,58 +219,25 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    /**
-     * Prepare partitions fetch data structure for acquirable partitions in 
the share fetch request satisfying minBytes criteria.
-     */
-    Map<TopicIdPartition, FetchPartitionData> 
replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData,
-                                                                      boolean 
hasRequestTimedOut) {
-        log.trace("Fetchable share partitions data: {} with groupId: {} fetch 
params: {}", topicPartitionData,
-            shareFetchData.groupId(), shareFetchData.fetchParams());
-        boolean minBytesSatisfied = false;
-        Map<TopicIdPartition, FetchPartitionData> responseData = new 
HashMap<>();
-        try {
-            Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
-                shareFetchData.fetchParams(),
-                CollectionConverters.asScala(
-                    topicPartitionData.entrySet().stream().map(entry ->
-                        new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
-                ),
-                QuotaFactory.UnboundedQuota$.MODULE$,
-                true);
-
-            AtomicLong accumulatedBytes = new AtomicLong(0);
-
-            responseLogResult.foreach(tpLogResult -> {
-                TopicIdPartition topicIdPartition = tpLogResult._1();
-                LogReadResult logResult = tpLogResult._2();
-                FetchPartitionData fetchPartitionData = 
logResult.toFetchPartitionData(false);
-                responseData.put(topicIdPartition, fetchPartitionData);
-                
accumulatedBytes.addAndGet(logResult.info().records.sizeInBytes());
-                return BoxedUnit.UNIT;
-            });
-            log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
-
-            if (accumulatedBytes.get() >= 
shareFetchData.fetchParams().minBytes)
-                minBytesSatisfied = true;
-        } catch (Exception e) {
-            log.error("Error processing delayed share fetch request", e);
-        } finally {
-            // The case when we cannot satisfy the share fetch requests 
because the response has lesser data than minBytes
-            // and the call is coming from tryComplete, hence we want to 
release partitions lock so that the next
-            // tryComplete/onComplete call complete successfully.
-            if (!minBytesSatisfied && !hasRequestTimedOut) {
-                // Releasing the lock to move ahead with the next request in 
queue.
-                releasePartitionLocks(shareFetchData.groupId(), 
topicPartitionData.keySet());
+    boolean isMinBytesCriteriaSatisfied(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
+        AtomicLong accumulatedSize = new AtomicLong(0);
+        topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+            Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+            LogOffsetSnapshot offsetSnapshot = 
partition.fetchOffsetSnapshot(Optional.empty(), true);
+            LogOffsetMetadata endOffsetMetadata = offsetSnapshot.highWatermark;
+            SharePartition sharePartition = 
sharePartitionManager.sharePartition(shareFetchData.groupId(), 
topicIdPartition);
+            if (sharePartition == null) {
+                log.error("Encountered null share partition for groupId={}, 
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+                return;
             }
-        }
-        // We can return the replica manager fetch response for the following 
2 cases -
-        // 1. This call is coming from onComplete, hence we return the 
response data irrespective of whether minBytes is
-        // satisfied or not.
-        // 2. Return the response if minBytes criteria is satisfied (request 
is coming from tryComplete).
-        if (hasRequestTimedOut || minBytesSatisfied)
-            return responseData;
-        // Return an empty map if replica manager fetch does not satisfy 
minBytes (request is coming from tryComplete).
-        return Collections.emptyMap();
+
+            // we take the partition fetch size as upper bound when 
accumulating the bytes (skip if a throttled partition)
+            long bytesAvailable = sharePartition.latestFetchOffsetMetadata() 
== null ?
+                Math.min(endOffsetMetadata.relativePositionInSegment, 
partitionData.maxBytes) :
+                
Math.min(endOffsetMetadata.positionDiff(sharePartition.latestFetchOffsetMetadata()),
 partitionData.maxBytes);

Review Comment:
   We need to check if the two offset metadata are on the same segment first 
before using `positionDiff`. Also, we need to handle the case when HWM doesn't 
have offset metadata. We can just follow the logic in `DelayedFetch`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to