brandboat commented on code in PR #16180:
URL: https://github.com/apache/kafka/pull/16180#discussion_r1625252312


##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java:
##########
@@ -197,333 +206,159 @@ public void 
testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remot
             for (Map.Entry<Integer, Long> entry : 
expectedEpochToHighestOffset.entrySet()) {
                 Integer epoch = entry.getKey();
                 Long expectedOffset = entry.getValue();
-                Optional<Long> offset = 
remoteLogSegmentLifecycleManager.highestOffsetForEpoch(epoch);
-                log.debug("Fetching highest offset for epoch: {} , returned: 
{} , expected: {}", epoch, offset, expectedOffset);
-                Assertions.assertEquals(Optional.of(expectedOffset), offset);
+                Optional<Long> offset = 
metadataManager.highestOffsetForEpoch(topicIdPartition, epoch);
+                assertEquals(Optional.of(expectedOffset), offset);
             }
 
             // Search for non existing leader epoch
-            Optional<Long> highestOffsetForEpoch5 = 
remoteLogSegmentLifecycleManager.highestOffsetForEpoch(5);
-            Assertions.assertFalse(highestOffsetForEpoch5.isPresent());
-        } finally {
-            Utils.closeQuietly(remoteLogSegmentLifecycleManager, 
"RemoteLogSegmentLifecycleManager");
+            Optional<Long> highestOffsetForEpoch5 = 
metadataManager.highestOffsetForEpoch(topicIdPartition, 5);
+            assertFalse(highestOffsetForEpoch5.isPresent());
         }
     }
 
-    private RemoteLogSegmentMetadata 
createSegmentUpdateWithState(RemoteLogSegmentLifecycleManager 
remoteLogSegmentLifecycleManager,
-                                                                  Map<Integer, 
Long> segmentLeaderEpochs,
-                                                                  long 
startOffset,
-                                                                  long 
endOffset,
-                                                                  
RemoteLogSegmentState state)
-            throws RemoteStorageException {
+    private RemoteLogSegmentMetadata 
upsertSegmentState(RemoteLogMetadataManager metadataManager,
+                                                        Map<Integer, Long> 
segmentLeaderEpochs,
+                                                        long startOffset,
+                                                        long endOffset,
+                                                        RemoteLogSegmentState 
state)
+            throws RemoteStorageException, ExecutionException, 
InterruptedException {
         RemoteLogSegmentId segmentId = new 
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
-        RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L, BROKER_ID_0,
-                                                                               
 time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
-        
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
-
-        RemoteLogSegmentMetadataUpdate segMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
-                Optional.empty(),
-                state, BROKER_ID_1);
-        
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate);
-
+        RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
+                -1L, brokerId0, time.milliseconds(), segSize, 
segmentLeaderEpochs);
+        metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
+        
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata);
+
+        RemoteLogSegmentMetadataUpdate segMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentId,
+                time.milliseconds(), Optional.empty(), state, brokerId1);
+        
metadataManager.updateRemoteLogSegmentMetadata(segMetadataUpdate).get();
+        
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadataUpdate(segMetadataUpdate);
         return segmentMetadata.createWithUpdates(segMetadataUpdate);
     }
 
