apoorvmittal10 commented on code in PR #17796:
URL: https://github.com/apache/kafka/pull/17796#discussion_r1868506381


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -665,17 +670,78 @@ public void handleFencedSharePartitionException(
             // The share partition is fenced hence remove the partition from 
map and let the client retry.
             // But surface the error to the client so client might take some 
action i.e. re-fetch
             // the metadata and retry the fetch on new leader.
-            SharePartition sharePartition = 
partitionCacheMap.remove(sharePartitionKey);
-            if (sharePartition != null) {
-                sharePartition.markFenced();
-            }
+            removeSharePartitionFromCache(sharePartitionKey, 
partitionCacheMap, replicaManager);
         }
     }
 
     private SharePartitionKey sharePartitionKey(String groupId, 
TopicIdPartition topicIdPartition) {
         return new SharePartitionKey(groupId, topicIdPartition);
     }
 
+    private static void removeSharePartitionFromCache(SharePartitionKey 
sharePartitionKey,
+        Map<SharePartitionKey, SharePartition> map, ReplicaManager 
replicaManager) {
+        SharePartition sharePartition = map.remove(sharePartitionKey);
+        if (sharePartition != null) {
+            sharePartition.markFenced();
+            
replicaManager.removeListener(sharePartitionKey.topicIdPartition().topicPartition(),
 sharePartition.listener());
+        }
+    }
+
+    /**
+     * The SharePartitionListener is used to listen for partition events. The 
share partition is associated with
+     * the topic-partition, we need to handle the partition events for the 
share partition.
+     * <p>
+     * The partition cache map stores share partitions against share partition 
key which comprises
+     * group and topic-partition. Instead of maintaining a separate map for 
topic-partition to share partitions,
+     * we can maintain the share partition key in the listener and create a 
new listener for each share partition.
+     */
+    static class SharePartitionListener implements PartitionListener {

Review Comment:
   That's what I was initially thinking to do but it made writing tests hard as 
now I have to create an instance of SharePartitionManager and invoke calls to 
create SharePartition as well which will provide me the listener. Hence thought 
to make it static itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to