kamalcph commented on code in PR #16180:
URL: https://github.com/apache/kafka/pull/16180#discussion_r1625298782
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java:
##########
@@ -197,333 +206,159 @@ public void
testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remot
for (Map.Entry<Integer, Long> entry :
expectedEpochToHighestOffset.entrySet()) {
Integer epoch = entry.getKey();
Long expectedOffset = entry.getValue();
- Optional<Long> offset =
remoteLogSegmentLifecycleManager.highestOffsetForEpoch(epoch);
- log.debug("Fetching highest offset for epoch: {} , returned:
{} , expected: {}", epoch, offset, expectedOffset);
- Assertions.assertEquals(Optional.of(expectedOffset), offset);
+ Optional<Long> offset =
metadataManager.highestOffsetForEpoch(topicIdPartition, epoch);
+ assertEquals(Optional.of(expectedOffset), offset);
}
// Search for non existing leader epoch
- Optional<Long> highestOffsetForEpoch5 =
remoteLogSegmentLifecycleManager.highestOffsetForEpoch(5);
- Assertions.assertFalse(highestOffsetForEpoch5.isPresent());
- } finally {
- Utils.closeQuietly(remoteLogSegmentLifecycleManager,
"RemoteLogSegmentLifecycleManager");
+ Optional<Long> highestOffsetForEpoch5 =
metadataManager.highestOffsetForEpoch(topicIdPartition, 5);
+ assertFalse(highestOffsetForEpoch5.isPresent());
}
}
- private RemoteLogSegmentMetadata
createSegmentUpdateWithState(RemoteLogSegmentLifecycleManager
remoteLogSegmentLifecycleManager,
- Map<Integer,
Long> segmentLeaderEpochs,
- long
startOffset,
- long
endOffset,
-
RemoteLogSegmentState state)
- throws RemoteStorageException {
+ private RemoteLogSegmentMetadata
upsertSegmentState(RemoteLogMetadataManager metadataManager,
+ Map<Integer, Long>
segmentLeaderEpochs,
+ long startOffset,
+ long endOffset,
+ RemoteLogSegmentState
state)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
RemoteLogSegmentId segmentId = new
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
- RemoteLogSegmentMetadata segmentMetadata = new
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L, BROKER_ID_0,
-
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
-
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
-
- RemoteLogSegmentMetadataUpdate segMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
- Optional.empty(),
- state, BROKER_ID_1);
-
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate);
-
+ RemoteLogSegmentMetadata segmentMetadata = new
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
+ -1L, brokerId0, time.milliseconds(), segSize,
segmentLeaderEpochs);
+ metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
+
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata);
+
+ RemoteLogSegmentMetadataUpdate segMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(segmentId,
+ time.milliseconds(), Optional.empty(), state, brokerId1);
+
metadataManager.updateRemoteLogSegmentMetadata(segMetadataUpdate).get();
+
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadataUpdate(segMetadataUpdate);
return segmentMetadata.createWithUpdates(segMetadataUpdate);
}
- private static class EpochOffset {
- final int epoch;
- final long offset;
-
- private EpochOffset(int epoch,
- long offset) {
- this.epoch = epoch;
- this.offset = offset;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- EpochOffset that = (EpochOffset) o;
- return epoch == that.epoch && offset == that.offset;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(epoch, offset);
- }
-
- @Override
- public String toString() {
- return "EpochOffset{" +
- "epoch=" + epoch +
- ", offset=" + offset +
- '}';
- }
- }
-
- private static Collection<Arguments> remoteLogSegmentLifecycleManagers() {
- return Arrays.asList(Arguments.of(new RemoteLogMetadataCacheWrapper()),
- Arguments.of(new
TopicBasedRemoteLogMetadataManagerWrapper()));
- }
-
- private void checkListSegments(RemoteLogSegmentLifecycleManager
remoteLogSegmentLifecycleManager,
+ private void checkListSegments(RemoteLogMetadataManager metadataManager,
int leaderEpoch,
- RemoteLogSegmentMetadata expectedSegment)
+ RemoteLogSegmentMetadata expectedMetadata)
throws RemoteStorageException {
// cache.listRemoteLogSegments(leaderEpoch) should contain the above
segment.
- Iterator<RemoteLogSegmentMetadata> segmentsIter =
remoteLogSegmentLifecycleManager.listRemoteLogSegments(leaderEpoch);
- Assertions.assertTrue(segmentsIter.hasNext());
- Assertions.assertEquals(expectedSegment, segmentsIter.next());
+ Iterator<RemoteLogSegmentMetadata> metadataIter =
+ metadataManager.listRemoteLogSegments(topicIdPartition,
leaderEpoch);
+ assertTrue(metadataIter.hasNext());
+ assertEquals(expectedMetadata, metadataIter.next());
// cache.listAllRemoteLogSegments() should contain the above segment.
- Iterator<RemoteLogSegmentMetadata> allSegmentsIter =
remoteLogSegmentLifecycleManager.listAllRemoteLogSegments();
- Assertions.assertTrue(allSegmentsIter.hasNext());
- Assertions.assertEquals(expectedSegment, allSegmentsIter.next());
+ Iterator<RemoteLogSegmentMetadata> allSegmentsIter =
metadataManager.listRemoteLogSegments(topicIdPartition);
+ assertTrue(allSegmentsIter.hasNext());
+ assertEquals(expectedMetadata, allSegmentsIter.next());
}
- @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
- @MethodSource("remoteLogSegmentLifecycleManagers")
- public void
testCacheSegmentWithCopySegmentStartedState(RemoteLogSegmentLifecycleManager
remoteLogSegmentLifecycleManager) throws Exception {
-
- try {
- remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
-
+ @ClusterTest(brokers = 3)
+ public void testCacheSegmentWithCopySegmentStartedState() throws Exception
{
+ try (RemoteLogMetadataManager metadataManager =
createTopicBasedRemoteLogMetadataManager()) {
+
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition),
Collections.emptySet());
// Create a segment with state COPY_SEGMENT_STARTED, and check for
searching that segment and listing the
// segments.
RemoteLogSegmentId segmentId = new
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
- RemoteLogSegmentMetadata segmentMetadata = new
RemoteLogSegmentMetadata(segmentId, 0L, 50L, -1L, BROKER_ID_0,
-
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
-
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
+ RemoteLogSegmentMetadata segmentMetadata = new
RemoteLogSegmentMetadata(segmentId, 0L, 50L,
+ -1L, brokerId0, time.milliseconds(), segSize,
Collections.singletonMap(0, 0L));
+ metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
+
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata);
// This segment should not be available as the state is not
reached to COPY_SEGMENT_FINISHED.
- Optional<RemoteLogSegmentMetadata> segMetadataForOffset0Epoch0 =
remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 0);
- Assertions.assertFalse(segMetadataForOffset0Epoch0.isPresent());
+ Optional<RemoteLogSegmentMetadata> segMetadataForOffset0Epoch0 =
+ metadataManager.remoteLogSegmentMetadata(topicIdPartition,
0, 0);
+ assertFalse(segMetadataForOffset0Epoch0.isPresent());
// cache.listRemoteLogSegments APIs should contain the above
segment.
- checkListSegments(remoteLogSegmentLifecycleManager, 0,
segmentMetadata);
- } finally {
- Utils.closeQuietly(remoteLogSegmentLifecycleManager,
"RemoteLogSegmentLifecycleManager");
+ checkListSegments(metadataManager, 0, segmentMetadata);
}
}
- @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
- @MethodSource("remoteLogSegmentLifecycleManagers")
- public void
testCacheSegmentWithCopySegmentFinishedState(RemoteLogSegmentLifecycleManager
remoteLogSegmentLifecycleManager) throws Exception {
- try {
- remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
-
+ @ClusterTest(brokers = 3)
+ public void testCacheSegmentWithCopySegmentFinishedState() throws
Exception {
+ try (RemoteLogMetadataManager metadataManager =
createTopicBasedRemoteLogMetadataManager()) {
+
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition),
Collections.emptySet());
// Create a segment and move it to state COPY_SEGMENT_FINISHED.
and check for searching that segment and
// listing the segments.
- RemoteLogSegmentMetadata segmentMetadata =
createSegmentUpdateWithState(remoteLogSegmentLifecycleManager,
-
Collections.singletonMap(0, 101L),
-
101L, 200L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+ RemoteLogSegmentMetadata segmentMetadata = upsertSegmentState(
+ metadataManager, Collections.singletonMap(0, 101L), 101L,
200L, COPY_SEGMENT_FINISHED);
// Search should return the above segment.
- Optional<RemoteLogSegmentMetadata> segMetadataForOffset150 =
remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 150);
- Assertions.assertEquals(Optional.of(segmentMetadata),
segMetadataForOffset150);
+ Optional<RemoteLogSegmentMetadata> segMetadataForOffset150 =
+ metadataManager.remoteLogSegmentMetadata(topicIdPartition,
0, 150);
+ assertEquals(Optional.of(segmentMetadata),
segMetadataForOffset150);
// cache.listRemoteLogSegments should contain the above segments.
- checkListSegments(remoteLogSegmentLifecycleManager, 0,
segmentMetadata);
- } finally {
- Utils.closeQuietly(remoteLogSegmentLifecycleManager,
"RemoteLogSegmentLifecycleManager");
+ checkListSegments(metadataManager, 0, segmentMetadata);
}
}
- @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
- @MethodSource("remoteLogSegmentLifecycleManagers")
- public void
testCacheSegmentWithDeleteSegmentStartedState(RemoteLogSegmentLifecycleManager
remoteLogSegmentLifecycleManager) throws Exception {
- try {
- remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
-
+ @ClusterTest(brokers = 3)
+ public void testCacheSegmentWithDeleteSegmentStartedState() throws
Exception {
+ try (RemoteLogMetadataManager metadataManager =
createTopicBasedRemoteLogMetadataManager()) {
+
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition),
Collections.emptySet());
// Create a segment and move it to state DELETE_SEGMENT_STARTED,
and check for searching that segment and
// listing the segments.
- RemoteLogSegmentMetadata segmentMetadata =
createSegmentUpdateWithState(remoteLogSegmentLifecycleManager,
-
Collections.singletonMap(0, 201L),
-
201L, 300L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
+ RemoteLogSegmentMetadata segmentMetadata = upsertSegmentState(
+ metadataManager, Collections.singletonMap(0, 201L), 201L,
300L, DELETE_SEGMENT_STARTED);
// Search should not return the above segment as their leader
epoch state is cleared.
- Optional<RemoteLogSegmentMetadata>
segmentMetadataForOffset250Epoch0 =
remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 250);
-
Assertions.assertFalse(segmentMetadataForOffset250Epoch0.isPresent());
-
- checkListSegments(remoteLogSegmentLifecycleManager, 0,
segmentMetadata);
- } finally {
- Utils.closeQuietly(remoteLogSegmentLifecycleManager,
"RemoteLogSegmentLifecycleManager");
+ Optional<RemoteLogSegmentMetadata>
segmentMetadataForOffset250Epoch0 =
+ metadataManager.remoteLogSegmentMetadata(topicIdPartition,
0, 250);
+ assertFalse(segmentMetadataForOffset250Epoch0.isPresent());
+ checkListSegments(metadataManager, 0, segmentMetadata);
}
}
- @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
- @MethodSource("remoteLogSegmentLifecycleManagers")
- public void
testCacheSegmentsWithDeleteSegmentFinishedState(RemoteLogSegmentLifecycleManager
remoteLogSegmentLifecycleManager) throws Exception {
- try {
- remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
-
+ @ClusterTest(brokers = 3)
+ public void testCacheSegmentsWithDeleteSegmentFinishedState() throws
Exception {
+ try (RemoteLogMetadataManager metadataManager =
createTopicBasedRemoteLogMetadataManager()) {
+
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition),
Collections.emptySet());
// Create a segment and move it to state DELETE_SEGMENT_FINISHED,
and check for searching that segment and
// listing the segments.
- RemoteLogSegmentMetadata segmentMetadata =
createSegmentUpdateWithState(remoteLogSegmentLifecycleManager,
-
Collections.singletonMap(0, 301L),
-
301L, 400L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
+ RemoteLogSegmentMetadata segmentMetadata = upsertSegmentState(
+ metadataManager, Collections.singletonMap(0, 301L), 301L,
400L, DELETE_SEGMENT_STARTED);
// Search should not return the above segment as their leader
epoch state is cleared.
-
Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0,
350).isPresent());
+
assertFalse(metadataManager.remoteLogSegmentMetadata(topicIdPartition, 0,
350).isPresent());
- RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
-
time.milliseconds(),
-
Optional.empty(),
-
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED,
-
BROKER_ID_1);
-
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
+ RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(
+ segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
Optional.empty(),
+ DELETE_SEGMENT_FINISHED, brokerId1);
+
metadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
+
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadataUpdate(segmentMetadataUpdate);
// listRemoteLogSegments(0) and listRemoteLogSegments() should not
contain the above segment.
-
Assertions.assertFalse(remoteLogSegmentLifecycleManager.listRemoteLogSegments(0).hasNext());
-
Assertions.assertFalse(remoteLogSegmentLifecycleManager.listAllRemoteLogSegments().hasNext());
- } finally {
- Utils.closeQuietly(remoteLogSegmentLifecycleManager,
"RemoteLogSegmentLifecycleManager");
+
assertFalse(metadataManager.listRemoteLogSegments(topicIdPartition,
0).hasNext());
+
assertFalse(metadataManager.listRemoteLogSegments(topicIdPartition).hasNext());
}
}
- @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
- @MethodSource("remoteLogSegmentLifecycleManagers")
- public void testCacheListSegments(RemoteLogSegmentLifecycleManager
remoteLogSegmentLifecycleManager) throws Exception {
- try {
- remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
-
+ @ClusterTest(brokers = 3)
+ public void testCacheListSegments() throws Exception {
+ try (RemoteLogMetadataManager metadataManager =
createTopicBasedRemoteLogMetadataManager()) {
+
metadataManager.onPartitionLeadershipChanges(Collections.singleton(topicIdPartition),
Collections.emptySet());
// Create a few segments and add them to the cache.
- RemoteLogSegmentMetadata segment0 =
createSegmentUpdateWithState(remoteLogSegmentLifecycleManager,
Collections.singletonMap(0, 0L), 0,
-
100,
-
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
- RemoteLogSegmentMetadata segment1 =
createSegmentUpdateWithState(remoteLogSegmentLifecycleManager,
Collections.singletonMap(0, 101L), 101,
-
200,
-
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
- Map<Integer, Long> segment2LeaderEpochs = new HashMap<>();
- segment2LeaderEpochs.put(0, 201L);
- segment2LeaderEpochs.put(1, 301L);
- RemoteLogSegmentMetadata segment2 =
createSegmentUpdateWithState(remoteLogSegmentLifecycleManager,
segment2LeaderEpochs, 201, 400,
-
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+ RemoteLogSegmentMetadata segment0 =
upsertSegmentState(metadataManager, Collections.singletonMap(0, 0L),
+ 0, 100, COPY_SEGMENT_FINISHED);
+ RemoteLogSegmentMetadata segment1 =
upsertSegmentState(metadataManager, Collections.singletonMap(0, 101L),
+ 101, 200, COPY_SEGMENT_FINISHED);
+ Map<Integer, Long> leaderEpochSegment2 = new HashMap<>();
+ leaderEpochSegment2.put(0, 201L);
+ leaderEpochSegment2.put(1, 301L);
+ RemoteLogSegmentMetadata segment2 =
upsertSegmentState(metadataManager, leaderEpochSegment2,
+ 201, 400, COPY_SEGMENT_FINISHED);
// listRemoteLogSegments(0) and listAllRemoteLogSegments() should
contain all the above segments.
List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch0 =
Arrays.asList(segment0, segment1, segment2);
-
Assertions.assertTrue(TestUtils.sameElementsWithOrder(remoteLogSegmentLifecycleManager.listRemoteLogSegments(0),
-
expectedSegmentsForEpoch0.iterator()));
-
Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(remoteLogSegmentLifecycleManager.listAllRemoteLogSegments(),
-
expectedSegmentsForEpoch0.iterator()));
+ assertTrue(TestUtils.sameElementsWithOrder(
+ expectedSegmentsForEpoch0.iterator(),
metadataManager.listRemoteLogSegments(topicIdPartition, 0)));
+ assertTrue(TestUtils.sameElementsWithoutOrder(
+ expectedSegmentsForEpoch0.iterator(),
metadataManager.listRemoteLogSegments(topicIdPartition)));
// listRemoteLogSegments(1) should contain only segment2.
List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch1 =
Collections.singletonList(segment2);
-
Assertions.assertTrue(TestUtils.sameElementsWithOrder(remoteLogSegmentLifecycleManager.listRemoteLogSegments(1),
-
expectedSegmentsForEpoch1.iterator()));
- } finally {
- Utils.closeQuietly(remoteLogSegmentLifecycleManager,
"RemoteLogSegmentLifecycleManager");
- }
- }
-
- /**
- * This is a wrapper with {@link TopicBasedRemoteLogMetadataManager}
implementing {@link RemoteLogSegmentLifecycleManager}.
- * This is passed to {@link
#testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager)} to test
- * {@code RemoteLogMetadataCache} for several lifecycle operations.
- * <p>
- * This starts a Kafka cluster with {@link #initialize(Set, boolean)} )}
with {@link #brokerCount()} no of servers. It also
- * creates the remote log metadata topic required for {@code
TopicBasedRemoteLogMetadataManager}. This cluster will
- * be stopped by invoking {@link #close()}.
- */
- static class TopicBasedRemoteLogMetadataManagerWrapper extends
TopicBasedRemoteLogMetadataManagerHarness implements
RemoteLogSegmentLifecycleManager {
-
- private TopicIdPartition topicIdPartition;
-
- @Override
- public synchronized void initialize(TopicIdPartition topicIdPartition)
{
- this.topicIdPartition = topicIdPartition;
- super.initialize(Collections.singleton(topicIdPartition), true);
- }
-
- @Override
- public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata
segmentMetadata) throws RemoteStorageException {
- try {
- // Wait until the segment is added successfully.
-
remoteLogMetadataManager().addRemoteLogSegmentMetadata(segmentMetadata).get();
- } catch (Exception e) {
- throw new RemoteStorageException(e);
- }
- }
-
- @Override
- public void
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate
segmentMetadataUpdate) throws RemoteStorageException {
- try {
- // Wait until the segment is updated successfully.
-
remoteLogMetadataManager().updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
- } catch (Exception e) {
- throw new RemoteStorageException(e);
- }
- }
-
- @Override
- public Optional<Long> highestOffsetForEpoch(int leaderEpoch) throws
RemoteStorageException {
- return
remoteLogMetadataManager().highestOffsetForEpoch(topicIdPartition, leaderEpoch);
- }
-
- @Override
- public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int
leaderEpoch,
-
long offset) throws RemoteStorageException {
- return
remoteLogMetadataManager().remoteLogSegmentMetadata(topicIdPartition,
leaderEpoch, offset);
- }
-
- @Override
- public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(int
leaderEpoch) throws RemoteStorageException {
- return
remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition, leaderEpoch);
- }
-
- @Override
- public Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments()
throws RemoteStorageException {
- return
remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition);
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- }
-
- }
-
- /**
- * This is a wrapper with {@link RemoteLogMetadataCache} implementing
{@link RemoteLogSegmentLifecycleManager}.
- * This is passed to {@link
#testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager)} to test
- * {@code RemoteLogMetadataCache} for several lifecycle operations.
- */
- static class RemoteLogMetadataCacheWrapper implements
RemoteLogSegmentLifecycleManager {
Review Comment:
The TopicBasedRemoteLogMetadataManager uses `RemotePartitionMetadataStore`
which internally uses `RemoteLogMetadataCache` to cache the entries and serve
it for the read query.
Mocking RemoteLogMetadataCache as another implementation for
RemoteLogMetadataManager using the `RemoteLogSegmentLifecycleManager` looks
in-correct to me. We are not asserting/verifying/validating the end-to-end
behavior, so integration tests is not required for this. Instead, we can cover
all/existing cases as unit tests for `RemoteLogMetadataCache`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]