chia7712 commented on code in PR #21343: URL: https://github.com/apache/kafka/pull/21343#discussion_r2717555273
########## storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerManagerTest.java: ########## @@ -0,0 +1,922 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.server.common.RequestLocal; +import org.apache.kafka.server.common.TransactionVersion; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.apache.kafka.storage.internals.log.LogCleanerManager.OffsetsToClean; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT; +import static org.apache.kafka.storage.internals.log.LogCleaningState.LOG_CLEANING_ABORTED; +import static org.apache.kafka.storage.internals.log.LogCleaningState.LOG_CLEANING_IN_PROGRESS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests for the log cleaning logic. + */ +class LogCleanerManagerTest { + private static final TopicPartition TOPIC_PARTITION = new TopicPartition("log", 0); + private static final TopicPartition TOPIC_PARTITION_2 = new TopicPartition("log2", 0); + private static final LogConfig LOG_CONFIG = createLogConfig(); + private static final MockTime TIME = new MockTime(1400000000000L, 1000L); // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs` + private static final long OFFSET = 999; + private static final ProducerStateManagerConfig PRODUCER_STATE_MANAGER_CONFIG = + new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false); + private static final Map<TopicPartition, Long> CLEANER_CHECKPOINTS = new HashMap<>(); + + private File tmpDir; + private File tmpDir2; + private File logDir; + private File logDir2; + + static class LogCleanerManagerMock extends LogCleanerManager { + + LogCleanerManagerMock( + List<File> logDirs, + ConcurrentMap<TopicPartition, UnifiedLog> logs, + LogDirFailureChannel logDirFailureChannel + ) { + super(logDirs, logs, logDirFailureChannel); + } + + @Override + public Map<TopicPartition, Long> allCleanerCheckpoints() { + return CLEANER_CHECKPOINTS; + } + + @Override + public void updateCheckpoints( + File dataDir, + Optional<Map.Entry<TopicPartition, Long>> partitionToUpdateOrAdd, + Optional<TopicPartition> partitionToRemove + ) { + assert partitionToRemove.isEmpty() : "partitionToRemove argument with value not yet handled"; + + Map.Entry<TopicPartition, Long> entry = partitionToUpdateOrAdd.orElseThrow(() -> + new IllegalArgumentException("partitionToUpdateOrAdd==None argument not yet handled")); + + CLEANER_CHECKPOINTS.put(entry.getKey(), entry.getValue()); + } + } + + // the exception should be caught and the partition that caused it marked as uncleanable + static class LogMock extends UnifiedLog { + + LogMock( + long logStartOffset, + LocalLog localLog, + BrokerTopicStats brokerTopicStats, + int producerIdExpirationCheckIntervalMs, + LeaderEpochFileCache leaderEpochCache, + ProducerStateManager producerStateManager, + Optional<Uuid> topicId, + boolean remoteStorageSystemEnable, + LogOffsetsListener logOffsetsListener + ) throws IOException { + super(logStartOffset, localLog, brokerTopicStats, producerIdExpirationCheckIntervalMs, leaderEpochCache, + producerStateManager, topicId, remoteStorageSystemEnable, logOffsetsListener); + } + + // Throw an error in getFirstBatchTimestampForSegments since it is called in grabFilthiestLog() + @Override + public Collection<Long> getFirstBatchTimestampForSegments(Collection<LogSegment> segments) { + throw new IllegalStateException("Error!"); + } + } + + @BeforeEach + public void setup() { + tmpDir = TestUtils.tempDirectory(); + tmpDir2 = TestUtils.tempDirectory(); + logDir = TestUtils.randomPartitionLogDir(tmpDir); + logDir2 = TestUtils.randomPartitionLogDir(tmpDir2); + + CLEANER_CHECKPOINTS.clear(); + } + + @AfterEach + public void tearDown() throws IOException { + Utils.delete(tmpDir); + Utils.delete(tmpDir2); + } + + private ConcurrentMap<TopicPartition, UnifiedLog> setupIncreasinglyFilthyLogs(List<TopicPartition> partitions) throws IOException { + ConcurrentMap<TopicPartition, UnifiedLog> logs = new ConcurrentHashMap<>(); + int numBatches = 20; + + for (TopicPartition tp : partitions) { + UnifiedLog log = createLog(2048, TopicConfig.CLEANUP_POLICY_COMPACT, tp); + logs.put(tp, log); + + writeRecords(log, numBatches, 1, 5); + numBatches += 5; + } + + return logs; + } + + @Test + public void testGrabFilthiestCompactedLogThrowsException() throws IOException { + TopicPartition tp = new TopicPartition("A", 1); + int logSegmentSize = LogTestUtils.singletonRecords("test".getBytes(), null).sizeInBytes() * 10; + int logSegmentsCount = 2; + File tpDir = new File(logDir, "A-1"); + Files.createDirectories(tpDir.toPath()); + + LogDirFailureChannel logDirFailureChannel = new LogDirFailureChannel(10); + LogConfig config = createLowRetentionLogConfig(logSegmentSize, TopicConfig.CLEANUP_POLICY_COMPACT); + LogSegments segments = new LogSegments(tp); + LeaderEpochFileCache leaderEpochCache = UnifiedLog.createLeaderEpochCache(tpDir, TOPIC_PARTITION, logDirFailureChannel, + Optional.empty(), TIME.scheduler); + ProducerStateManager producerStateManager = new ProducerStateManager(TOPIC_PARTITION, tpDir, 5 * 60 * 1000, + PRODUCER_STATE_MANAGER_CONFIG, TIME); + LoadedLogOffsets offsets = new LogLoader(tpDir, tp, config, TIME.scheduler, TIME, logDirFailureChannel, true, + segments, 0L, 0L, leaderEpochCache, producerStateManager, new ConcurrentHashMap<>(), false).load(); + LocalLog localLog = new LocalLog(tpDir, config, segments, offsets.recoveryPoint(), offsets.nextOffsetMetadata(), + TIME.scheduler, TIME, tp, logDirFailureChannel); + UnifiedLog log = new LogMock(offsets.logStartOffset(), localLog, new BrokerTopicStats(), PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + leaderEpochCache, producerStateManager, Optional.empty(), false, LogOffsetsListener.NO_OP_OFFSETS_LISTENER); + + writeRecords(log, logSegmentsCount * 2, 10, 2); + + ConcurrentMap<TopicPartition, UnifiedLog> logsPool = new ConcurrentHashMap<>(); + logsPool.put(tp, log); + LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logsPool); + CLEANER_CHECKPOINTS.put(tp, 1L); + + LogCleaningException thrownException = assertThrows(LogCleaningException.class, + () -> cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get()); + + assertEquals(log, thrownException.log); + assertInstanceOf(IllegalStateException.class, thrownException.getCause()); + } + + @Test + public void testGrabFilthiestCompactedLogReturnsLogWithDirtiestRatio() throws IOException { + TopicPartition tp0 = new TopicPartition("wishing-well", 0); + TopicPartition tp1 = new TopicPartition("wishing-well", 1); + TopicPartition tp2 = new TopicPartition("wishing-well", 2); + List<TopicPartition> partitions = List.of(tp0, tp1, tp2); + + // setup logs with cleanable range: [20, 20], [20, 25], [20, 30] + ConcurrentMap<TopicPartition, UnifiedLog> logs = setupIncreasinglyFilthyLogs(partitions); + LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs); + partitions.forEach(partition -> CLEANER_CHECKPOINTS.put(partition, 20L)); + + LogToClean filthiestLog = cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get(); + assertEquals(tp2, filthiestLog.topicPartition()); + assertEquals(tp2, filthiestLog.log().topicPartition()); + } + + @Test + public void testGrabFilthiestCompactedLogIgnoresUncleanablePartitions() throws IOException { + TopicPartition tp0 = new TopicPartition("wishing-well", 0); + TopicPartition tp1 = new TopicPartition("wishing-well", 1); + TopicPartition tp2 = new TopicPartition("wishing-well", 2); + List<TopicPartition> partitions = List.of(tp0, tp1, tp2); + + // setup logs with cleanable range: [20, 20], [20, 25], [20, 30] + ConcurrentMap<TopicPartition, UnifiedLog> logs = setupIncreasinglyFilthyLogs(partitions); + LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs); + partitions.forEach(partition -> CLEANER_CHECKPOINTS.put(partition, 20L)); + + cleanerManager.markPartitionUncleanable(logs.get(tp2).dir().getParent(), tp2); + + LogToClean filthiestLog = cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get(); + assertEquals(tp1, filthiestLog.topicPartition()); + assertEquals(tp1, filthiestLog.log().topicPartition()); + } + + @Test + public void testGrabFilthiestCompactedLogIgnoresInProgressPartitions() throws IOException { + TopicPartition tp0 = new TopicPartition("wishing-well", 0); + TopicPartition tp1 = new TopicPartition("wishing-well", 1); + TopicPartition tp2 = new TopicPartition("wishing-well", 2); + List<TopicPartition> partitions = List.of(tp0, tp1, tp2); + + // setup logs with cleanable range: [20, 20], [20, 25], [20, 30] + ConcurrentMap<TopicPartition, UnifiedLog> logs = setupIncreasinglyFilthyLogs(partitions); + LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs); + partitions.forEach(partition -> CLEANER_CHECKPOINTS.put(partition, 20L)); + + cleanerManager.setCleaningState(tp2, LOG_CLEANING_IN_PROGRESS); + + LogToClean filthiestLog = cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get(); + assertEquals(tp1, filthiestLog.topicPartition()); + assertEquals(tp1, filthiestLog.log().topicPartition()); + } + + @Test + public void testGrabFilthiestCompactedLogIgnoresBothInProgressPartitionsAndUncleanablePartitions() throws IOException { + TopicPartition tp0 = new TopicPartition("wishing-well", 0); + TopicPartition tp1 = new TopicPartition("wishing-well", 1); + TopicPartition tp2 = new TopicPartition("wishing-well", 2); + List<TopicPartition> partitions = List.of(tp0, tp1, tp2); + + // setup logs with cleanable range: [20, 20], [20, 25], [20, 30] + ConcurrentMap<TopicPartition, UnifiedLog> logs = setupIncreasinglyFilthyLogs(partitions); + LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs); + partitions.forEach(partition -> CLEANER_CHECKPOINTS.put(partition, 20L)); + + cleanerManager.setCleaningState(tp2, LOG_CLEANING_IN_PROGRESS); + cleanerManager.markPartitionUncleanable(logs.get(tp1).dir().getParent(), tp1); + + Optional<LogToClean> filthiestLog = cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()); + assertEquals(Optional.empty(), filthiestLog); + } + + @Test + public void testDirtyOffsetResetIfLargerThanEndOffset() throws IOException { + TopicPartition tp = new TopicPartition("foo", 0); + ConcurrentMap<TopicPartition, UnifiedLog> logs = setupIncreasinglyFilthyLogs(List.of(tp)); + LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs); + CLEANER_CHECKPOINTS.put(tp, 200L); + + LogToClean filthiestLog = cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get(); + assertEquals(0L, filthiestLog.firstDirtyOffset()); + } + + @Test + public void testDirtyOffsetResetIfSmallerThanStartOffset() throws IOException { + TopicPartition tp = new TopicPartition("foo", 0); + ConcurrentMap<TopicPartition, UnifiedLog> logs = setupIncreasinglyFilthyLogs(List.of(tp)); + + logs.get(tp).maybeIncrementLogStartOffset(10L, LogStartOffsetIncrementReason.ClientRecordDeletion); + + LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs); + CLEANER_CHECKPOINTS.put(tp, 0L); + + LogToClean filthiestLog = cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get(); + assertEquals(10L, filthiestLog.firstDirtyOffset()); + } + + @Test + public void testLogStartOffsetLargerThanActiveSegmentBaseOffset() throws IOException { + TopicPartition tp = new TopicPartition("foo", 0); + UnifiedLog log = createLog(2048, TopicConfig.CLEANUP_POLICY_COMPACT, tp); + + ConcurrentMap<TopicPartition, UnifiedLog> logs = new ConcurrentHashMap<>(); + logs.put(tp, log); + + appendRecords(log, 3); + appendRecords(log, 3); + appendRecords(log, 3); + + assertEquals(1, log.logSegments().size()); + + log.maybeIncrementLogStartOffset(2L, LogStartOffsetIncrementReason.ClientRecordDeletion); + + LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs); + CLEANER_CHECKPOINTS.put(tp, 0L); + + // The active segment is uncleanable and hence not filthy from the POV of the CleanerManager. + Optional<LogToClean> filthiestLog = cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()); + assertEquals(Optional.empty(), filthiestLog); + } + + @Test + public void testDirtyOffsetLargerThanActiveSegmentBaseOffset() throws IOException { + // It is possible in the case of an unclean leader election for the checkpoint + // dirty offset to get ahead of the active segment base offset, but still be + // within the range of the log. + + TopicPartition tp = new TopicPartition("foo", 0); + + ConcurrentMap<TopicPartition, UnifiedLog> logs = new ConcurrentHashMap<>(); + UnifiedLog log = createLog(2048, TopicConfig.CLEANUP_POLICY_COMPACT, tp); + logs.put(tp, log); + + appendRecords(log, 3); + appendRecords(log, 3); + + assertEquals(1, log.logSegments().size()); + assertEquals(0L, log.activeSegment().baseOffset()); + + LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs); + CLEANER_CHECKPOINTS.put(tp, 3L); + + // These segments are uncleanable and hence not filthy + Optional<LogToClean> filthiestLog = cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()); + assertEquals(Optional.empty(), filthiestLog); + } + + /** + * When checking for logs with segments ready for deletion + * we shouldn't consider logs where cleanup.policy=delete + * as they are handled by the LogManager + */ + @Test + public void testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyDeleteLogs() throws IOException { + MemoryRecords records = LogTestUtils.singletonRecords("test".getBytes(), null); + UnifiedLog log = createLog(records.sizeInBytes() * 5, TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 0)); + LogCleanerManager cleanerManager = createCleanerManager(log); + + int readyToDelete = cleanerManager.deletableLogs().size(); + assertEquals(0, readyToDelete, "should have 0 logs ready to be deleted"); + } + + /** + * We should find logs with segments ready to be deleted when cleanup.policy=compact,delete + */ + @Test + public void testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactDeleteLogs() throws IOException { + MemoryRecords records = LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes()); + UnifiedLog log = createLog(records.sizeInBytes() * 5, TopicConfig.CLEANUP_POLICY_COMPACT + "," + + TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 0)); + LogCleanerManager cleanerManager = createCleanerManager(log); + + int readyToDelete = cleanerManager.deletableLogs().size(); + assertEquals(1, readyToDelete, "should have 1 logs ready to be deleted"); + } + + /** + * When looking for logs with segments ready to be deleted we should consider + * logs with cleanup.policy=compact because they may have segments from before the log start offset + */ + @Test + public void testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactLogs() throws IOException { + MemoryRecords records = LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes()); + UnifiedLog log = createLog(records.sizeInBytes() * 5, TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0)); + LogCleanerManager cleanerManager = createCleanerManager(log); + + int readyToDelete = cleanerManager.deletableLogs().size(); + assertEquals(1, readyToDelete, "should have 1 logs ready to be deleted"); + } + + /** + * log under cleanup should be ineligible for compaction + */ + @Test + public void testLogsUnderCleanupIneligibleForCompaction() throws IOException { + MemoryRecords records = LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes()); + UnifiedLog log = createLog(records.sizeInBytes() * 5, TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 0)); + LogCleanerManager cleanerManager = createCleanerManager(log); + + log.appendAsLeader(records, 0); + log.roll(); + log.appendAsLeader(LogTestUtils.singletonRecords("test2".getBytes(), "test2".getBytes()), 0); + log.updateHighWatermark(2L); + + // simulate cleanup thread working on the log partition + List<Map.Entry<TopicPartition, UnifiedLog>> deletableLog = cleanerManager.pauseCleaningForNonCompactedPartitions(); + assertEquals(1, deletableLog.size(), "should have 1 logs ready to be deleted"); + + // change cleanup policy from delete to compact + Properties logProps = new Properties(); + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, log.config().segmentSize()); + logProps.put(TopicConfig.RETENTION_MS_CONFIG, log.config().retentionMs); + logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); + logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0); + LogConfig config = new LogConfig(logProps); + log.updateConfig(config); + + // log cleanup in progress, the log is not available for compaction + Optional<LogToClean> cleanable = cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()); + assertTrue(cleanable.isEmpty(), "should have 0 logs ready to be compacted"); + + // log cleanup finished, and log can be picked up for compaction + cleanerManager.resumeCleaning(deletableLog.stream().map(Map.Entry::getKey).collect(Collectors.toSet())); + Optional<LogToClean> cleanable2 = cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()); + assertTrue(cleanable2.isPresent(), "should have 1 logs ready to be compacted"); + + // update cleanup policy to delete + logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); + LogConfig config2 = new LogConfig(logProps); + log.updateConfig(config2); + + // compaction in progress, should have 0 log eligible for log cleanup + List<Map.Entry<TopicPartition, UnifiedLog>> deletableLog2 = cleanerManager.pauseCleaningForNonCompactedPartitions(); + assertEquals(0, deletableLog2.size(), "should have 0 logs ready to be deleted"); + + // compaction done, should have 1 log eligible for log cleanup + cleanerManager.doneDeleting(List.of(cleanable2.get().topicPartition())); + List<Map.Entry<TopicPartition, UnifiedLog>> deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions(); + assertEquals(1, deletableLog3.size(), "should have 1 logs ready to be deleted"); + } + + @Test + public void testUpdateCheckpointsShouldAddOffsetToPartition() throws IOException { + MemoryRecords records = LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes()); + UnifiedLog log = createLog(records.sizeInBytes() * 5, TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0)); + LogCleanerManager cleanerManager = createCleanerManager(log); + + // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints + assertNotEquals(OFFSET, cleanerManager.allCleanerCheckpoints().getOrDefault(TOPIC_PARTITION, 0L)); + + cleanerManager.updateCheckpoints(logDir, Optional.of(Map.entry(TOPIC_PARTITION, OFFSET)), Optional.empty()); + // expect the checkpoint offset is now updated to the expected offset after doing updateCheckpoints + assertEquals(OFFSET, cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION)); + } + + @Test + public void testUpdateCheckpointsShouldRemovePartitionData() throws IOException { + MemoryRecords records = LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes()); + UnifiedLog log = createLog(records.sizeInBytes() * 5, TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0)); + LogCleanerManager cleanerManager = createCleanerManager(log); + + // write some data into the cleaner-offset-checkpoint file + cleanerManager.updateCheckpoints(logDir, Optional.of(Map.entry(TOPIC_PARTITION, OFFSET)), Optional.empty()); + assertEquals(OFFSET, cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION)); + + // updateCheckpoints should remove the topicPartition data in the logDir + cleanerManager.updateCheckpoints(logDir, Optional.empty(), Optional.of(TOPIC_PARTITION)); + assertFalse(cleanerManager.allCleanerCheckpoints().containsKey(TOPIC_PARTITION)); + } + + @Test + public void testHandleLogDirFailureShouldRemoveDirAndData() throws IOException { + MemoryRecords records = LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes()); + UnifiedLog log = createLog(records.sizeInBytes() * 5, TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0)); + LogCleanerManager cleanerManager = createCleanerManager(log); + + // write some data into the cleaner-offset-checkpoint file in logDir and logDir2 + cleanerManager.updateCheckpoints(logDir, Optional.of(Map.entry(TOPIC_PARTITION, OFFSET)), Optional.empty()); + cleanerManager.updateCheckpoints(logDir2, Optional.of(Map.entry(TOPIC_PARTITION_2, OFFSET)), Optional.empty()); + assertEquals(OFFSET, cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION)); + assertEquals(OFFSET, cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION_2)); + + cleanerManager.handleLogDirFailure(logDir.getAbsolutePath()); + // verify the partition data in logDir is gone, and data in logDir2 is still there + assertEquals(OFFSET, cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION_2)); + assertFalse(cleanerManager.allCleanerCheckpoints().containsKey(TOPIC_PARTITION)); + } + + @Test + public void testMaybeTruncateCheckpointShouldTruncateData() throws IOException { + MemoryRecords records = LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes()); + UnifiedLog log = createLog(records.sizeInBytes() * 5, TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0)); + LogCleanerManager cleanerManager = createCleanerManager(log); + long lowerOffset = 1L; + long higherOffset = 1000L; + + // write some data into the cleaner-offset-checkpoint file in logDir + cleanerManager.updateCheckpoints(logDir, Optional.of(Map.entry(TOPIC_PARTITION, OFFSET)), Optional.empty()); + assertEquals(OFFSET, cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION)); + + // we should not truncate the checkpoint data for checkpointed offset <= the given offset (higherOffset) + cleanerManager.maybeTruncateCheckpoint(logDir, TOPIC_PARTITION, higherOffset); + assertEquals(OFFSET, cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION)); + + // we should truncate the checkpoint data for checkpointed offset > the given offset (lowerOffset) + cleanerManager.maybeTruncateCheckpoint(logDir, TOPIC_PARTITION, lowerOffset); + assertEquals(lowerOffset, cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION)); + } + + @Test + public void testAlterCheckpointDirShouldRemoveDataInSrcDirAndAddInNewDir() throws IOException { + MemoryRecords records = LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes()); + UnifiedLog log = createLog(records.sizeInBytes() * 5, TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0)); + LogCleanerManager cleanerManager = createCleanerManager(log); + + // write some data into the cleaner-offset-checkpoint file in logDir + cleanerManager.updateCheckpoints(logDir, Optional.of(Map.entry(TOPIC_PARTITION, OFFSET)), Optional.empty()); + assertEquals(OFFSET, cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION)); + + cleanerManager.alterCheckpointDir(TOPIC_PARTITION, logDir, logDir2); + // verify we still can get the partition offset after alterCheckpointDir + // This data should locate in logDir2, not logDir + assertEquals(OFFSET, cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION)); + + // force delete the logDir2 from checkpoints, so that the partition data should also be deleted + cleanerManager.handleLogDirFailure(logDir2.getAbsolutePath()); + assertFalse(cleanerManager.allCleanerCheckpoints().containsKey(TOPIC_PARTITION)); + } + + /** + * Log under cleanup should still be eligible for log truncation. + */ + @Test + public void testConcurrentLogCleanupAndLogTruncation() throws IOException { + MemoryRecords records = LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes()); + UnifiedLog log = createLog(records.sizeInBytes() * 5, TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 0)); + LogCleanerManager cleanerManager = createCleanerManager(log); + + // log cleanup starts + List<Map.Entry<TopicPartition, UnifiedLog>> pausedPartitions = cleanerManager.pauseCleaningForNonCompactedPartitions(); + // Log truncation happens due to unclean leader election + cleanerManager.abortAndPauseCleaning(log.topicPartition()); + cleanerManager.resumeCleaning(Set.of(log.topicPartition())); + // log cleanup finishes and pausedPartitions are resumed + cleanerManager.resumeCleaning(pausedPartitions.stream().map(Map.Entry::getKey).collect(Collectors.toSet())); + + assertEquals(Optional.empty(), cleanerManager.cleaningState(log.topicPartition())); + } + + /** + * Log under cleanup should still be eligible for topic deletion. + */ + @Test + public void testConcurrentLogCleanupAndTopicDeletion() throws IOException { + MemoryRecords records = LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes()); + UnifiedLog log = createLog(records.sizeInBytes() * 5, TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 0)); + LogCleanerManager cleanerManager = createCleanerManager(log); + + // log cleanup starts + List<Map.Entry<TopicPartition, UnifiedLog>> pausedPartitions = cleanerManager.pauseCleaningForNonCompactedPartitions(); + // Broker processes StopReplicaRequest with delete=true + cleanerManager.abortCleaning(log.topicPartition()); + // log cleanup finishes and pausedPartitions are resumed + cleanerManager.resumeCleaning(pausedPartitions.stream().map(Map.Entry::getKey).collect(Collectors.toSet())); + + assertEquals(Optional.empty(), cleanerManager.cleaningState(log.topicPartition())); + } + + /** + * When looking for logs with segments ready to be deleted we shouldn't consider + * logs that have had their partition marked as uncleanable. + */ + @Test + public void testLogsWithSegmentsToDeleteShouldNotConsiderUncleanablePartitions() throws IOException { + MemoryRecords records = LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes()); + UnifiedLog log = createLog(records.sizeInBytes() * 5, TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0)); + LogCleanerManager cleanerManager = createCleanerManager(log); + cleanerManager.markPartitionUncleanable(log.dir().getParent(), TOPIC_PARTITION); + + int readyToDelete = cleanerManager.deletableLogs().size(); + assertEquals(0, readyToDelete, "should have 0 logs ready to be deleted"); + } + + /** + * Test computation of cleanable range with no minimum compaction lag settings active where bounded by LSO. + */ + @Test + public void testCleanableOffsetsForNone() throws IOException { + Properties logProps = new Properties(); + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024); + UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(), logProps)); + + while (log.numberOfSegments() < 8) + log.appendAsLeader(records((int) log.logEndOffset(), (int) log.logEndOffset(), TIME.milliseconds()), 0); + + log.updateHighWatermark(50); + + Optional<Long> lastCleanOffset = Optional.of(0L); + OffsetsToClean cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, TIME.milliseconds()); + assertEquals(0L, cleanableOffsets.firstDirtyOffset(), "The first cleanable offset starts at the beginning of the log."); + assertEquals(log.highWatermark(), log.lastStableOffset(), + "The high watermark equals the last stable offset as no transactions are in progress"); + assertEquals(log.lastStableOffset(), cleanableOffsets.firstUncleanableDirtyOffset(), + "The first uncleanable offset is bounded by the last stable offset."); + } + + /** + * Test computation of cleanable range with no minimum compaction lag settings active where bounded by active segment. + */ + @Test + public void testCleanableOffsetsActiveSegment() throws IOException { + Properties logProps = new Properties(); + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024); + UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(), logProps)); + + while (log.numberOfSegments() < 8) + log.appendAsLeader(records((int) log.logEndOffset(), (int) log.logEndOffset(), TIME.milliseconds()), 0); + + log.updateHighWatermark(log.logEndOffset()); + + Optional<Long> lastCleanOffset = Optional.of(0L); + OffsetsToClean cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, TIME.milliseconds()); + + assertEquals(0L, cleanableOffsets.firstDirtyOffset(), "The first cleanable offset starts at the beginning of the log."); + assertEquals(log.activeSegment().baseOffset(), cleanableOffsets.firstUncleanableDirtyOffset(), + "The first uncleanable offset begins with the active segment."); + } + + /** + * Test computation of cleanable range with a minimum compaction lag time + */ + @Test + public void testCleanableOffsetsForTime() throws IOException { + int compactionLag = 60 * 60 * 1000; + Properties logProps = new Properties(); + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024); + logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag); + UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(), logProps)); + + long t0 = TIME.milliseconds(); + while (log.numberOfSegments() < 4) + log.appendAsLeader(records((int) log.logEndOffset(), (int) log.logEndOffset(), t0), 0); + + LogSegment activeSegAtT0 = log.activeSegment(); + + TIME.sleep(compactionLag + 1); + long t1 = TIME.milliseconds(); + + while (log.numberOfSegments() < 8) + log.appendAsLeader(records((int) log.logEndOffset(), (int) log.logEndOffset(), t1), 0); + + log.updateHighWatermark(log.logEndOffset()); + + Optional<Long> lastCleanOffset = Optional.of(0L); + OffsetsToClean cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, TIME.milliseconds()); + assertEquals(0L, cleanableOffsets.firstDirtyOffset(), "The first cleanable offset starts at the beginning of the log."); + assertEquals(activeSegAtT0.baseOffset(), cleanableOffsets.firstUncleanableDirtyOffset(), + "The first uncleanable offset begins with the second block of log entries."); + } + + /** + * Test computation of cleanable range with a minimum compaction lag time that is small enough that + * the active segment contains it. + */ + @Test + public void testCleanableOffsetsForShortTime() throws IOException { + int compactionLag = 60 * 60 * 1000; + Properties logProps = new Properties(); + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024); + logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag); + + UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(), logProps)); + + long t0 = TIME.milliseconds(); + while (log.numberOfSegments() < 8) + log.appendAsLeader(records((int) log.logEndOffset(), (int) log.logEndOffset(), t0), 0); + + log.updateHighWatermark(log.logEndOffset()); + + TIME.sleep(compactionLag + 1); + + Optional<Long> lastCleanOffset = Optional.of(0L); + OffsetsToClean cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, TIME.milliseconds()); + assertEquals(0L, cleanableOffsets.firstDirtyOffset(), "The first cleanable offset starts at the beginning of the log."); + assertEquals(log.activeSegment().baseOffset(), cleanableOffsets.firstUncleanableDirtyOffset(), + "The first uncleanable offset begins with active segment."); + } + + @Test + public void testCleanableOffsetsNeedsCheckpointReset() throws IOException { + TopicPartition tp = new TopicPartition("foo", 0); + ConcurrentMap<TopicPartition, UnifiedLog> logs = setupIncreasinglyFilthyLogs(List.of(tp)); + logs.get(tp).maybeIncrementLogStartOffset(10L, LogStartOffsetIncrementReason.ClientRecordDeletion); + + Optional<Long> lastCleanOffset = Optional.of(15L); + OffsetsToClean cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, TIME.milliseconds()); + assertFalse(cleanableOffsets.forceUpdateCheckpoint(), "Checkpoint offset should not be reset if valid"); + + logs.get(tp).maybeIncrementLogStartOffset(20L, LogStartOffsetIncrementReason.ClientRecordDeletion); + cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, TIME.milliseconds()); + assertTrue(cleanableOffsets.forceUpdateCheckpoint(), "Checkpoint offset needs to be reset if less than log start offset"); + + lastCleanOffset = Optional.of(25L); + cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset, TIME.milliseconds()); + assertTrue(cleanableOffsets.forceUpdateCheckpoint(), "Checkpoint offset needs to be reset if greater than log end offset"); + } + + @Test + public void testUndecidedTransactionalDataNotCleanable() throws IOException { + int compactionLag = 60 * 60 * 1000; + Properties logProps = new Properties(); + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024); + logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag); + UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(), logProps)); + + long producerId = 15L; + short producerEpoch = 0; + int sequence = 0; + log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, + new SimpleRecord(TIME.milliseconds(), "1".getBytes(), "a".getBytes()), + new SimpleRecord(TIME.milliseconds(), "2".getBytes(), "b".getBytes())), 0); + log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence + 2, + new SimpleRecord(TIME.milliseconds(), "3".getBytes(), "c".getBytes())), 0); + log.roll(); + log.updateHighWatermark(3L); + + TIME.sleep(compactionLag + 1); + // although the compaction lag has been exceeded, the undecided data should not be cleaned + OffsetsToClean cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Optional.of(0L), TIME.milliseconds()); + assertEquals(0L, cleanableOffsets.firstDirtyOffset()); + assertEquals(0L, cleanableOffsets.firstUncleanableDirtyOffset()); + + log.appendAsLeader(MemoryRecords.withEndTransactionMarker(TIME.milliseconds(), producerId, producerEpoch, + new EndTransactionMarker(ControlRecordType.ABORT, 15)), 0, AppendOrigin.COORDINATOR, RequestLocal.noCaching(), + VerificationGuard.SENTINEL, TransactionVersion.TV_1.featureLevel()); + log.roll(); + log.updateHighWatermark(4L); + + // the first segment should now become cleanable immediately + cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Optional.of(0L), TIME.milliseconds()); + assertEquals(0L, cleanableOffsets.firstDirtyOffset()); + assertEquals(3L, cleanableOffsets.firstUncleanableDirtyOffset()); + + TIME.sleep(compactionLag + 1); + + // the second segment becomes cleanable after the compaction lag + cleanableOffsets = LogCleanerManager.cleanableOffsets(log, Optional.of(0L), TIME.milliseconds()); + assertEquals(0L, cleanableOffsets.firstDirtyOffset()); + assertEquals(4L, cleanableOffsets.firstUncleanableDirtyOffset()); + } + + @Test + public void testDoneCleaning() throws IOException { + Properties logProps = new Properties(); + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024); + UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(), logProps)); + + while (log.numberOfSegments() < 8) + log.appendAsLeader(records((int) log.logEndOffset(), (int) log.logEndOffset(), TIME.milliseconds()), 0); + + LogCleanerManager cleanerManager = createCleanerManager(log); + assertThrows(IllegalStateException.class, () -> cleanerManager.doneCleaning(TOPIC_PARTITION, log.dir(), 1)); + + cleanerManager.setCleaningState(TOPIC_PARTITION, LogCleaningState.logCleaningPaused(1)); + assertThrows(IllegalStateException.class, () -> cleanerManager.doneCleaning(TOPIC_PARTITION, log.dir(), 1)); + + cleanerManager.setCleaningState(TOPIC_PARTITION, LOG_CLEANING_IN_PROGRESS); + long endOffset = 1L; + cleanerManager.doneCleaning(TOPIC_PARTITION, log.dir(), endOffset); + + assertTrue(cleanerManager.cleaningState(TOPIC_PARTITION).isEmpty()); + assertTrue(cleanerManager.allCleanerCheckpoints().containsKey(TOPIC_PARTITION)); + assertEquals(Optional.of(endOffset), Optional.of(cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION))); Review Comment: Could you remove the `Optional.of` from both inputs? ########## storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerManagerTest.java: ########## @@ -0,0 +1,922 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.server.common.RequestLocal; +import org.apache.kafka.server.common.TransactionVersion; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.apache.kafka.storage.internals.log.LogCleanerManager.OffsetsToClean; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT; +import static org.apache.kafka.storage.internals.log.LogCleaningState.LOG_CLEANING_ABORTED; +import static org.apache.kafka.storage.internals.log.LogCleaningState.LOG_CLEANING_IN_PROGRESS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests for the log cleaning logic. + */ +class LogCleanerManagerTest { + private static final TopicPartition TOPIC_PARTITION = new TopicPartition("log", 0); + private static final TopicPartition TOPIC_PARTITION_2 = new TopicPartition("log2", 0); + private static final LogConfig LOG_CONFIG = createLogConfig(); + private static final MockTime TIME = new MockTime(1400000000000L, 1000L); // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs` + private static final long OFFSET = 999; + private static final ProducerStateManagerConfig PRODUCER_STATE_MANAGER_CONFIG = + new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false); + private static final Map<TopicPartition, Long> CLEANER_CHECKPOINTS = new HashMap<>(); Review Comment: Would you mind moving `CLEANER_CHECKPOINTS` into `LogCleanerManagerMock` as an instance variable? Most tests set `CLEANER_CHECKPOINTS` immediately after creating `LogCleanerManagerMock`, so scoping it to the mock instance makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
