adixitconfluent commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1806729605


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -86,57 +86,37 @@ public void onComplete() {
         if (shareFetchData.future().isDone())
             return;
 
-        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
-        // tryComplete did not invoke forceComplete, so we need to check if we 
have any partitions to fetch.
-        if (topicPartitionDataFromTryComplete.isEmpty())
-            topicPartitionData = acquirablePartitions();
-        // tryComplete invoked forceComplete, so we can use the data from 
tryComplete.
-        else
-            topicPartitionData = topicPartitionDataFromTryComplete;
-
-        if (topicPartitionData.isEmpty()) {
-            // No locks for share partitions could be acquired, so we complete 
the request with an empty response.
-            shareFetchData.future().complete(Collections.emptyMap());
-            return;
+        Map<TopicIdPartition, FetchPartitionData> fetchResponseData;
+        // tryComplete did not invoke forceComplete, so we need to get replica 
manager response data.
+        if (replicaManagerFetchDataFromTryComplete.isEmpty()) {
+            Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData = acquirablePartitions();
+            if (topicPartitionData.isEmpty()) {
+                // No locks for share partitions could be acquired, so we 
complete the request with an empty response.
+                shareFetchData.future().complete(Collections.emptyMap());
+                return;
+            }
+            fetchResponseData = replicaManagerFetchData(topicPartitionData, 
true);
+        } else {
+            // tryComplete invoked forceComplete, so we can use the replica 
manager response data from tryComplete.
+            fetchResponseData = replicaManagerFetchDataFromTryComplete;
         }
-        log.trace("Fetchable share partitions data: {} with groupId: {} fetch 
params: {}",
-                topicPartitionData, shareFetchData.groupId(), 
shareFetchData.fetchParams());
 
         try {
-            Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
-                shareFetchData.fetchParams(),
-                CollectionConverters.asScala(
-                    topicPartitionData.entrySet().stream().map(entry ->
-                        new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
-                ),
-                QuotaFactory.UnboundedQuota$.MODULE$,
-                true);
-
-            Map<TopicIdPartition, FetchPartitionData> responseData = new 
HashMap<>();
-            responseLogResult.foreach(tpLogResult -> {
-                TopicIdPartition topicIdPartition = tpLogResult._1();
-                LogReadResult logResult = tpLogResult._2();
-                FetchPartitionData fetchPartitionData = 
logResult.toFetchPartitionData(false);
-                responseData.put(topicIdPartition, fetchPartitionData);
-                return BoxedUnit.UNIT;
-            });
-
-            log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
             Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result 
=
-                ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData, sharePartitionManager, replicaManager);
+                ShareFetchUtils.processFetchResponse(shareFetchData, 
fetchResponseData, sharePartitionManager, replicaManager);

Review Comment:
   yeah, since `processFetchResponse` can handle it, that's why I didn't add 
any check here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to