showuon commented on code in PR #15133: URL: https://github.com/apache/kafka/pull/15133#discussion_r1463136420
########## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ########## @@ -406,93 +408,51 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf def invalidOffsetOrSequenceRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter() - def recordRemoteCopyLagBytes(partition: Int, bytesLag: Long): Unit = { - val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric - brokerTopicAggregatedMetric.setPartitionMetricValue(partition, bytesLag) - } - - def removeRemoteCopyLagBytes(partition: Int): Unit = { - val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric - brokerTopicAggregatedMetric.removePartition(partition) + def remoteCopyLagBytesAggrMetric(): AggregatedMetric = { + metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).aggregatedMetric } // Visible for testing - def remoteCopyLagBytes: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric.value() - - def recordRemoteCopyLagSegments(partition: Int, segmentsLag: Long): Unit = { - val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).brokerTopicAggregatedMetric - brokerTopicAggregatedMetric.setPartitionMetricValue(partition, segmentsLag) - } + def remoteCopyLagBytes: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).aggregatedMetric.value() - def removeRemoteCopyLagSegments(partition: Int): Unit = { - val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).brokerTopicAggregatedMetric - brokerTopicAggregatedMetric.removePartition(partition) + def remoteCopyLagSegmentsAggrMetric(): AggregatedMetric = { + metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).aggregatedMetric } // Visible for testing - def remoteCopyLagSegments: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).brokerTopicAggregatedMetric.value() - - def recordRemoteLogMetadataCount(partition: Int, count: Long): Unit = { - val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric - brokerTopicAggregatedMetric.setPartitionMetricValue(partition, count) - } + def remoteCopyLagSegments: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).aggregatedMetric.value() - def removeRemoteLogMetadataCount(partition: Int): Unit = { - val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric - brokerTopicAggregatedMetric.removePartition(partition) + def remoteLogMetadataCountAggrMetric(): AggregatedMetric = { + metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).aggregatedMetric } - def remoteLogMetadataCount: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric.value() - - def recordRemoteLogSizeBytes(partition: Int, size: Long): Unit = { - val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric - brokerTopicAggregatedMetric.setPartitionMetricValue(partition, size) - } + def remoteLogMetadataCount: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).aggregatedMetric.value() - def removeRemoteLogSizeBytes(partition: Int): Unit = { - val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric - brokerTopicAggregatedMetric.removePartition(partition) + def remoteLogSizeBytesAggrMetric(): AggregatedMetric = { + metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).aggregatedMetric } - def remoteLogSizeBytes: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric.value() - - def recordRemoteLogSizeComputationTime(partition: Int, timeSpent: Long): Unit = { - val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric - brokerTopicAggregatedMetric.setPartitionMetricValue(partition, timeSpent) - } + def remoteLogSizeBytes: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).aggregatedMetric.value() - def removeRemoteLogSizeComputationTime(partition: Int): Unit = { - val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric - brokerTopicAggregatedMetric.removePartition(partition) + def remoteLogSizeComputationTimeAggrMetric(): AggregatedMetric = { + metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).aggregatedMetric } - def remoteLogSizeComputationTime: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric.value() - - def recordRemoteDeleteLagBytes(partition: Int, segmentsLag: Long): Unit = { - val brokerTopicAggregatedMetric = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_DELETE_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric - brokerTopicAggregatedMetric.setPartitionMetricValue(partition, segmentsLag) - } + def remoteLogSizeComputationTime: Long = metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).aggregatedMetric.value() Review Comment: Ah, nice! Let me update it soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org