apoorvmittal10 commented on code in PR #17739:
URL: https://github.com/apache/kafka/pull/17739#discussion_r1836502635


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -58,8 +58,8 @@ public class DelayedShareFetch extends DelayedOperation {
     private final ShareFetchData shareFetchData;
     private final ReplicaManager replicaManager;
 
-    private Map<TopicIdPartition, FetchRequest.PartitionData> 
partitionsAcquired;
-    private Map<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;
+    private LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
partitionsAcquired;
+    private LinkedHashMap<TopicIdPartition, LogReadResult> 
partitionsAlreadyFetched;

Review Comment:
   nit: Can we move final instance variable prior to non-final? It just gives a 
clear distinction in code.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -90,39 +90,50 @@ public void onExpiration() {
      */
     @Override
     public void onComplete() {
+        // We are utilizing lock so that onComplete doesn't do a dirty read 
for global variables -
+        // partitionsAcquired and partitionsAlreadyFetched, since these 
variables can get updated in a different tryComplete thread.
+        lock.lock();

Review Comment:
   So now for share fetch trycomplete and oncomplete will be under lock. Seems 
fine as anyways the execution should be sequential.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -90,39 +90,50 @@ public void onExpiration() {
      */
     @Override
     public void onComplete() {
+        // We are utilizing lock so that onComplete doesn't do a dirty read 
for global variables -
+        // partitionsAcquired and partitionsAlreadyFetched, since these 
variables can get updated in a different tryComplete thread.
+        lock.lock();
         log.trace("Completing the delayed share fetch request for group {}, 
member {}, "
             + "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
             partitionsAcquired.keySet());
 
-        if (shareFetchData.future().isDone())
-            return;
+        try {
+            if (shareFetchData.future().isDone())
+                return;

Review Comment:
   As we have this check here for share fetch future completion, so if there 
are locks acquired for share partitions but the share fetch future is already 
completed in line 101 then how will they be released? I don't think code 
handles that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to