afshing commented on code in PR #20397: URL: https://github.com/apache/kafka/pull/20397#discussion_r2294691708
########## clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java: ########## @@ -150,6 +150,8 @@ public void testMetricsReporters() { assertEquals(2, reporters.size()); } + Review Comment: Remove these extra lines ########## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ########## @@ -94,7 +94,7 @@ object DynamicBrokerConfig { LogCleaner.RECONFIGURABLE_CONFIGS.asScala ++ DynamicLogConfig.ReconfigurableConfigs ++ DynamicThreadPool.RECONFIGURABLE_CONFIGS.asScala ++ - Set(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG) ++ + Set(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricConfigs.METRICS_VERBOSITY_CONFIG) ++ Review Comment: It seems like the convention is to add each dynamic config in a separate line. Maybe we should follow that ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4488,6 +4488,192 @@ class KafkaApisTest extends Logging { assertEquals(Seq("broker2"), node.map(_.host)) } + @Test + def testPartitionLevelMetricsGatedByMetricsVerbosity_Fetch_KafkaApis(): Unit = { + val t = "t" + val u = "u" + val tId = Uuid.randomUuid() + val uId = Uuid.randomUuid() + val tpT = new TopicPartition(t, 0) + val tpU = new TopicPartition(u, 0) + val tidpT = new TopicIdPartition(tId, tpT) + val tidpU = new TopicIdPartition(uId, tpU) + + addTopicToMetadataCache(t, 1, topicId = tId) + addTopicToMetadataCache(u, 1, topicId = uId) + + // Ensure log configs exist (not strictly required for gating, but used by code paths) + when(replicaManager.getLogConfig(ArgumentMatchers.eq(tpT))).thenReturn(None) + when(replicaManager.getLogConfig(ArgumentMatchers.eq(tpU))).thenReturn(None) + + // Mock replicaManager.fetchMessages to return bytes for both topics + when(replicaManager.fetchMessages( + any[FetchParams], + any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]], + any[ReplicaQuota], + any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]() + )).thenAnswer(invocation => { + val callback = invocation.getArgument(3).asInstanceOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] + val recT = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("a".getBytes(StandardCharsets.UTF_8))) + val recU = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("b".getBytes(StandardCharsets.UTF_8))) + callback(Seq( + tidpT -> new FetchPartitionData(Errors.NONE, 0, 0, recT, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false), + tidpU -> new FetchPartitionData(Errors.NONE, 0, 0, recU, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false) + )) + }) + + // Build fetch contexts and request for both topics + val fetchData = util.Map.of( + tidpT, new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000, Optional.empty()), + tidpU, new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000, Optional.empty()) + ) + val fetchDataBuilder = util.Map.of( + tpT, new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000, Optional.empty()), + tpU, new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000, Optional.empty()) + ) + val fetchMetadata = new JFetchMetadata(0, 0) + val fetchContext = new FullFetchContext(time, new FetchSessionCacheShard(1000, 100), + fetchMetadata, fetchData, false, false) + when(fetchManager.newContext( + any[Short], + any[JFetchMetadata], + any[Boolean], + any[util.Map[TopicIdPartition, FetchRequest.PartitionData]], + any[util.List[TopicIdPartition]], + any[util.Map[Uuid, String]])).thenReturn(fetchContext) + + // Configure metrics.verbosity to allow only t for BytesOut/TotalFetch + val overrideProps = Map( + "metrics.verbosity" -> "[{\"level\":\"high\",\"names\":\"BytesOutPerSec|TotalFetchRequestsPerSec\",\"filters\":[{\"topics\":[\"t\"]}]}]" + ) + kafkaApis = createKafkaApis(overrideProperties = overrideProps) + + // Baseline counts + val tPartMetrics = brokerTopicStats.partitionStats(t, 0) + val uPartMetrics = brokerTopicStats.partitionStats(u, 0) + val tBytesOutBefore = tPartMetrics.bytesOutRate().count() + val tTotalFetchBefore = tPartMetrics.totalFetchRequestRate().count() + val uBytesOutBefore = uPartMetrics.bytesOutRate().count() + val uTotalFetchBefore = uPartMetrics.totalFetchRequestRate().count() + + val fetchRequest = new FetchRequest.Builder(ApiKeys.FETCH.latestVersion, ApiKeys.FETCH.latestVersion, + -1, -1, 100, 0, fetchDataBuilder).metadata(fetchMetadata).build() + val request = buildRequest(fetchRequest) + kafkaApis.handleFetchRequest(request) + + verifyNoThrottling[FetchResponse](request) + + // After fetch: topic t should advance; topic u should remain unchanged Review Comment: Something is strange here. First we check if bytesOutRate > bytesOutBefore, and then we check they are equal? Am I reading correct? ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4488,6 +4488,192 @@ class KafkaApisTest extends Logging { assertEquals(Seq("broker2"), node.map(_.host)) } + @Test Review Comment: We also need tests for - Produce - the request from follower - the request with size of zero In general all the conditional branches ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -745,6 +746,22 @@ class KafkaApis(val requestChannel: RequestChannel, if (topicResponse.topic != null) { val tp = new TopicIdPartition(topicResponse.topicId, new TopicPartition(topicResponse.topic, data.partitionIndex)) brokerTopicStats.updateBytesOut(tp.topic, fetchRequest.isFromFollower, reassigningPartitions.contains(tp), FetchResponse.recordsSize(data)) + // Partition-level throughput (KIP-977): record per-partition fetch metrics for leader (client fetches), gated by metrics.verbosity + if (!fetchRequest.isFromFollower) { + val size = FetchResponse.recordsSize(data) + val serverConfig = config + if (size > 0 && org.apache.kafka.server.metrics.MetricsVerbosityController.shouldEmitPartitionMetric( + serverConfig, BrokerTopicMetrics.BYTES_OUT_PER_SEC, tp.topic)) { + val m = brokerTopicStats.partitionStats(tp.topic, tp.partition) + m.bytesOutRate().mark(size) + } + if (org.apache.kafka.server.metrics.MetricsVerbosityController.shouldEmitPartitionMetric( Review Comment: Let's import `org.apache.kafka.server.metrics.MetricsVerbosityController` in this file and avoid package name ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -745,6 +746,22 @@ class KafkaApis(val requestChannel: RequestChannel, if (topicResponse.topic != null) { val tp = new TopicIdPartition(topicResponse.topicId, new TopicPartition(topicResponse.topic, data.partitionIndex)) brokerTopicStats.updateBytesOut(tp.topic, fetchRequest.isFromFollower, reassigningPartitions.contains(tp), FetchResponse.recordsSize(data)) + // Partition-level throughput (KIP-977): record per-partition fetch metrics for leader (client fetches), gated by metrics.verbosity + if (!fetchRequest.isFromFollower) { + val size = FetchResponse.recordsSize(data) + val serverConfig = config Review Comment: It seems like we can pass the `config` as is, and there is no value in assigning it to a new variable ########## server/src/main/java/org/apache/kafka/server/metrics/MetricConfigs.java: ########## @@ -71,11 +78,15 @@ public class MetricConfigs { .define(METRIC_SAMPLE_WINDOW_MS_CONFIG, LONG, METRIC_SAMPLE_WINDOW_MS_DEFAULT, atLeast(1), LOW, METRIC_SAMPLE_WINDOW_MS_DOC) .define(METRIC_REPORTER_CLASSES_CONFIG, LIST, METRIC_REPORTER_CLASSES_DEFAULT, LOW, METRIC_REPORTER_CLASSES_DOC) .define(METRIC_RECORDING_LEVEL_CONFIG, STRING, METRIC_RECORDING_LEVEL_DEFAULT, LOW, METRIC_RECORDING_LEVEL_DOC) + .define(METRICS_VERBOSITY_CONFIG, STRING, METRICS_VERBOSITY_DEFAULT, LOW, METRICS_VERBOSITY_DOC) // Kafka Yammer Metrics Reporter Configuration .define(KAFKA_METRICS_REPORTER_CLASSES_CONFIG, LIST, KAFKA_METRIC_REPORTER_CLASSES_DEFAULT, LOW, KAFKA_METRICS_REPORTER_CLASSES_DOC) .define(KAFKA_METRICS_POLLING_INTERVAL_SECONDS_CONFIG, INT, KAFKA_METRICS_POLLING_INTERVAL_SECONDS_DEFAULT, atLeast(1), LOW, KAFKA_METRICS_POLLING_INTERVAL_SECONDS_DOC) + // Kafka Metrics Verbosity Configuration (KIP-977) Review Comment: No need for this comment either ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -245,6 +245,244 @@ class ReplicaManagerTest { } } + @Test + def testPartitionLevelMetricsGatedByMetricsVerbosity_Produce(): Unit = { + val dir = TestUtils.tempDir() + val props = TestUtils.createBrokerConfig(0) + // Enable KIP-977 partition-level Bytes* metrics only for topic "t1" + props.put("metrics.verbosity", "[{\"level\":\"high\",\"names\":\"Bytes.*\",\"filters\":[{\"topics\":[\"t1\"]}]}]") + val config = KafkaConfig.fromProps(props) + val logManager = TestUtils.createLogManager(Seq(dir).asJava) + val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = logManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + try { + val t1 = "t1" + val t2 = "t2" + val t1Id = Uuid.randomUuid() + val t2Id = Uuid.randomUuid() + setupMetadataCacheWithTopicIds(Map(t1 -> t1Id, t2 -> t2Id), metadataCache) + val tp1 = new TopicPartition(t1, 0) + val tp2 = new TopicPartition(t2, 0) + + val leaderDelta = topicsCreateDelta(0, isStartIdLeader = true, partitions = List(0), topicName = t1, topicId = t1Id) + rm.applyDelta(leaderDelta, imageFromTopics(leaderDelta.apply())) + val leaderDelta2 = topicsCreateDelta(0, isStartIdLeader = true, partitions = List(0), topicName = t2, topicId = t2Id) + rm.applyDelta(leaderDelta2, imageFromTopics(leaderDelta2.apply())) + + val t1PartMetrics = rm.brokerTopicStats.partitionStats(t1, 0) + val t2PartMetrics = rm.brokerTopicStats.partitionStats(t2, 0) + + val t1BytesBefore = t1PartMetrics.bytesInRate().count() + val t1MsgsBefore = t1PartMetrics.messagesInRate().count() + val t2BytesBefore = t2PartMetrics.bytesInRate().count() + val t2MsgsBefore = t2PartMetrics.messagesInRate().count() + + // Append to t1 should mark partition-level Bytes* and Messages* meters + appendRecords(rm, tp1, TestUtils.singletonRecords("a".getBytes)).onFire { response => + assertEquals(Errors.NONE, response.error) + } + assertTrue(t1PartMetrics.bytesInRate().count() > t1BytesBefore) Review Comment: Can we be more specific and check if the count is actually equal to bytesBefore + diff? ########## server/src/main/java/org/apache/kafka/server/metrics/MetricConfigs.java: ########## @@ -46,6 +46,8 @@ public class MetricConfigs { public static final String METRIC_RECORDING_LEVEL_DEFAULT = Sensor.RecordingLevel.INFO.toString(); public static final String METRIC_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC; + // KIP-977: metrics verbosity config is defined in CommonClientConfigs for consistency Review Comment: This comment seems to be a left over -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org