apoorvmittal10 commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1828579315


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -103,34 +113,27 @@ public void onComplete() {
             topicPartitionData, shareFetchData.groupId(), 
shareFetchData.fetchParams());
 
         try {
-            Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
-                shareFetchData.fetchParams(),
-                CollectionConverters.asScala(
-                    topicPartitionData.entrySet().stream().map(entry ->
-                        new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
-                ),
-                QuotaFactory.UNBOUNDED_QUOTA,
-                true);
-
-            Map<TopicIdPartition, FetchPartitionData> responseData = new 
HashMap<>();
-            responseLogResult.foreach(tpLogResult -> {
-                TopicIdPartition topicIdPartition = tpLogResult._1();
-                LogReadResult logResult = tpLogResult._2();
-                FetchPartitionData fetchPartitionData = 
logResult.toFetchPartitionData(false);
-                responseData.put(topicIdPartition, fetchPartitionData);
-                return BoxedUnit.UNIT;
-            });
-
-            log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
+            Map<TopicIdPartition, LogReadResult> responseData;
+            if (partitionsAlreadyFetched.isEmpty())
+                responseData = readFromLog(topicPartitionData);
+            else
+                // There can't be a case when we have a non-empty 
logReadResponse, and we have a fresh topicPartitionData
+                // using tryComplete because purgatory ensures that 2 
tryCompletes calls do not happen at the same time.

Review Comment:
   I understand there can't be concurrent calls to tryComplete but I didn't get 
this comment. Though forceComplete() cannot be called twice. But 
forceComplete() on expiration can be on different thread and tryComplete() on 
different (that's my understanding as per DelayedOperation code I have seen) 
can this change any behaviour here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to