apoorvmittal10 commented on code in PR #17474:
URL: https://github.com/apache/kafka/pull/17474#discussion_r1803728797
##########
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##########
@@ -459,6 +469,48 @@ Timer expirationTimer() {
return expirationTimer;
}
+ // Visible for testing
+ Map<String, Uuid> clientConnectionIdMap() {
+ return clientConnectionIdMap;
+ }
+
+ private final class ClientConnectionDisconnectListener implements
ConnectionDisconnectListener {
+
+ private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache;
+ private final Map<String, Uuid> clientConnectionIdMap;
+ private final ClientMetricsStats clientMetricsStats;
+
+ ClientConnectionDisconnectListener(
+ Cache<Uuid, ClientMetricsInstance> clientInstanceCache,
+ Map<String, Uuid> clientConnectionIdMap,
+ ClientMetricsStats clientMetricsStats
+ ) {
+ this.clientInstanceCache = clientInstanceCache;
+ this.clientConnectionIdMap = clientConnectionIdMap;
+ this.clientMetricsStats = clientMetricsStats;
+ }
+
+ @Override
+ public void onDisconnect(String connectionId) {
+ log.trace("Removing client connection id [{}] from the client
instance cache", connectionId);
+
+ Uuid clientInstanceId = clientConnectionIdMap.remove(connectionId);
+ if (clientInstanceId == null) {
+ log.trace("Client connection id [{}] is not found in the client
instance cache", connectionId);
+ return;
+ }
+
+ // Unregister the client instance metrics from the broker metrics.
+
clientMetricsStats.unregisterClientInstanceMetrics(clientInstanceId);
+
+ ClientMetricsInstance clientInstance =
clientInstanceCache.get(clientInstanceId);
+ if (clientInstance != null) {
+ clientInstance.cancelExpirationTimerTask();
+ clientInstanceCache.remove(clientInstanceId);
Review Comment:
>Does that mean we shouldn't evict from the cache for clients marked as
terminating? If we do that, do we really get any benefit since most of the
clients will go through the terminating process?
What I meant was that we should keep terminating flag behaviour separately
from cache eviction. We shouldn't evict from cache based on terminating flag
but should only evict when connection is dropped.
Say, there is a rogue client which sends telemetry push requests always with
terminating as true and on a same open connection. Then because the instance
will be in cache as connection is still open then we can deny the request.
Hence to protect broker and down stream systems, we should always act on client
disconnection.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]