adixitconfluent commented on code in PR #21062:
URL: https://github.com/apache/kafka/pull/21062#discussion_r2585028211
##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -229,18 +270,56 @@ Integer numMembers(String groupId) {
return numMembersPerGroup.get(groupId);
}
+ // Visible for testing.
+ synchronized SessionKeyAndState connectionSessionKeyAndState(String
connectionId) {
+ return connectionIdToSessionMap.get(connectionId);
+ }
+
private final class ClientConnectionDisconnectListener implements
ConnectionDisconnectListener {
// When the client disconnects, the corresponding session should be
removed from the cache.
@Override
public void onDisconnect(String connectionId) {
- ShareSessionKey shareSessionKey =
connectionIdToSessionMap.remove(connectionId);
- if (shareSessionKey != null) {
- // Try removing session and notify listeners. The session
might already be removed
- // as part of final epoch from client, so we need to check if
the session is still
- // present in the cache.
- maybeRemoveAndNotifyListeners(shareSessionKey);
+ SessionKeyAndState sessionKeyAndState =
maybeRemoveConnectionFromSession(connectionId);
+ // If the session is not stale, try removing the session and
notify listeners.
Review Comment:
nit: Ideally this should go above line `if (!sessionKeyAndState.stale()) {`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]