showuon commented on code in PR #15133: URL: https://github.com/apache/kafka/pull/15133#discussion_r1461839024
########## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ########## @@ -529,20 +489,16 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf } } -class BrokerTopicAggregatedMetric() { - private val partitionMetricValues = new ConcurrentHashMap[Int, Long]() - - def setPartitionMetricValue(partition: Int, partitionValue: Long): Unit = { - partitionMetricValues.put(partition, partitionValue) - } - - def removePartition(partition: Int): Option[Long] = { - Option.apply(partitionMetricValues.remove(partition)) - } - - def value(): Long = partitionMetricValues.values().stream().mapToLong(v => v).sum() - - def close(): Unit = partitionMetricValues.clear() +class AggregatedMetric { + // The map to store: + // - per-partition value for topic-level metrics. The key will be the partition number + // - per-topic value for broker-level metrics. The key will be the topic name + private val metricValues = new ConcurrentHashMap[String, Long]() + def setValue(key: String, value: Long): Unit = metricValues.put(key, value) + def removeKey(key: String): Option[Long] = Option.apply(metricValues.remove(key)) + // Sum all values in the metricValues map + def value(): Long = metricValues.values().stream().mapToLong(v => v).sum() + def close(): Unit = metricValues.clear() } Review Comment: After the latest refactor, we will use the `AggregatedMetric` for per-topic and per-broker metrics, which makes it simpler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org