showuon commented on code in PR #15005:
URL: https://github.com/apache/kafka/pull/15005#discussion_r1427637469
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1073,13 +1088,29 @@ void cleanupExpiredRemoteLogSegments() throws
RemoteStorageException, ExecutionE
.iterator();
while (epochsToClean.hasNext()) {
int epoch = epochsToClean.next();
+ List<RemoteLogSegmentMetadata> listOfSegmentsToBeCleaned =
new ArrayList<>();
+ sizeOfDeletableSegmentsBytes = 0L;
Review Comment:
I don't think we should reset these to values. The
`cleanupExpiredRemoteLogSegments` process is not completed, yet, we should add
them together.
Suppose:
phase 1: time or size or log start breached, we have total 10 segments, 100
GB to be deleted.
RemoteDeleteBytesLag: 100GB,
RemoteDeleteSegmentsLag: 10
phase 2: delete all these segments 1 by 1, but unfortunately, 1 segment and
10 GB size cannot be deleted
RemoteDeleteBytesLag: 10GB,
RemoteDeleteSegmentsLag: 1
phase 3: check if epoch is unreferenced, we have 2 segments, 20 GB to be
deleted. In this case:
RemoteDeleteBytesLag: _should be 10GB + 20GB_
RemoteDeleteSegmentsLag: _should be 1 + 2_
Does that make sense?
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1586,6 +1617,15 @@ private static void
shutdownAndAwaitTermination(ExecutorService pool, String poo
LOGGER.info("Shutting down of thread pool {} is completed", poolName);
}
+ private void removeRemoteTopicPartitionMetrics(TopicIdPartition
topicIdPartition) {
Review Comment:
nice!
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1733,6 +1738,63 @@ public void testDeletionOnRetentionBreachedSegments(long
retentionSize,
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
}
+ @ParameterizedTest(name = "testRemoteDeleteLagsOnRetentionBreachedSegments
retentionSize={0} retentionMs={1}")
+ @CsvSource(value = {"0, -1", "-1, 0"})
+ public void testRemoteDeleteLagsOnRetentionBreachedSegments(long
retentionSize,
+ long
retentionMs)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ Map<String, Long> logProps = new HashMap<>();
+ logProps.put("retention.bytes", retentionSize);
+ logProps.put("retention.ms", retentionMs);
+ LogConfig mockLogConfig = new LogConfig(logProps);
+ when(mockLog.config()).thenReturn(mockLogConfig);
+
+ List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+ checkpoint.write(epochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+ when(mockLog.logEndOffset()).thenReturn(200L);
+
+ List<RemoteLogSegmentMetadata> metadataList =
+ listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100,
1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+ .thenReturn(metadataList.iterator());
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+ .thenAnswer(ans -> metadataList.iterator());
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+ .thenReturn(CompletableFuture.runAsync(() -> { }));
+
+ doAnswer(ans -> {
+ // waiting for verification
Review Comment:
We don't wait for anything here. This comment can be removed.
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1733,6 +1738,63 @@ public void testDeletionOnRetentionBreachedSegments(long
retentionSize,
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
}
+ @ParameterizedTest(name = "testRemoteDeleteLagsOnRetentionBreachedSegments
retentionSize={0} retentionMs={1}")
+ @CsvSource(value = {"0, -1", "-1, 0"})
+ public void testRemoteDeleteLagsOnRetentionBreachedSegments(long
retentionSize,
+ long
retentionMs)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ Map<String, Long> logProps = new HashMap<>();
+ logProps.put("retention.bytes", retentionSize);
+ logProps.put("retention.ms", retentionMs);
+ LogConfig mockLogConfig = new LogConfig(logProps);
+ when(mockLog.config()).thenReturn(mockLogConfig);
+
+ List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+ checkpoint.write(epochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+ when(mockLog.logEndOffset()).thenReturn(200L);
+
+ List<RemoteLogSegmentMetadata> metadataList =
+ listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100,
1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+ .thenReturn(metadataList.iterator());
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+ .thenAnswer(ans -> metadataList.iterator());
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+ .thenReturn(CompletableFuture.runAsync(() -> { }));
+
+ doAnswer(ans -> {
+ // waiting for verification
+ assertEquals(2048,
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
+ String.format("Expected to find 2048 for RemoteDeleteLagBytes
metric value, but found %d",
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
+ assertEquals(2,
safeLongYammerMetricValue("RemoteDeleteLagSegments"),
+ String.format("Expected to find 2 for RemoteDeleteLagSegments
metric value, but found %d",
safeLongYammerMetricValue("RemoteDeleteLagSegments")));
+ return Optional.empty();
+ }).doAnswer(ans -> {
+ // waiting for verification
Review Comment:
ditto.
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1733,6 +1738,63 @@ public void testDeletionOnRetentionBreachedSegments(long
retentionSize,
verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1));
}
+ @ParameterizedTest(name = "testRemoteDeleteLagsOnRetentionBreachedSegments
retentionSize={0} retentionMs={1}")
+ @CsvSource(value = {"0, -1", "-1, 0"})
+ public void testRemoteDeleteLagsOnRetentionBreachedSegments(long
retentionSize,
+ long
retentionMs)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ Map<String, Long> logProps = new HashMap<>();
+ logProps.put("retention.bytes", retentionSize);
+ logProps.put("retention.ms", retentionMs);
+ LogConfig mockLogConfig = new LogConfig(logProps);
+ when(mockLog.config()).thenReturn(mockLogConfig);
+
+ List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+ checkpoint.write(epochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+ when(mockLog.logEndOffset()).thenReturn(200L);
+
+ List<RemoteLogSegmentMetadata> metadataList =
+ listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100,
1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+ .thenReturn(metadataList.iterator());
+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+ .thenAnswer(ans -> metadataList.iterator());
+
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+ .thenReturn(CompletableFuture.runAsync(() -> { }));
+
+ doAnswer(ans -> {
+ // waiting for verification
+ assertEquals(2048,
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
+ String.format("Expected to find 2048 for RemoteDeleteLagBytes
metric value, but found %d",
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
+ assertEquals(2,
safeLongYammerMetricValue("RemoteDeleteLagSegments"),
+ String.format("Expected to find 2 for RemoteDeleteLagSegments
metric value, but found %d",
safeLongYammerMetricValue("RemoteDeleteLagSegments")));
+ return Optional.empty();
+ }).doAnswer(ans -> {
+ // waiting for verification
+ assertEquals(1024,
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
+ String.format("Expected to find 1024 for RemoteDeleteLagBytes
metric value, but found %d",
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
+ assertEquals(1,
safeLongYammerMetricValue("RemoteDeleteLagSegments"),
+ String.format("Expected to find 1 for RemoteDeleteLagSegments
metric value, but found %d",
safeLongYammerMetricValue("RemoteDeleteLagSegments")));
+ return Optional.empty();
+
}).when(remoteStorageManager).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
Review Comment:
nice test.
##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -397,16 +400,56 @@ class BrokerTopicMetrics(name: Option[String], configOpt:
java.util.Optional[Kaf
def invalidOffsetOrSequenceRecordsPerSec: Meter =
metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()
def recordRemoteCopyBytesLag(partition: Int, bytesLag: Long): Unit = {
- val brokerTopicAggregatedMetric =
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName).brokerTopicAggregatedMetric
+ val brokerTopicAggregatedMetric =
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric
Review Comment:
Nice catch!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]