apoorvmittal10 commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1809360360


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +191,62 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
+    /**
+     * Prepare partitions fetch data structure for acquirable partitions in 
the share fetch request satisfying minBytes criteria.
+     */
+    Map<TopicIdPartition, FetchPartitionData> 
replicaManagerFetchData(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData,
+                                                                      boolean 
hasRequestTimedOut) {
+        log.trace("Fetchable share partitions data: {} with groupId: {} fetch 
params: {}", topicPartitionData,
+            shareFetchData.groupId(), shareFetchData.fetchParams());
+        boolean minBytesSatisfied = false;
+        Map<TopicIdPartition, FetchPartitionData> responseData = new 
HashMap<>();
+        try {
+            Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
+                shareFetchData.fetchParams(),
+                CollectionConverters.asScala(
+                    topicPartitionData.entrySet().stream().map(entry ->
+                        new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
+                ),
+                QuotaFactory.UnboundedQuota$.MODULE$,
+                true);
+
+            AtomicInteger accumulatedBytes = new AtomicInteger(0);

Review Comment:
   Should it be AtomicLong?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -153,13 +133,20 @@ public boolean tryComplete() {
             shareFetchData.groupId(), shareFetchData.memberId(),
             shareFetchData.partitionMaxBytes().keySet());
 
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty())
-            return forceComplete();
-        log.info("Can't acquire records for any partition in the share fetch 
request for group {}, member {}, " +
-                "topic partitions {}", shareFetchData.groupId(),
-                shareFetchData.memberId(), 
shareFetchData.partitionMaxBytes().keySet());
+        if (!topicPartitionData.isEmpty()) {
+            replicaManagerFetchDataFromTryComplete = 
replicaManagerFetchData(topicPartitionData, false);
+            if (!replicaManagerFetchDataFromTryComplete.isEmpty())
+                return forceComplete();
+            log.info("minBytes is not satisfied for the share fetch request 
for group {}, member {}, " +
+                "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
+                shareFetchData.partitionMaxBytes().keySet());
+        } else {
+            log.info("Can't acquire records for any partition in the share 
fetch request for group {}, member {}, " +

Review Comment:
   Wouldn't this be too common? Should we move it trace?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -153,13 +133,20 @@ public boolean tryComplete() {
             shareFetchData.groupId(), shareFetchData.memberId(),
             shareFetchData.partitionMaxBytes().keySet());
 
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty())
-            return forceComplete();
-        log.info("Can't acquire records for any partition in the share fetch 
request for group {}, member {}, " +
-                "topic partitions {}", shareFetchData.groupId(),
-                shareFetchData.memberId(), 
shareFetchData.partitionMaxBytes().keySet());
+        if (!topicPartitionData.isEmpty()) {
+            replicaManagerFetchDataFromTryComplete = 
replicaManagerFetchData(topicPartitionData, false);
+            if (!replicaManagerFetchDataFromTryComplete.isEmpty())
+                return forceComplete();
+            log.info("minBytes is not satisfied for the share fetch request 
for group {}, member {}, " +

Review Comment:
   info seems to be too much here, how helpful this log would be in info mode? 
Can we move it to debug please.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -86,57 +86,37 @@ public void onComplete() {
         if (shareFetchData.future().isDone())
             return;
 
-        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
-        // tryComplete did not invoke forceComplete, so we need to check if we 
have any partitions to fetch.
-        if (topicPartitionDataFromTryComplete.isEmpty())
-            topicPartitionData = acquirablePartitions();
-        // tryComplete invoked forceComplete, so we can use the data from 
tryComplete.
-        else
-            topicPartitionData = topicPartitionDataFromTryComplete;
-
-        if (topicPartitionData.isEmpty()) {
-            // No locks for share partitions could be acquired, so we complete 
the request with an empty response.
-            shareFetchData.future().complete(Collections.emptyMap());
-            return;
+        Map<TopicIdPartition, FetchPartitionData> fetchResponseData;
+        // tryComplete did not invoke forceComplete, so we need to get replica 
manager response data.
+        if (replicaManagerFetchDataFromTryComplete.isEmpty()) {
+            Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData = acquirablePartitions();
+            if (topicPartitionData.isEmpty()) {
+                // No locks for share partitions could be acquired, so we 
complete the request with an empty response.
+                shareFetchData.future().complete(Collections.emptyMap());
+                return;
+            }
+            fetchResponseData = replicaManagerFetchData(topicPartitionData, 
true);
+        } else {
+            // tryComplete invoked forceComplete, so we can use the replica 
manager response data from tryComplete.
+            fetchResponseData = replicaManagerFetchDataFromTryComplete;
         }
-        log.trace("Fetchable share partitions data: {} with groupId: {} fetch 
params: {}",
-                topicPartitionData, shareFetchData.groupId(), 
shareFetchData.fetchParams());
 
         try {
-            Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
-                shareFetchData.fetchParams(),
-                CollectionConverters.asScala(
-                    topicPartitionData.entrySet().stream().map(entry ->
-                        new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
-                ),
-                QuotaFactory.UnboundedQuota$.MODULE$,
-                true);
-
-            Map<TopicIdPartition, FetchPartitionData> responseData = new 
HashMap<>();
-            responseLogResult.foreach(tpLogResult -> {
-                TopicIdPartition topicIdPartition = tpLogResult._1();
-                LogReadResult logResult = tpLogResult._2();
-                FetchPartitionData fetchPartitionData = 
logResult.toFetchPartitionData(false);
-                responseData.put(topicIdPartition, fetchPartitionData);
-                return BoxedUnit.UNIT;
-            });
-
-            log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
             Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result 
=
-                ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData, sharePartitionManager, replicaManager);
+                ShareFetchUtils.processFetchResponse(shareFetchData, 
fetchResponseData, sharePartitionManager, replicaManager);
             shareFetchData.future().complete(result);
         } catch (Exception e) {
             log.error("Error processing delayed share fetch request", e);
             shareFetchData.future().completeExceptionally(e);
         } finally {
             // Releasing the lock to move ahead with the next request in queue.
-            releasePartitionLocks(shareFetchData.groupId(), 
topicPartitionData.keySet());
+            releasePartitionLocks(shareFetchData.groupId(), 
fetchResponseData.keySet());

Review Comment:
   Will it not depend on `readFromLog` API response i.e. if you sent 3 
partitions then is it guaranteed that replica manager will return all 3 
partitions in response?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to