adixitconfluent commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1806729605
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -86,57 +86,37 @@ public void onComplete() {
if (shareFetchData.future().isDone())
return;
- Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
- // tryComplete did not invoke forceComplete, so we need to check if we
have any partitions to fetch.
- if (topicPartitionDataFromTryComplete.isEmpty())
- topicPartitionData = acquirablePartitions();
- // tryComplete invoked forceComplete, so we can use the data from
tryComplete.
- else
- topicPartitionData = topicPartitionDataFromTryComplete;
-
- if (topicPartitionData.isEmpty()) {
- // No locks for share partitions could be acquired, so we complete
the request with an empty response.
- shareFetchData.future().complete(Collections.emptyMap());
- return;
+ Map<TopicIdPartition, FetchPartitionData> fetchResponseData;
+ // tryComplete did not invoke forceComplete, so we need to get replica
manager response data.
+ if (replicaManagerFetchDataFromTryComplete.isEmpty()) {
+ Map<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData = acquirablePartitions();
+ if (topicPartitionData.isEmpty()) {
+ // No locks for share partitions could be acquired, so we
complete the request with an empty response.
+ shareFetchData.future().complete(Collections.emptyMap());
+ return;
+ }
+ fetchResponseData = replicaManagerFetchData(topicPartitionData,
true);
+ } else {
+ // tryComplete invoked forceComplete, so we can use the replica
manager response data from tryComplete.
+ fetchResponseData = replicaManagerFetchDataFromTryComplete;
}
- log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}",
- topicPartitionData, shareFetchData.groupId(),
shareFetchData.fetchParams());
try {
- Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
- shareFetchData.fetchParams(),
- CollectionConverters.asScala(
- topicPartitionData.entrySet().stream().map(entry ->
- new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
- ),
- QuotaFactory.UnboundedQuota$.MODULE$,
- true);
-
- Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
- responseLogResult.foreach(tpLogResult -> {
- TopicIdPartition topicIdPartition = tpLogResult._1();
- LogReadResult logResult = tpLogResult._2();
- FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
- responseData.put(topicIdPartition, fetchPartitionData);
- return BoxedUnit.UNIT;
- });
-
- log.trace("Data successfully retrieved by replica manager: {}",
responseData);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result
=
- ShareFetchUtils.processFetchResponse(shareFetchData,
responseData, sharePartitionManager, replicaManager);
+ ShareFetchUtils.processFetchResponse(shareFetchData,
fetchResponseData, sharePartitionManager, replicaManager);
Review Comment:
yeah, since `processFetchResponse` can handle it, that's why I didn't add
any check here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]