This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push: new b83a71e0531 KAFKA-15479: Remote log segments should be considered once for retention breach (#14407) b83a71e0531 is described below commit b83a71e0531574608fd5f4607e045ca192251fb0 Author: Kamal Chandraprakash <kchandraprak...@uber.com> AuthorDate: Mon Sep 25 17:41:53 2023 +0530 KAFKA-15479: Remote log segments should be considered once for retention breach (#14407) When a remote log segment contains multiple epoch, then it gets considered for multiple times during breach by retention size/time/start-offset. This will affect the deletion by remote log retention size as it deletes the number of segments less than expected. This is a follow-up of KAFKA-15352 Reviewers: Divij Vaidya <di...@amazon.com>, Christo Lolov <lol...@amazon.com>, Satish Duggana <sati...@apache.org> --- .../java/kafka/log/remote/RemoteLogManager.java | 90 ++--- .../kafka/log/remote/RemoteLogManagerTest.java | 369 ++++++++++++--------- 2 files changed, 252 insertions(+), 207 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index d8f2144b3e3..a5a30e6d156 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -561,7 +561,6 @@ public class RemoteLogManager implements Closeable { } cache.truncateFromEnd(endOffset); } - return checkpoint; } @@ -706,7 +705,8 @@ public class RemoteLogManager implements Closeable { } } - private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException, + private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) + throws InterruptedException, ExecutionException, RemoteStorageException, IOException, CustomMetadataSizeLimitExceededException { File logFile = segment.log().file(); String logFileName = logFile.getName(); @@ -832,13 +832,11 @@ public class RemoteLogManager implements Closeable { remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); } - private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) { + private boolean isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata metadata) { + boolean shouldDeleteSegment = false; if (!retentionSizeData.isPresent()) { - return false; + return shouldDeleteSegment; } - - boolean shouldDeleteSegment = false; - // Assumption that segments contain size >= 0 if (remainingBreachedSize > 0) { long remainingBytes = remainingBreachedSize - metadata.segmentSizeInBytes(); @@ -847,7 +845,6 @@ public class RemoteLogManager implements Closeable { shouldDeleteSegment = true; } } - if (shouldDeleteSegment) { logStartOffset = OptionalLong.of(metadata.endOffset() + 1); logger.info("About to delete remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", @@ -856,12 +853,12 @@ public class RemoteLogManager implements Closeable { return shouldDeleteSegment; } - public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) { + public boolean isSegmentBreachedByRetentionTime(RemoteLogSegmentMetadata metadata) { + boolean shouldDeleteSegment = false; if (!retentionTimeData.isPresent()) { - return false; + return shouldDeleteSegment; } - - boolean shouldDeleteSegment = metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs; + shouldDeleteSegment = metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs; if (shouldDeleteSegment) { remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); // It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals @@ -873,9 +870,9 @@ public class RemoteLogManager implements Closeable { return shouldDeleteSegment; } - private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, - long logStartOffset, - NavigableMap<Integer, Long> leaderEpochEntries) { + private boolean isSegmentBreachByLogStartOffset(RemoteLogSegmentMetadata metadata, + long logStartOffset, + NavigableMap<Integer, Long> leaderEpochEntries) { boolean shouldDeleteSegment = false; if (!leaderEpochEntries.isEmpty()) { // Note that `logStartOffset` and `leaderEpochEntries.firstEntry().getValue()` should be same @@ -916,10 +913,8 @@ public class RemoteLogManager implements Closeable { remoteLogMetadataManager.updateRemoteLogSegmentMetadata( new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); - // Delete the segment in remote storage. remoteLogStorageManager.deleteLogSegmentData(segmentMetadata); - // Publish delete segment finished event. remoteLogMetadataManager.updateRemoteLogSegmentMetadata( new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), @@ -932,7 +927,7 @@ public class RemoteLogManager implements Closeable { } - private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException { + void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException { if (isCancelled() || !isLeader()) { logger.info("Returning from remote log segments cleanup as the task state is changed"); return; @@ -993,13 +988,15 @@ public class RemoteLogManager implements Closeable { return; } RemoteLogSegmentMetadata metadata = segmentsIterator.next(); - + if (segmentsToDelete.contains(metadata)) { + continue; + } // When the log-start-offset is moved by the user, the leader-epoch-checkpoint file gets truncated // as per the log-start-offset. Until the rlm-cleaner-thread runs in the next iteration, those // remote log segments won't be removed. The `isRemoteSegmentWithinLeaderEpoch` validates whether // the epochs present in the segment lies in the checkpoint file. It will always return false // since the checkpoint file was already truncated. - boolean shouldDeleteSegment = remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments( + boolean shouldDeleteSegment = remoteLogRetentionHandler.isSegmentBreachByLogStartOffset( metadata, logStartOffset, epochWithOffsets); boolean isValidSegment = false; if (!shouldDeleteSegment) { @@ -1007,8 +1004,8 @@ public class RemoteLogManager implements Closeable { isValidSegment = isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets); if (isValidSegment) { shouldDeleteSegment = - remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) || - remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata); + remoteLogRetentionHandler.isSegmentBreachedByRetentionTime(metadata) || + remoteLogRetentionHandler.isSegmentBreachedByRetentionSize(metadata); } } if (shouldDeleteSegment) { @@ -1018,6 +1015,27 @@ public class RemoteLogManager implements Closeable { } } + // Update log start offset with the computed value after retention cleanup is done + remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset)); + + // At this point in time we have updated the log start offsets, but not initiated a deletion. + // Either a follower has picked up the changes to the log start offset, or they have not. + // If the follower HAS picked up the changes, and they become the leader this replica won't successfully complete + // the deletion. + // However, the new leader will correctly pick up all breaching segments as log start offset breaching ones + // and delete them accordingly. + // If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process + // again and delete them with the original deletion reason i.e. size, time or log start offset breach. + List<String> undeletedSegments = new ArrayList<>(); + for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) { + if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader())) { + undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString()); + } + } + if (!undeletedSegments.isEmpty()) { + logger.info("The following remote segments could not be deleted: {}", String.join(",", undeletedSegments)); + } + // Remove the remote log segments whose segment-leader-epochs are less than the earliest-epoch known // to the leader. This will remove the unreferenced segments in the remote storage. This is needed for // unclean leader election scenarios as the remote storage can have epochs earlier to the current leader's @@ -1040,27 +1058,6 @@ public class RemoteLogManager implements Closeable { } } } - - // Update log start offset with the computed value after retention cleanup is done - remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset)); - - // At this point in time we have updated the log start offsets, but not initiated a deletion. - // Either a follower has picked up the changes to the log start offset, or they have not. - // If the follower HAS picked up the changes, and they become the leader this replica won't successfully complete - // the deletion. - // However, the new leader will correctly pick up all breaching segments as log start offset breaching ones - // and delete them accordingly. - // If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process - // again and delete them with the original deletion reason i.e. size, time or log start offset breach. - List<String> undeletedSegments = new ArrayList<>(); - for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) { - if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader())) { - undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString()); - } - } - if (!undeletedSegments.isEmpty()) { - logger.info("The following remote segments could not be deleted: {}", String.join(",", undeletedSegments)); - } } private Optional<RetentionTimeData> buildRetentionTimeData(long retentionMs) { @@ -1179,7 +1176,12 @@ public class RemoteLogManager implements Closeable { } } // segment end offset should be with in the log end offset. - return segmentEndOffset < logEndOffset; + if (segmentEndOffset >= logEndOffset) { + LOGGER.debug("Segment {} end offset {} is more than log end offset {}.", + segmentMetadata.remoteLogSegmentId(), segmentEndOffset, logEndOffset); + return false; + } + return true; } /** diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index bb66994b273..095976626c8 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -67,6 +67,9 @@ import org.apache.kafka.storage.internals.log.TransactionIndex; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.MockedConstruction; @@ -94,6 +97,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -177,6 +181,7 @@ public class RemoteLogManagerTest { return epochs; } }; + private final AtomicLong currentLogStartOffset = new AtomicLong(0L); private final UnifiedLog mockLog = mock(UnifiedLog.class); @@ -192,7 +197,7 @@ public class RemoteLogManagerTest { kafka.utils.TestUtils.clearYammerMetrics(); remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), - (topicPartition, offset) -> { }, + (topicPartition, offset) -> currentLogStartOffset.set(offset), brokerTopicStats) { public RemoteStorageManager createRemoteStorageManager() { return remoteStorageManager; @@ -1247,14 +1252,15 @@ public class RemoteLogManagerTest { segmentEpochs7), logEndOffset, leaderEpochToStartOffset)); // Test a remote segment having larger end offset than the log end offset - TreeMap<Integer, Long> segmentEpochs8 = new TreeMap<>(); - segmentEpochs8.put(1, 15L); - segmentEpochs8.put(2, 20L); - assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata( 15, 95, // larger than log end offset - segmentEpochs8), logEndOffset, leaderEpochToStartOffset)); + leaderEpochToStartOffset), logEndOffset, leaderEpochToStartOffset)); + + assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata( + 15, + 90, // equal to the log end offset + leaderEpochToStartOffset), logEndOffset, leaderEpochToStartOffset)); // Test whether a segment's first offset is earlier to the respective epoch's start offset TreeMap<Integer, Long> segmentEpochs9 = new TreeMap<>(); @@ -1509,188 +1515,212 @@ public class RemoteLogManagerTest { } } - @Test - public void testDeleteRetentionSizeBreachingSegments() throws RemoteStorageException, IOException { - AtomicLong logStartOffset = new AtomicLong(0); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, - tp -> Optional.of(mockLog), - (topicPartition, offset) -> logStartOffset.set(offset), - brokerTopicStats) { - public RemoteStorageManager createRemoteStorageManager() { - return remoteStorageManager; - } - public RemoteLogMetadataManager createRemoteLogMetadataManager() { - return remoteLogMetadataManager; - } - }) { - RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); - task.convertToLeader(0); - - when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); - when(mockLog.logEndOffset()).thenReturn(200L); - - List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0); - - List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries); - - when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) - .thenReturn(remoteLogSegmentMetadatas.iterator()); - when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)) - .thenReturn(remoteLogSegmentMetadatas.iterator()) - .thenReturn(remoteLogSegmentMetadatas.iterator()) - .thenReturn(remoteLogSegmentMetadatas.iterator()); - - checkpoint.write(epochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); - when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + @ParameterizedTest(name = "testDeletionOnRetentionBreachedSegments retentionSize={0} retentionMs={1}") + @CsvSource(value = {"0, -1", "-1, 0"}) + public void testDeletionOnRetentionBreachedSegments(long retentionSize, + long retentionMs) + throws RemoteStorageException, ExecutionException, InterruptedException { + Map<String, Long> logProps = new HashMap<>(); + logProps.put("retention.bytes", retentionSize); + logProps.put("retention.ms", retentionMs); + LogConfig mockLogConfig = new LogConfig(logProps); + when(mockLog.config()).thenReturn(mockLogConfig); + + List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0); + checkpoint.write(epochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); - Map<String, Long> logProps = new HashMap<>(); - logProps.put("retention.bytes", 0L); - logProps.put("retention.ms", -1L); - LogConfig mockLogConfig = new LogConfig(logProps); - when(mockLog.config()).thenReturn(mockLogConfig); + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + when(mockLog.logEndOffset()).thenReturn(200L); - when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))) - .thenAnswer(answer -> CompletableFuture.runAsync(() -> { })); + List<RemoteLogSegmentMetadata> metadataList = + listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) + .thenReturn(metadataList.iterator()); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)) + .thenAnswer(ans -> metadataList.iterator()); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))) + .thenReturn(CompletableFuture.runAsync(() -> { })); - task.run(); + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(0); + task.cleanupExpiredRemoteLogSegments(); - assertEquals(200L, logStartOffset.get()); - verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0)); - verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1)); - } + assertEquals(200L, currentLogStartOffset.get()); + verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0)); + verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1)); } @Test - public void testDeleteRetentionMsBreachingSegments() throws RemoteStorageException, IOException { - AtomicLong logStartOffset = new AtomicLong(0); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, - tp -> Optional.of(mockLog), - (topicPartition, offset) -> logStartOffset.set(offset), - brokerTopicStats) { - public RemoteStorageManager createRemoteStorageManager() { - return remoteStorageManager; - } - public RemoteLogMetadataManager createRemoteLogMetadataManager() { - return remoteLogMetadataManager; - } - }) { - RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); - task.convertToLeader(0); + public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws RemoteStorageException, ExecutionException, InterruptedException { + RemoteLogManager.RLMTask leaderTask = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + leaderTask.convertToLeader(0); - when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); - when(mockLog.logEndOffset()).thenReturn(200L); + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + when(mockLog.logEndOffset()).thenReturn(200L); - List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0); + List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0); - List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries); + List<RemoteLogSegmentMetadata> metadataList = + listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) + .thenReturn(metadataList.iterator()); + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)) + .thenAnswer(ans -> metadataList.iterator()); - when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) - .thenReturn(remoteLogSegmentMetadatas.iterator()); - when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)) - .thenReturn(remoteLogSegmentMetadatas.iterator()) - .thenReturn(remoteLogSegmentMetadatas.iterator()); + checkpoint.write(epochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); - checkpoint.write(epochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); - when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + Map<String, Long> logProps = new HashMap<>(); + logProps.put("retention.bytes", -1L); + logProps.put("retention.ms", 0L); + LogConfig mockLogConfig = new LogConfig(logProps); + when(mockLog.config()).thenReturn(mockLogConfig); + + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))) + .thenAnswer(answer -> { + // cancel the task so that we don't delete the second segment + leaderTask.cancel(); + return CompletableFuture.runAsync(() -> { + }); + }); - Map<String, Long> logProps = new HashMap<>(); - logProps.put("retention.bytes", -1L); - logProps.put("retention.ms", 0L); - LogConfig mockLogConfig = new LogConfig(logProps); - when(mockLog.config()).thenReturn(mockLogConfig); + leaderTask.cleanupExpiredRemoteLogSegments(); - when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))) - .thenAnswer(answer -> CompletableFuture.runAsync(() -> { })); + assertEquals(200L, currentLogStartOffset.get()); + verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0)); + verify(remoteStorageManager, never()).deleteLogSegmentData(metadataList.get(1)); - task.run(); + // test that the 2nd log segment will be deleted by the new leader + RemoteLogManager.RLMTask newLeaderTask = remoteLogManager.new RLMTask(followerTopicIdPartition, 128); + newLeaderTask.convertToLeader(1); - assertEquals(200L, logStartOffset.get()); - verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0)); - verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1)); - } - } + Iterator<RemoteLogSegmentMetadata> firstIterator = metadataList.iterator(); + firstIterator.next(); + Iterator<RemoteLogSegmentMetadata> secondIterator = metadataList.iterator(); + secondIterator.next(); + Iterator<RemoteLogSegmentMetadata> thirdIterator = metadataList.iterator(); + thirdIterator.next(); - @Test - public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws RemoteStorageException, IOException { - AtomicLong logStartOffset = new AtomicLong(0); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, - tp -> Optional.of(mockLog), - (topicPartition, offset) -> logStartOffset.set(offset), - brokerTopicStats) { - public RemoteStorageManager createRemoteStorageManager() { - return remoteStorageManager; - } - public RemoteLogMetadataManager createRemoteLogMetadataManager() { - return remoteLogMetadataManager; - } - }) { - RemoteLogManager.RLMTask leaderTask = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); - leaderTask.convertToLeader(0); - - when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); - when(mockLog.logEndOffset()).thenReturn(200L); - - List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0); - - List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, epochEntries); - - when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) - .thenReturn(remoteLogSegmentMetadatas.iterator()); - when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0)) - .thenReturn(remoteLogSegmentMetadatas.iterator()) - .thenReturn(remoteLogSegmentMetadatas.iterator()); - - checkpoint.write(epochEntries); - LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); - when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); - - Map<String, Long> logProps = new HashMap<>(); - logProps.put("retention.bytes", -1L); - logProps.put("retention.ms", 0L); - LogConfig mockLogConfig = new LogConfig(logProps); - when(mockLog.config()).thenReturn(mockLogConfig); - - when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))) - .thenAnswer(answer -> { - // cancel the task so that we don't delete the second segment - leaderTask.cancel(); - return CompletableFuture.runAsync(() -> { - }); - }); + when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition)) + .thenReturn(firstIterator); + when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition, 0)) + .thenReturn(secondIterator) + .thenReturn(thirdIterator); - leaderTask.run(); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))) + .thenAnswer(answer -> CompletableFuture.runAsync(() -> { })); - assertEquals(200L, logStartOffset.get()); - verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0)); - verify(remoteStorageManager, never()).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1)); + newLeaderTask.cleanupExpiredRemoteLogSegments(); - // test that the 2nd log segment will be deleted by the new leader - RemoteLogManager.RLMTask newLeaderTask = remoteLogManager.new RLMTask(followerTopicIdPartition, 128); - newLeaderTask.convertToLeader(1); + assertEquals(200L, currentLogStartOffset.get()); + verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(0)); + verify(remoteStorageManager).deleteLogSegmentData(metadataList.get(1)); + } - Iterator<RemoteLogSegmentMetadata> firstIterator = remoteLogSegmentMetadatas.iterator(); - firstIterator.next(); - Iterator<RemoteLogSegmentMetadata> secondIterator = remoteLogSegmentMetadatas.iterator(); - secondIterator.next(); - Iterator<RemoteLogSegmentMetadata> thirdIterator = remoteLogSegmentMetadatas.iterator(); - thirdIterator.next(); + @ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionSizeBreach segmentCount={0} deletableSegmentCount={1}") + @CsvSource(value = {"50, 0", "50, 1", "50, 23", "50, 50"}) + public void testDeleteLogSegmentDueToRetentionSizeBreach(int segmentCount, + int deletableSegmentCount) + throws RemoteStorageException, ExecutionException, InterruptedException { + int recordsPerSegment = 100; + int segmentSize = 1024; + List<EpochEntry> epochEntries = Arrays.asList( + new EpochEntry(0, 0L), + new EpochEntry(1, 20L), + new EpochEntry(3, 50L), + new EpochEntry(4, 100L) + ); + checkpoint.write(epochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + int currentLeaderEpoch = epochEntries.get(epochEntries.size() - 1).epoch; + + long localLogSegmentsSize = 512L; + long retentionSize = ((long) segmentCount - deletableSegmentCount) * segmentSize + localLogSegmentsSize; + Map<String, Long> logProps = new HashMap<>(); + logProps.put("retention.bytes", retentionSize); + logProps.put("retention.ms", -1L); + LogConfig mockLogConfig = new LogConfig(logProps); + when(mockLog.config()).thenReturn(mockLogConfig); + + long localLogStartOffset = (long) segmentCount * recordsPerSegment; + long logEndOffset = ((long) segmentCount * recordsPerSegment) + 1; + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(mockLog.localLogStartOffset()).thenReturn(localLogStartOffset); + when(mockLog.logEndOffset()).thenReturn(logEndOffset); + when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(localLogSegmentsSize); - when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition)) - .thenReturn(firstIterator); - when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition, 0)) - .thenReturn(secondIterator) - .thenReturn(thirdIterator); + List<RemoteLogSegmentMetadata> segmentMetadataList = listRemoteLogSegmentMetadata( + leaderTopicIdPartition, segmentCount, recordsPerSegment, segmentSize, epochEntries); + verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, currentLeaderEpoch); + } - when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))) - .thenAnswer(answer -> CompletableFuture.runAsync(() -> { })); + @ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionTimeBreach segmentCount={0} deletableSegmentCount={1}") + @CsvSource(value = {"50, 0", "50, 1", "50, 23", "50, 50"}) + public void testDeleteLogSegmentDueToRetentionTimeBreach(int segmentCount, + int deletableSegmentCount) + throws RemoteStorageException, ExecutionException, InterruptedException { + int recordsPerSegment = 100; + int segmentSize = 1024; + List<EpochEntry> epochEntries = Arrays.asList( + new EpochEntry(0, 0L), + new EpochEntry(1, 20L), + new EpochEntry(3, 50L), + new EpochEntry(4, 100L) + ); + checkpoint.write(epochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint); + int currentLeaderEpoch = epochEntries.get(epochEntries.size() - 1).epoch; + + long localLogSegmentsSize = 512L; + long retentionSize = -1L; + Map<String, Long> logProps = new HashMap<>(); + logProps.put("retention.bytes", retentionSize); + logProps.put("retention.ms", 1L); + LogConfig mockLogConfig = new LogConfig(logProps); + when(mockLog.config()).thenReturn(mockLogConfig); + + long localLogStartOffset = (long) segmentCount * recordsPerSegment; + long logEndOffset = ((long) segmentCount * recordsPerSegment) + 1; + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(mockLog.localLogStartOffset()).thenReturn(localLogStartOffset); + when(mockLog.logEndOffset()).thenReturn(logEndOffset); + when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(localLogSegmentsSize); - newLeaderTask.run(); + List<RemoteLogSegmentMetadata> segmentMetadataList = listRemoteLogSegmentMetadataByTime( + leaderTopicIdPartition, segmentCount, deletableSegmentCount, recordsPerSegment, segmentSize, epochEntries); + verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, currentLeaderEpoch); + } - assertEquals(200L, logStartOffset.get()); - verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0)); - verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1)); + private void verifyDeleteLogSegment(List<RemoteLogSegmentMetadata> segmentMetadataList, + int deletableSegmentCount, + int currentLeaderEpoch) + throws RemoteStorageException, ExecutionException, InterruptedException { + when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) + .thenReturn(segmentMetadataList.iterator()); + when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt())) + .thenAnswer(invocation -> { + int leaderEpoch = invocation.getArgument(1); + return segmentMetadataList.stream() + .filter(segmentMetadata -> segmentMetadata.segmentLeaderEpochs().containsKey(leaderEpoch)) + .iterator(); + }); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))) + .thenAnswer(answer -> CompletableFuture.runAsync(() -> { })); + RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); + task.convertToLeader(currentLeaderEpoch); + task.cleanupExpiredRemoteLogSegments(); + + ArgumentCaptor<RemoteLogSegmentMetadata> deletedMetadataCapture = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class); + verify(remoteStorageManager, times(deletableSegmentCount)).deleteLogSegmentData(deletedMetadataCapture.capture()); + if (deletableSegmentCount > 0) { + List<RemoteLogSegmentMetadata> deletedMetadataList = deletedMetadataCapture.getAllValues(); + RemoteLogSegmentMetadata expectedEndMetadata = segmentMetadataList.get(deletableSegmentCount - 1); + assertEquals(segmentMetadataList.get(0), deletedMetadataList.get(0)); + assertEquals(expectedEndMetadata, deletedMetadataList.get(deletedMetadataList.size() - 1)); + assertEquals(currentLogStartOffset.get(), expectedEndMetadata.endOffset() + 1); } } @@ -1706,9 +1736,22 @@ public class RemoteLogManagerTest { int recordsPerSegment, int segmentSize, List<EpochEntry> epochEntries) { + return listRemoteLogSegmentMetadataByTime( + topicIdPartition, segmentCount, 0, recordsPerSegment, segmentSize, epochEntries); + } + + private List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadataByTime(TopicIdPartition topicIdPartition, + int segmentCount, + int deletableSegmentCount, + int recordsPerSegment, + int segmentSize, + List<EpochEntry> epochEntries) { List<RemoteLogSegmentMetadata> segmentMetadataList = new ArrayList<>(); for (int idx = 0; idx < segmentCount; idx++) { long timestamp = time.milliseconds(); + if (idx < deletableSegmentCount) { + timestamp = time.milliseconds() - 1; + } long startOffset = (long) idx * recordsPerSegment; long endOffset = startOffset + recordsPerSegment - 1; List<EpochEntry> localTotalEpochEntries = epochEntries.isEmpty() ? totalEpochEntries : epochEntries;