FrankYang0529 commented on code in PR #16426:
URL: https://github.com/apache/kafka/pull/16426#discussion_r1668384092


##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -284,26 +284,34 @@ class BrokerTopicMetrics(name: Option[String], 
remoteStorageEnabled: Boolean = f
   }
 
   case class GaugeWrapper(metricType: String) {
-    @volatile private var gaugeObject: Gauge[Long] = _
-    final private val gaugeLock = new Object
-    final val aggregatedMetric = new AggregatedMetric()
+    // The map to store:
+    //   - per-partition value for topic-level metrics. The key will be the 
partition number
+    //   - per-topic value for broker-level metrics. The key will be the topic 
name
+    private val metricValues = new ConcurrentHashMap[String, Long]()
+    private val gaugeLock = new Object
+
+    def setValue(key: String, value: Long): Unit = {
+      newGaugeIfNeed()
+      metricValues.put(key, value)
+    }
 
-    def gauge(): Gauge[Long] = gaugeLock synchronized {
-      if (gaugeObject == null) {
-        gaugeObject = metricsGroup.newGauge(metricType, () => 
aggregatedMetric.value(), tags)
-      }
-      return gaugeObject
+    def removeKey(key: String): Unit = {
+      newGaugeIfNeed()
+      metricValues.remove(key)
     }
 
     def close(): Unit = gaugeLock synchronized {
-      if (gaugeObject != null) {
-        metricsGroup.removeMetric(metricType, tags)
-        aggregatedMetric.close()
-        gaugeObject = null
-      }
+      metricsGroup.removeMetric(metricType, tags)
+      metricValues.clear()
     }
 
-    gauge()
+    def value(): Long = metricValues.values().stream().mapToLong(v => v).sum()
+
+    private def newGaugeIfNeed(): Unit = gaugeLock synchronized {
+      metricsGroup.newGauge(metricType, () => value(), tags)

Review Comment:
   Hi @chia7712, thanks for the review. I think we should still keep 
`synchronized`.
   
   1. When running `BrokerTopicMetrics#close`, it only close meter / gauge in 
values, but not delete related key.
   
https://github.com/apache/kafka/blob/a533e246e3ed5f6f6c5be4ebf9d29ae75cab557e/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L485-L488
   2. When using `GaugeWrapper`, it calls `setValue` directly. It means we get 
a closed `GaugeWrapper` from `ConcurrentHashMap ` and reinitialize 
`GaugeWrapper` in `setValue`. In this case, `ConcurrentHashMap` can't help for 
thread-safe.
   
https://github.com/apache/kafka/blob/a533e246e3ed5f6f6c5be4ebf9d29ae75cab557e/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L623-L624



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to