dongjinleekr commented on a change in pull request #11586:
URL: https://github.com/apache/kafka/pull/11586#discussion_r805725430



##########
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##########
@@ -1305,44 +1314,62 @@ private Sensor sensor(String name, Sensor... parents) {
         }
 
         public void maybeRegisterConnectionMetrics(String connectionId) {
-            if (!connectionId.isEmpty() && metricsPerConnection) {
-                // if one sensor of the metrics has been registered for the 
connection,
-                // then all other sensors should have been registered; and 
vice versa
-                String nodeRequestName = "node-" + connectionId + 
".requests-sent";
-                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
-                if (nodeRequest == null) {
-                    Map<String, String> tags = new LinkedHashMap<>(metricTags);
-                    tags.put("node-id", "node-" + connectionId);
-
-                    nodeRequest = sensor(nodeRequestName);
-                    nodeRequest.add(createMeter(metrics, 
perConnectionMetricGrpName, tags, new WindowedCount(), "request", "requests 
sent"));
-                    MetricName metricName = 
metrics.metricName("request-size-avg", perConnectionMetricGrpName, "The average 
size of requests sent.", tags);
-                    nodeRequest.add(metricName, new Avg());
-                    metricName = metrics.metricName("request-size-max", 
perConnectionMetricGrpName, "The maximum size of any request sent.", tags);
-                    nodeRequest.add(metricName, new Max());
-
-                    String bytesSentName = "node-" + connectionId + 
".bytes-sent";
-                    Sensor bytesSent = sensor(bytesSentName);
-                    bytesSent.add(createMeter(metrics, 
perConnectionMetricGrpName, tags, "outgoing-byte", "outgoing bytes"));
-
-                    String nodeResponseName = "node-" + connectionId + 
".responses-received";
-                    Sensor nodeResponse = sensor(nodeResponseName);
-                    nodeResponse.add(createMeter(metrics, 
perConnectionMetricGrpName, tags, new WindowedCount(), "response", "responses 
received"));
-
-                    String bytesReceivedName = "node-" + connectionId + 
".bytes-received";
-                    Sensor bytesReceive = sensor(bytesReceivedName);
-                    bytesReceive.add(createMeter(metrics, 
perConnectionMetricGrpName, tags, "incoming-byte", "incoming bytes"));
-
-                    String nodeTimeName = "node-" + connectionId + ".latency";
-                    Sensor nodeRequestTime = sensor(nodeTimeName);
-                    metricName = metrics.metricName("request-latency-avg", 
perConnectionMetricGrpName, tags);
-                    nodeRequestTime.add(metricName, new Avg());
-                    metricName = metrics.metricName("request-latency-max", 
perConnectionMetricGrpName, tags);
-                    nodeRequestTime.add(metricName, new Max());
-                }
+            if (!connectionId.isEmpty() && connectionMetrics != null) {
+                connectionMetrics.computeIfAbsent(connectionId, (key) -> {
+                    // key: connection id
+                    // value: set of sensors (currently null)
+                    return perConnectionSensors(key);
+                });
             }
         }
 
+        public void maybeUnregisterConnectionMetrics(String connectionId) {
+            if (!connectionId.isEmpty() && connectionMetrics != null) {
+                connectionMetrics.computeIfPresent(connectionId, (key, value) 
-> {
+                    // key: connection id
+                    // value: set of sensors
+                    for (Sensor sensor : value) {
+                        metrics.removeSensor(sensor.name());
+                    }
+                    return null;
+                });
+            }
+        }
+
+        private Set<Sensor> perConnectionSensors(String connectionId) {

Review comment:
       So... `ConnectionMetrics` should hold all the metrics related to given 
`connectionId`, similar to `GroupCoordinatorMetrics` or `SenderMetrics`. Do I 
understand correctly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to