adixitconfluent commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1828852012


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,51 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        if (anySharePartitionNoLongerExists()) {
+            return forceComplete();
+        }
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty()) {
+        try {
+            if (!topicPartitionData.isEmpty()) {
+                // In case, fetch offset metadata doesn't exist for one or 
more topic partitions, we do a
+                // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
+                // those topic partitions.
+                Map<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = 
updateFetchOffsetMetadata(maybeReadFromLog(topicPartitionData));
+                if 
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData)) {
+                    partitionsToComplete = topicPartitionData;
+                    partitionsAlreadyFetched = replicaManagerReadResponse;
+                    boolean completedByMe = forceComplete();
+                    // If invocation of forceComplete is not successful, then 
that means the request is already completed
+                    // hence release the acquired locks.
+                    if (!completedByMe) {
+                        releasePartitionLocks(partitionsToComplete.keySet());
+                        partitionsAlreadyFetched.clear();
+                        partitionsToComplete.clear();
+                    }
+                    return completedByMe;
+                } else {
+                    log.debug("minBytes is not satisfied for the share fetch 
request for group {}, member {}, " +

Review Comment:
   yes, the log statement is correct because because `isMinBytesSatisfied` can 
run only if `anyTopicIdPartitionHasLogReadError` returns false. We should only 
check `isMinBytesSatisfied` if `anyTopicIdPartitionHasLogReadError` returns 
false. If `anyTopicIdPartitionHasLogReadError` return true, then we do a 
`forceComplete`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to