divijvaidya commented on code in PR #15133:
URL: https://github.com/apache/kafka/pull/15133#discussion_r1461999960


##########
core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala:
##########
@@ -245,403 +251,438 @@ class KafkaRequestHandlerTest {
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def testSingularCopyLagBytesMetric(systemRemoteStorageEnabled: Boolean): 
Unit = {
-    val brokerTopicMetrics = 
setupBrokerTopicMetrics(systemRemoteStorageEnabled)
+    val props = kafka.utils.TestUtils.createDummyBrokerConfig()
+    
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
systemRemoteStorageEnabled.toString)
+    val brokerTopicStats = new 
BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props)))
+    val brokerTopicMetrics = brokerTopicStats.topicStats(topic)
 
     if (systemRemoteStorageEnabled) {
-      brokerTopicMetrics.recordRemoteCopyLagBytes(0, 100);
-      brokerTopicMetrics.recordRemoteCopyLagBytes(1, 150);
-      brokerTopicMetrics.recordRemoteCopyLagBytes(2, 250);
+      brokerTopicStats.recordRemoteCopyLagBytes(topic, 0, 100)
+      brokerTopicStats.recordRemoteCopyLagBytes(topic, 1, 150)
+      brokerTopicStats.recordRemoteCopyLagBytes(topic, 2, 250)
       assertEquals(500, brokerTopicMetrics.remoteCopyLagBytes)
+      assertEquals(500, brokerTopicStats.allTopicsStats.remoteCopyLagBytes)
+      brokerTopicStats.recordRemoteCopyLagBytes(topic2, 0, 100)
+      assertEquals(600, brokerTopicStats.allTopicsStats.remoteCopyLagBytes)
     } else {
       assertEquals(None, 
brokerTopicMetrics.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName))
+      assertEquals(None, 
brokerTopicStats.allTopicsStats.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName))
     }
   }
 
   @Test
   def testMultipleCopyLagBytesMetrics(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteCopyLagBytes(0, 1);
-    brokerTopicMetrics.recordRemoteCopyLagBytes(1, 2);
-    brokerTopicMetrics.recordRemoteCopyLagBytes(2, 3);
+    brokerTopicStats.recordRemoteCopyLagBytes(topic, 0, 1)
+    brokerTopicStats.recordRemoteCopyLagBytes(topic, 1, 2)
+    brokerTopicStats.recordRemoteCopyLagBytes(topic, 2, 3)
 
-    brokerTopicMetrics.recordRemoteCopyLagBytes(0, 4);
-    brokerTopicMetrics.recordRemoteCopyLagBytes(1, 5);
-    brokerTopicMetrics.recordRemoteCopyLagBytes(2, 6);
+    brokerTopicStats.recordRemoteCopyLagBytes(topic, 0, 4)
+    brokerTopicStats.recordRemoteCopyLagBytes(topic, 1, 5)
+    brokerTopicStats.recordRemoteCopyLagBytes(topic, 2, 6)
 
     assertEquals(15, brokerTopicMetrics.remoteCopyLagBytes)
+    assertEquals(15, allTopicMetrics.remoteCopyLagBytes)
+    brokerTopicStats.recordRemoteCopyLagBytes(topic2, 2, 5)
+    assertEquals(20, allTopicMetrics.remoteCopyLagBytes)
   }
 
   @Test
   def testCopyLagBytesMetricWithPartitionExpansion(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteCopyLagBytes(0, 1);
-    brokerTopicMetrics.recordRemoteCopyLagBytes(1, 2);
+    brokerTopicStats.recordRemoteCopyLagBytes(topic, 0, 1)
+    brokerTopicStats.recordRemoteCopyLagBytes(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteCopyLagBytes)
+    assertEquals(3, allTopicMetrics.remoteCopyLagBytes)
 
-    brokerTopicMetrics.recordRemoteCopyLagBytes(2, 3);
+    brokerTopicStats.recordRemoteCopyLagBytes(topic, 2, 3)
 
     assertEquals(6, brokerTopicMetrics.remoteCopyLagBytes)
+    assertEquals(6, allTopicMetrics.remoteCopyLagBytes)
+    brokerTopicStats.recordRemoteCopyLagBytes(topic2, 0, 1)
+    assertEquals(7, allTopicMetrics.remoteCopyLagBytes)
   }
 
   @Test
   def testCopyLagBytesMetricWithPartitionShrinking(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteCopyLagBytes(0, 1);
-    brokerTopicMetrics.recordRemoteCopyLagBytes(1, 2);
+    brokerTopicStats.recordRemoteCopyLagBytes(topic, 0, 1)
+    brokerTopicStats.recordRemoteCopyLagBytes(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteCopyLagBytes)
+    assertEquals(3, allTopicMetrics.remoteCopyLagBytes)
 
-    brokerTopicMetrics.removeRemoteCopyLagBytes(1);
+    brokerTopicStats.removeRemoteCopyLagBytes(topic, 1)
 
     assertEquals(1, brokerTopicMetrics.remoteCopyLagBytes)
+    assertEquals(1, allTopicMetrics.remoteCopyLagBytes)
+
+    brokerTopicStats.recordRemoteCopyLagBytes(topic2, 0, 1)
+    assertEquals(2, allTopicMetrics.remoteCopyLagBytes)
   }
 
   @Test
   def testCopyLagBytesMetricWithRemovingNonexistentPartitions(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteCopyLagBytes(0, 1);
-    brokerTopicMetrics.recordRemoteCopyLagBytes(1, 2);
+    brokerTopicStats.recordRemoteCopyLagBytes(topic, 0, 1)
+    brokerTopicStats.recordRemoteCopyLagBytes(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteCopyLagBytes)
+    assertEquals(3, allTopicMetrics.remoteCopyLagBytes)
 
-    brokerTopicMetrics.removeRemoteCopyLagBytes(3);
+
+    brokerTopicStats.removeRemoteCopyLagBytes(topic, 3)
 
     assertEquals(3, brokerTopicMetrics.remoteCopyLagBytes)
+    assertEquals(3, allTopicMetrics.remoteCopyLagBytes)
   }
 
   @Test
   def testCopyLagBytesMetricClear(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteCopyLagBytes(0, 1);
-    brokerTopicMetrics.recordRemoteCopyLagBytes(1, 2);
+    brokerTopicStats.recordRemoteCopyLagBytes(topic, 0, 1)
+    brokerTopicStats.recordRemoteCopyLagBytes(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteCopyLagBytes)
+    assertEquals(3, allTopicMetrics.remoteCopyLagBytes)
 
-    brokerTopicMetrics.close()
+    brokerTopicStats.close()
 
     assertEquals(0, brokerTopicMetrics.remoteCopyLagBytes)
+    assertEquals(0, allTopicMetrics.remoteCopyLagBytes)
+
+    brokerTopicStats.recordRemoteCopyLagBytes(topic2, 0, 1)
+    assertEquals(1, allTopicMetrics.remoteCopyLagBytes)
   }
 
   @Test
   def testMultipleCopyLagSegmentsMetrics(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteCopyLagSegments(0, 1);
-    brokerTopicMetrics.recordRemoteCopyLagSegments(1, 2);
-    brokerTopicMetrics.recordRemoteCopyLagSegments(2, 3);
+    brokerTopicStats.recordRemoteCopyLagSegments(topic, 0, 1)
+    brokerTopicStats.recordRemoteCopyLagSegments(topic, 1, 2)
+    brokerTopicStats.recordRemoteCopyLagSegments(topic, 2, 3)
 
-    brokerTopicMetrics.recordRemoteCopyLagSegments(0, 4);
-    brokerTopicMetrics.recordRemoteCopyLagSegments(1, 5);
-    brokerTopicMetrics.recordRemoteCopyLagSegments(2, 6);
+    brokerTopicStats.recordRemoteCopyLagSegments(topic, 0, 4)
+    brokerTopicStats.recordRemoteCopyLagSegments(topic, 1, 5)
+    brokerTopicStats.recordRemoteCopyLagSegments(topic, 2, 6)
 
     assertEquals(15, brokerTopicMetrics.remoteCopyLagSegments)
+    assertEquals(15, allTopicMetrics.remoteCopyLagSegments)
+
+    brokerTopicStats.recordRemoteCopyLagSegments(topic2, 0, 1)
+    assertEquals(16, allTopicMetrics.remoteCopyLagSegments)
   }
 
   @Test
   def testCopyLagSegmentsMetricWithPartitionExpansion(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteCopyLagSegments(0, 1);
-    brokerTopicMetrics.recordRemoteCopyLagSegments(1, 2);
+    brokerTopicStats.recordRemoteCopyLagSegments(topic, 0, 1)
+    brokerTopicStats.recordRemoteCopyLagSegments(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteCopyLagSegments)
+    assertEquals(3, allTopicMetrics.remoteCopyLagSegments)
 
-    brokerTopicMetrics.recordRemoteCopyLagSegments(2, 3);
+    brokerTopicStats.recordRemoteCopyLagSegments(topic, 2, 3)
 
     assertEquals(6, brokerTopicMetrics.remoteCopyLagSegments)
+    assertEquals(6, allTopicMetrics.remoteCopyLagSegments)
   }
 
   @Test
   def testCopyLagSegmentsMetricWithPartitionShrinking(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteCopyLagSegments(0, 1);
-    brokerTopicMetrics.recordRemoteCopyLagSegments(1, 2);
+    brokerTopicStats.recordRemoteCopyLagSegments(topic, 0, 1)
+    brokerTopicStats.recordRemoteCopyLagSegments(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteCopyLagSegments)
+    assertEquals(3, allTopicMetrics.remoteCopyLagSegments)
 
-    brokerTopicMetrics.removeRemoteCopyLagSegments(1);
+    brokerTopicStats.removeRemoteCopyLagSegments(topic, 1)
 
     assertEquals(1, brokerTopicMetrics.remoteCopyLagSegments)
+    assertEquals(1, allTopicMetrics.remoteCopyLagSegments)
   }
 
   @Test
   def testCopyLagSegmentsMetricWithRemovingNonexistentPartitions(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteCopyLagSegments(0, 1);
-    brokerTopicMetrics.recordRemoteCopyLagSegments(1, 2);
+    brokerTopicStats.recordRemoteCopyLagSegments(topic, 0, 1)
+    brokerTopicStats.recordRemoteCopyLagSegments(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteCopyLagSegments)
+    assertEquals(3, allTopicMetrics.remoteCopyLagSegments)
 
-    brokerTopicMetrics.removeRemoteCopyLagSegments(3);
+    brokerTopicStats.removeRemoteCopyLagSegments(topic, 3)
 
     assertEquals(3, brokerTopicMetrics.remoteCopyLagSegments)
+    assertEquals(3, allTopicMetrics.remoteCopyLagSegments)
   }
 
   @Test
   def testCopyLagSegmentsMetricClear(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteCopyLagSegments(0, 1);
-    brokerTopicMetrics.recordRemoteCopyLagSegments(1, 2);
+    brokerTopicStats.recordRemoteCopyLagSegments(topic, 0, 1)
+    brokerTopicStats.recordRemoteCopyLagSegments(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteCopyLagSegments)
+    assertEquals(3, allTopicMetrics.remoteCopyLagSegments)
 
-    brokerTopicMetrics.close()
+    brokerTopicStats.close()
 
     assertEquals(0, brokerTopicMetrics.remoteCopyLagSegments)
+    assertEquals(0, allTopicMetrics.remoteCopyLagSegments)
+
   }
 
   @Test
   def testMultipleDeleteLagBytesMetrics(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic, 0, 1)
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic, 1, 2)
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic, 2, 3)
 
