apoorvmittal10 commented on code in PR #17539: URL: https://github.com/apache/kafka/pull/17539#discussion_r1828579315
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -103,34 +113,27 @@ public void onComplete() { topicPartitionData, shareFetchData.groupId(), shareFetchData.fetchParams()); try { - Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = replicaManager.readFromLog( - shareFetchData.fetchParams(), - CollectionConverters.asScala( - topicPartitionData.entrySet().stream().map(entry -> - new Tuple2<>(entry.getKey(), entry.getValue())).collect(Collectors.toList()) - ), - QuotaFactory.UNBOUNDED_QUOTA, - true); - - Map<TopicIdPartition, FetchPartitionData> responseData = new HashMap<>(); - responseLogResult.foreach(tpLogResult -> { - TopicIdPartition topicIdPartition = tpLogResult._1(); - LogReadResult logResult = tpLogResult._2(); - FetchPartitionData fetchPartitionData = logResult.toFetchPartitionData(false); - responseData.put(topicIdPartition, fetchPartitionData); - return BoxedUnit.UNIT; - }); - - log.trace("Data successfully retrieved by replica manager: {}", responseData); + Map<TopicIdPartition, LogReadResult> responseData; + if (partitionsAlreadyFetched.isEmpty()) + responseData = readFromLog(topicPartitionData); + else + // There can't be a case when we have a non-empty logReadResponse, and we have a fresh topicPartitionData + // using tryComplete because purgatory ensures that 2 tryCompletes calls do not happen at the same time. Review Comment: I understand there can't be concurrent calls to tryComplete but I didn't get this comment. Though forceComplete() cannot be called twice. But forceComplete() on expiration can be on different thread and tryComplete() on different (that's my understanding as per DelayedOperation code I have seen) can this change any behaviour here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org