-    private static class EpochOffset {
-        final int epoch;
-        final long offset;
-
-        private EpochOffset(int epoch,
-                            long offset) {
-            this.epoch = epoch;
-            this.offset = offset;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            EpochOffset that = (EpochOffset) o;
-            return epoch == that.epoch && offset == that.offset;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(epoch, offset);
-        }
-
-        @Override
-        public String toString() {
-            return "EpochOffset{" +
-                    "epoch=" + epoch +
-                    ", offset=" + offset +
-                    '}';
-        }
-    }
-
-    private static Collection<Arguments> remoteLogSegmentLifecycleManagers() {
-        return Arrays.asList(Arguments.of(new RemoteLogMetadataCacheWrapper()),
-                             Arguments.of(new 
TopicBasedRemoteLogMetadataManagerWrapper()));
-    }
-
-    private void checkListSegments(RemoteLogSegmentLifecycleManager 
remoteLogSegmentLifecycleManager,
+    private void checkListSegments(RemoteLogMetadataManager metadataManager,
                                    int leaderEpoch,
-                                   RemoteLogSegmentMetadata expectedSegment)
+                                   RemoteLogSegmentMetadata expectedMetadata)
             throws RemoteStorageException {
         // cache.listRemoteLogSegments(leaderEpoch) should contain the above 
segment.
-        Iterator<RemoteLogSegmentMetadata> segmentsIter = 
remoteLogSegmentLifecycleManager.listRemoteLogSegments(leaderEpoch);
-        Assertions.assertTrue(segmentsIter.hasNext());
-        Assertions.assertEquals(expectedSegment, segmentsIter.next());
+        Iterator<RemoteLogSegmentMetadata> metadataIter =
+                metadataManager.listRemoteLogSegments(topicIdPartition, 
leaderEpoch);
+        assertTrue(metadataIter.hasNext());
+        assertEquals(expectedMetadata, metadataIter.next());
 
         // cache.listAllRemoteLogSegments() should contain the above segment.
-        Iterator<RemoteLogSegmentMetadata> allSegmentsIter = 
remoteLogSegmentLifecycleManager.listAllRemoteLogSegments();
-        Assertions.assertTrue(allSegmentsIter.hasNext());
-        Assertions.assertEquals(expectedSegment, allSegmentsIter.next());
+        Iterator<RemoteLogSegmentMetadata> allSegmentsIter = 
metadataManager.listRemoteLogSegments(topicIdPartition);
+        assertTrue(allSegmentsIter.hasNext());
+        assertEquals(expectedMetadata, allSegmentsIter.next());
     }
 
-    @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
-    @MethodSource("remoteLogSegmentLifecycleManagers")
-    public void 
testCacheSegmentWithCopySegmentStartedState(RemoteLogSegmentLifecycleManager 
remoteLogSegmentLifecycleManager) throws Exception {
-
-        try {
-            remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
-
+    @ClusterTest(brokers = 3)
+    public void testCacheSegmentWithCopySegmentStartedState() throws Exception 
{
+        try (RemoteLogMetadataManager metadataManager = 
createTopicBasedRemoteLogMetadataManager()) {
+            
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition),
 Collections.emptySet());
             // Create a segment with state COPY_SEGMENT_STARTED, and check for 
searching that segment and listing the
             // segments.
             RemoteLogSegmentId segmentId = new 
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
-            RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, 0L, 50L, -1L, BROKER_ID_0,
-                                                                               
     time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
-            
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
+            RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, 0L, 50L,
+                    -1L, brokerId0, time.milliseconds(), segSize, 
Collections.singletonMap(0, 0L));
+            metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
+            
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata);
 
             // This segment should not be available as the state is not 
reached to COPY_SEGMENT_FINISHED.
-            Optional<RemoteLogSegmentMetadata> segMetadataForOffset0Epoch0 = 
remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 0);
-            Assertions.assertFalse(segMetadataForOffset0Epoch0.isPresent());
+            Optional<RemoteLogSegmentMetadata> segMetadataForOffset0Epoch0 =
+                    metadataManager.remoteLogSegmentMetadata(topicIdPartition, 
0, 0);
+            assertFalse(segMetadataForOffset0Epoch0.isPresent());
 
             // cache.listRemoteLogSegments APIs should contain the above 
segment.
-            checkListSegments(remoteLogSegmentLifecycleManager, 0, 
segmentMetadata);
-        } finally {
-            Utils.closeQuietly(remoteLogSegmentLifecycleManager, 
"RemoteLogSegmentLifecycleManager");
+            checkListSegments(metadataManager, 0, segmentMetadata);
         }
     }
 