-    brokerTopicMetrics.recordRemoteDeleteLagBytes(0, 1);
-    brokerTopicMetrics.recordRemoteDeleteLagBytes(1, 2);
-    brokerTopicMetrics.recordRemoteDeleteLagBytes(2, 3);
-
-    brokerTopicMetrics.recordRemoteDeleteLagBytes(0, 4);
-    brokerTopicMetrics.recordRemoteDeleteLagBytes(1, 5);
-    brokerTopicMetrics.recordRemoteDeleteLagBytes(2, 6);
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic, 0, 4)
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic, 1, 5)
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic, 2, 6)
 
     assertEquals(15, brokerTopicMetrics.remoteDeleteLagBytes)
+    assertEquals(15, allTopicMetrics.remoteDeleteLagBytes)
+
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic2, 0, 1)
+    assertEquals(16, allTopicMetrics.remoteDeleteLagBytes)
   }
 
   @Test
   def testDeleteLagBytesMetricWithPartitionExpansion(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteDeleteLagBytes(0, 1);
-    brokerTopicMetrics.recordRemoteDeleteLagBytes(1, 2);
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic, 0, 1)
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteDeleteLagBytes)
+    assertEquals(3, allTopicMetrics.remoteDeleteLagBytes)
 
-    brokerTopicMetrics.recordRemoteDeleteLagBytes(2, 3);
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic, 2, 3)
 
     assertEquals(6, brokerTopicMetrics.remoteDeleteLagBytes)
