adixitconfluent commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1828820632


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,51 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        if (anySharePartitionNoLongerExists()) {
+            return forceComplete();
+        }
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty()) {
+        try {
+            if (!topicPartitionData.isEmpty()) {
+                // In case, fetch offset metadata doesn't exist for one or 
more topic partitions, we do a
+                // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
+                // those topic partitions.
+                Map<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = 
updateFetchOffsetMetadata(maybeReadFromLog(topicPartitionData));
+                if 
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData)) {
+                    partitionsToComplete = topicPartitionData;
+                    partitionsAlreadyFetched = replicaManagerReadResponse;
+                    boolean completedByMe = forceComplete();
+                    // If invocation of forceComplete is not successful, then 
that means the request is already completed
+                    // hence release the acquired locks.
+                    if (!completedByMe) {
+                        releasePartitionLocks(partitionsToComplete.keySet());
+                        partitionsAlreadyFetched.clear();
+                        partitionsToComplete.clear();
+                    }
+                    return completedByMe;
+                } else {
+                    log.debug("minBytes is not satisfied for the share fetch 
request for group {}, member {}, " +
+                            "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
+                        shareFetchData.partitionMaxBytes().keySet());
+                    releasePartitionLocks(topicPartitionData.keySet());
+                }
+            } else {
+                log.trace("Can't acquire records for any partition in the 
share fetch request for group {}, member {}, " +
+                        "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
+                    shareFetchData.partitionMaxBytes().keySet());
+            }
+            return false;
+        } catch (Exception e) {
+            log.error("Error processing delayed share fetch request", e);
             boolean completedByMe = forceComplete();
             // If invocation of forceComplete is not successful, then that 
means the request is already completed
             // hence release the acquired locks.
             if (!completedByMe) {
-                releasePartitionLocks(shareFetchData.groupId(), 
topicPartitionDataFromTryComplete.keySet());
+                releasePartitionLocks(partitionsToComplete.keySet());

Review Comment:
   Hi @junrao, you're right, I've changed the line to 
releasePartitionLocks(topicPartitionData.keySet())



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,51 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        if (anySharePartitionNoLongerExists()) {
+            return forceComplete();
+        }
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty()) {
+        try {
+            if (!topicPartitionData.isEmpty()) {
+                // In case, fetch offset metadata doesn't exist for one or 
more topic partitions, we do a
+                // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
+                // those topic partitions.
+                Map<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = 
updateFetchOffsetMetadata(maybeReadFromLog(topicPartitionData));
+                if 
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData)) {
+                    partitionsToComplete = topicPartitionData;
+                    partitionsAlreadyFetched = replicaManagerReadResponse;
+                    boolean completedByMe = forceComplete();
+                    // If invocation of forceComplete is not successful, then 
that means the request is already completed
+                    // hence release the acquired locks.
+                    if (!completedByMe) {
+                        releasePartitionLocks(partitionsToComplete.keySet());
+                        partitionsAlreadyFetched.clear();
+                        partitionsToComplete.clear();
+                    }
+                    return completedByMe;
+                } else {
+                    log.debug("minBytes is not satisfied for the share fetch 
request for group {}, member {}, " +
+                            "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
+                        shareFetchData.partitionMaxBytes().keySet());
+                    releasePartitionLocks(topicPartitionData.keySet());
+                }
+            } else {
+                log.trace("Can't acquire records for any partition in the 
share fetch request for group {}, member {}, " +
+                        "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
+                    shareFetchData.partitionMaxBytes().keySet());
+            }
+            return false;
+        } catch (Exception e) {
+            log.error("Error processing delayed share fetch request", e);
             boolean completedByMe = forceComplete();
             // If invocation of forceComplete is not successful, then that 
means the request is already completed
             // hence release the acquired locks.
             if (!completedByMe) {
-                releasePartitionLocks(shareFetchData.groupId(), 
topicPartitionDataFromTryComplete.keySet());
+                releasePartitionLocks(partitionsToComplete.keySet());

Review Comment:
   Hi @junrao, you're right, I've changed the line to 
`releasePartitionLocks(topicPartitionData.keySet())`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to