vvcephei commented on a change in pull request #9177: URL: https://github.com/apache/kafka/pull/9177#discussion_r478709525
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ########## @@ -396,34 +398,65 @@ private String cacheSensorPrefix(final String threadId, final String taskId, fin + SENSOR_PREFIX_DELIMITER + SENSOR_CACHE_LABEL + SENSOR_PREFIX_DELIMITER + cacheName; } - public final Sensor storeLevelSensor(final String threadId, - final String taskId, + public final Sensor storeLevelSensor(final String taskId, final String storeName, final String sensorName, - final Sensor.RecordingLevel recordingLevel, + final RecordingLevel recordingLevel, final Sensor... parents) { - final String key = storeSensorPrefix(threadId, taskId, storeName); - synchronized (storeLevelSensors) { - final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; - final Sensor sensor = metrics.getSensor(fullSensorName); - if (sensor == null) { + final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName); + final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; + return Optional.ofNullable(metrics.getSensor(fullSensorName)) + .orElseGet(() -> { storeLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName); return metrics.sensor(fullSensorName, recordingLevel, parents); - } else { - return sensor; - } + }); Review comment: Thanks! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org