+    assertEquals(6, allTopicMetrics.remoteDeleteLagBytes)
   }
 
   @Test
   def testDeleteLagBytesMetricWithPartitionShrinking(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteDeleteLagBytes(0, 1);
-    brokerTopicMetrics.recordRemoteDeleteLagBytes(1, 2);
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic, 0, 1)
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteDeleteLagBytes)
+    assertEquals(3, allTopicMetrics.remoteDeleteLagBytes)
 
-    brokerTopicMetrics.removeRemoteDeleteLagBytes(1);
+    brokerTopicStats.removeRemoteDeleteLagBytes(topic, 1)
 
     assertEquals(1, brokerTopicMetrics.remoteDeleteLagBytes)
+    assertEquals(1, allTopicMetrics.remoteDeleteLagBytes)
   }
 
   @Test
   def testDeleteLagBytesMetricWithRemovingNonexistentPartitions(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteDeleteLagBytes(0, 1);
-    brokerTopicMetrics.recordRemoteDeleteLagBytes(1, 2);
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic, 0, 1)
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteDeleteLagBytes)
+    assertEquals(3, allTopicMetrics.remoteDeleteLagBytes)
 
-    brokerTopicMetrics.removeRemoteDeleteLagBytes(3);
+    brokerTopicStats.removeRemoteDeleteLagBytes(topic, 3)
 
     assertEquals(3, brokerTopicMetrics.remoteDeleteLagBytes)
