chia7712 commented on code in PR #16180:
URL: https://github.com/apache/kafka/pull/16180#discussion_r1624880657
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java:
##########
@@ -197,333 +206,159 @@ public void
testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remot
for (Map.Entry<Integer, Long> entry :
expectedEpochToHighestOffset.entrySet()) {
Integer epoch = entry.getKey();
Long expectedOffset = entry.getValue();
- Optional<Long> offset =
remoteLogSegmentLifecycleManager.highestOffsetForEpoch(epoch);
- log.debug("Fetching highest offset for epoch: {} , returned:
{} , expected: {}", epoch, offset, expectedOffset);
- Assertions.assertEquals(Optional.of(expectedOffset), offset);
+ Optional<Long> offset =
metadataManager.highestOffsetForEpoch(topicIdPartition, epoch);
+ assertEquals(Optional.of(expectedOffset), offset);
}
// Search for non existing leader epoch
- Optional<Long> highestOffsetForEpoch5 =
remoteLogSegmentLifecycleManager.highestOffsetForEpoch(5);
- Assertions.assertFalse(highestOffsetForEpoch5.isPresent());
- } finally {
- Utils.closeQuietly(remoteLogSegmentLifecycleManager,
"RemoteLogSegmentLifecycleManager");
+ Optional<Long> highestOffsetForEpoch5 =
metadataManager.highestOffsetForEpoch(topicIdPartition, 5);
+ assertFalse(highestOffsetForEpoch5.isPresent());
}
}
- private RemoteLogSegmentMetadata
createSegmentUpdateWithState(RemoteLogSegmentLifecycleManager
remoteLogSegmentLifecycleManager,
- Map<Integer,
Long> segmentLeaderEpochs,
- long
startOffset,
- long
endOffset,
-
RemoteLogSegmentState state)
- throws RemoteStorageException {
+ private RemoteLogSegmentMetadata
upsertSegmentState(RemoteLogMetadataManager metadataManager,
+ Map<Integer, Long>
segmentLeaderEpochs,
+ long startOffset,
+ long endOffset,
+ RemoteLogSegmentState
state)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
RemoteLogSegmentId segmentId = new
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
- RemoteLogSegmentMetadata segmentMetadata = new
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L, BROKER_ID_0,
-
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
-
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
-
- RemoteLogSegmentMetadataUpdate segMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
- Optional.empty(),
- state, BROKER_ID_1);
-
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate);
-
+ RemoteLogSegmentMetadata segmentMetadata = new
RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
+ -1L, brokerId0, time.milliseconds(), segSize,
segmentLeaderEpochs);
+ metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
+
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(segmentMetadata);
+
+ RemoteLogSegmentMetadataUpdate segMetadataUpdate = new
RemoteLogSegmentMetadataUpdate(segmentId,
+ time.milliseconds(), Optional.empty(), state, brokerId1);
+
metadataManager.updateRemoteLogSegmentMetadata(segMetadataUpdate).get();
+
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadataUpdate(segMetadataUpdate);
return segmentMetadata.createWithUpdates(segMetadataUpdate);
}
- private static class EpochOffset {
- final int epoch;
- final long offset;
-
- private EpochOffset(int epoch,
- long offset) {
- this.epoch = epoch;
- this.offset = offset;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- EpochOffset that = (EpochOffset) o;
- return epoch == that.epoch && offset == that.offset;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(epoch, offset);
- }
-
- @Override
- public String toString() {
- return "EpochOffset{" +
- "epoch=" + epoch +
- ", offset=" + offset +
- '}';
- }
- }
-
- private static Collection<Arguments> remoteLogSegmentLifecycleManagers() {
- return Arrays.asList(Arguments.of(new RemoteLogMetadataCacheWrapper()),
- Arguments.of(new
TopicBasedRemoteLogMetadataManagerWrapper()));
- }
-
- private void checkListSegments(RemoteLogSegmentLifecycleManager
remoteLogSegmentLifecycleManager,
+ private void checkListSegments(RemoteLogMetadataManager metadataManager,
int leaderEpoch,
- RemoteLogSegmentMetadata expectedSegment)
+ RemoteLogSegmentMetadata expectedMetadata)
throws RemoteStorageException {
// cache.listRemoteLogSegments(leaderEpoch) should contain the above
segment.
- Iterator<RemoteLogSegmentMetadata> segmentsIter =
remoteLogSegmentLifecycleManager.listRemoteLogSegments(leaderEpoch);
- Assertions.assertTrue(segmentsIter.hasNext());
- Assertions.assertEquals(expectedSegment, segmentsIter.next());
+ Iterator<RemoteLogSegmentMetadata> metadataIter =
+ metadataManager.listRemoteLogSegments(topicIdPartition,
leaderEpoch);
+ assertTrue(metadataIter.hasNext());
+ assertEquals(expectedMetadata, metadataIter.next());
// cache.listAllRemoteLogSegments() should contain the above segment.
- Iterator<RemoteLogSegmentMetadata> allSegmentsIter =
remoteLogSegmentLifecycleManager.listAllRemoteLogSegments();
- Assertions.assertTrue(allSegmentsIter.hasNext());
- Assertions.assertEquals(expectedSegment, allSegmentsIter.next());
+ Iterator<RemoteLogSegmentMetadata> allSegmentsIter =
metadataManager.listRemoteLogSegments(topicIdPartition);
+ assertTrue(allSegmentsIter.hasNext());
+ assertEquals(expectedMetadata, allSegmentsIter.next());
}
- @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
- @MethodSource("remoteLogSegmentLifecycleManagers")
- public void
testCacheSegmentWithCopySegmentStartedState(RemoteLogSegmentLifecycleManager
remoteLogSegmentLifecycleManager) throws Exception {
-
- try {
- remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
-
+ @ClusterTest(brokers = 3)
Review Comment:
It seems all test cases require `brokers = 3`. Maybe we can define it by
`@ClusterTestDefaults(brokers = 3)`?
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java:
##########
@@ -16,172 +16,181 @@
*/
package org.apache.kafka.server.log.remote.metadata.storage;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
-import
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Stream;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import static
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
+import static
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.DELETE_SEGMENT_STARTED;
+import static
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.DELETE_SEGMENT_FINISHED;
+@ExtendWith(value = ClusterTestExtensions.class)
+@Tag("integration")
public class RemoteLogSegmentLifecycleTest {
- private static final Logger log =
LoggerFactory.getLogger(RemoteLogSegmentLifecycleTest.class);
- private static final int SEG_SIZE = 1024 * 1024;
- private static final int BROKER_ID_0 = 0;
- private static final int BROKER_ID_1 = 1;
+ int segSize = 1048576;
Review Comment:
Could you please add `private final` to those variables? otherwise, we will
see a minor PR to fix them later :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]