apoorvmittal10 commented on code in PR #17739: URL: https://github.com/apache/kafka/pull/17739#discussion_r1836502635
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -58,8 +58,8 @@ public class DelayedShareFetch extends DelayedOperation { private final ShareFetchData shareFetchData; private final ReplicaManager replicaManager; - private Map<TopicIdPartition, FetchRequest.PartitionData> partitionsAcquired; - private Map<TopicIdPartition, LogReadResult> partitionsAlreadyFetched; + private LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsAcquired; + private LinkedHashMap<TopicIdPartition, LogReadResult> partitionsAlreadyFetched; Review Comment: nit: Can we move final instance variable prior to non-final? It just gives a clear distinction in code. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -90,39 +90,50 @@ public void onExpiration() { */ @Override public void onComplete() { + // We are utilizing lock so that onComplete doesn't do a dirty read for global variables - + // partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread. + lock.lock(); Review Comment: So now for share fetch trycomplete and oncomplete will be under lock. Seems fine as anyways the execution should be sequential. ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -90,39 +90,50 @@ public void onExpiration() { */ @Override public void onComplete() { + // We are utilizing lock so that onComplete doesn't do a dirty read for global variables - + // partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread. + lock.lock(); log.trace("Completing the delayed share fetch request for group {}, member {}, " + "topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(), partitionsAcquired.keySet()); - if (shareFetchData.future().isDone()) - return; + try { + if (shareFetchData.future().isDone()) + return; Review Comment: As we have this check here for share fetch future completion, so if there are locks acquired for share partitions but the share fetch future is already completed in line 101 then how will they be released? I don't think code handles that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org