+    assertEquals(3, allTopicMetrics.remoteDeleteLagBytes)
   }
 
   @Test
   def testDeleteLagBytesMetricClear(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteDeleteLagBytes(0, 1);
-    brokerTopicMetrics.recordRemoteDeleteLagBytes(1, 2);
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic, 0, 1)
+    brokerTopicStats.recordRemoteDeleteLagBytes(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteDeleteLagBytes)
+    assertEquals(3, allTopicMetrics.remoteDeleteLagBytes)
 
-    brokerTopicMetrics.close()
+    brokerTopicStats.close()
 
     assertEquals(0, brokerTopicMetrics.remoteDeleteLagBytes)
+    assertEquals(0, allTopicMetrics.remoteDeleteLagBytes)
   }
 
   @Test
   def testMultipleDeleteLagSegmentsMetrics(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteDeleteLagSegments(0, 1);
-    brokerTopicMetrics.recordRemoteDeleteLagSegments(1, 2);
-    brokerTopicMetrics.recordRemoteDeleteLagSegments(2, 3);
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic, 0, 1)
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic, 1, 2)
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic, 2, 3)
 
-    brokerTopicMetrics.recordRemoteDeleteLagSegments(0, 4);
-    brokerTopicMetrics.recordRemoteDeleteLagSegments(1, 5);
-    brokerTopicMetrics.recordRemoteDeleteLagSegments(2, 6);
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic, 0, 4)
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic, 1, 5)
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic, 2, 6)
 
     assertEquals(15, brokerTopicMetrics.remoteDeleteLagSegments)
+    assertEquals(15, allTopicMetrics.remoteDeleteLagSegments)
+
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic2, 1, 5)
+    assertEquals(20, allTopicMetrics.remoteDeleteLagSegments)
   }
 
   @Test
   def testDeleteLagSegmentsMetricWithPartitionExpansion(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteDeleteLagSegments(0, 1);
-    brokerTopicMetrics.recordRemoteDeleteLagSegments(1, 2);
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic, 0, 1)
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteDeleteLagSegments)
+    assertEquals(3, allTopicMetrics.remoteDeleteLagSegments)
 
