chia7712 commented on code in PR #16426:
URL: https://github.com/apache/kafka/pull/16426#discussion_r1668347193


##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -284,26 +284,34 @@ class BrokerTopicMetrics(name: Option[String], 
remoteStorageEnabled: Boolean = f
   }
 
   case class GaugeWrapper(metricType: String) {
-    @volatile private var gaugeObject: Gauge[Long] = _
-    final private val gaugeLock = new Object
-    final val aggregatedMetric = new AggregatedMetric()
+    // The map to store:
+    //   - per-partition value for topic-level metrics. The key will be the 
partition number
+    //   - per-topic value for broker-level metrics. The key will be the topic 
name
+    private val metricValues = new ConcurrentHashMap[String, Long]()
+    private val gaugeLock = new Object
+
+    def setValue(key: String, value: Long): Unit = {
+      newGaugeIfNeed()
+      metricValues.put(key, value)
+    }
 
-    def gauge(): Gauge[Long] = gaugeLock synchronized {
-      if (gaugeObject == null) {
-        gaugeObject = metricsGroup.newGauge(metricType, () => 
aggregatedMetric.value(), tags)
-      }
-      return gaugeObject
+    def removeKey(key: String): Unit = {
+      newGaugeIfNeed()
+      metricValues.remove(key)
     }
 
     def close(): Unit = gaugeLock synchronized {
-      if (gaugeObject != null) {
-        metricsGroup.removeMetric(metricType, tags)
-        aggregatedMetric.close()
-        gaugeObject = null
-      }
+      metricsGroup.removeMetric(metricType, tags)
+      metricValues.clear()
     }
 
-    gauge()
+    def value(): Long = metricValues.values().stream().mapToLong(v => v).sum()
+
+    private def newGaugeIfNeed(): Unit = gaugeLock synchronized {
+      metricsGroup.newGauge(metricType, () => value(), tags)

Review Comment:
   @FrankYang0529 could you please check whether the impl is thread-safe? it 
seems the impl leverage `ConcurrentHashMap` to have thread-safe. If so, we 
don't need those extra locks 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to