showuon commented on code in PR #15005:
URL: https://github.com/apache/kafka/pull/15005#discussion_r1427637469


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1073,13 +1088,29 @@ void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionE
                         .iterator();
                 while (epochsToClean.hasNext()) {
                     int epoch = epochsToClean.next();
+                    List<RemoteLogSegmentMetadata> listOfSegmentsToBeCleaned = 
new ArrayList<>();
+                    sizeOfDeletableSegmentsBytes = 0L;

Review Comment:
   I don't think we should reset these to values. The 
`cleanupExpiredRemoteLogSegments` process is not completed, yet, we should add 
them together. 
   
   Suppose:
   phase 1: time or size or log start breached, we have total 10 segments, 100 
GB to be deleted.
   RemoteDeleteBytesLag: 100GB, 
   RemoteDeleteSegmentsLag: 10
   
   phase 2: delete all these segments 1 by 1, but unfortunately, 1 segment and 
10 GB size cannot be deleted
   RemoteDeleteBytesLag: 10GB, 
   RemoteDeleteSegmentsLag: 1
   
   phase 3: check if epoch is unreferenced, we have 2 segments, 20 GB to be 
deleted. In this case:
   RemoteDeleteBytesLag: _should be 10GB + 20GB_ 
   RemoteDeleteSegmentsLag: _should be 1 + 2_
   
   Does that make sense?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1586,6 +1617,15 @@ private static void 
shutdownAndAwaitTermination(ExecutorService pool, String poo
         LOGGER.info("Shutting down of thread pool {} is completed", poolName);
     }
 
+    private void removeRemoteTopicPartitionMetrics(TopicIdPartition 
topicIdPartition) {

Review Comment:
   nice!



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1733,6 +1738,63 @@ public void testDeletionOnRetentionBreachedSegments(long 
retentionSize,
         verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
     }
 
+    @ParameterizedTest(name = "testRemoteDeleteLagsOnRetentionBreachedSegments 
retentionSize={0} retentionMs={1}")
+    @CsvSource(value = {"0, -1", "-1, 0"})
+    public void testRemoteDeleteLagsOnRetentionBreachedSegments(long 
retentionSize,
+                                                                long 
retentionMs)
+            throws RemoteStorageException, ExecutionException, 
InterruptedException {
+        Map<String, Long> logProps = new HashMap<>();
+        logProps.put("retention.bytes", retentionSize);
+        logProps.put("retention.ms", retentionMs);
+        LogConfig mockLogConfig = new LogConfig(logProps);
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+        checkpoint.write(epochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+        when(mockLog.logEndOffset()).thenReturn(200L);
+
+        List<RemoteLogSegmentMetadata> metadataList =
+                listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 
1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                .thenReturn(metadataList.iterator());
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                .thenAnswer(ans -> metadataList.iterator());
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                .thenReturn(CompletableFuture.runAsync(() -> { }));
+
+        doAnswer(ans -> {
+            // waiting for verification

Review Comment:
   We don't wait for anything here. This comment can be removed.



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1733,6 +1738,63 @@ public void testDeletionOnRetentionBreachedSegments(long 
retentionSize,
         verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
     }
 
+    @ParameterizedTest(name = "testRemoteDeleteLagsOnRetentionBreachedSegments 
retentionSize={0} retentionMs={1}")
+    @CsvSource(value = {"0, -1", "-1, 0"})
+    public void testRemoteDeleteLagsOnRetentionBreachedSegments(long 
retentionSize,
+                                                                long 
retentionMs)
+            throws RemoteStorageException, ExecutionException, 
InterruptedException {
+        Map<String, Long> logProps = new HashMap<>();
+        logProps.put("retention.bytes", retentionSize);
+        logProps.put("retention.ms", retentionMs);
+        LogConfig mockLogConfig = new LogConfig(logProps);
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+        checkpoint.write(epochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+        when(mockLog.logEndOffset()).thenReturn(200L);
+
+        List<RemoteLogSegmentMetadata> metadataList =
+                listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 
1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                .thenReturn(metadataList.iterator());
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                .thenAnswer(ans -> metadataList.iterator());
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                .thenReturn(CompletableFuture.runAsync(() -> { }));
+
+        doAnswer(ans -> {
+            // waiting for verification
+            assertEquals(2048, 
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
+                String.format("Expected to find 2048 for RemoteDeleteLagBytes 
metric value, but found %d", 
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
+            assertEquals(2, 
safeLongYammerMetricValue("RemoteDeleteLagSegments"),
+                String.format("Expected to find 2 for RemoteDeleteLagSegments 
metric value, but found %d", 
safeLongYammerMetricValue("RemoteDeleteLagSegments")));
+            return Optional.empty();
+        }).doAnswer(ans -> {
+            // waiting for verification

Review Comment:
   ditto.



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1733,6 +1738,63 @@ public void testDeletionOnRetentionBreachedSegments(long 
retentionSize,
         verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
     }
 
+    @ParameterizedTest(name = "testRemoteDeleteLagsOnRetentionBreachedSegments 
retentionSize={0} retentionMs={1}")
+    @CsvSource(value = {"0, -1", "-1, 0"})
+    public void testRemoteDeleteLagsOnRetentionBreachedSegments(long 
retentionSize,
+                                                                long 
retentionMs)
+            throws RemoteStorageException, ExecutionException, 
InterruptedException {
+        Map<String, Long> logProps = new HashMap<>();
+        logProps.put("retention.bytes", retentionSize);
+        logProps.put("retention.ms", retentionMs);
+        LogConfig mockLogConfig = new LogConfig(logProps);
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+        checkpoint.write(epochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+        when(mockLog.logEndOffset()).thenReturn(200L);
+
+        List<RemoteLogSegmentMetadata> metadataList =
+                listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 
1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                .thenReturn(metadataList.iterator());
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                .thenAnswer(ans -> metadataList.iterator());
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                .thenReturn(CompletableFuture.runAsync(() -> { }));
+
+        doAnswer(ans -> {
+            // waiting for verification
+            assertEquals(2048, 
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
+                String.format("Expected to find 2048 for RemoteDeleteLagBytes 
metric value, but found %d", 
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
+            assertEquals(2, 
safeLongYammerMetricValue("RemoteDeleteLagSegments"),
+                String.format("Expected to find 2 for RemoteDeleteLagSegments 
metric value, but found %d", 
safeLongYammerMetricValue("RemoteDeleteLagSegments")));
+            return Optional.empty();
+        }).doAnswer(ans -> {
+            // waiting for verification
+            assertEquals(1024, 
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
+                String.format("Expected to find 1024 for RemoteDeleteLagBytes 
metric value, but found %d", 
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
+            assertEquals(1, 
safeLongYammerMetricValue("RemoteDeleteLagSegments"),
+                String.format("Expected to find 1 for RemoteDeleteLagSegments 
metric value, but found %d", 
safeLongYammerMetricValue("RemoteDeleteLagSegments")));
+            return Optional.empty();
+        
}).when(remoteStorageManager).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));

Review Comment:
   nice test.



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -397,16 +400,56 @@ class BrokerTopicMetrics(name: Option[String], configOpt: 
java.util.Optional[Kaf
   def invalidOffsetOrSequenceRecordsPerSec: Meter = 
metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()
 
   def recordRemoteCopyBytesLag(partition: Int, bytesLag: Long): Unit = {
-    val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName).brokerTopicAggregatedMetric
+    val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric

Review Comment:
   Nice catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to