showuon commented on code in PR #16681: URL: https://github.com/apache/kafka/pull/16681#discussion_r1715153062
########## core/src/test/java/kafka/log/remote/quota/InMemoryRemoteLogMetadataManager.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; Review Comment: Why do we put this class under remote quota? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1332,6 +1338,33 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE } } + /** + * Get a set of all leaders existing in the remote storage Review Comment: nit: all leader [epochs] ########## core/src/test/java/kafka/log/remote/RemoteLogManagerInteractionTest.java: ########## @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import kafka.log.LogTestUtils; +import kafka.log.UnifiedLog; +import kafka.log.remote.quota.InMemoryRemoteLogMetadataManager; +import kafka.log.remote.quota.RLMQuotaManager; +import kafka.server.BrokerTopicStats; +import kafka.server.KafkaConfig; +import kafka.server.RequestLocal; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.AppendOrigin; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogOffsetsListener; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.apache.kafka.storage.internals.log.VerificationGuard; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.time.Duration; +import java.util.HashMap; +import java.util.Optional; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; + +import scala.Option; + +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteLogManagerInteractionTest { + private static final Random RANDOM = new Random(0); + + private KafkaConfig config; + private final String clusterId = "dummyId"; + private final int retentionSize = 10_000_000; + private final File logDir = TestUtils.tempDirectory("kafka-"); + private final File partitionDir = kafka.utils.TestUtils.randomPartitionLogDir(logDir); + private BrokerTopicStats brokerTopicStats; + private final MockTime mockTime = new MockTime(); + private Uuid topicId; + private UnifiedLog log; + private final Metrics metrics = new Metrics(); + + private final RemoteStorageManager remoteStorageManager = Mockito.mock(RemoteStorageManager.class); + private final InMemoryRemoteLogMetadataManager remoteLogMetadataManager = new InMemoryRemoteLogMetadataManager(); + private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class); + private RemoteLogManager remoteLogManager; + + private final String remoteLogStorageTestProp = "remote.log.storage.test"; + private final String remoteLogStorageTestVal = "storage.test"; + + private TopicIdPartition topicIdPartition; + + + @BeforeEach + void setUp() throws Exception { + Properties props = kafka.utils.TestUtils.createDummyBrokerConfig(); + props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); + props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100"); + appendRLMConfig(props); + config = KafkaConfig.fromProps(props); + topicId = Uuid.randomUuid(); + topicIdPartition = new TopicIdPartition(topicId, new TopicPartition("test", 0)); + brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled()); + + HashMap<String, Object> properties = new HashMap<>(); + properties.put("retention.bytes", retentionSize); + properties.put("local.retention.bytes", "1"); + properties.put("remote.storage.enable", "true"); + + LogConfig logConfig = new LogConfig(properties); + log = LogTestUtils.createLog( + partitionDir, logConfig, brokerTopicStats, mockTime.scheduler, mockTime, 0L, 0L, 5 * 60 * 1000, + new ProducerStateManagerConfig(86400000, false), 600000, true, + Option.apply(topicId), true, new ConcurrentHashMap<>(), true, null, LogOffsetsListener.NO_OP_OFFSETS_LISTENER + ); + + remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), 1, logDir.toString(), clusterId, mockTime, + tp -> Optional.of(log), + (topicPartition, offset) -> log.maybeIncrementLogStartOffset(offset, LogStartOffsetIncrementReason.SegmentDeletion), + brokerTopicStats, metrics) { + public RemoteStorageManager createRemoteStorageManager() { + return remoteStorageManager; + } + + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + + public RLMQuotaManager createRLMCopyQuotaManager() { + return rlmCopyQuotaManager; + } + + public Duration quotaTimeout() { + return Duration.ofMillis(100); + } + + @Override + long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) { + return 0L; + } + }; + remoteLogMetadataManager.initialise(topicIdPartition); + remoteLogManager.startup(); + } + + @AfterEach + void cleanup() { + log.close(); + } + + private void appendRLMConfig(Properties props) { + props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true); + props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, NoOpRemoteStorageManager.class.getName()); + props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, NoOpRemoteLogMetadataManager.class.getName()); + props.put(DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX + remoteLogStorageTestProp, remoteLogStorageTestVal); + } + + /** + * Builds a segment of the given size and append it to the log, then roll the segment after. + * Each segment has some overhead of around 70bytes which impact log retention policy. + * + * @param size size of the segment to create + * @param count number of messages to create in the segment + */ + private void buildSegment(int size, int count) { + byte[] bytes = new byte[size / count]; + for (int i = 0; i < count; i++) { + RANDOM.nextBytes(bytes); + MemoryRecords records = MemoryRecords.withRecords( + RecordBatch.MAGIC_VALUE_V2, 0, Compression.NONE, TimestampType.CREATE_TIME, + new SimpleRecord(bytes) + ); + log.appendAsLeader(records, 0, AppendOrigin.CLIENT, + MetadataVersion.latestTesting(), RequestLocal.NoCaching(), VerificationGuard.SENTINEL + ); + } + log.roll(Option.empty()); + } + + /** + * Builds a segment of the given size with a default of 100 messages and append it to the log + * then roll the segment after. + * Each segment has some overhead of around 70bytes which impact log retention policy. + * + * @param size size of the segment to create + */ + private void buildSegment(int size) { + buildSegment(size, 100); + } + + @Test + void nonActiveSegmentsAreCorrectlyWritten() throws RemoteStorageException { + RemoteLogManager.RLMTask task = remoteLogManager.new RLMCopyTask(topicIdPartition, 128); + + buildSegment(retentionSize); + buildSegment(retentionSize); + buildSegment(retentionSize); + log.maybeUpdateHighWatermark(log.logEndOffset()); + + task.run(); + + verify(remoteStorageManager, times(3)).copyLogSegmentData(any(), any()); + } + + @Test + void failedSegmentAreRetried() throws RemoteStorageException { + RemoteLogManager.RLMTask task = remoteLogManager.new RLMCopyTask(topicIdPartition, 128); + + buildSegment(retentionSize); + buildSegment(retentionSize); + buildSegment(retentionSize); + log.maybeUpdateHighWatermark(log.logEndOffset()); + + when(remoteStorageManager.copyLogSegmentData(any(), any())) + .thenThrow(new RemoteStorageException("")) + .thenReturn(Optional.empty()); + + task.run(); + task.run(); + + verify(remoteStorageManager, times(4)).copyLogSegmentData(any(), any()); + } + + @Test + void failedSegmentsAreNotAccountedInSizeForRetention() throws RemoteStorageException { + // We're testing the scenario where the first segment is correctly pushed then the next segment fails to get + // uploaded multiple times before succeeding again. + // At the end we should see no call to delete as retention.bytes was never breached + RemoteLogManager.RLMTask copyTask = remoteLogManager.new RLMCopyTask(topicIdPartition, 128); + RemoteLogManager.RLMTask cleanupTask = remoteLogManager.new RLMExpirationTask(topicIdPartition); + + buildSegment(retentionSize / 3); + log.maybeUpdateHighWatermark(log.logEndOffset()); + when(remoteStorageManager.copyLogSegmentData(any(), any())).thenReturn(Optional.empty()); + copyTask.run(); + + buildSegment(retentionSize / 3); + log.maybeUpdateHighWatermark(log.logEndOffset()); + when(remoteStorageManager.copyLogSegmentData(any(), any())).thenThrow(new RemoteStorageException("")); + for (int i = 0; i < 4; i++) { + copyTask.run(); + } + doReturn(Optional.empty()).when(remoteStorageManager).copyLogSegmentData(any(), any()); + copyTask.run(); + + buildSegment(retentionSize / 3); + log.maybeUpdateHighWatermark(log.logEndOffset()); + copyTask.run(); + cleanupTask.run(); + + // total is 1 (success) + 4 (failures) + 2x1 (success) + verify(remoteStorageManager, times(7)).copyLogSegmentData(any(), any()); + verify(remoteStorageManager, never()).deleteLogSegmentData( + argThat(segment -> segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) + ); + verify(remoteStorageManager, times(4)).deleteLogSegmentData( + argThat(segment -> segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED)) + ); Review Comment: Could you help me understand why we call 4 times of `deleteLogSegmentData` for `COPY_SEGMENT_STARTED` here? Maybe add the explanation as a comment above the line. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1209,60 +1195,80 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE long logStartOffset = log.logStartOffset(); long logEndOffset = log.logEndOffset(); - Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(log.config().retentionSize, - log.onlyLocalLogSegmentsSize(), logEndOffset, epochWithOffsets); - Optional<RetentionTimeData> retentionTimeData = buildRetentionTimeData(log.config().retentionMs); - - RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData); - Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator(); - boolean canProcess = true; - List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>(); long sizeOfDeletableSegmentsBytes = 0L; - while (canProcess && epochIterator.hasNext()) { - Integer epoch = epochIterator.next(); - Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); - while (canProcess && segmentsIterator.hasNext()) { - if (isCancelled()) { - logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed."); - return; - } - RemoteLogSegmentMetadata metadata = segmentsIterator.next(); - if (segmentIdsBeingCopied.contains(metadata.remoteLogSegmentId())) { - logger.debug("Copy for the segment {} is currently in process. Skipping cleanup for it and the remaining segments", - metadata.remoteLogSegmentId()); - canProcess = false; - continue; - } - if (RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) { + final List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>(); + final List<RemoteLogSegmentMetadata> validSegments = new ArrayList<>(); + for (Integer remoteLeaderEpoch: epochWithOffsets.navigableKeySet()) { + Iterator<RemoteLogSegmentMetadata> it = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, remoteLeaderEpoch); + while (it.hasNext()) { + final RemoteLogSegmentMetadata segment = it.next(); + + // We can remove all segments in COPY_SEGMENT_STARTED but the last one as they are dangling + if (segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED) && it.hasNext()) { Review Comment: So this is based on each `RemoteLogSegmentMetadata` is different even if it is the same log segment retry, right? So, if there is only `COPY_SEGMENT_STARTED` and has no next state, it must be a dangling one, right? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1209,60 +1195,80 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE long logStartOffset = log.logStartOffset(); long logEndOffset = log.logEndOffset(); - Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(log.config().retentionSize, - log.onlyLocalLogSegmentsSize(), logEndOffset, epochWithOffsets); - Optional<RetentionTimeData> retentionTimeData = buildRetentionTimeData(log.config().retentionMs); - - RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData); - Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator(); - boolean canProcess = true; - List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>(); long sizeOfDeletableSegmentsBytes = 0L; - while (canProcess && epochIterator.hasNext()) { - Integer epoch = epochIterator.next(); - Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); - while (canProcess && segmentsIterator.hasNext()) { - if (isCancelled()) { - logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed."); - return; - } - RemoteLogSegmentMetadata metadata = segmentsIterator.next(); - if (segmentIdsBeingCopied.contains(metadata.remoteLogSegmentId())) { - logger.debug("Copy for the segment {} is currently in process. Skipping cleanup for it and the remaining segments", - metadata.remoteLogSegmentId()); - canProcess = false; - continue; - } - if (RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) { + final List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>(); + final List<RemoteLogSegmentMetadata> validSegments = new ArrayList<>(); + for (Integer remoteLeaderEpoch: epochWithOffsets.navigableKeySet()) { + Iterator<RemoteLogSegmentMetadata> it = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, remoteLeaderEpoch); + while (it.hasNext()) { + final RemoteLogSegmentMetadata segment = it.next(); + + // We can remove all segments in COPY_SEGMENT_STARTED but the last one as they are dangling + if (segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED) && it.hasNext()) { + sizeOfDeletableSegmentsBytes += segment.segmentSizeInBytes(); + segmentsToDelete.add(segment); continue; } - if (segmentsToDelete.contains(metadata)) { + + if (segment.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_STARTED)) { + segmentsToDelete.add(segment); + sizeOfDeletableSegmentsBytes += segment.segmentSizeInBytes(); continue; } Review Comment: We will check `COPY_SEGMENT_STARTED` and `DELETE_SEGMENT_STARTED` here, but `DELETE_SEGMENT_FINISHED` in L1244. I don't think it's readable. Could you make it clear? ########## core/src/test/java/kafka/log/remote/RemoteLogManagerInteractionTest.java: ########## @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import kafka.log.LogTestUtils; +import kafka.log.UnifiedLog; +import kafka.log.remote.quota.InMemoryRemoteLogMetadataManager; +import kafka.log.remote.quota.RLMQuotaManager; +import kafka.server.BrokerTopicStats; +import kafka.server.KafkaConfig; +import kafka.server.RequestLocal; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.AppendOrigin; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogOffsetsListener; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.apache.kafka.storage.internals.log.VerificationGuard; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.time.Duration; +import java.util.HashMap; +import java.util.Optional; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; + +import scala.Option; + +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteLogManagerInteractionTest { Review Comment: I think these tests can be put inside `RemoteLogManagerTest`, without creating a new `RemoteLogManagerInteractionTest` class. Basically we're not doing integration tests here. ########## core/src/test/java/kafka/log/remote/RemoteLogManagerInteractionTest.java: ########## @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import kafka.log.LogTestUtils; +import kafka.log.UnifiedLog; +import kafka.log.remote.quota.InMemoryRemoteLogMetadataManager; +import kafka.log.remote.quota.RLMQuotaManager; +import kafka.server.BrokerTopicStats; +import kafka.server.KafkaConfig; +import kafka.server.RequestLocal; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.AppendOrigin; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogOffsetsListener; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; +import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig; +import org.apache.kafka.storage.internals.log.VerificationGuard; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.time.Duration; +import java.util.HashMap; +import java.util.Optional; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; + +import scala.Option; + +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteLogManagerInteractionTest { + private static final Random RANDOM = new Random(0); + + private KafkaConfig config; + private final String clusterId = "dummyId"; + private final int retentionSize = 10_000_000; + private final File logDir = TestUtils.tempDirectory("kafka-"); + private final File partitionDir = kafka.utils.TestUtils.randomPartitionLogDir(logDir); + private BrokerTopicStats brokerTopicStats; + private final MockTime mockTime = new MockTime(); + private Uuid topicId; + private UnifiedLog log; + private final Metrics metrics = new Metrics(); + + private final RemoteStorageManager remoteStorageManager = Mockito.mock(RemoteStorageManager.class); + private final InMemoryRemoteLogMetadataManager remoteLogMetadataManager = new InMemoryRemoteLogMetadataManager(); + private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class); + private RemoteLogManager remoteLogManager; + + private final String remoteLogStorageTestProp = "remote.log.storage.test"; + private final String remoteLogStorageTestVal = "storage.test"; + + private TopicIdPartition topicIdPartition; + + + @BeforeEach + void setUp() throws Exception { + Properties props = kafka.utils.TestUtils.createDummyBrokerConfig(); + props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); + props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100"); + appendRLMConfig(props); + config = KafkaConfig.fromProps(props); + topicId = Uuid.randomUuid(); + topicIdPartition = new TopicIdPartition(topicId, new TopicPartition("test", 0)); + brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled()); + + HashMap<String, Object> properties = new HashMap<>(); + properties.put("retention.bytes", retentionSize); + properties.put("local.retention.bytes", "1"); + properties.put("remote.storage.enable", "true"); + + LogConfig logConfig = new LogConfig(properties); + log = LogTestUtils.createLog( + partitionDir, logConfig, brokerTopicStats, mockTime.scheduler, mockTime, 0L, 0L, 5 * 60 * 1000, + new ProducerStateManagerConfig(86400000, false), 600000, true, + Option.apply(topicId), true, new ConcurrentHashMap<>(), true, null, LogOffsetsListener.NO_OP_OFFSETS_LISTENER + ); + + remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), 1, logDir.toString(), clusterId, mockTime, + tp -> Optional.of(log), + (topicPartition, offset) -> log.maybeIncrementLogStartOffset(offset, LogStartOffsetIncrementReason.SegmentDeletion), + brokerTopicStats, metrics) { + public RemoteStorageManager createRemoteStorageManager() { + return remoteStorageManager; + } + + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + + public RLMQuotaManager createRLMCopyQuotaManager() { + return rlmCopyQuotaManager; + } + + public Duration quotaTimeout() { + return Duration.ofMillis(100); + } + + @Override + long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) { + return 0L; + } + }; + remoteLogMetadataManager.initialise(topicIdPartition); + remoteLogManager.startup(); + } + + @AfterEach + void cleanup() { + log.close(); + } + + private void appendRLMConfig(Properties props) { + props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true); + props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, NoOpRemoteStorageManager.class.getName()); + props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, NoOpRemoteLogMetadataManager.class.getName()); + props.put(DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX + remoteLogStorageTestProp, remoteLogStorageTestVal); + } + + /** + * Builds a segment of the given size and append it to the log, then roll the segment after. + * Each segment has some overhead of around 70bytes which impact log retention policy. + * + * @param size size of the segment to create + * @param count number of messages to create in the segment + */ + private void buildSegment(int size, int count) { + byte[] bytes = new byte[size / count]; + for (int i = 0; i < count; i++) { + RANDOM.nextBytes(bytes); + MemoryRecords records = MemoryRecords.withRecords( + RecordBatch.MAGIC_VALUE_V2, 0, Compression.NONE, TimestampType.CREATE_TIME, + new SimpleRecord(bytes) + ); + log.appendAsLeader(records, 0, AppendOrigin.CLIENT, + MetadataVersion.latestTesting(), RequestLocal.NoCaching(), VerificationGuard.SENTINEL + ); + } + log.roll(Option.empty()); + } + + /** + * Builds a segment of the given size with a default of 100 messages and append it to the log + * then roll the segment after. + * Each segment has some overhead of around 70bytes which impact log retention policy. + * + * @param size size of the segment to create + */ + private void buildSegment(int size) { + buildSegment(size, 100); + } + + @Test + void nonActiveSegmentsAreCorrectlyWritten() throws RemoteStorageException { + RemoteLogManager.RLMTask task = remoteLogManager.new RLMCopyTask(topicIdPartition, 128); + + buildSegment(retentionSize); + buildSegment(retentionSize); + buildSegment(retentionSize); + log.maybeUpdateHighWatermark(log.logEndOffset()); + + task.run(); + + verify(remoteStorageManager, times(3)).copyLogSegmentData(any(), any()); + } + + @Test + void failedSegmentAreRetried() throws RemoteStorageException { + RemoteLogManager.RLMTask task = remoteLogManager.new RLMCopyTask(topicIdPartition, 128); + + buildSegment(retentionSize); + buildSegment(retentionSize); + buildSegment(retentionSize); + log.maybeUpdateHighWatermark(log.logEndOffset()); + + when(remoteStorageManager.copyLogSegmentData(any(), any())) + .thenThrow(new RemoteStorageException("")) + .thenReturn(Optional.empty()); + + task.run(); + task.run(); + + verify(remoteStorageManager, times(4)).copyLogSegmentData(any(), any()); + } + + @Test + void failedSegmentsAreNotAccountedInSizeForRetention() throws RemoteStorageException { + // We're testing the scenario where the first segment is correctly pushed then the next segment fails to get + // uploaded multiple times before succeeding again. + // At the end we should see no call to delete as retention.bytes was never breached + RemoteLogManager.RLMTask copyTask = remoteLogManager.new RLMCopyTask(topicIdPartition, 128); + RemoteLogManager.RLMTask cleanupTask = remoteLogManager.new RLMExpirationTask(topicIdPartition); + + buildSegment(retentionSize / 3); + log.maybeUpdateHighWatermark(log.logEndOffset()); + when(remoteStorageManager.copyLogSegmentData(any(), any())).thenReturn(Optional.empty()); + copyTask.run(); + + buildSegment(retentionSize / 3); + log.maybeUpdateHighWatermark(log.logEndOffset()); + when(remoteStorageManager.copyLogSegmentData(any(), any())).thenThrow(new RemoteStorageException("")); + for (int i = 0; i < 4; i++) { + copyTask.run(); + } + doReturn(Optional.empty()).when(remoteStorageManager).copyLogSegmentData(any(), any()); + copyTask.run(); + + buildSegment(retentionSize / 3); + log.maybeUpdateHighWatermark(log.logEndOffset()); + copyTask.run(); + cleanupTask.run(); + + // total is 1 (success) + 4 (failures) + 2x1 (success) + verify(remoteStorageManager, times(7)).copyLogSegmentData(any(), any()); + verify(remoteStorageManager, never()).deleteLogSegmentData( + argThat(segment -> segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) + ); + verify(remoteStorageManager, times(4)).deleteLogSegmentData( + argThat(segment -> segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED)) + ); + } + + @Test + void failedSegmentsAreNotAccountedInSizeDuringDeletion() throws RemoteStorageException { + RemoteLogManager.RLMTask copyTask = remoteLogManager.new RLMCopyTask(topicIdPartition, 128); + RemoteLogManager.RLMTask cleanupTask = remoteLogManager.new RLMExpirationTask(topicIdPartition); + + buildSegment(retentionSize - 100); + log.maybeUpdateHighWatermark(log.logEndOffset()); + when(remoteStorageManager.copyLogSegmentData(any(), any())).thenThrow(new RemoteStorageException("")); + for (int i = 0; i < 4; i++) { + copyTask.run(); + } + doReturn(Optional.empty()).when(remoteStorageManager).copyLogSegmentData(any(), any()); + copyTask.run(); + + buildSegment(retentionSize - 100); + log.maybeUpdateHighWatermark(log.logEndOffset()); + copyTask.run(); + cleanupTask.run(); + + // total is 1 (success) + 4 (failures) + 1 (success) + verify(remoteStorageManager, times(6)).copyLogSegmentData(any(), any()); + + // The first segment should now be deleted + verify(remoteStorageManager, times(5)).deleteLogSegmentData( + argThat(segment -> segment.startOffset() == 0) + ); Review Comment: Could you help me understand why we call 5 times of deleteLogSegmentData here? Maybe add the explanation as a comment above the line. Thanks. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1209,60 +1195,80 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE long logStartOffset = log.logStartOffset(); long logEndOffset = log.logEndOffset(); - Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(log.config().retentionSize, - log.onlyLocalLogSegmentsSize(), logEndOffset, epochWithOffsets); - Optional<RetentionTimeData> retentionTimeData = buildRetentionTimeData(log.config().retentionMs); - - RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData); - Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator(); - boolean canProcess = true; - List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>(); long sizeOfDeletableSegmentsBytes = 0L; - while (canProcess && epochIterator.hasNext()) { - Integer epoch = epochIterator.next(); - Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); - while (canProcess && segmentsIterator.hasNext()) { - if (isCancelled()) { - logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed."); - return; - } - RemoteLogSegmentMetadata metadata = segmentsIterator.next(); - if (segmentIdsBeingCopied.contains(metadata.remoteLogSegmentId())) { - logger.debug("Copy for the segment {} is currently in process. Skipping cleanup for it and the remaining segments", - metadata.remoteLogSegmentId()); - canProcess = false; - continue; - } - if (RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) { + final List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>(); + final List<RemoteLogSegmentMetadata> validSegments = new ArrayList<>(); + for (Integer remoteLeaderEpoch: epochWithOffsets.navigableKeySet()) { + Iterator<RemoteLogSegmentMetadata> it = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, remoteLeaderEpoch); + while (it.hasNext()) { + final RemoteLogSegmentMetadata segment = it.next(); + + // We can remove all segments in COPY_SEGMENT_STARTED but the last one as they are dangling + if (segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED) && it.hasNext()) { + sizeOfDeletableSegmentsBytes += segment.segmentSizeInBytes(); + segmentsToDelete.add(segment); continue; } - if (segmentsToDelete.contains(metadata)) { + + if (segment.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_STARTED)) { + segmentsToDelete.add(segment); + sizeOfDeletableSegmentsBytes += segment.segmentSizeInBytes(); continue; } Review Comment: This is the main fix, right? Before this fix, we will add all `COPY_SEGMENT_STARTED` segments into validSegments, including retried ones. But here we will filter them out, right? ########## core/src/test/java/kafka/log/remote/quota/InMemoryRemoteLogMetadataManager.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +public class InMemoryRemoteLogMetadataManager implements RemoteLogMetadataManager { Review Comment: I don't know if this is really needed because we can test using mock. But I'm fine we added it since it makes tests in this PR easier. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1341,24 +1374,24 @@ private Optional<RetentionTimeData> buildRetentionTimeData(long retentionMs) { private Optional<RetentionSizeData> buildRetentionSizeData(long retentionSize, long onlyLocalLogSegmentsSize, long logEndOffset, - NavigableMap<Integer, Long> epochEntries) throws RemoteStorageException { + NavigableMap<Integer, Long> epochEntries, + List<RemoteLogSegmentMetadata> segments + ) { if (retentionSize > -1) { long startTimeMs = time.milliseconds(); long remoteLogSizeBytes = 0L; Set<RemoteLogSegmentId> visitedSegmentIds = new HashSet<>(); - for (Integer epoch : epochEntries.navigableKeySet()) { - // remoteLogSize(topicIdPartition, epochEntry.epoch) may not be completely accurate as the remote - // log size may be computed for all the segments but not for segments with in the current - // partition's leader epoch lineage. Better to revisit this API. - // remoteLogSizeBytes += remoteLogMetadataManager.remoteLogSize(topicIdPartition, epochEntry.epoch); - Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); - while (segmentsIterator.hasNext()) { - RemoteLogSegmentMetadata segmentMetadata = segmentsIterator.next(); - RemoteLogSegmentId segmentId = segmentMetadata.remoteLogSegmentId(); - if (!visitedSegmentIds.contains(segmentId) && isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries)) { - remoteLogSizeBytes += segmentMetadata.segmentSizeInBytes(); - visitedSegmentIds.add(segmentId); - } + // remoteLogSize(topicIdPartition, epochEntry.epoch) may not be completely accurate as the remote + // log size may be computed for all the segments but not for segments with in the current + // partition's leader epoch lineage. Better to revisit this API. + // remoteLogSizeBytes += remoteLogMetadataManager.remoteLogSize(topicIdPartition, epochEntry.epoch); Review Comment: If you think there's a potential bug existed, please file a JIRA for future improvement. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org