adixitconfluent commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1828877645
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -103,34 +113,27 @@ public void onComplete() {
topicPartitionData, shareFetchData.groupId(),
shareFetchData.fetchParams());
try {
- Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
- shareFetchData.fetchParams(),
- CollectionConverters.asScala(
- topicPartitionData.entrySet().stream().map(entry ->
- new Tuple2<>(entry.getKey(),
entry.getValue())).collect(Collectors.toList())
- ),
- QuotaFactory.UNBOUNDED_QUOTA,
- true);
-
- Map<TopicIdPartition, FetchPartitionData> responseData = new
HashMap<>();
- responseLogResult.foreach(tpLogResult -> {
- TopicIdPartition topicIdPartition = tpLogResult._1();
- LogReadResult logResult = tpLogResult._2();
- FetchPartitionData fetchPartitionData =
logResult.toFetchPartitionData(false);
- responseData.put(topicIdPartition, fetchPartitionData);
- return BoxedUnit.UNIT;
- });
-
- log.trace("Data successfully retrieved by replica manager: {}",
responseData);
+ Map<TopicIdPartition, LogReadResult> responseData;
+ if (partitionsAlreadyFetched.isEmpty())
+ responseData = readFromLog(topicPartitionData);
+ else
+ // There can't be a case when we have a non-empty
logReadResponse, and we have a fresh topicPartitionData
+ // using tryComplete because purgatory ensures that 2
tryCompletes calls do not happen at the same time.
Review Comment:
What I mean is that `partitionsAlreadyFetched` value can't be changed once
we enter this point in `onComplete` via `forceComplete` either by expiration or
by a `tryComplete` successful call. IIUC, `forceComplete` uses this
`AtomicBoolean` variable `completed` which is used as locking mechanism for
`forceComplete`. This should ensure atomicity of global variables between
`tryComplete` and `forceComplete` we use in `DelayedShareFetch`. I'll change
the comment to explain this better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]