adixitconfluent commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1828852012
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,51 @@ public void onComplete() {
*/
@Override
public boolean tryComplete() {
- topicPartitionDataFromTryComplete = acquirablePartitions();
+ if (anySharePartitionNoLongerExists()) {
+ return forceComplete();
+ }
+ Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData =
acquirablePartitions();
- if (!topicPartitionDataFromTryComplete.isEmpty()) {
+ try {
+ if (!topicPartitionData.isEmpty()) {
+ // In case, fetch offset metadata doesn't exist for one or
more topic partitions, we do a
+ // replicaManager.readFromLog to populate the offset metadata
and update the fetch offset metadata for
+ // those topic partitions.
+ Map<TopicIdPartition, LogReadResult>
replicaManagerReadResponse =
updateFetchOffsetMetadata(maybeReadFromLog(topicPartitionData));
+ if
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) ||
isMinBytesSatisfied(topicPartitionData)) {
+ partitionsToComplete = topicPartitionData;
+ partitionsAlreadyFetched = replicaManagerReadResponse;
+ boolean completedByMe = forceComplete();
+ // If invocation of forceComplete is not successful, then
that means the request is already completed
+ // hence release the acquired locks.
+ if (!completedByMe) {
+ releasePartitionLocks(partitionsToComplete.keySet());
+ partitionsAlreadyFetched.clear();
+ partitionsToComplete.clear();
+ }
+ return completedByMe;
+ } else {
+ log.debug("minBytes is not satisfied for the share fetch
request for group {}, member {}, " +
Review Comment:
yes, the log statement is correct because because `isMinBytesSatisfied` can
run only if `anyTopicIdPartitionHasLogReadError` returns false. We should only
check `isMinBytesSatisfied` if `anyTopicIdPartitionHasLogReadError` returns
false. If `anyTopicIdPartitionHasLogReadError` return true, then we do a
`forceComplete`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]