junrao commented on code in PR #17474:
URL: https://github.com/apache/kafka/pull/17474#discussion_r1803537126


##########
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##########
@@ -459,6 +469,48 @@ Timer expirationTimer() {
         return expirationTimer;
     }
 
+    // Visible for testing
+    Map<String, Uuid> clientConnectionIdMap() {
+        return clientConnectionIdMap;
+    }
+
+    private final class ClientConnectionDisconnectListener implements 
ConnectionDisconnectListener {
+
+        private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache;
+        private final Map<String, Uuid> clientConnectionIdMap;
+        private final ClientMetricsStats clientMetricsStats;
+
+        ClientConnectionDisconnectListener(
+            Cache<Uuid, ClientMetricsInstance> clientInstanceCache,
+            Map<String, Uuid> clientConnectionIdMap,
+            ClientMetricsStats clientMetricsStats
+        ) {
+            this.clientInstanceCache = clientInstanceCache;
+            this.clientConnectionIdMap = clientConnectionIdMap;
+            this.clientMetricsStats = clientMetricsStats;
+        }
+
+        @Override
+        public void onDisconnect(String connectionId) {
+            log.trace("Removing client connection id [{}] from the client 
instance cache", connectionId);
+
+            Uuid clientInstanceId = clientConnectionIdMap.remove(connectionId);
+            if (clientInstanceId == null) {
+              log.trace("Client connection id [{}] is not found in the client 
instance cache", connectionId);
+              return;
+            }
+
+            // Unregister the client instance metrics from the broker metrics.
+            
clientMetricsStats.unregisterClientInstanceMetrics(clientInstanceId);
+
+            ClientMetricsInstance clientInstance = 
clientInstanceCache.get(clientInstanceId);
+            if (clientInstance != null) {
+                clientInstance.cancelExpirationTimerTask();
+                clientInstanceCache.remove(clientInstanceId);

Review Comment:
   You are right. Restarting a broker doesn't cause additional issues.
   
   > Though terminating is good enough to not accept further push telemetry 
calls from same client but if we remove the instance from cache on terminating 
flag then it will open us for attacks with rogue clients which can send 
telemetry push request with terminating flag as true and we will skip push time 
based throttling for this special push request. Hence it would be better to 
keep these 2 semantics separately.
   
   Does that mean we shouldn't evict from the cache for clients marked as 
terminating? If we do that, do we really get any benefit since most of the 
clients will go through the terminating process?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to