-    brokerTopicMetrics.recordRemoteDeleteLagSegments(2, 3);
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic, 2, 3)
 
     assertEquals(6, brokerTopicMetrics.remoteDeleteLagSegments)
+    assertEquals(6, allTopicMetrics.remoteDeleteLagSegments)
   }
 
   @Test
   def testDeleteLagSegmentsMetricWithPartitionShrinking(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteDeleteLagSegments(0, 1);
-    brokerTopicMetrics.recordRemoteDeleteLagSegments(1, 2);
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic, 0, 1)
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteDeleteLagSegments)
+    assertEquals(3, allTopicMetrics.remoteDeleteLagSegments)
 
-    brokerTopicMetrics.removeRemoteDeleteLagSegments(1);
+    brokerTopicStats.removeRemoteDeleteLagSegments(topic, 1)
 
     assertEquals(1, brokerTopicMetrics.remoteDeleteLagSegments)
+    assertEquals(1, allTopicMetrics.remoteDeleteLagSegments)
   }
 
   @Test
   def testDeleteLagSegmentsMetricWithRemovingNonexistentPartitions(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteDeleteLagSegments(0, 1);
-    brokerTopicMetrics.recordRemoteDeleteLagSegments(1, 2);
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic, 0, 1)
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteDeleteLagSegments)
+    assertEquals(3, allTopicMetrics.remoteDeleteLagSegments)
 
-    brokerTopicMetrics.removeRemoteDeleteLagSegments(3);
+    brokerTopicStats.removeRemoteDeleteLagSegments(topic, 3)
 
     assertEquals(3, brokerTopicMetrics.remoteDeleteLagSegments)
+    assertEquals(3, allTopicMetrics.remoteDeleteLagSegments)
   }
 
   @Test
   def testDeleteLagSegmentsMetricClear(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteDeleteLagSegments(0, 1);
-    brokerTopicMetrics.recordRemoteDeleteLagSegments(1, 2);
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic, 0, 1)
+    brokerTopicStats.recordRemoteDeleteLagSegments(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteDeleteLagSegments)
+    assertEquals(3, allTopicMetrics.remoteDeleteLagSegments)
 
-    brokerTopicMetrics.close()
+    brokerTopicStats.close()
 
     assertEquals(0, brokerTopicMetrics.remoteDeleteLagSegments)
