chia7712 commented on code in PR #16180:
URL: https://github.com/apache/kafka/pull/16180#discussion_r1624880657


##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java:
##########
@@ -197,333 +206,159 @@ public void 
testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remot
             for (Map.Entry<Integer, Long> entry : 
expectedEpochToHighestOffset.entrySet()) {
                 Integer epoch = entry.getKey();
                 Long expectedOffset = entry.getValue();
-                Optional<Long> offset = 
remoteLogSegmentLifecycleManager.highestOffsetForEpoch(epoch);
-                log.debug("Fetching highest offset for epoch: {} , returned: 
{} , expected: {}", epoch, offset, expectedOffset);
-                Assertions.assertEquals(Optional.of(expectedOffset), offset);
+                Optional<Long> offset = 
metadataManager.highestOffsetForEpoch(topicIdPartition, epoch);
+                assertEquals(Optional.of(expectedOffset), offset);
             }
 
             // Search for non existing leader epoch
-            Optional<Long> highestOffsetForEpoch5 = 
remoteLogSegmentLifecycleManager.highestOffsetForEpoch(5);
-            Assertions.assertFalse(highestOffsetForEpoch5.isPresent());
-        } finally {
-            Utils.closeQuietly(remoteLogSegmentLifecycleManager, 
"RemoteLogSegmentLifecycleManager");
+            Optional<Long> highestOffsetForEpoch5 = 
metadataManager.highestOffsetForEpoch(topicIdPartition, 5);
+            assertFalse(highestOffsetForEpoch5.isPresent());
         }
     }
 
-    private RemoteLogSegmentMetadata 
createSegmentUpdateWithState(RemoteLogSegmentLifecycleManager 
remoteLogSegmentLifecycleManager,
-                                                                  Map<Integer, 
Long> segmentLeaderEpochs,
-                                                                  long 
startOffset,
-                                                                  long 
endOffset,
-                                                                  
RemoteLogSegmentState state)
-            throws RemoteStorageException {
+    private RemoteLogSegmentMetadata 
upsertSegmentState(RemoteLogMetadataManager metadataManager,
+                                                        Map<Integer, Long> 
segmentLeaderEpochs,
+                                                        long startOffset,
+                                                        long endOffset,
+                                                        RemoteLogSegmentState 
state)
+            throws RemoteStorageException, ExecutionException, 
InterruptedException {
         RemoteLogSegmentId segmentId = new 
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
-        RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L, BROKER_ID_0,
-                                                                               
 time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
-        
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
-
-        RemoteLogSegmentMetadataUpdate segMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
-                Optional.empty(),
-                state, BROKER_ID_1);
-        
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate);
-
+        RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
+                -1L, brokerId0, time.milliseconds(), segSize, 
segmentLeaderEpochs);
+        metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
+        
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata);
+
+        RemoteLogSegmentMetadataUpdate segMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId,
+                time.milliseconds(), Optional.empty(), state, brokerId1);
+        
metadataManager.updateRemoteLogSegmentMetadata(segMetadataUpdate).get();
+        
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadataUpdate(segMetadataUpdate);
         return segmentMetadata.createWithUpdates(segMetadataUpdate);
     }
 
-    private static class EpochOffset {
-        final int epoch;
-        final long offset;
-
-        private EpochOffset(int epoch,
-                            long offset) {
-            this.epoch = epoch;
-            this.offset = offset;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            EpochOffset that = (EpochOffset) o;
-            return epoch == that.epoch && offset == that.offset;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(epoch, offset);
-        }
-
-        @Override
-        public String toString() {
-            return "EpochOffset{" +
-                    "epoch=" + epoch +
-                    ", offset=" + offset +
-                    '}';
-        }
-    }
-
-    private static Collection<Arguments> remoteLogSegmentLifecycleManagers() {
-        return Arrays.asList(Arguments.of(new RemoteLogMetadataCacheWrapper()),
-                             Arguments.of(new 
TopicBasedRemoteLogMetadataManagerWrapper()));
-    }
-
-    private void checkListSegments(RemoteLogSegmentLifecycleManager 
remoteLogSegmentLifecycleManager,
+    private void checkListSegments(RemoteLogMetadataManager metadataManager,
                                    int leaderEpoch,
-                                   RemoteLogSegmentMetadata expectedSegment)
+                                   RemoteLogSegmentMetadata expectedMetadata)
             throws RemoteStorageException {
         // cache.listRemoteLogSegments(leaderEpoch) should contain the above 
segment.
-        Iterator<RemoteLogSegmentMetadata> segmentsIter = 
remoteLogSegmentLifecycleManager.listRemoteLogSegments(leaderEpoch);
-        Assertions.assertTrue(segmentsIter.hasNext());
-        Assertions.assertEquals(expectedSegment, segmentsIter.next());
+        Iterator<RemoteLogSegmentMetadata> metadataIter =
+                metadataManager.listRemoteLogSegments(topicIdPartition, 
leaderEpoch);
+        assertTrue(metadataIter.hasNext());
+        assertEquals(expectedMetadata, metadataIter.next());
 
         // cache.listAllRemoteLogSegments() should contain the above segment.
-        Iterator<RemoteLogSegmentMetadata> allSegmentsIter = 
remoteLogSegmentLifecycleManager.listAllRemoteLogSegments();
-        Assertions.assertTrue(allSegmentsIter.hasNext());
-        Assertions.assertEquals(expectedSegment, allSegmentsIter.next());
+        Iterator<RemoteLogSegmentMetadata> allSegmentsIter = 
metadataManager.listRemoteLogSegments(topicIdPartition);
+        assertTrue(allSegmentsIter.hasNext());
+        assertEquals(expectedMetadata, allSegmentsIter.next());
     }
 
-    @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
-    @MethodSource("remoteLogSegmentLifecycleManagers")
-    public void 
testCacheSegmentWithCopySegmentStartedState(RemoteLogSegmentLifecycleManager 
remoteLogSegmentLifecycleManager) throws Exception {
-
-        try {
-            remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
-
+    @ClusterTest(brokers = 3)

Review Comment:
   It seems all test cases require `brokers = 3`. Maybe we can define it by 
`@ClusterTestDefaults(brokers = 3)`?



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java:
##########
@@ -16,172 +16,181 @@
  */
 package org.apache.kafka.server.log.remote.metadata.storage;
 
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.junit.ClusterTestExtensions;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
-import 
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.storage.internals.log.EpochEntry;
 import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Stream;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.DELETE_SEGMENT_STARTED;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.DELETE_SEGMENT_FINISHED;
 
+@ExtendWith(value = ClusterTestExtensions.class)
+@Tag("integration")
 public class RemoteLogSegmentLifecycleTest {
-    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogSegmentLifecycleTest.class);
 
-    private static final int SEG_SIZE = 1024 * 1024;
-    private static final int BROKER_ID_0 = 0;
-    private static final int BROKER_ID_1 = 1;
+    int segSize = 1048576;

Review Comment:
   Could you please add `private final` to those variables? otherwise, we will 
see a minor PR to fix them later :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to