apoorvmittal10 commented on code in PR #19757:
URL: https://github.com/apache/kafka/pull/19757#discussion_r2100372692


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -130,6 +130,10 @@ public class SharePartitionManager implements 
AutoCloseable {
      * The max delivery count is the maximum number of times a message can be 
delivered before it is considered to be archived.
      */
     private final int maxDeliveryCount;
+    /**
+     * The max wait time for a share fetch request having remote storage fetch.
+     */
+    private final long remoteStorageRequestWaitTimeMs;

Review Comment:
   Should the variable name be `remoteFetchMaxWaitMs`, aligned with the kafka 
config?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -904,4 +948,30 @@ private boolean forceCompleteRequest() {
         }
         return completedByMe;
     }
+
+    private void completeRemoteShareFetchRequestOutsidePurgatory() {
+        try {
+            if (outsidePurgatoryCallbackLock.compareAndSet(false, true)) {
+                completeRemoteStorageShareFetchRequest();
+            }
+        } finally {
+            outsidePurgatoryCallbackLock.set(false);
+        }

Review Comment:
   Why do you want to set it back to false if one thread has already completed 
the execution?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to