AndrewJSchofield commented on code in PR #21062:
URL: https://github.com/apache/kafka/pull/21062#discussion_r2585320072
##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -229,18 +270,56 @@ Integer numMembers(String groupId) {
return numMembersPerGroup.get(groupId);
}
+ // Visible for testing.
+ synchronized SessionKeyAndState connectionSessionKeyAndState(String
connectionId) {
+ return connectionIdToSessionMap.get(connectionId);
+ }
+
private final class ClientConnectionDisconnectListener implements
ConnectionDisconnectListener {
// When the client disconnects, the corresponding session should be
removed from the cache.
@Override
public void onDisconnect(String connectionId) {
- ShareSessionKey shareSessionKey =
connectionIdToSessionMap.remove(connectionId);
- if (shareSessionKey != null) {
- // Try removing session and notify listeners. The session
might already be removed
- // as part of final epoch from client, so we need to check if
the session is still
- // present in the cache.
- maybeRemoveAndNotifyListeners(shareSessionKey);
+ SessionKeyAndState sessionKeyAndState =
maybeRemoveConnectionFromSession(connectionId);
+ // If the session is not stale, try removing the session and
notify listeners.
+ if (sessionKeyAndState != null) {
+ if (!sessionKeyAndState.stale()) {
+ // Try removing session and notify listeners. The session
might already be removed
+ // as part of final epoch from client, so we need to check
if the session is still
+ // present in the cache.
+
maybeRemoveAndNotifyListeners(sessionKeyAndState.shareSessionKey());
Review Comment:
I suggest renaming this method to
`maybeRemoveAndNotifyListenersOnMemberLeave`.
##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -229,18 +270,56 @@ Integer numMembers(String groupId) {
return numMembersPerGroup.get(groupId);
}
+ // Visible for testing.
+ synchronized SessionKeyAndState connectionSessionKeyAndState(String
connectionId) {
+ return connectionIdToSessionMap.get(connectionId);
+ }
+
private final class ClientConnectionDisconnectListener implements
ConnectionDisconnectListener {
// When the client disconnects, the corresponding session should be
removed from the cache.
@Override
public void onDisconnect(String connectionId) {
- ShareSessionKey shareSessionKey =
connectionIdToSessionMap.remove(connectionId);
- if (shareSessionKey != null) {
- // Try removing session and notify listeners. The session
might already be removed
- // as part of final epoch from client, so we need to check if
the session is still
- // present in the cache.
- maybeRemoveAndNotifyListeners(shareSessionKey);
+ SessionKeyAndState sessionKeyAndState =
maybeRemoveConnectionFromSession(connectionId);
+ // If the session is not stale, try removing the session and
notify listeners.
Review Comment:
Yes. I agree.
##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java:
##########
@@ -229,18 +270,56 @@ Integer numMembers(String groupId) {
return numMembersPerGroup.get(groupId);
}
+ // Visible for testing.
+ synchronized SessionKeyAndState connectionSessionKeyAndState(String
connectionId) {
+ return connectionIdToSessionMap.get(connectionId);
+ }
+
private final class ClientConnectionDisconnectListener implements
ConnectionDisconnectListener {
// When the client disconnects, the corresponding session should be
removed from the cache.
@Override
public void onDisconnect(String connectionId) {
- ShareSessionKey shareSessionKey =
connectionIdToSessionMap.remove(connectionId);
- if (shareSessionKey != null) {
- // Try removing session and notify listeners. The session
might already be removed
- // as part of final epoch from client, so we need to check if
the session is still
- // present in the cache.
- maybeRemoveAndNotifyListeners(shareSessionKey);
+ SessionKeyAndState sessionKeyAndState =
maybeRemoveConnectionFromSession(connectionId);
+ // If the session is not stale, try removing the session and
notify listeners.
+ if (sessionKeyAndState != null) {
+ if (!sessionKeyAndState.stale()) {
+ // Try removing session and notify listeners. The session
might already be removed
+ // as part of final epoch from client, so we need to check
if the session is still
+ // present in the cache.
+
maybeRemoveAndNotifyListeners(sessionKeyAndState.shareSessionKey());
+ }
+ // Notify the share group listener if the group is empty. This
should be checked regardless
+ // session is evicted by connection disconnect or client's
final epoch.
+
checkAndNotifyGroupListeners(sessionKeyAndState.shareSessionKey().groupId());
Review Comment:
I suggest changing this method name to
`checkAndNotifyListenersOnGroupEmpty`. Now the two different listener methods
are called in different places, I think it would be helpful to differentiate
them more.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]