apoorvmittal10 commented on code in PR #19757: URL: https://github.com/apache/kafka/pull/19757#discussion_r2100372692
########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -130,6 +130,10 @@ public class SharePartitionManager implements AutoCloseable { * The max delivery count is the maximum number of times a message can be delivered before it is considered to be archived. */ private final int maxDeliveryCount; + /** + * The max wait time for a share fetch request having remote storage fetch. + */ + private final long remoteStorageRequestWaitTimeMs; Review Comment: Should the variable name be `remoteFetchMaxWaitMs`, aligned with the kafka config? ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -904,4 +948,30 @@ private boolean forceCompleteRequest() { } return completedByMe; } + + private void completeRemoteShareFetchRequestOutsidePurgatory() { + try { + if (outsidePurgatoryCallbackLock.compareAndSet(false, true)) { + completeRemoteStorageShareFetchRequest(); + } + } finally { + outsidePurgatoryCallbackLock.set(false); + } Review Comment: Why do you want to set it back to false if one thread has already completed the execution? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org