afshing commented on code in PR #20397:
URL: https://github.com/apache/kafka/pull/20397#discussion_r2294691708


##########
clients/src/test/java/org/apache/kafka/clients/CommonClientConfigsTest.java:
##########
@@ -150,6 +150,8 @@ public void testMetricsReporters() {
         assertEquals(2, reporters.size());
     }
 
+    

Review Comment:
   Remove these extra lines



##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -94,7 +94,7 @@ object DynamicBrokerConfig {
     LogCleaner.RECONFIGURABLE_CONFIGS.asScala ++
     DynamicLogConfig.ReconfigurableConfigs ++
     DynamicThreadPool.RECONFIGURABLE_CONFIGS.asScala ++
-    Set(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG) ++
+    Set(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
MetricConfigs.METRICS_VERBOSITY_CONFIG) ++

Review Comment:
   It seems like the convention is to add each dynamic config in a separate 
line. Maybe we should follow that



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4488,6 +4488,192 @@ class KafkaApisTest extends Logging {
     assertEquals(Seq("broker2"), node.map(_.host))
   }
 
+  @Test
+  def testPartitionLevelMetricsGatedByMetricsVerbosity_Fetch_KafkaApis(): Unit 
= {
+    val t = "t"
+    val u = "u"
+    val tId = Uuid.randomUuid()
+    val uId = Uuid.randomUuid()
+    val tpT = new TopicPartition(t, 0)
+    val tpU = new TopicPartition(u, 0)
+    val tidpT = new TopicIdPartition(tId, tpT)
+    val tidpU = new TopicIdPartition(uId, tpU)
+
+    addTopicToMetadataCache(t, 1, topicId = tId)
+    addTopicToMetadataCache(u, 1, topicId = uId)
+
+    // Ensure log configs exist (not strictly required for gating, but used by 
code paths)
+    
when(replicaManager.getLogConfig(ArgumentMatchers.eq(tpT))).thenReturn(None)
+    
when(replicaManager.getLogConfig(ArgumentMatchers.eq(tpU))).thenReturn(None)
+
+    // Mock replicaManager.fetchMessages to return bytes for both topics
+    when(replicaManager.fetchMessages(
+      any[FetchParams],
+      any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]],
+      any[ReplicaQuota],
+      any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]()
+    )).thenAnswer(invocation => {
+      val callback = 
invocation.getArgument(3).asInstanceOf[Seq[(TopicIdPartition, 
FetchPartitionData)] => Unit]
+      val recT = MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord("a".getBytes(StandardCharsets.UTF_8)))
+      val recU = MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord("b".getBytes(StandardCharsets.UTF_8)))
+      callback(Seq(
+        tidpT -> new FetchPartitionData(Errors.NONE, 0, 0, recT,
+          Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false),
+        tidpU -> new FetchPartitionData(Errors.NONE, 0, 0, recU,
+          Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)
+      ))
+    })
+
+    // Build fetch contexts and request for both topics
+    val fetchData = util.Map.of(
+      tidpT, new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000, 
Optional.empty()),
+      tidpU, new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000, 
Optional.empty())
+    )
+    val fetchDataBuilder = util.Map.of(
+      tpT, new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000, 
Optional.empty()),
+      tpU, new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000, 
Optional.empty())
+    )
+    val fetchMetadata = new JFetchMetadata(0, 0)
+    val fetchContext = new FullFetchContext(time, new 
FetchSessionCacheShard(1000, 100),
+      fetchMetadata, fetchData, false, false)
+    when(fetchManager.newContext(
+      any[Short],
+      any[JFetchMetadata],
+      any[Boolean],
+      any[util.Map[TopicIdPartition, FetchRequest.PartitionData]],
+      any[util.List[TopicIdPartition]],
+      any[util.Map[Uuid, String]])).thenReturn(fetchContext)
+
+    // Configure metrics.verbosity to allow only t for BytesOut/TotalFetch
+    val overrideProps = Map(
+      "metrics.verbosity" -> 
"[{\"level\":\"high\",\"names\":\"BytesOutPerSec|TotalFetchRequestsPerSec\",\"filters\":[{\"topics\":[\"t\"]}]}]"
+    )
+    kafkaApis = createKafkaApis(overrideProperties = overrideProps)
+
+    // Baseline counts
+    val tPartMetrics = brokerTopicStats.partitionStats(t, 0)
+    val uPartMetrics = brokerTopicStats.partitionStats(u, 0)
+    val tBytesOutBefore = tPartMetrics.bytesOutRate().count()
+    val tTotalFetchBefore = tPartMetrics.totalFetchRequestRate().count()
+    val uBytesOutBefore = uPartMetrics.bytesOutRate().count()
+    val uTotalFetchBefore = uPartMetrics.totalFetchRequestRate().count()
+
+    val fetchRequest = new FetchRequest.Builder(ApiKeys.FETCH.latestVersion, 
ApiKeys.FETCH.latestVersion,
+      -1, -1, 100, 0, fetchDataBuilder).metadata(fetchMetadata).build()
+    val request = buildRequest(fetchRequest)
+    kafkaApis.handleFetchRequest(request)
+
+    verifyNoThrottling[FetchResponse](request)
+
+    // After fetch: topic t should advance; topic u should remain unchanged

