adixitconfluent commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1806735371


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -86,57 +86,37 @@ public void onComplete() {
         if (shareFetchData.future().isDone())
             return;
 
-        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
-        // tryComplete did not invoke forceComplete, so we need to check if we 
have any partitions to fetch.
-        if (topicPartitionDataFromTryComplete.isEmpty())
-            topicPartitionData = acquirablePartitions();
-        // tryComplete invoked forceComplete, so we can use the data from 
tryComplete.
-        else
-            topicPartitionData = topicPartitionDataFromTryComplete;
-
-        if (topicPartitionData.isEmpty()) {
-            // No locks for share partitions could be acquired, so we complete 
the request with an empty response.
-            shareFetchData.future().complete(Collections.emptyMap());
-            return;
+        Map<TopicIdPartition, FetchPartitionData> fetchResponseData;
+        // tryComplete did not invoke forceComplete, so we need to get replica 
manager response data.
+        if (replicaManagerFetchDataFromTryComplete.isEmpty()) {
+            Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData = acquirablePartitions();
+            if (topicPartitionData.isEmpty()) {
+                // No locks for share partitions could be acquired, so we 
complete the request with an empty response.
+                shareFetchData.future().complete(Collections.emptyMap());
+                return;
+            }
+            fetchResponseData = replicaManagerFetchData(topicPartitionData, 
true);
+        } else {
+            // tryComplete invoked forceComplete, so we can use the replica 
manager response data from tryComplete.
+            fetchResponseData = replicaManagerFetchDataFromTryComplete;
         }
-        log.trace("Fetchable share partitions data: {} with groupId: {} fetch 
params: {}",
-                topicPartitionData, shareFetchData.groupId(), 
shareFetchData.fetchParams());
 
         try {
-            Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
-                shareFetchData.fetchParams(),
-                CollectionConverters.asScala(
-                    topicPartitionData.entrySet().stream().map(entry ->
-                        new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
-                ),
-                QuotaFactory.UnboundedQuota$.MODULE$,
-                true);
-
-            Map<TopicIdPartition, FetchPartitionData> responseData = new 
HashMap<>();
-            responseLogResult.foreach(tpLogResult -> {
-                TopicIdPartition topicIdPartition = tpLogResult._1();
-                LogReadResult logResult = tpLogResult._2();
-                FetchPartitionData fetchPartitionData = 
logResult.toFetchPartitionData(false);
-                responseData.put(topicIdPartition, fetchPartitionData);
-                return BoxedUnit.UNIT;
-            });
-
-            log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
             Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result 
=
-                ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData, sharePartitionManager, replicaManager);
+                ShareFetchUtils.processFetchResponse(shareFetchData, 
fetchResponseData, sharePartitionManager, replicaManager);
             shareFetchData.future().complete(result);
         } catch (Exception e) {
             log.error("Error processing delayed share fetch request", e);
             shareFetchData.future().completeExceptionally(e);
         } finally {
             // Releasing the lock to move ahead with the next request in queue.
-            releasePartitionLocks(shareFetchData.groupId(), 
topicPartitionData.keySet());
+            releasePartitionLocks(shareFetchData.groupId(), 
fetchResponseData.keySet());

Review Comment:
   for that case, even when the data is not received from replica manager, the 
fetchResponseData should still have keys as the locked topic partitions and 
values as empty data, so it should work. Am I wrong in that understanding?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to