showuon commented on code in PR #15133:
URL: https://github.com/apache/kafka/pull/15133#discussion_r1463136420


##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -406,93 +408,51 @@ class BrokerTopicMetrics(name: Option[String], configOpt: 
java.util.Optional[Kaf
 
   def invalidOffsetOrSequenceRecordsPerSec: Meter = 
metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()
 
-  def recordRemoteCopyLagBytes(partition: Int, bytesLag: Long): Unit = {
-    val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric
-    brokerTopicAggregatedMetric.setPartitionMetricValue(partition, bytesLag)
-  }
-
-  def removeRemoteCopyLagBytes(partition: Int): Unit = {
-    val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric
-    brokerTopicAggregatedMetric.removePartition(partition)
+  def remoteCopyLagBytesAggrMetric(): AggregatedMetric = {
+    
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).aggregatedMetric
   }
 
   // Visible for testing
-  def remoteCopyLagBytes: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric.value()
-
-  def recordRemoteCopyLagSegments(partition: Int, segmentsLag: Long): Unit = {
-    val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).brokerTopicAggregatedMetric
-    brokerTopicAggregatedMetric.setPartitionMetricValue(partition, segmentsLag)
-  }
+  def remoteCopyLagBytes: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).aggregatedMetric.value()
 
-  def removeRemoteCopyLagSegments(partition: Int): Unit = {
-    val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).brokerTopicAggregatedMetric
-    brokerTopicAggregatedMetric.removePartition(partition)
+  def remoteCopyLagSegmentsAggrMetric(): AggregatedMetric = {
+    
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).aggregatedMetric
   }
 
   // Visible for testing
-  def remoteCopyLagSegments: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).brokerTopicAggregatedMetric.value()
-
-  def recordRemoteLogMetadataCount(partition: Int, count: Long): Unit = {
-    val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric
-    brokerTopicAggregatedMetric.setPartitionMetricValue(partition, count)
-  }
+  def remoteCopyLagSegments: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).aggregatedMetric.value()
 
-  def removeRemoteLogMetadataCount(partition: Int): Unit = {
-    val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric
-    brokerTopicAggregatedMetric.removePartition(partition)
+  def remoteLogMetadataCountAggrMetric(): AggregatedMetric = {
+    
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).aggregatedMetric
   }
 
-  def remoteLogMetadataCount: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric.value()
-
-  def recordRemoteLogSizeBytes(partition: Int, size: Long): Unit = {
-    val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric
-    brokerTopicAggregatedMetric.setPartitionMetricValue(partition, size)
-  }
+  def remoteLogMetadataCount: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).aggregatedMetric.value()
 
-  def removeRemoteLogSizeBytes(partition: Int): Unit = {
-    val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric
-    brokerTopicAggregatedMetric.removePartition(partition)
+  def remoteLogSizeBytesAggrMetric(): AggregatedMetric = {
+    
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).aggregatedMetric
   }
 
-  def remoteLogSizeBytes: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric.value()
-
-  def recordRemoteLogSizeComputationTime(partition: Int, timeSpent: Long): 
Unit = {
-    val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric
-    brokerTopicAggregatedMetric.setPartitionMetricValue(partition, timeSpent)
-  }
+  def remoteLogSizeBytes: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).aggregatedMetric.value()
 
-  def removeRemoteLogSizeComputationTime(partition: Int): Unit = {
-    val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric
-    brokerTopicAggregatedMetric.removePartition(partition)
+  def remoteLogSizeComputationTimeAggrMetric(): AggregatedMetric = {
+    
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).aggregatedMetric
   }
 
-  def remoteLogSizeComputationTime: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric.value()
-
-  def recordRemoteDeleteLagBytes(partition: Int, segmentsLag: Long): Unit = {
-    val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric
-    brokerTopicAggregatedMetric.setPartitionMetricValue(partition, segmentsLag)
-  }
+  def remoteLogSizeComputationTime: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).aggregatedMetric.value()

Review Comment:
   Ah, nice! Let me update it soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to