adixitconfluent commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1830461947
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -80,64 +89,56 @@ public void onExpiration() {
@Override
public void onComplete() {
log.trace("Completing the delayed share fetch request for group {},
member {}, "
- + "topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(),
- topicPartitionDataFromTryComplete.keySet());
+ + "topic partitions {}", partitionsToComplete.groupId(),
partitionsToComplete.memberId(),
+ partitionsAcquired.keySet());
- if (shareFetchData.future().isDone())
+ if (partitionsToComplete.future().isDone())
return;
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
// tryComplete did not invoke forceComplete, so we need to check if we
have any partitions to fetch.
- if (topicPartitionDataFromTryComplete.isEmpty())
+ if (partitionsAcquired.isEmpty())
topicPartitionData = acquirablePartitions();
// tryComplete invoked forceComplete, so we can use the data from
tryComplete.
else
- topicPartitionData = topicPartitionDataFromTryComplete;
+ topicPartitionData = partitionsAcquired;
if (topicPartitionData.isEmpty()) {
// No locks for share partitions could be acquired, so we complete
the request with an empty response.
- shareFetchData.future().complete(Collections.emptyMap());
+ partitionsToComplete.future().complete(Collections.emptyMap());
return;
}
log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}",
- topicPartitionData, shareFetchData.groupId(),
shareFetchData.fetchParams());
+ topicPartitionData, partitionsToComplete.groupId(),
partitionsToComplete.fetchParams());
try {
- Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
- shareFetchData.fetchParams(),
- CollectionConverters.asScala(
- topicPartitionData.entrySet().stream().map(entry ->
- new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
- ),
- QuotaFactory.UNBOUNDED_QUOTA,
- true);
-
- Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
- responseLogResult.foreach(tpLogResult -> {
- TopicIdPartition topicIdPartition = tpLogResult._1();
- LogReadResult logResult = tpLogResult._2();
- FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
- responseData.put(topicIdPartition, fetchPartitionData);
- return BoxedUnit.UNIT;
- });
-
- log.trace("Data successfully retrieved by replica manager: {}",
responseData);
- Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result
=
- ShareFetchUtils.processFetchResponse(shareFetchData,
responseData, sharePartitionManager, replicaManager);
- shareFetchData.future().complete(result);
+ Map<TopicIdPartition, LogReadResult> responseData;
+ if (partitionsAlreadyFetched.isEmpty())
+ responseData = readFromLog(topicPartitionData);
+ else
+ // There shouldn't be a case when we have a
partitionsAlreadyFetched value here and this variable is getting
+ // updated in a different tryComplete thread.
Review Comment:
Hi @junrao , I've created a ticket
https://issues.apache.org/jira/browse/KAFKA-17948 to track this issue and if it
fine to you, I would prefer to address the issue in a future PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]