malletgu commented on code in PR #16681:
URL: https://github.com/apache/kafka/pull/16681#discussion_r1724975609


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1209,60 +1195,80 @@ void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionE
 
             long logStartOffset = log.logStartOffset();
             long logEndOffset = log.logEndOffset();
-            Optional<RetentionSizeData> retentionSizeData = 
buildRetentionSizeData(log.config().retentionSize,
-                    log.onlyLocalLogSegmentsSize(), logEndOffset, 
epochWithOffsets);
-            Optional<RetentionTimeData> retentionTimeData = 
buildRetentionTimeData(log.config().retentionMs);
-
-            RemoteLogRetentionHandler remoteLogRetentionHandler = new 
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
-            Iterator<Integer> epochIterator = 
epochWithOffsets.navigableKeySet().iterator();
-            boolean canProcess = true;
-            List<RemoteLogSegmentMetadata> segmentsToDelete = new 
ArrayList<>();
             long sizeOfDeletableSegmentsBytes = 0L;
-            while (canProcess && epochIterator.hasNext()) {
-                Integer epoch = epochIterator.next();
-                Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
-                while (canProcess && segmentsIterator.hasNext()) {
-                    if (isCancelled()) {
-                        logger.info("Returning from remote log segments 
cleanup for the remaining segments as the task state is changed.");
-                        return;
-                    }
-                    RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
 
-                    if 
(segmentIdsBeingCopied.contains(metadata.remoteLogSegmentId())) {
-                        logger.debug("Copy for the segment {} is currently in 
process. Skipping cleanup for it and the remaining segments",
-                                metadata.remoteLogSegmentId());
-                        canProcess = false;
-                        continue;
-                    }
-                    if 
(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) {
+            final List<RemoteLogSegmentMetadata> segmentsToDelete = new 
ArrayList<>();
+            final List<RemoteLogSegmentMetadata> validSegments = new 
ArrayList<>();
+            for (Integer remoteLeaderEpoch: 
epochWithOffsets.navigableKeySet()) {
+                Iterator<RemoteLogSegmentMetadata> it = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, 
remoteLeaderEpoch);
+                while (it.hasNext()) {
+                    final RemoteLogSegmentMetadata segment = it.next();
+
+                    // We can remove all segments in COPY_SEGMENT_STARTED but 
the last one as they are dangling
+                    if 
(segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED) && 
it.hasNext()) {
+                        sizeOfDeletableSegmentsBytes += 
segment.segmentSizeInBytes();
+                        segmentsToDelete.add(segment);
                         continue;
                     }
-                    if (segmentsToDelete.contains(metadata)) {
+
+                    if 
(segment.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_STARTED)) {
+                        segmentsToDelete.add(segment);
+                        sizeOfDeletableSegmentsBytes += 
segment.segmentSizeInBytes();
                         continue;
                     }

Review Comment:
   Yes this is correct. Avoiding passing those segments when computing 
`buildRetentionSizeData` after is the most important thing and before we used 
to do it.
   
   > We will check COPY_SEGMENT_STARTED and DELETE_SEGMENT_STARTED here, but 
DELETE_SEGMENT_FINISHED in L1244. I don't think it's readable. Could you make 
it clear?
   
   I'll refactor this 



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1209,60 +1195,80 @@ void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionE
 
             long logStartOffset = log.logStartOffset();
             long logEndOffset = log.logEndOffset();
-            Optional<RetentionSizeData> retentionSizeData = 
buildRetentionSizeData(log.config().retentionSize,
-                    log.onlyLocalLogSegmentsSize(), logEndOffset, 
epochWithOffsets);
-            Optional<RetentionTimeData> retentionTimeData = 
buildRetentionTimeData(log.config().retentionMs);
-
-            RemoteLogRetentionHandler remoteLogRetentionHandler = new 
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
-            Iterator<Integer> epochIterator = 
epochWithOffsets.navigableKeySet().iterator();
-            boolean canProcess = true;
-            List<RemoteLogSegmentMetadata> segmentsToDelete = new 
ArrayList<>();
             long sizeOfDeletableSegmentsBytes = 0L;
-            while (canProcess && epochIterator.hasNext()) {
-                Integer epoch = epochIterator.next();
-                Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
-                while (canProcess && segmentsIterator.hasNext()) {
-                    if (isCancelled()) {
-                        logger.info("Returning from remote log segments 
cleanup for the remaining segments as the task state is changed.");
-                        return;
-                    }
-                    RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
 
-                    if 
(segmentIdsBeingCopied.contains(metadata.remoteLogSegmentId())) {
-                        logger.debug("Copy for the segment {} is currently in 
process. Skipping cleanup for it and the remaining segments",
-                                metadata.remoteLogSegmentId());
-                        canProcess = false;
-                        continue;
-                    }
-                    if 
(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) {
+            final List<RemoteLogSegmentMetadata> segmentsToDelete = new 
ArrayList<>();
+            final List<RemoteLogSegmentMetadata> validSegments = new 
ArrayList<>();
+            for (Integer remoteLeaderEpoch: 
epochWithOffsets.navigableKeySet()) {
+                Iterator<RemoteLogSegmentMetadata> it = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, 
remoteLeaderEpoch);
+                while (it.hasNext()) {
+                    final RemoteLogSegmentMetadata segment = it.next();
+
+                    // We can remove all segments in COPY_SEGMENT_STARTED but 
the last one as they are dangling
+                    if 
(segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED) && 
it.hasNext()) {

Review Comment:
   Correct, it is based on each `RemoteLogSegmentMetadata` being different.
    
   > if there is only COPY_SEGMENT_STARTED and has no next state, it must be a 
dangling one, right
   We used to have this assumption, but now that the copy and cleanup happens 
in parallel we could get a segment in `COPY_SEGMENT_STARTED` because it is 
actually being copied.
   
   This is also why I avoid doing 
`remoteLogMetadataManager.listRemoteLogSegments` again as this list of segments 
could change between consecutive calls during this function.



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerInteractionTest.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import kafka.log.LogTestUtils;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.quota.InMemoryRemoteLogMetadataManager;
+import kafka.log.remote.quota.RLMQuotaManager;
+import kafka.server.BrokerTopicStats;
+import kafka.server.KafkaConfig;
+import kafka.server.RequestLocal;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.AppendOrigin;
+import org.apache.kafka.storage.internals.log.LogConfig;
+import org.apache.kafka.storage.internals.log.LogOffsetsListener;
+import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+import org.apache.kafka.storage.internals.log.VerificationGuard;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import scala.Option;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteLogManagerInteractionTest {
+    private static final Random RANDOM = new Random(0);
+
+    private KafkaConfig config;
+    private final String clusterId = "dummyId";
+    private final int retentionSize = 10_000_000;
+    private final File logDir = TestUtils.tempDirectory("kafka-");
+    private final File partitionDir = 
kafka.utils.TestUtils.randomPartitionLogDir(logDir);
+    private BrokerTopicStats brokerTopicStats;
+    private final MockTime mockTime = new MockTime();
+    private Uuid topicId;
+    private UnifiedLog log;
+    private final Metrics metrics = new Metrics();
+
+    private final RemoteStorageManager remoteStorageManager = 
Mockito.mock(RemoteStorageManager.class);
+    private final InMemoryRemoteLogMetadataManager remoteLogMetadataManager = 
new InMemoryRemoteLogMetadataManager();
+    private final RLMQuotaManager rlmCopyQuotaManager = 
mock(RLMQuotaManager.class);
+    private RemoteLogManager remoteLogManager;
+
+    private final String remoteLogStorageTestProp = "remote.log.storage.test";
+    private final String remoteLogStorageTestVal = "storage.test";
+
+    private TopicIdPartition topicIdPartition;
+
+
+    @BeforeEach
+    void setUp() throws Exception {
+        Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
+        
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true");
+        
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "100");
+        appendRLMConfig(props);
+        config = KafkaConfig.fromProps(props);
+        topicId = Uuid.randomUuid();
+        topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition("test", 0));
+        brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled());
+
+        HashMap<String, Object> properties = new HashMap<>();
+        properties.put("retention.bytes", retentionSize);
+        properties.put("local.retention.bytes", "1");
+        properties.put("remote.storage.enable", "true");
+
+        LogConfig logConfig = new LogConfig(properties);
+        log = LogTestUtils.createLog(
+            partitionDir, logConfig, brokerTopicStats, mockTime.scheduler, 
mockTime, 0L, 0L, 5 * 60 * 1000,
+            new ProducerStateManagerConfig(86400000, false), 600000, true,
+            Option.apply(topicId), true, new ConcurrentHashMap<>(), true, 
null, LogOffsetsListener.NO_OP_OFFSETS_LISTENER
+        );
+
+        remoteLogManager = new 
RemoteLogManager(config.remoteLogManagerConfig(), 1, logDir.toString(), 
clusterId, mockTime,
+            tp -> Optional.of(log),
+            (topicPartition, offset) -> 
log.maybeIncrementLogStartOffset(offset, 
LogStartOffsetIncrementReason.SegmentDeletion),
+            brokerTopicStats, metrics) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return remoteStorageManager;
+            }
+
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+
+            public RLMQuotaManager createRLMCopyQuotaManager() {
+                return rlmCopyQuotaManager;
+            }
+
+            public Duration quotaTimeout() {
+                return Duration.ofMillis(100);
+            }
+
+            @Override
+            long findLogStartOffset(TopicIdPartition topicIdPartition, 
UnifiedLog log) {
+                return 0L;
+            }
+        };
+        remoteLogMetadataManager.initialise(topicIdPartition);
+        remoteLogManager.startup();
+    }
+
+    @AfterEach
+    void cleanup() {
+        log.close();
+    }
+
+    private void appendRLMConfig(Properties props) {
+        
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
+        
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, 
NoOpRemoteStorageManager.class.getName());
+        
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, 
NoOpRemoteLogMetadataManager.class.getName());
+        props.put(DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX + 
remoteLogStorageTestProp, remoteLogStorageTestVal);
+    }
+
+    /**
+     * Builds a segment of the given size and append it to the log, then roll 
the segment after.
+     * Each segment has some overhead of around 70bytes which impact log 
retention policy.
+     *
+     * @param size size of the segment to create
+     * @param count number of messages to create in the segment
+     */
+    private void buildSegment(int size, int count) {
+        byte[] bytes = new byte[size / count];
+        for (int i = 0; i < count; i++) {
+            RANDOM.nextBytes(bytes);
+            MemoryRecords records = MemoryRecords.withRecords(
+                RecordBatch.MAGIC_VALUE_V2, 0, Compression.NONE, 
TimestampType.CREATE_TIME,
+                new SimpleRecord(bytes)
+            );
+            log.appendAsLeader(records, 0, AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting(), RequestLocal.NoCaching(), 
VerificationGuard.SENTINEL
+            );
+        }
+        log.roll(Option.empty());
+    }
+
+    /**
+     * Builds a segment of the given size with a default of 100 messages and 
append it to the log
+     * then roll the segment after.
+     * Each segment has some overhead of around 70bytes which impact log 
retention policy.
+     *
+     * @param size size of the segment to create
+     */
+    private void buildSegment(int size) {
+        buildSegment(size, 100);
+    }
+
+    @Test
+    void nonActiveSegmentsAreCorrectlyWritten() throws RemoteStorageException {
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMCopyTask(topicIdPartition, 128);
+
+        buildSegment(retentionSize);
+        buildSegment(retentionSize);
+        buildSegment(retentionSize);
+        log.maybeUpdateHighWatermark(log.logEndOffset());
+
+        task.run();
+
+        verify(remoteStorageManager, times(3)).copyLogSegmentData(any(), 
any());
+    }
+
+    @Test
+    void failedSegmentAreRetried() throws RemoteStorageException {
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMCopyTask(topicIdPartition, 128);
+
+        buildSegment(retentionSize);
+        buildSegment(retentionSize);
+        buildSegment(retentionSize);
+        log.maybeUpdateHighWatermark(log.logEndOffset());
+
+        when(remoteStorageManager.copyLogSegmentData(any(), any()))
+            .thenThrow(new RemoteStorageException(""))
+            .thenReturn(Optional.empty());
+
+        task.run();
+        task.run();
+
+        verify(remoteStorageManager, times(4)).copyLogSegmentData(any(), 
any());
+    }
+
+    @Test
+    void failedSegmentsAreNotAccountedInSizeForRetention() throws 
RemoteStorageException {
+        // We're testing the scenario where the first segment is correctly 
pushed then the next segment fails to get
+        // uploaded multiple times before succeeding again.
+        // At the end we should see no call to delete as retention.bytes was 
never breached
+        RemoteLogManager.RLMTask copyTask = remoteLogManager.new 
RLMCopyTask(topicIdPartition, 128);
+        RemoteLogManager.RLMTask cleanupTask = remoteLogManager.new 
RLMExpirationTask(topicIdPartition);
+
+        buildSegment(retentionSize /  3);
+        log.maybeUpdateHighWatermark(log.logEndOffset());
+        when(remoteStorageManager.copyLogSegmentData(any(), 
any())).thenReturn(Optional.empty());
+        copyTask.run();
+
+        buildSegment(retentionSize /  3);
+        log.maybeUpdateHighWatermark(log.logEndOffset());
+        when(remoteStorageManager.copyLogSegmentData(any(), 
any())).thenThrow(new RemoteStorageException(""));
+        for (int i = 0; i < 4; i++) {
+            copyTask.run();
+        }
+        
doReturn(Optional.empty()).when(remoteStorageManager).copyLogSegmentData(any(), 
any());
+        copyTask.run();
+
+        buildSegment(retentionSize /  3);
+        log.maybeUpdateHighWatermark(log.logEndOffset());
+        copyTask.run();
+        cleanupTask.run();
+
+        // total is 1 (success) + 4 (failures) + 2x1 (success)
+        verify(remoteStorageManager, times(7)).copyLogSegmentData(any(), 
any());
+        verify(remoteStorageManager, never()).deleteLogSegmentData(
+            argThat(segment -> 
segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED))
+        );
+        verify(remoteStorageManager, times(4)).deleteLogSegmentData(
+            argThat(segment -> 
segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED))
+        );

Review Comment:
   I'll add this as a comment but the idea is that, because we have a 
`COPY_SEGMENT_FINISHED` after a `COPY_SEGMENT_STARTED` in the metadata for this 
topic partition, all the `COPY_SEGMENT_STARTED` segments will be seen as 
dangling and will be deleted. 



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerInteractionTest.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import kafka.log.LogTestUtils;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.quota.InMemoryRemoteLogMetadataManager;
+import kafka.log.remote.quota.RLMQuotaManager;
+import kafka.server.BrokerTopicStats;
+import kafka.server.KafkaConfig;
+import kafka.server.RequestLocal;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.AppendOrigin;
+import org.apache.kafka.storage.internals.log.LogConfig;
+import org.apache.kafka.storage.internals.log.LogOffsetsListener;
+import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+import org.apache.kafka.storage.internals.log.VerificationGuard;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import scala.Option;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteLogManagerInteractionTest {
+    private static final Random RANDOM = new Random(0);
+
+    private KafkaConfig config;
+    private final String clusterId = "dummyId";
+    private final int retentionSize = 10_000_000;
+    private final File logDir = TestUtils.tempDirectory("kafka-");
+    private final File partitionDir = 
kafka.utils.TestUtils.randomPartitionLogDir(logDir);
+    private BrokerTopicStats brokerTopicStats;
+    private final MockTime mockTime = new MockTime();
+    private Uuid topicId;
+    private UnifiedLog log;
+    private final Metrics metrics = new Metrics();
+
+    private final RemoteStorageManager remoteStorageManager = 
Mockito.mock(RemoteStorageManager.class);
+    private final InMemoryRemoteLogMetadataManager remoteLogMetadataManager = 
new InMemoryRemoteLogMetadataManager();
+    private final RLMQuotaManager rlmCopyQuotaManager = 
mock(RLMQuotaManager.class);
+    private RemoteLogManager remoteLogManager;
+
+    private final String remoteLogStorageTestProp = "remote.log.storage.test";
+    private final String remoteLogStorageTestVal = "storage.test";
+
+    private TopicIdPartition topicIdPartition;
+
+
+    @BeforeEach
+    void setUp() throws Exception {
+        Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
+        
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true");
+        
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "100");
+        appendRLMConfig(props);
+        config = KafkaConfig.fromProps(props);
+        topicId = Uuid.randomUuid();
+        topicIdPartition = new TopicIdPartition(topicId, new 
TopicPartition("test", 0));
+        brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled());
+
+        HashMap<String, Object> properties = new HashMap<>();
+        properties.put("retention.bytes", retentionSize);
+        properties.put("local.retention.bytes", "1");
+        properties.put("remote.storage.enable", "true");
+
+        LogConfig logConfig = new LogConfig(properties);
+        log = LogTestUtils.createLog(
+            partitionDir, logConfig, brokerTopicStats, mockTime.scheduler, 
mockTime, 0L, 0L, 5 * 60 * 1000,
+            new ProducerStateManagerConfig(86400000, false), 600000, true,
+            Option.apply(topicId), true, new ConcurrentHashMap<>(), true, 
null, LogOffsetsListener.NO_OP_OFFSETS_LISTENER
+        );
+
+        remoteLogManager = new 
RemoteLogManager(config.remoteLogManagerConfig(), 1, logDir.toString(), 
clusterId, mockTime,
+            tp -> Optional.of(log),
+            (topicPartition, offset) -> 
log.maybeIncrementLogStartOffset(offset, 
LogStartOffsetIncrementReason.SegmentDeletion),
+            brokerTopicStats, metrics) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return remoteStorageManager;
+            }
+
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+
+            public RLMQuotaManager createRLMCopyQuotaManager() {
+                return rlmCopyQuotaManager;
+            }
+
+            public Duration quotaTimeout() {
+                return Duration.ofMillis(100);
+            }
+
+            @Override
+            long findLogStartOffset(TopicIdPartition topicIdPartition, 
UnifiedLog log) {
+                return 0L;
+            }
+        };
+        remoteLogMetadataManager.initialise(topicIdPartition);
+        remoteLogManager.startup();
+    }
+
+    @AfterEach
+    void cleanup() {
+        log.close();
+    }
+
+    private void appendRLMConfig(Properties props) {
+        
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
+        
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, 
NoOpRemoteStorageManager.class.getName());
+        
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, 
NoOpRemoteLogMetadataManager.class.getName());
+        props.put(DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX + 
remoteLogStorageTestProp, remoteLogStorageTestVal);
+    }
+
+    /**
+     * Builds a segment of the given size and append it to the log, then roll 
the segment after.
+     * Each segment has some overhead of around 70bytes which impact log 
retention policy.
+     *
+     * @param size size of the segment to create
+     * @param count number of messages to create in the segment
+     */
+    private void buildSegment(int size, int count) {
+        byte[] bytes = new byte[size / count];
+        for (int i = 0; i < count; i++) {
+            RANDOM.nextBytes(bytes);
+            MemoryRecords records = MemoryRecords.withRecords(
+                RecordBatch.MAGIC_VALUE_V2, 0, Compression.NONE, 
TimestampType.CREATE_TIME,
+                new SimpleRecord(bytes)
+            );
+            log.appendAsLeader(records, 0, AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting(), RequestLocal.NoCaching(), 
VerificationGuard.SENTINEL
+            );
+        }
+        log.roll(Option.empty());
+    }
+
+    /**
+     * Builds a segment of the given size with a default of 100 messages and 
append it to the log
+     * then roll the segment after.
+     * Each segment has some overhead of around 70bytes which impact log 
retention policy.
+     *
+     * @param size size of the segment to create
+     */
+    private void buildSegment(int size) {
+        buildSegment(size, 100);
+    }
+
+    @Test
+    void nonActiveSegmentsAreCorrectlyWritten() throws RemoteStorageException {
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMCopyTask(topicIdPartition, 128);
+
+        buildSegment(retentionSize);
+        buildSegment(retentionSize);
+        buildSegment(retentionSize);
+        log.maybeUpdateHighWatermark(log.logEndOffset());
+
+        task.run();
+
+        verify(remoteStorageManager, times(3)).copyLogSegmentData(any(), 
any());
+    }
+
+    @Test
+    void failedSegmentAreRetried() throws RemoteStorageException {
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMCopyTask(topicIdPartition, 128);
+
+        buildSegment(retentionSize);
+        buildSegment(retentionSize);
+        buildSegment(retentionSize);
+        log.maybeUpdateHighWatermark(log.logEndOffset());
+
+        when(remoteStorageManager.copyLogSegmentData(any(), any()))
+            .thenThrow(new RemoteStorageException(""))
+            .thenReturn(Optional.empty());
+
+        task.run();
+        task.run();
+
+        verify(remoteStorageManager, times(4)).copyLogSegmentData(any(), 
any());
+    }
+
+    @Test
+    void failedSegmentsAreNotAccountedInSizeForRetention() throws 
RemoteStorageException {
+        // We're testing the scenario where the first segment is correctly 
pushed then the next segment fails to get
+        // uploaded multiple times before succeeding again.
+        // At the end we should see no call to delete as retention.bytes was 
never breached
+        RemoteLogManager.RLMTask copyTask = remoteLogManager.new 
RLMCopyTask(topicIdPartition, 128);
+        RemoteLogManager.RLMTask cleanupTask = remoteLogManager.new 
RLMExpirationTask(topicIdPartition);
+
+        buildSegment(retentionSize /  3);
+        log.maybeUpdateHighWatermark(log.logEndOffset());
+        when(remoteStorageManager.copyLogSegmentData(any(), 
any())).thenReturn(Optional.empty());
+        copyTask.run();
+
+        buildSegment(retentionSize /  3);
+        log.maybeUpdateHighWatermark(log.logEndOffset());
+        when(remoteStorageManager.copyLogSegmentData(any(), 
any())).thenThrow(new RemoteStorageException(""));
+        for (int i = 0; i < 4; i++) {
+            copyTask.run();
+        }
+        
doReturn(Optional.empty()).when(remoteStorageManager).copyLogSegmentData(any(), 
any());
+        copyTask.run();
+
+        buildSegment(retentionSize /  3);
+        log.maybeUpdateHighWatermark(log.logEndOffset());
+        copyTask.run();
+        cleanupTask.run();
+
+        // total is 1 (success) + 4 (failures) + 2x1 (success)
+        verify(remoteStorageManager, times(7)).copyLogSegmentData(any(), 
any());
+        verify(remoteStorageManager, never()).deleteLogSegmentData(
+            argThat(segment -> 
segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED))
+        );
+        verify(remoteStorageManager, times(4)).deleteLogSegmentData(
+            argThat(segment -> 
segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED))
+        );
+    }
+
+    @Test
+    void failedSegmentsAreNotAccountedInSizeDuringDeletion() throws 
RemoteStorageException {
+        RemoteLogManager.RLMTask copyTask = remoteLogManager.new 
RLMCopyTask(topicIdPartition, 128);
+        RemoteLogManager.RLMTask cleanupTask = remoteLogManager.new 
RLMExpirationTask(topicIdPartition);
+
+        buildSegment(retentionSize - 100);
+        log.maybeUpdateHighWatermark(log.logEndOffset());
+        when(remoteStorageManager.copyLogSegmentData(any(), 
any())).thenThrow(new RemoteStorageException(""));
+        for (int i = 0; i < 4; i++) {
+            copyTask.run();
+        }
+        
doReturn(Optional.empty()).when(remoteStorageManager).copyLogSegmentData(any(), 
any());
+        copyTask.run();
+
+        buildSegment(retentionSize - 100);
+        log.maybeUpdateHighWatermark(log.logEndOffset());
+        copyTask.run();
+        cleanupTask.run();
+
+        // total is 1 (success) + 4 (failures) + 1 (success)
+        verify(remoteStorageManager, times(6)).copyLogSegmentData(any(), 
any());
+
+        // The first segment should now be deleted
+        verify(remoteStorageManager, times(5)).deleteLogSegmentData(
+            argThat(segment -> segment.startOffset() == 0)
+        );

Review Comment:
   ```
           when(remoteStorageManager.copyLogSegmentData(any(), 
any())).thenThrow(new RemoteStorageException(""));
            for (int i = 0; i < 4; i++) {
                copyTask.run();
            }
   ```
   This portion will have created 4 failed `COPY_SEGMENT_STARTED` segment and 
we have a successful run after. So in total we have 5 segments metadata 
referring to the first segment.
   All those segments are meant for deletion due to `retention.bytes` policy as 
we pushed a second segment after it, which is what is causing the 4 failures + 
1 success to be deleted.



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerInteractionTest.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import kafka.log.LogTestUtils;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.quota.InMemoryRemoteLogMetadataManager;
+import kafka.log.remote.quota.RLMQuotaManager;
+import kafka.server.BrokerTopicStats;
+import kafka.server.KafkaConfig;
+import kafka.server.RequestLocal;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.AppendOrigin;
+import org.apache.kafka.storage.internals.log.LogConfig;
+import org.apache.kafka.storage.internals.log.LogOffsetsListener;
+import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+import org.apache.kafka.storage.internals.log.VerificationGuard;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import scala.Option;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteLogManagerInteractionTest {

Review Comment:
   The idea behind this class is avoid mocking all calls to the unifiedLog and 
the RLMM as those mocks can easily become quite long (see 
[RemoteLogManagerTest](https://github.com/apache/kafka/blob/0eaaff88cf68bc2c24d4874ff9bc1cc2b493c24b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java#L3035)
 ) and because I was mostly testing subsequent calls between copy and cleanup I 
think this is making the test more clear when reading it rather than having 
more mocks between each calls.



##########
core/src/test/java/kafka/log/remote/quota/InMemoryRemoteLogMetadataManager.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;

Review Comment:
   Good point, I am going to move it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to