+    assertEquals(0, allTopicMetrics.remoteDeleteLagSegments)
   }
 
   @Test
   def testRemoteLogMetadataCount(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
     assertEquals(0, brokerTopicMetrics.remoteLogMetadataCount)
-    brokerTopicMetrics.recordRemoteLogMetadataCount(0, 1)
+    assertEquals(0, allTopicMetrics.remoteLogMetadataCount)
+    brokerTopicStats.recordRemoteLogMetadataCount(topic, 0, 1)
     assertEquals(1, brokerTopicMetrics.remoteLogMetadataCount)
+    assertEquals(1, allTopicMetrics.remoteLogMetadataCount)
 
-    brokerTopicMetrics.recordRemoteLogMetadataCount(1, 2)
-    brokerTopicMetrics.recordRemoteLogMetadataCount(2, 3)
+    brokerTopicStats.recordRemoteLogMetadataCount(topic, 1, 2)
+    brokerTopicStats.recordRemoteLogMetadataCount(topic, 2, 3)
     assertEquals(6, brokerTopicMetrics.remoteLogMetadataCount)
+    assertEquals(6, allTopicMetrics.remoteLogMetadataCount)
 
-    brokerTopicMetrics.close()
+    brokerTopicStats.close()
 
     assertEquals(0, brokerTopicMetrics.remoteLogMetadataCount)
+    assertEquals(0, allTopicMetrics.remoteLogMetadataCount)
   }
 
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def testSingularLogSizeBytesMetric(systemRemoteStorageEnabled: Boolean): 
Unit = {
-    val brokerTopicMetrics = 
setupBrokerTopicMetrics(systemRemoteStorageEnabled)
-
+    val props = kafka.utils.TestUtils.createDummyBrokerConfig()
+    
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
systemRemoteStorageEnabled.toString)
+    val brokerTopicStats = new 
BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props)))
+    val brokerTopicMetrics = brokerTopicStats.topicStats(topic)
     if (systemRemoteStorageEnabled) {
-      brokerTopicMetrics.recordRemoteLogSizeBytes(0, 100);
-      brokerTopicMetrics.recordRemoteLogSizeBytes(1, 150);
-      brokerTopicMetrics.recordRemoteLogSizeBytes(2, 250);
+      brokerTopicStats.recordRemoteLogSizeBytes(topic, 0, 100)
+      brokerTopicStats.recordRemoteLogSizeBytes(topic, 1, 150)
+      brokerTopicStats.recordRemoteLogSizeBytes(topic, 2, 250)
       assertEquals(500, brokerTopicMetrics.remoteLogSizeBytes)
+      assertEquals(500, brokerTopicStats.allTopicsStats.remoteLogSizeBytes)
+
+      brokerTopicStats.recordRemoteLogSizeBytes(topic2, 0, 100)
+      assertEquals(600, brokerTopicStats.allTopicsStats.remoteLogSizeBytes)
     } else {
       assertEquals(None, 
brokerTopicMetrics.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName))
     }
   }
 
   @Test
   def testMultipleLogSizeBytesMetrics(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
+    brokerTopicStats.recordRemoteLogSizeBytes(topic, 0, 1)
+    brokerTopicStats.recordRemoteLogSizeBytes(topic, 1, 2)
+    brokerTopicStats.recordRemoteLogSizeBytes(topic, 2, 3)
 
-    brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1);
-    brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2);
-    brokerTopicMetrics.recordRemoteLogSizeBytes(2, 3);
-
-    brokerTopicMetrics.recordRemoteLogSizeBytes(0, 4);
-    brokerTopicMetrics.recordRemoteLogSizeBytes(1, 5);
-    brokerTopicMetrics.recordRemoteLogSizeBytes(2, 6);
+    brokerTopicStats.recordRemoteLogSizeBytes(topic, 0, 4)
+    brokerTopicStats.recordRemoteLogSizeBytes(topic, 1, 5)
+    brokerTopicStats.recordRemoteLogSizeBytes(topic, 2, 6)
 
     assertEquals(15, brokerTopicMetrics.remoteLogSizeBytes)
+    assertEquals(15, allTopicMetrics.remoteLogSizeBytes)
   }
 
   @Test
   def testLogSizeBytesMetricWithPartitionExpansion(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1);
-    brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2);
+    brokerTopicStats.recordRemoteLogSizeBytes(topic, 0, 1)
+    brokerTopicStats.recordRemoteLogSizeBytes(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteLogSizeBytes)
+    assertEquals(3, allTopicMetrics.remoteLogSizeBytes)
 
-    brokerTopicMetrics.recordRemoteLogSizeBytes(2, 3);
+    brokerTopicStats.recordRemoteLogSizeBytes(topic, 2, 3)
 
     assertEquals(6, brokerTopicMetrics.remoteLogSizeBytes)
+    assertEquals(6, allTopicMetrics.remoteLogSizeBytes)
   }
 
   @Test
   def testLogSizeBytesMetricWithPartitionShrinking(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1);
-    brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2);
+    brokerTopicStats.recordRemoteLogSizeBytes(topic, 0, 1)
+    brokerTopicStats.recordRemoteLogSizeBytes(topic, 1, 2)
 
     assertEquals(3, brokerTopicMetrics.remoteLogSizeBytes)
+    assertEquals(3, allTopicMetrics.remoteLogSizeBytes)
 
-    brokerTopicMetrics.removeRemoteLogSizeBytes(1);
+    brokerTopicStats.removeRemoteLogSizeBytes(topic, 1)
 
     assertEquals(1, brokerTopicMetrics.remoteLogSizeBytes)
+    assertEquals(1, allTopicMetrics.remoteLogSizeBytes)
   }
 
   @Test
   def testLogSizeBytesMetricWithRemovingNonexistentPartitions(): Unit = {
-    val brokerTopicMetrics = setupBrokerTopicMetrics()
-
-    brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1);
-    brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2);
+//    val props = kafka.utils.TestUtils.createDummyBrokerConfig()

Review Comment:
   nit 
   
   delete this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to