chia7712 commented on code in PR #16426:
URL: https://github.com/apache/kafka/pull/16426#discussion_r1668391118
##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -284,26 +284,34 @@ class BrokerTopicMetrics(name: Option[String],
remoteStorageEnabled: Boolean = f
}
case class GaugeWrapper(metricType: String) {
- @volatile private var gaugeObject: Gauge[Long] = _
- final private val gaugeLock = new Object
- final val aggregatedMetric = new AggregatedMetric()
+ // The map to store:
+ // - per-partition value for topic-level metrics. The key will be the
partition number
+ // - per-topic value for broker-level metrics. The key will be the topic
name
+ private val metricValues = new ConcurrentHashMap[String, Long]()
+ private val gaugeLock = new Object
+
+ def setValue(key: String, value: Long): Unit = {
+ newGaugeIfNeed()
+ metricValues.put(key, value)
+ }
- def gauge(): Gauge[Long] = gaugeLock synchronized {
- if (gaugeObject == null) {
- gaugeObject = metricsGroup.newGauge(metricType, () =>
aggregatedMetric.value(), tags)
- }
- return gaugeObject
+ def removeKey(key: String): Unit = {
+ newGaugeIfNeed()
+ metricValues.remove(key)
}
def close(): Unit = gaugeLock synchronized {
- if (gaugeObject != null) {
- metricsGroup.removeMetric(metricType, tags)
- aggregatedMetric.close()
- gaugeObject = null
- }
+ metricsGroup.removeMetric(metricType, tags)
+ metricValues.clear()
}
- gauge()
+ def value(): Long = metricValues.values().stream().mapToLong(v => v).sum()
+
+ private def newGaugeIfNeed(): Unit = gaugeLock synchronized {
+ metricsGroup.newGauge(metricType, () => value(), tags)
Review Comment:
@FrankYang0529 sorry for unclear comment. the impl I mentioned is
`metricsGroup.newGauge`. the impl seems to use `ConcurrentHashMap`, so it
should be safe to call `metricsGroup.newGauge` without lock.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]