-    @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
-    @MethodSource("remoteLogSegmentLifecycleManagers")
-    public void 
testCacheSegmentWithCopySegmentFinishedState(RemoteLogSegmentLifecycleManager 
remoteLogSegmentLifecycleManager) throws Exception {
-        try {
-            remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
-
+    @ClusterTest(brokers = 3)
+    public void testCacheSegmentWithCopySegmentFinishedState() throws 
Exception {
+        try (RemoteLogMetadataManager metadataManager = 
createTopicBasedRemoteLogMetadataManager()) {
+            
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition),
 Collections.emptySet());
             // Create a segment and move it to state COPY_SEGMENT_FINISHED. 
and check for searching that segment and
             // listing the segments.
-            RemoteLogSegmentMetadata segmentMetadata = 
createSegmentUpdateWithState(remoteLogSegmentLifecycleManager,
-                                                                               
     Collections.singletonMap(0, 101L),
-                                                                               
     101L, 200L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+            RemoteLogSegmentMetadata segmentMetadata = upsertSegmentState(
+                    metadataManager, Collections.singletonMap(0, 101L), 101L, 
200L, COPY_SEGMENT_FINISHED);
 
             // Search should return the above segment.
-            Optional<RemoteLogSegmentMetadata> segMetadataForOffset150 = 
remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 150);
-            Assertions.assertEquals(Optional.of(segmentMetadata), 
segMetadataForOffset150);
+            Optional<RemoteLogSegmentMetadata> segMetadataForOffset150 =
+                    metadataManager.remoteLogSegmentMetadata(topicIdPartition, 
0, 150);
+            assertEquals(Optional.of(segmentMetadata), 
segMetadataForOffset150);
 
             // cache.listRemoteLogSegments should contain the above segments.
-            checkListSegments(remoteLogSegmentLifecycleManager, 0, 
segmentMetadata);
-        } finally {
-            Utils.closeQuietly(remoteLogSegmentLifecycleManager, 
"RemoteLogSegmentLifecycleManager");
+            checkListSegments(metadataManager, 0, segmentMetadata);
         }
     }
 
-    @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
-    @MethodSource("remoteLogSegmentLifecycleManagers")
-    public void 
testCacheSegmentWithDeleteSegmentStartedState(RemoteLogSegmentLifecycleManager 
remoteLogSegmentLifecycleManager) throws Exception {
-        try {
-            remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
-
+    @ClusterTest(brokers = 3)
+    public void testCacheSegmentWithDeleteSegmentStartedState() throws 
Exception {
+        try (RemoteLogMetadataManager metadataManager = 
createTopicBasedRemoteLogMetadataManager()) {
+            
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition),
 Collections.emptySet());
             // Create a segment and move it to state DELETE_SEGMENT_STARTED, 
and check for searching that segment and
             // listing the segments.
-            RemoteLogSegmentMetadata segmentMetadata = 
createSegmentUpdateWithState(remoteLogSegmentLifecycleManager,
-                                                                               
     Collections.singletonMap(0, 201L),
-                                                                               
     201L, 300L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
+            RemoteLogSegmentMetadata segmentMetadata = upsertSegmentState(
+                    metadataManager, Collections.singletonMap(0, 201L), 201L, 
300L, DELETE_SEGMENT_STARTED);
 
             // Search should not return the above segment as their leader 
epoch state is cleared.
-            Optional<RemoteLogSegmentMetadata> 
segmentMetadataForOffset250Epoch0 = 
remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 250);
-            
Assertions.assertFalse(segmentMetadataForOffset250Epoch0.isPresent());
-
-            checkListSegments(remoteLogSegmentLifecycleManager, 0, 
segmentMetadata);
-        } finally {
-            Utils.closeQuietly(remoteLogSegmentLifecycleManager, 
"RemoteLogSegmentLifecycleManager");
+            Optional<RemoteLogSegmentMetadata> 
segmentMetadataForOffset250Epoch0 =
+                    metadataManager.remoteLogSegmentMetadata(topicIdPartition, 
0, 250);
+            assertFalse(segmentMetadataForOffset250Epoch0.isPresent());
+            checkListSegments(metadataManager, 0, segmentMetadata);
         }
     }
 
-    @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
-    @MethodSource("remoteLogSegmentLifecycleManagers")
-    public void 
testCacheSegmentsWithDeleteSegmentFinishedState(RemoteLogSegmentLifecycleManager
 remoteLogSegmentLifecycleManager) throws Exception {
-        try {
-            remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
-
+    @ClusterTest(brokers = 3)
+    public void testCacheSegmentsWithDeleteSegmentFinishedState() throws 
Exception {
+        try (RemoteLogMetadataManager metadataManager = 
createTopicBasedRemoteLogMetadataManager()) {
+            
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition),
 Collections.emptySet());
             // Create a segment and move it to state DELETE_SEGMENT_FINISHED, 
and check for searching that segment and
             // listing the segments.
-            RemoteLogSegmentMetadata segmentMetadata = 
createSegmentUpdateWithState(remoteLogSegmentLifecycleManager,
-                                                                               
     Collections.singletonMap(0, 301L),
-                                                                               
     301L, 400L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
+            RemoteLogSegmentMetadata segmentMetadata = upsertSegmentState(
+                    metadataManager, Collections.singletonMap(0, 301L), 301L, 
400L, DELETE_SEGMENT_STARTED);
 
             // Search should not return the above segment as their leader 
epoch state is cleared.
-            
Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0,
 350).isPresent());
+            
assertFalse(metadataManager.remoteLogSegmentMetadata(topicIdPartition, 0, 
350).isPresent());
 
-            RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
-                                                                               
                       time.milliseconds(),
-                                                                               
                       Optional.empty(),
-                                                                               
                       RemoteLogSegmentState.DELETE_SEGMENT_FINISHED,
-                                                                               
                       BROKER_ID_1);
