mumrah commented on code in PR #17796:
URL: https://github.com/apache/kafka/pull/17796#discussion_r1842703563


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -665,17 +669,83 @@ public void handleFencedSharePartitionException(
             // The share partition is fenced hence remove the partition from 
map and let the client retry.
             // But surface the error to the client so client might take some 
action i.e. re-fetch
             // the metadata and retry the fetch on new leader.
-            SharePartition sharePartition = 
partitionCacheMap.remove(sharePartitionKey);
-            if (sharePartition != null) {
-                sharePartition.markFenced();
-            }
+            removeSharePartitionFromCache(sharePartitionKey);
         }
     }
 
     private SharePartitionKey sharePartitionKey(String groupId, 
TopicIdPartition topicIdPartition) {
         return new SharePartitionKey(groupId, topicIdPartition);
     }
 
+    private void removeSharePartitionFromCache(SharePartitionKey 
sharePartitionKey) {
+        SharePartition sharePartition = 
partitionCacheMap.remove(sharePartitionKey);
+        if (sharePartition != null) {
+            sharePartition.markFenced();
+        }
+    }
+
+    /**
+     * The SharePartitionListener is used to listen for partition events. The 
share partition is associated with
+     * the topic-partition, we need to handle the partition events for the 
share partition.
+     * <p>
+     * The partition cache map stores share partitions against share partition 
key which comprises
+     * group and topic-partition. Instead of maintaining a separate map for 
topic-partition to share partitions,
+     * we can maintain the share partition key in the listener and create a 
new listener for each share partition.
+     */
+    private class SharePartitionListener implements PartitionListener {
+
+        private final SharePartitionKey sharePartitionKey;
+
+        private SharePartitionListener(SharePartitionKey sharePartitionKey) {
+            this.sharePartitionKey = sharePartitionKey;
+        }
+
+        /**
+         * The onFailed method is called when a Partition is marked offline.
+         *
+         * @param topicPartition The topic-partition that has been marked 
offline.
+         */
+        @Override
+        public void onFailed(TopicPartition topicPartition) {
+            log.info("The share partition failed listener is invoked for the 
topic-partition: {}, share-partition: {}",
+                topicPartition, sharePartitionKey);
+            onUpdate(topicPartition);
+        }
+
+        /**
+         * The onDeleted method is called when a Partition is deleted.
+         *
+         * @param topicPartition The topic-partition that has been deleted.
+         */
+        @Override
+        public void onDeleted(TopicPartition topicPartition) {
+            log.info("The share partition delete listener is invoked for the 
topic-partition: {}, share-partition: {}",
+                topicPartition, sharePartitionKey);
+            onUpdate(topicPartition);
+        }
+
+        /**
+         * The onFollower method is called when a Partition is marked follower.
+         *
+         * @param topicPartition The topic-partition that has been marked as 
follower.
+         */
+        @Override
+        public void onFollower(TopicPartition topicPartition) {
+            log.info("The share partition leader change listener is invoked 
for the topic-partition: {}, share-partition: {}",

Review Comment:
   INFO is probably too noisy for this as we expect it any time the topic 
leadership changes.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -609,6 +610,12 @@ private SharePartition 
getOrCreateSharePartition(SharePartitionKey sharePartitio
                 k -> {
                     long start = time.hiResClockMs();
                     int leaderEpoch = 
ShareFetchUtils.leaderEpoch(replicaManager, 
sharePartitionKey.topicIdPartition().topicPartition());
+                    // Attach listener to Partition which shall invoke 
partition change handlers.
+                    // However, as there could be multiple share partitions 
(per group name) for a single topic-partition,
+                    // hence create separate listeners per share partition 
which holds the share partition key
+                    // to identify the respective share partition.
+                    
replicaManager.maybeAddListener(sharePartitionKey.topicIdPartition().topicPartition(),

Review Comment:
   This will only add the listener if the Partition is online. Is it possible 
for us to attempt to initialize a SharePartition before a partition is loaded? 
Or do we have some guards in SPM/KafkaApis?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to