adixitconfluent commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1830461947


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -80,64 +89,56 @@ public void onExpiration() {
     @Override
     public void onComplete() {
         log.trace("Completing the delayed share fetch request for group {}, 
member {}, "
-            + "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
-            topicPartitionDataFromTryComplete.keySet());
+            + "topic partitions {}", partitionsToComplete.groupId(), 
partitionsToComplete.memberId(),
+            partitionsAcquired.keySet());
 
-        if (shareFetchData.future().isDone())
+        if (partitionsToComplete.future().isDone())
             return;
 
         Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
         // tryComplete did not invoke forceComplete, so we need to check if we 
have any partitions to fetch.
-        if (topicPartitionDataFromTryComplete.isEmpty())
+        if (partitionsAcquired.isEmpty())
             topicPartitionData = acquirablePartitions();
         // tryComplete invoked forceComplete, so we can use the data from 
tryComplete.
         else
-            topicPartitionData = topicPartitionDataFromTryComplete;
+            topicPartitionData = partitionsAcquired;
 
         if (topicPartitionData.isEmpty()) {
             // No locks for share partitions could be acquired, so we complete 
the request with an empty response.
-            shareFetchData.future().complete(Collections.emptyMap());
+            partitionsToComplete.future().complete(Collections.emptyMap());
             return;
         }
         log.trace("Fetchable share partitions data: {} with groupId: {} fetch 
params: {}",
-            topicPartitionData, shareFetchData.groupId(), 
shareFetchData.fetchParams());
+            topicPartitionData, partitionsToComplete.groupId(), 
partitionsToComplete.fetchParams());
 
         try {
-            Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
-                shareFetchData.fetchParams(),
-                CollectionConverters.asScala(
-                    topicPartitionData.entrySet().stream().map(entry ->
-                        new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
-                ),
-                QuotaFactory.UNBOUNDED_QUOTA,
-                true);
-
-            Map<TopicIdPartition, FetchPartitionData> responseData = new 
HashMap<>();
-            responseLogResult.foreach(tpLogResult -> {
-                TopicIdPartition topicIdPartition = tpLogResult._1();
-                LogReadResult logResult = tpLogResult._2();
-                FetchPartitionData fetchPartitionData = 
logResult.toFetchPartitionData(false);
-                responseData.put(topicIdPartition, fetchPartitionData);
-                return BoxedUnit.UNIT;
-            });
-
-            log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
-            Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result 
=
-                ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData, sharePartitionManager, replicaManager);
-            shareFetchData.future().complete(result);
+            Map<TopicIdPartition, LogReadResult> responseData;
+            if (partitionsAlreadyFetched.isEmpty())
+                responseData = readFromLog(topicPartitionData);
+            else
+                // There shouldn't be a case when we have a 
partitionsAlreadyFetched value here and this variable is getting
+                // updated in a different tryComplete thread.

Review Comment:
   Hi @junrao , I've created a ticket 
https://issues.apache.org/jira/browse/KAFKA-17948 to track this issue and if it 
fine to you, I would prefer to address the issue in a future PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to