-            
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
+            RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new 
RemoteLogSegmentMetadataUpdate(
+                    segmentMetadata.remoteLogSegmentId(), time.milliseconds(), 
Optional.empty(),
+                    DELETE_SEGMENT_FINISHED, brokerId1);
+            
metadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
+            
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadataUpdate(segmentMetadataUpdate);
 
             // listRemoteLogSegments(0) and listRemoteLogSegments() should not 
contain the above segment.
-            
Assertions.assertFalse(remoteLogSegmentLifecycleManager.listRemoteLogSegments(0).hasNext());
-            
Assertions.assertFalse(remoteLogSegmentLifecycleManager.listAllRemoteLogSegments().hasNext());
-        } finally {
-            Utils.closeQuietly(remoteLogSegmentLifecycleManager, 
"RemoteLogSegmentLifecycleManager");
+            
assertFalse(metadataManager.listRemoteLogSegments(topicIdPartition, 
0).hasNext());
+            
assertFalse(metadataManager.listRemoteLogSegments(topicIdPartition).hasNext());
         }
     }
 
-    @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
-    @MethodSource("remoteLogSegmentLifecycleManagers")
-    public void testCacheListSegments(RemoteLogSegmentLifecycleManager 
remoteLogSegmentLifecycleManager) throws Exception {
-        try {
-            remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
-
+    @ClusterTest(brokers = 3)
+    public void testCacheListSegments() throws Exception {
+        try (RemoteLogMetadataManager metadataManager = 
createTopicBasedRemoteLogMetadataManager()) {
+            
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition),
 Collections.emptySet());
             // Create a few segments and add them to the cache.
-            RemoteLogSegmentMetadata segment0 = 
createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, 
Collections.singletonMap(0, 0L), 0,
-                                                                             
100,
-                                                                             
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
-            RemoteLogSegmentMetadata segment1 = 
createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, 
Collections.singletonMap(0, 101L), 101,
-                                                                             
200,
-                                                                             
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
-            Map<Integer, Long> segment2LeaderEpochs = new HashMap<>();
-            segment2LeaderEpochs.put(0, 201L);
-            segment2LeaderEpochs.put(1, 301L);
-            RemoteLogSegmentMetadata segment2 = 
createSegmentUpdateWithState(remoteLogSegmentLifecycleManager, 
segment2LeaderEpochs, 201, 400,
-                                                                             
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+            RemoteLogSegmentMetadata segment0 = 
upsertSegmentState(metadataManager, Collections.singletonMap(0, 0L),
+                    0, 100, COPY_SEGMENT_FINISHED);
+            RemoteLogSegmentMetadata segment1 = 
upsertSegmentState(metadataManager, Collections.singletonMap(0, 101L),
+                    101, 200, COPY_SEGMENT_FINISHED);
+            Map<Integer, Long> leaderEpochSegment2 = new HashMap<>();
+            leaderEpochSegment2.put(0, 201L);
+            leaderEpochSegment2.put(1, 301L);
+            RemoteLogSegmentMetadata segment2 = 
upsertSegmentState(metadataManager, leaderEpochSegment2,
+                    201, 400, COPY_SEGMENT_FINISHED);
 
             // listRemoteLogSegments(0) and listAllRemoteLogSegments() should 
contain all the above segments.
             List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch0 = 
Arrays.asList(segment0, segment1, segment2);
-            
Assertions.assertTrue(TestUtils.sameElementsWithOrder(remoteLogSegmentLifecycleManager.listRemoteLogSegments(0),
-                                                                  
expectedSegmentsForEpoch0.iterator()));
-            
Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(remoteLogSegmentLifecycleManager.listAllRemoteLogSegments(),
-                                                                     
expectedSegmentsForEpoch0.iterator()));
+            assertTrue(TestUtils.sameElementsWithOrder(
+                    expectedSegmentsForEpoch0.iterator(), 
metadataManager.listRemoteLogSegments(topicIdPartition, 0)));
+            assertTrue(TestUtils.sameElementsWithoutOrder(
+                    expectedSegmentsForEpoch0.iterator(), 
metadataManager.listRemoteLogSegments(topicIdPartition)));
 
             // listRemoteLogSegments(1) should contain only segment2.
             List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch1 = 
Collections.singletonList(segment2);
-            
Assertions.assertTrue(TestUtils.sameElementsWithOrder(remoteLogSegmentLifecycleManager.listRemoteLogSegments(1),
-                                                                  
expectedSegmentsForEpoch1.iterator()));
-        } finally {
-            Utils.closeQuietly(remoteLogSegmentLifecycleManager, 
"RemoteLogSegmentLifecycleManager");
-        }
-    }
-
-    /**
-     * This is a wrapper with {@link TopicBasedRemoteLogMetadataManager} 
implementing {@link RemoteLogSegmentLifecycleManager}.
-     * This is passed to {@link 
#testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager)} to test
-     * {@code RemoteLogMetadataCache} for several lifecycle operations.
-     * <p>
-     * This starts a Kafka cluster with {@link #initialize(Set, boolean)} )} 
with {@link #brokerCount()} no of servers. It also
-     * creates the remote log metadata topic required for {@code 
TopicBasedRemoteLogMetadataManager}. This cluster will
-     * be stopped by invoking {@link #close()}.
-     */
-    static class TopicBasedRemoteLogMetadataManagerWrapper extends 
TopicBasedRemoteLogMetadataManagerHarness implements 
RemoteLogSegmentLifecycleManager {
-
-        private TopicIdPartition topicIdPartition;
-
-        @Override
-        public synchronized void initialize(TopicIdPartition topicIdPartition) 
{
-            this.topicIdPartition = topicIdPartition;
-            super.initialize(Collections.singleton(topicIdPartition), true);
-        }
-
-        @Override
-        public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
segmentMetadata) throws RemoteStorageException {
-            try {
-                // Wait until the segment is added successfully.
-                
remoteLogMetadataManager().addRemoteLogSegmentMetadata(segmentMetadata).get();
-            } catch (Exception e) {
-                throw new RemoteStorageException(e);
-            }
-        }
-
-        @Override
-        public void 
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate 
segmentMetadataUpdate) throws RemoteStorageException {
-            try {
-                // Wait until the segment is updated successfully.
-                
remoteLogMetadataManager().updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
-            } catch (Exception e) {
-                throw new RemoteStorageException(e);
-            }
-        }
-
-        @Override
-        public Optional<Long> highestOffsetForEpoch(int leaderEpoch) throws 
RemoteStorageException {
-            return 
remoteLogMetadataManager().highestOffsetForEpoch(topicIdPartition, leaderEpoch);
-        }
-
-        @Override
-        public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int 
leaderEpoch,
-                                                                           
long offset) throws RemoteStorageException {
-            return 
remoteLogMetadataManager().remoteLogSegmentMetadata(topicIdPartition, 
leaderEpoch, offset);
-        }
-
-        @Override
-        public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(int 
leaderEpoch) throws RemoteStorageException {
-            return 
remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition, leaderEpoch);
-        }
-
-        @Override
-        public Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments() 
throws RemoteStorageException {
-            return 
remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition);
-        }
-
-        @Override
-        public void close() throws IOException {
-            super.close();
-        }
-
-    }
-
-    /**
-     * This is a wrapper with {@link RemoteLogMetadataCache} implementing 
{@link RemoteLogSegmentLifecycleManager}.
-     * This is passed to {@link 
#testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager)} to test
-     * {@code RemoteLogMetadataCache} for several lifecycle operations.
-     */
-    static class RemoteLogMetadataCacheWrapper implements 
RemoteLogSegmentLifecycleManager {

Review Comment:
   Looks like this pr removed all the RemoteLogMetadataCache test, does that 
mean we already have some related  tests or something else ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to