apoorvmittal10 commented on code in PR #21062:
URL: https://github.com/apache/kafka/pull/21062#discussion_r2585380635


##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -229,18 +270,56 @@ Integer numMembers(String groupId) {
         return numMembersPerGroup.get(groupId);
     }
 
+    // Visible for testing.
+    synchronized SessionKeyAndState connectionSessionKeyAndState(String 
connectionId) {
+        return connectionIdToSessionMap.get(connectionId);
+    }
+
     private final class ClientConnectionDisconnectListener implements 
ConnectionDisconnectListener {
 
         // When the client disconnects, the corresponding session should be 
removed from the cache.
         @Override
         public void onDisconnect(String connectionId) {
-            ShareSessionKey shareSessionKey = 
connectionIdToSessionMap.remove(connectionId);
-            if (shareSessionKey != null) {
-                // Try removing session and notify listeners. The session 
might already be removed
-                // as part of final epoch from client, so we need to check if 
the session is still
-                // present in the cache.
-                maybeRemoveAndNotifyListeners(shareSessionKey);
+            SessionKeyAndState sessionKeyAndState = 
maybeRemoveConnectionFromSession(connectionId);
+            // If the session is not stale, try removing the session and 
notify listeners.
+            if (sessionKeyAndState != null) {
+                if (!sessionKeyAndState.stale()) {
+                    // Try removing session and notify listeners. The session 
might already be removed
+                    // as part of final epoch from client, so we need to check 
if the session is still
+                    // present in the cache.
+                    
maybeRemoveAndNotifyListeners(sessionKeyAndState.shareSessionKey());
+                }
+                // Notify the share group listener if the group is empty. This 
should be checked regardless
+                // session is evicted by connection disconnect or client's 
final epoch.
+                
checkAndNotifyGroupListeners(sessionKeyAndState.shareSessionKey().groupId());

Review Comment:
   Done.



##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -229,18 +270,56 @@ Integer numMembers(String groupId) {
         return numMembersPerGroup.get(groupId);
     }
 
+    // Visible for testing.
+    synchronized SessionKeyAndState connectionSessionKeyAndState(String 
connectionId) {
+        return connectionIdToSessionMap.get(connectionId);
+    }
+
     private final class ClientConnectionDisconnectListener implements 
ConnectionDisconnectListener {
 
         // When the client disconnects, the corresponding session should be 
removed from the cache.
         @Override
         public void onDisconnect(String connectionId) {
-            ShareSessionKey shareSessionKey = 
connectionIdToSessionMap.remove(connectionId);
-            if (shareSessionKey != null) {
-                // Try removing session and notify listeners. The session 
might already be removed
-                // as part of final epoch from client, so we need to check if 
the session is still
-                // present in the cache.
-                maybeRemoveAndNotifyListeners(shareSessionKey);
+            SessionKeyAndState sessionKeyAndState = 
maybeRemoveConnectionFromSession(connectionId);
+            // If the session is not stale, try removing the session and 
notify listeners.
+            if (sessionKeyAndState != null) {
+                if (!sessionKeyAndState.stale()) {
+                    // Try removing session and notify listeners. The session 
might already be removed
+                    // as part of final epoch from client, so we need to check 
if the session is still
+                    // present in the cache.
+                    
maybeRemoveAndNotifyListeners(sessionKeyAndState.shareSessionKey());

Review Comment:
   Done.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to