malletgu commented on code in PR #16681: URL: https://github.com/apache/kafka/pull/16681#discussion_r1724975609
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1209,60 +1195,80 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE long logStartOffset = log.logStartOffset(); long logEndOffset = log.logEndOffset(); - Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(log.config().retentionSize, - log.onlyLocalLogSegmentsSize(), logEndOffset, epochWithOffsets); - Optional<RetentionTimeData> retentionTimeData = buildRetentionTimeData(log.config().retentionMs); - - RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData); - Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator(); - boolean canProcess = true; - List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>(); long sizeOfDeletableSegmentsBytes = 0L; - while (canProcess && epochIterator.hasNext()) { - Integer epoch = epochIterator.next(); - Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); - while (canProcess && segmentsIterator.hasNext()) { - if (isCancelled()) { - logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed."); - return; - } - RemoteLogSegmentMetadata metadata = segmentsIterator.next(); - if (segmentIdsBeingCopied.contains(metadata.remoteLogSegmentId())) { - logger.debug("Copy for the segment {} is currently in process. Skipping cleanup for it and the remaining segments", - metadata.remoteLogSegmentId()); - canProcess = false; - continue; - } - if (RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) { + final List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>(); + final List<RemoteLogSegmentMetadata> validSegments = new ArrayList<>(); + for (Integer remoteLeaderEpoch: epochWithOffsets.navigableKeySet()) { + Iterator<RemoteLogSegmentMetadata> it = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, remoteLeaderEpoch); + while (it.hasNext()) { + final RemoteLogSegmentMetadata segment = it.next(); + + // We can remove all segments in COPY_SEGMENT_STARTED but the last one as they are dangling + if (segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED) && it.hasNext()) { + sizeOfDeletableSegmentsBytes += segment.segmentSizeInBytes(); + segmentsToDelete.add(segment); continue; } - if (segmentsToDelete.contains(metadata)) { + + if (segment.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_STARTED)) { + segmentsToDelete.add(segment); + sizeOfDeletableSegmentsBytes += segment.segmentSizeInBytes(); continue; } Review Comment: Yes this is correct. Avoiding passing those segments when computing `buildRetentionSizeData` after is the most important thing and before we used to do it. > We will check COPY_SEGMENT_STARTED and DELETE_SEGMENT_STARTED here, but DELETE_SEGMENT_FINISHED in L1244. I don't think it's readable. Could you make it clear? I'll refactor this ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1209,60 +1195,80 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE long logStartOffset = log.logStartOffset(); long logEndOffset = log.logEndOffset(); - Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(log.config().retentionSize, - log.onlyLocalLogSegmentsSize(), logEndOffset, epochWithOffsets); - Optional<RetentionTimeData> retentionTimeData = buildRetentionTimeData(log.config().retentionMs); - - RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData); - Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator(); - boolean canProcess = true; - List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>(); long sizeOfDeletableSegmentsBytes = 0L; - while (canProcess && epochIterator.hasNext()) { - Integer epoch = epochIterator.next(); - Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); - while (canProcess && segmentsIterator.hasNext()) { - if (isCancelled()) { - logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed."); - return; - } - RemoteLogSegmentMetadata metadata = segmentsIterator.next(); - if (segmentIdsBeingCopied.contains(metadata.remoteLogSegmentId())) { - logger.debug("Copy for the segment {} is currently in process. Skipping cleanup for it and the remaining segments", - metadata.remoteLogSegmentId()); - canProcess = false; - continue; - } - if (RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) { + final List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>(); + final List<RemoteLogSegmentMetadata> validSegments = new ArrayList<>(); + for (Integer remoteLeaderEpoch: epochWithOffsets.navigableKeySet()) { + Iterator<RemoteLogSegmentMetadata> it = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, remoteLeaderEpoch); + while (it.hasNext()) { + final RemoteLogSegmentMetadata segment = it.next(); + + // We can remove all segments in COPY_SEGMENT_STARTED but the last one as they are dangling + if (segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED) && it.hasNext()) { Review Comment: Correct, it is based on each `RemoteLogSegmentMetadata` being different. > if there is only COPY_SEGMENT_STARTED and has no next state, it must be a dangling one, right We used to have this assumption, but now that the copy and cleanup happens in parallel we could get a segment in `COPY_SEGMENT_STARTED` because it is actually being copied. This is also why I avoid doing `remoteLogMetadataManager.listRemoteLogSegments` again as this list of segments could change between consecutive calls during this function. ########## core/src/test/java/kafka/log/remote/RemoteLogManagerInteractionTest.java: ########## @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import kafka.log.LogTestUtils; +import kafka.log.UnifiedLog; +import kafka.log.remote.quota.InMemoryRemoteLogMetadataManager; +import kafka.log.remote.quota.RLMQuotaManager; +import kafka.server.BrokerTopicStats; +import kafka.server.KafkaConfig; +import kafka.server.RequestLocal; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.AppendOrigin; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogOffsetsListener; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.apache.kafka.storage.internals.log.VerificationGuard; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.time.Duration; +import java.util.HashMap; +import java.util.Optional; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; + +import scala.Option; + +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteLogManagerInteractionTest { + private static final Random RANDOM = new Random(0); + + private KafkaConfig config; + private final String clusterId = "dummyId"; + private final int retentionSize = 10_000_000; + private final File logDir = TestUtils.tempDirectory("kafka-"); + private final File partitionDir = kafka.utils.TestUtils.randomPartitionLogDir(logDir); + private BrokerTopicStats brokerTopicStats; + private final MockTime mockTime = new MockTime(); + private Uuid topicId; + private UnifiedLog log; + private final Metrics metrics = new Metrics(); + + private final RemoteStorageManager remoteStorageManager = Mockito.mock(RemoteStorageManager.class); + private final InMemoryRemoteLogMetadataManager remoteLogMetadataManager = new InMemoryRemoteLogMetadataManager(); + private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class); + private RemoteLogManager remoteLogManager; + + private final String remoteLogStorageTestProp = "remote.log.storage.test"; + private final String remoteLogStorageTestVal = "storage.test"; + + private TopicIdPartition topicIdPartition; + + + @BeforeEach + void setUp() throws Exception { + Properties props = kafka.utils.TestUtils.createDummyBrokerConfig(); + props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); + props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100"); + appendRLMConfig(props); + config = KafkaConfig.fromProps(props); + topicId = Uuid.randomUuid(); + topicIdPartition = new TopicIdPartition(topicId, new TopicPartition("test", 0)); + brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled()); + + HashMap<String, Object> properties = new HashMap<>(); + properties.put("retention.bytes", retentionSize); + properties.put("local.retention.bytes", "1"); + properties.put("remote.storage.enable", "true"); + + LogConfig logConfig = new LogConfig(properties); + log = LogTestUtils.createLog( + partitionDir, logConfig, brokerTopicStats, mockTime.scheduler, mockTime, 0L, 0L, 5 * 60 * 1000, + new ProducerStateManagerConfig(86400000, false), 600000, true, + Option.apply(topicId), true, new ConcurrentHashMap<>(), true, null, LogOffsetsListener.NO_OP_OFFSETS_LISTENER + ); + + remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), 1, logDir.toString(), clusterId, mockTime, + tp -> Optional.of(log), + (topicPartition, offset) -> log.maybeIncrementLogStartOffset(offset, LogStartOffsetIncrementReason.SegmentDeletion), + brokerTopicStats, metrics) { + public RemoteStorageManager createRemoteStorageManager() { + return remoteStorageManager; + } + + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + + public RLMQuotaManager createRLMCopyQuotaManager() { + return rlmCopyQuotaManager; + } + + public Duration quotaTimeout() { + return Duration.ofMillis(100); + } + + @Override + long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) { + return 0L; + } + }; + remoteLogMetadataManager.initialise(topicIdPartition); + remoteLogManager.startup(); + } + + @AfterEach + void cleanup() { + log.close(); + } + + private void appendRLMConfig(Properties props) { + props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true); + props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, NoOpRemoteStorageManager.class.getName()); + props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, NoOpRemoteLogMetadataManager.class.getName()); + props.put(DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX + remoteLogStorageTestProp, remoteLogStorageTestVal); + } + + /** + * Builds a segment of the given size and append it to the log, then roll the segment after. + * Each segment has some overhead of around 70bytes which impact log retention policy. + * + * @param size size of the segment to create + * @param count number of messages to create in the segment + */ + private void buildSegment(int size, int count) { + byte[] bytes = new byte[size / count]; + for (int i = 0; i < count; i++) { + RANDOM.nextBytes(bytes); + MemoryRecords records = MemoryRecords.withRecords( + RecordBatch.MAGIC_VALUE_V2, 0, Compression.NONE, TimestampType.CREATE_TIME, + new SimpleRecord(bytes) + ); + log.appendAsLeader(records, 0, AppendOrigin.CLIENT, + MetadataVersion.latestTesting(), RequestLocal.NoCaching(), VerificationGuard.SENTINEL + ); + } + log.roll(Option.empty()); + } + + /** + * Builds a segment of the given size with a default of 100 messages and append it to the log + * then roll the segment after. + * Each segment has some overhead of around 70bytes which impact log retention policy. + * + * @param size size of the segment to create + */ + private void buildSegment(int size) { + buildSegment(size, 100); + } + + @Test + void nonActiveSegmentsAreCorrectlyWritten() throws RemoteStorageException { + RemoteLogManager.RLMTask task = remoteLogManager.new RLMCopyTask(topicIdPartition, 128); + + buildSegment(retentionSize); + buildSegment(retentionSize); + buildSegment(retentionSize); + log.maybeUpdateHighWatermark(log.logEndOffset()); + + task.run(); + + verify(remoteStorageManager, times(3)).copyLogSegmentData(any(), any()); + } + + @Test + void failedSegmentAreRetried() throws RemoteStorageException { + RemoteLogManager.RLMTask task = remoteLogManager.new RLMCopyTask(topicIdPartition, 128); + + buildSegment(retentionSize); + buildSegment(retentionSize); + buildSegment(retentionSize); + log.maybeUpdateHighWatermark(log.logEndOffset()); + + when(remoteStorageManager.copyLogSegmentData(any(), any())) + .thenThrow(new RemoteStorageException("")) + .thenReturn(Optional.empty()); + + task.run(); + task.run(); + + verify(remoteStorageManager, times(4)).copyLogSegmentData(any(), any()); + } + + @Test + void failedSegmentsAreNotAccountedInSizeForRetention() throws RemoteStorageException { + // We're testing the scenario where the first segment is correctly pushed then the next segment fails to get + // uploaded multiple times before succeeding again. + // At the end we should see no call to delete as retention.bytes was never breached + RemoteLogManager.RLMTask copyTask = remoteLogManager.new RLMCopyTask(topicIdPartition, 128); + RemoteLogManager.RLMTask cleanupTask = remoteLogManager.new RLMExpirationTask(topicIdPartition); + + buildSegment(retentionSize / 3); + log.maybeUpdateHighWatermark(log.logEndOffset()); + when(remoteStorageManager.copyLogSegmentData(any(), any())).thenReturn(Optional.empty()); + copyTask.run(); + + buildSegment(retentionSize / 3); + log.maybeUpdateHighWatermark(log.logEndOffset()); + when(remoteStorageManager.copyLogSegmentData(any(), any())).thenThrow(new RemoteStorageException("")); + for (int i = 0; i < 4; i++) { + copyTask.run(); + } + doReturn(Optional.empty()).when(remoteStorageManager).copyLogSegmentData(any(), any()); + copyTask.run(); + + buildSegment(retentionSize / 3); + log.maybeUpdateHighWatermark(log.logEndOffset()); + copyTask.run(); + cleanupTask.run(); + + // total is 1 (success) + 4 (failures) + 2x1 (success) + verify(remoteStorageManager, times(7)).copyLogSegmentData(any(), any()); + verify(remoteStorageManager, never()).deleteLogSegmentData( + argThat(segment -> segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) + ); + verify(remoteStorageManager, times(4)).deleteLogSegmentData( + argThat(segment -> segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED)) + ); Review Comment: I'll add this as a comment but the idea is that, because we have a `COPY_SEGMENT_FINISHED` after a `COPY_SEGMENT_STARTED` in the metadata for this topic partition, all the `COPY_SEGMENT_STARTED` segments will be seen as dangling and will be deleted. ########## core/src/test/java/kafka/log/remote/RemoteLogManagerInteractionTest.java: ########## @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import kafka.log.LogTestUtils; +import kafka.log.UnifiedLog; +import kafka.log.remote.quota.InMemoryRemoteLogMetadataManager; +import kafka.log.remote.quota.RLMQuotaManager; +import kafka.server.BrokerTopicStats; +import kafka.server.KafkaConfig; +import kafka.server.RequestLocal; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.AppendOrigin; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogOffsetsListener; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.apache.kafka.storage.internals.log.VerificationGuard; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.time.Duration; +import java.util.HashMap; +import java.util.Optional; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; + +import scala.Option; + +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteLogManagerInteractionTest { + private static final Random RANDOM = new Random(0); + + private KafkaConfig config; + private final String clusterId = "dummyId"; + private final int retentionSize = 10_000_000; + private final File logDir = TestUtils.tempDirectory("kafka-"); + private final File partitionDir = kafka.utils.TestUtils.randomPartitionLogDir(logDir); + private BrokerTopicStats brokerTopicStats; + private final MockTime mockTime = new MockTime(); + private Uuid topicId; + private UnifiedLog log; + private final Metrics metrics = new Metrics(); + + private final RemoteStorageManager remoteStorageManager = Mockito.mock(RemoteStorageManager.class); + private final InMemoryRemoteLogMetadataManager remoteLogMetadataManager = new InMemoryRemoteLogMetadataManager(); + private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class); + private RemoteLogManager remoteLogManager; + + private final String remoteLogStorageTestProp = "remote.log.storage.test"; + private final String remoteLogStorageTestVal = "storage.test"; + + private TopicIdPartition topicIdPartition; + + + @BeforeEach + void setUp() throws Exception { + Properties props = kafka.utils.TestUtils.createDummyBrokerConfig(); + props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); + props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100"); + appendRLMConfig(props); + config = KafkaConfig.fromProps(props); + topicId = Uuid.randomUuid(); + topicIdPartition = new TopicIdPartition(topicId, new TopicPartition("test", 0)); + brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled()); + + HashMap<String, Object> properties = new HashMap<>(); + properties.put("retention.bytes", retentionSize); + properties.put("local.retention.bytes", "1"); + properties.put("remote.storage.enable", "true"); + + LogConfig logConfig = new LogConfig(properties); + log = LogTestUtils.createLog( + partitionDir, logConfig, brokerTopicStats, mockTime.scheduler, mockTime, 0L, 0L, 5 * 60 * 1000, + new ProducerStateManagerConfig(86400000, false), 600000, true, + Option.apply(topicId), true, new ConcurrentHashMap<>(), true, null, LogOffsetsListener.NO_OP_OFFSETS_LISTENER + ); + + remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), 1, logDir.toString(), clusterId, mockTime, + tp -> Optional.of(log), + (topicPartition, offset) -> log.maybeIncrementLogStartOffset(offset, LogStartOffsetIncrementReason.SegmentDeletion), + brokerTopicStats, metrics) { + public RemoteStorageManager createRemoteStorageManager() { + return remoteStorageManager; + } + + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + + public RLMQuotaManager createRLMCopyQuotaManager() { + return rlmCopyQuotaManager; + } + + public Duration quotaTimeout() { + return Duration.ofMillis(100); + } + + @Override + long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) { + return 0L; + } + }; + remoteLogMetadataManager.initialise(topicIdPartition); + remoteLogManager.startup(); + } + + @AfterEach + void cleanup() { + log.close(); + } + + private void appendRLMConfig(Properties props) { + props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true); + props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, NoOpRemoteStorageManager.class.getName()); + props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, NoOpRemoteLogMetadataManager.class.getName()); + props.put(DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX + remoteLogStorageTestProp, remoteLogStorageTestVal); + } + + /** + * Builds a segment of the given size and append it to the log, then roll the segment after. + * Each segment has some overhead of around 70bytes which impact log retention policy. + * + * @param size size of the segment to create + * @param count number of messages to create in the segment + */ + private void buildSegment(int size, int count) { + byte[] bytes = new byte[size / count]; + for (int i = 0; i < count; i++) { + RANDOM.nextBytes(bytes); + MemoryRecords records = MemoryRecords.withRecords( + RecordBatch.MAGIC_VALUE_V2, 0, Compression.NONE, TimestampType.CREATE_TIME, + new SimpleRecord(bytes) + ); + log.appendAsLeader(records, 0, AppendOrigin.CLIENT, + MetadataVersion.latestTesting(), RequestLocal.NoCaching(), VerificationGuard.SENTINEL + ); + } + log.roll(Option.empty()); + } + + /** + * Builds a segment of the given size with a default of 100 messages and append it to the log + * then roll the segment after. + * Each segment has some overhead of around 70bytes which impact log retention policy. + * + * @param size size of the segment to create + */ + private void buildSegment(int size) { + buildSegment(size, 100); + } + + @Test + void nonActiveSegmentsAreCorrectlyWritten() throws RemoteStorageException { + RemoteLogManager.RLMTask task = remoteLogManager.new RLMCopyTask(topicIdPartition, 128); + + buildSegment(retentionSize); + buildSegment(retentionSize); + buildSegment(retentionSize); + log.maybeUpdateHighWatermark(log.logEndOffset()); + + task.run(); + + verify(remoteStorageManager, times(3)).copyLogSegmentData(any(), any()); + } + + @Test + void failedSegmentAreRetried() throws RemoteStorageException { + RemoteLogManager.RLMTask task = remoteLogManager.new RLMCopyTask(topicIdPartition, 128); + + buildSegment(retentionSize); + buildSegment(retentionSize); + buildSegment(retentionSize); + log.maybeUpdateHighWatermark(log.logEndOffset()); + + when(remoteStorageManager.copyLogSegmentData(any(), any())) + .thenThrow(new RemoteStorageException("")) + .thenReturn(Optional.empty()); + + task.run(); + task.run(); + + verify(remoteStorageManager, times(4)).copyLogSegmentData(any(), any()); + } + + @Test + void failedSegmentsAreNotAccountedInSizeForRetention() throws RemoteStorageException { + // We're testing the scenario where the first segment is correctly pushed then the next segment fails to get + // uploaded multiple times before succeeding again. + // At the end we should see no call to delete as retention.bytes was never breached + RemoteLogManager.RLMTask copyTask = remoteLogManager.new RLMCopyTask(topicIdPartition, 128); + RemoteLogManager.RLMTask cleanupTask = remoteLogManager.new RLMExpirationTask(topicIdPartition); + + buildSegment(retentionSize / 3); + log.maybeUpdateHighWatermark(log.logEndOffset()); + when(remoteStorageManager.copyLogSegmentData(any(), any())).thenReturn(Optional.empty()); + copyTask.run(); + + buildSegment(retentionSize / 3); + log.maybeUpdateHighWatermark(log.logEndOffset()); + when(remoteStorageManager.copyLogSegmentData(any(), any())).thenThrow(new RemoteStorageException("")); + for (int i = 0; i < 4; i++) { + copyTask.run(); + } + doReturn(Optional.empty()).when(remoteStorageManager).copyLogSegmentData(any(), any()); + copyTask.run(); + + buildSegment(retentionSize / 3); + log.maybeUpdateHighWatermark(log.logEndOffset()); + copyTask.run(); + cleanupTask.run(); + + // total is 1 (success) + 4 (failures) + 2x1 (success) + verify(remoteStorageManager, times(7)).copyLogSegmentData(any(), any()); + verify(remoteStorageManager, never()).deleteLogSegmentData( + argThat(segment -> segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) + ); + verify(remoteStorageManager, times(4)).deleteLogSegmentData( + argThat(segment -> segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED)) + ); + } + + @Test + void failedSegmentsAreNotAccountedInSizeDuringDeletion() throws RemoteStorageException { + RemoteLogManager.RLMTask copyTask = remoteLogManager.new RLMCopyTask(topicIdPartition, 128); + RemoteLogManager.RLMTask cleanupTask = remoteLogManager.new RLMExpirationTask(topicIdPartition); + + buildSegment(retentionSize - 100); + log.maybeUpdateHighWatermark(log.logEndOffset()); + when(remoteStorageManager.copyLogSegmentData(any(), any())).thenThrow(new RemoteStorageException("")); + for (int i = 0; i < 4; i++) { + copyTask.run(); + } + doReturn(Optional.empty()).when(remoteStorageManager).copyLogSegmentData(any(), any()); + copyTask.run(); + + buildSegment(retentionSize - 100); + log.maybeUpdateHighWatermark(log.logEndOffset()); + copyTask.run(); + cleanupTask.run(); + + // total is 1 (success) + 4 (failures) + 1 (success) + verify(remoteStorageManager, times(6)).copyLogSegmentData(any(), any()); + + // The first segment should now be deleted + verify(remoteStorageManager, times(5)).deleteLogSegmentData( + argThat(segment -> segment.startOffset() == 0) + ); Review Comment: ``` when(remoteStorageManager.copyLogSegmentData(any(), any())).thenThrow(new RemoteStorageException("")); for (int i = 0; i < 4; i++) { copyTask.run(); } ``` This portion will have created 4 failed `COPY_SEGMENT_STARTED` segment and we have a successful run after. So in total we have 5 segments metadata referring to the first segment. All those segments are meant for deletion due to `retention.bytes` policy as we pushed a second segment after it, which is what is causing the 4 failures + 1 success to be deleted. ########## core/src/test/java/kafka/log/remote/RemoteLogManagerInteractionTest.java: ########## @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import kafka.log.LogTestUtils; +import kafka.log.UnifiedLog; +import kafka.log.remote.quota.InMemoryRemoteLogMetadataManager; +import kafka.log.remote.quota.RLMQuotaManager; +import kafka.server.BrokerTopicStats; +import kafka.server.KafkaConfig; +import kafka.server.RequestLocal; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.AppendOrigin; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogOffsetsListener; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.apache.kafka.storage.internals.log.VerificationGuard; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.time.Duration; +import java.util.HashMap; +import java.util.Optional; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; + +import scala.Option; + +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteLogManagerInteractionTest { Review Comment: The idea behind this class is avoid mocking all calls to the unifiedLog and the RLMM as those mocks can easily become quite long (see [RemoteLogManagerTest](https://github.com/apache/kafka/blob/0eaaff88cf68bc2c24d4874ff9bc1cc2b493c24b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java#L3035) ) and because I was mostly testing subsequent calls between copy and cleanup I think this is making the test more clear when reading it rather than having more mocks between each calls. ########## core/src/test/java/kafka/log/remote/quota/InMemoryRemoteLogMetadataManager.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; Review Comment: Good point, I am going to move it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org