dongjinleekr commented on a change in pull request #11586: URL: https://github.com/apache/kafka/pull/11586#discussion_r805725430
########## File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java ########## @@ -1305,44 +1314,62 @@ private Sensor sensor(String name, Sensor... parents) { } public void maybeRegisterConnectionMetrics(String connectionId) { - if (!connectionId.isEmpty() && metricsPerConnection) { - // if one sensor of the metrics has been registered for the connection, - // then all other sensors should have been registered; and vice versa - String nodeRequestName = "node-" + connectionId + ".requests-sent"; - Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); - if (nodeRequest == null) { - Map<String, String> tags = new LinkedHashMap<>(metricTags); - tags.put("node-id", "node-" + connectionId); - - nodeRequest = sensor(nodeRequestName); - nodeRequest.add(createMeter(metrics, perConnectionMetricGrpName, tags, new WindowedCount(), "request", "requests sent")); - MetricName metricName = metrics.metricName("request-size-avg", perConnectionMetricGrpName, "The average size of requests sent.", tags); - nodeRequest.add(metricName, new Avg()); - metricName = metrics.metricName("request-size-max", perConnectionMetricGrpName, "The maximum size of any request sent.", tags); - nodeRequest.add(metricName, new Max()); - - String bytesSentName = "node-" + connectionId + ".bytes-sent"; - Sensor bytesSent = sensor(bytesSentName); - bytesSent.add(createMeter(metrics, perConnectionMetricGrpName, tags, "outgoing-byte", "outgoing bytes")); - - String nodeResponseName = "node-" + connectionId + ".responses-received"; - Sensor nodeResponse = sensor(nodeResponseName); - nodeResponse.add(createMeter(metrics, perConnectionMetricGrpName, tags, new WindowedCount(), "response", "responses received")); - - String bytesReceivedName = "node-" + connectionId + ".bytes-received"; - Sensor bytesReceive = sensor(bytesReceivedName); - bytesReceive.add(createMeter(metrics, perConnectionMetricGrpName, tags, "incoming-byte", "incoming bytes")); - - String nodeTimeName = "node-" + connectionId + ".latency"; - Sensor nodeRequestTime = sensor(nodeTimeName); - metricName = metrics.metricName("request-latency-avg", perConnectionMetricGrpName, tags); - nodeRequestTime.add(metricName, new Avg()); - metricName = metrics.metricName("request-latency-max", perConnectionMetricGrpName, tags); - nodeRequestTime.add(metricName, new Max()); - } + if (!connectionId.isEmpty() && connectionMetrics != null) { + connectionMetrics.computeIfAbsent(connectionId, (key) -> { + // key: connection id + // value: set of sensors (currently null) + return perConnectionSensors(key); + }); } } + public void maybeUnregisterConnectionMetrics(String connectionId) { + if (!connectionId.isEmpty() && connectionMetrics != null) { + connectionMetrics.computeIfPresent(connectionId, (key, value) -> { + // key: connection id + // value: set of sensors + for (Sensor sensor : value) { + metrics.removeSensor(sensor.name()); + } + return null; + }); + } + } + + private Set<Sensor> perConnectionSensors(String connectionId) { Review comment: So... `ConnectionMetrics` should hold all the metrics related to given `connectionId`, in the form of `Set<Sensor>`. Do I understand correctly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org