Review Comment:
   Something is strange here. First we check if bytesOutRate > bytesOutBefore, 
and then we check they are equal? Am I reading correct?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4488,6 +4488,192 @@ class KafkaApisTest extends Logging {
     assertEquals(Seq("broker2"), node.map(_.host))
   }
 
+  @Test

Review Comment:
   We also need tests for
   - Produce
   - the request from follower
   - the request with size of zero
   
   In general all the conditional branches



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -745,6 +746,22 @@ class KafkaApis(val requestChannel: RequestChannel,
             if (topicResponse.topic != null) {
               val tp = new TopicIdPartition(topicResponse.topicId, new 
TopicPartition(topicResponse.topic, data.partitionIndex))
               brokerTopicStats.updateBytesOut(tp.topic, 
fetchRequest.isFromFollower, reassigningPartitions.contains(tp), 
FetchResponse.recordsSize(data))
+              // Partition-level throughput (KIP-977): record per-partition 
fetch metrics for leader (client fetches), gated by metrics.verbosity
+              if (!fetchRequest.isFromFollower) {
+                val size = FetchResponse.recordsSize(data)
+                val serverConfig = config
+                if (size > 0 && 
org.apache.kafka.server.metrics.MetricsVerbosityController.shouldEmitPartitionMetric(
+                  serverConfig, BrokerTopicMetrics.BYTES_OUT_PER_SEC, 
tp.topic)) {
+                  val m = brokerTopicStats.partitionStats(tp.topic, 
tp.partition)
+                  m.bytesOutRate().mark(size)
+                }
+                if 
(org.apache.kafka.server.metrics.MetricsVerbosityController.shouldEmitPartitionMetric(

Review Comment:
   Let's import `org.apache.kafka.server.metrics.MetricsVerbosityController` in 
this file and avoid package name 



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -745,6 +746,22 @@ class KafkaApis(val requestChannel: RequestChannel,
             if (topicResponse.topic != null) {
               val tp = new TopicIdPartition(topicResponse.topicId, new 
TopicPartition(topicResponse.topic, data.partitionIndex))
               brokerTopicStats.updateBytesOut(tp.topic, 
fetchRequest.isFromFollower, reassigningPartitions.contains(tp), 
FetchResponse.recordsSize(data))
+              // Partition-level throughput (KIP-977): record per-partition 
fetch metrics for leader (client fetches), gated by metrics.verbosity
+              if (!fetchRequest.isFromFollower) {
+                val size = FetchResponse.recordsSize(data)
+                val serverConfig = config

Review Comment:
   It seems like we can pass the `config` as is, and there is no value in 
assigning it to a new variable



##########
server/src/main/java/org/apache/kafka/server/metrics/MetricConfigs.java:
##########
@@ -71,11 +78,15 @@ public class MetricConfigs {
             .define(METRIC_SAMPLE_WINDOW_MS_CONFIG, LONG, 
METRIC_SAMPLE_WINDOW_MS_DEFAULT, atLeast(1), LOW, METRIC_SAMPLE_WINDOW_MS_DOC)
             .define(METRIC_REPORTER_CLASSES_CONFIG, LIST, 
METRIC_REPORTER_CLASSES_DEFAULT, LOW, METRIC_REPORTER_CLASSES_DOC)
             .define(METRIC_RECORDING_LEVEL_CONFIG, STRING, 
METRIC_RECORDING_LEVEL_DEFAULT, LOW, METRIC_RECORDING_LEVEL_DOC)
+            .define(METRICS_VERBOSITY_CONFIG, STRING, 
METRICS_VERBOSITY_DEFAULT, LOW, METRICS_VERBOSITY_DOC)
 
             // Kafka Yammer Metrics Reporter Configuration
             .define(KAFKA_METRICS_REPORTER_CLASSES_CONFIG, LIST, 
KAFKA_METRIC_REPORTER_CLASSES_DEFAULT, LOW, KAFKA_METRICS_REPORTER_CLASSES_DOC)
             .define(KAFKA_METRICS_POLLING_INTERVAL_SECONDS_CONFIG, INT, 
KAFKA_METRICS_POLLING_INTERVAL_SECONDS_DEFAULT, atLeast(1), LOW, 
KAFKA_METRICS_POLLING_INTERVAL_SECONDS_DOC)
 
+            // Kafka Metrics Verbosity Configuration (KIP-977)

Review Comment:
   No need for this comment either



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -245,6 +245,244 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testPartitionLevelMetricsGatedByMetricsVerbosity_Produce(): Unit = {
+    val dir = TestUtils.tempDir()
+    val props = TestUtils.createBrokerConfig(0)
+    // Enable KIP-977 partition-level Bytes* metrics only for topic "t1"
+    props.put("metrics.verbosity", 
"[{\"level\":\"high\",\"names\":\"Bytes.*\",\"filters\":[{\"topics\":[\"t1\"]}]}]")
+    val config = KafkaConfig.fromProps(props)
+    val logManager = TestUtils.createLogManager(Seq(dir).asJava)
+    val rm = new ReplicaManager(
+      metrics = metrics,
+      config = config,
+      time = time,
+      scheduler = new MockScheduler(time),
+      logManager = logManager,
+      quotaManagers = quotaManager,
+      metadataCache = metadataCache,
+      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+      alterPartitionManager = alterPartitionManager)
+    try {
+      val t1 = "t1"
+      val t2 = "t2"
+      val t1Id = Uuid.randomUuid()
+      val t2Id = Uuid.randomUuid()
+      setupMetadataCacheWithTopicIds(Map(t1 -> t1Id, t2 -> t2Id), 
metadataCache)
+      val tp1 = new TopicPartition(t1, 0)
+      val tp2 = new TopicPartition(t2, 0)
+
+      val leaderDelta = topicsCreateDelta(0, isStartIdLeader = true, 
partitions = List(0), topicName = t1, topicId = t1Id)
+      rm.applyDelta(leaderDelta, imageFromTopics(leaderDelta.apply()))
+      val leaderDelta2 = topicsCreateDelta(0, isStartIdLeader = true, 
partitions = List(0), topicName = t2, topicId = t2Id)
+      rm.applyDelta(leaderDelta2, imageFromTopics(leaderDelta2.apply()))
+
+      val t1PartMetrics = rm.brokerTopicStats.partitionStats(t1, 0)
+      val t2PartMetrics = rm.brokerTopicStats.partitionStats(t2, 0)
+
+      val t1BytesBefore = t1PartMetrics.bytesInRate().count()
+      val t1MsgsBefore = t1PartMetrics.messagesInRate().count()
+      val t2BytesBefore = t2PartMetrics.bytesInRate().count()
+      val t2MsgsBefore = t2PartMetrics.messagesInRate().count()
+
+      // Append to t1 should mark partition-level Bytes* and Messages* meters
+      appendRecords(rm, tp1, TestUtils.singletonRecords("a".getBytes)).onFire 
{ response =>
+        assertEquals(Errors.NONE, response.error)
+      }
+      assertTrue(t1PartMetrics.bytesInRate().count() > t1BytesBefore)

Review Comment:
   Can we be more specific and check if the count is actually equal to 
bytesBefore + diff?



##########
server/src/main/java/org/apache/kafka/server/metrics/MetricConfigs.java:
##########
@@ -46,6 +46,8 @@ public class MetricConfigs {
     public static final String METRIC_RECORDING_LEVEL_DEFAULT = 
Sensor.RecordingLevel.INFO.toString();
     public static final String METRIC_RECORDING_LEVEL_DOC = 
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
 
+    // KIP-977: metrics verbosity config is defined in CommonClientConfigs for 
consistency

Review Comment:
   This comment seems to be a left over



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to