chia7712 commented on code in PR #16180:
URL: https://github.com/apache/kafka/pull/16180#discussion_r1625637333
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java:
##########
@@ -16,172 +16,183 @@
*/
package org.apache.kafka.server.log.remote.metadata.storage;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
-import
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Stream;
+import java.util.concurrent.ExecutionException;
-public class RemoteLogSegmentLifecycleTest {
- private static final Logger log =
LoggerFactory.getLogger(RemoteLogSegmentLifecycleTest.class);
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
- private static final int SEG_SIZE = 1024 * 1024;
- private static final int BROKER_ID_0 = 0;
- private static final int BROKER_ID_1 = 1;
+import static
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
+import static
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.DELETE_SEGMENT_STARTED;
+import static
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.DELETE_SEGMENT_FINISHED;
- private final TopicIdPartition topicIdPartition = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+@ClusterTestDefaults(brokers = 3)
+@ExtendWith(value = ClusterTestExtensions.class)
+@Tag("integration")
+public class RemoteLogSegmentLifecycleTest {
+
+ private final int segSize = 1048576;
+ private final int brokerId0 = 0;
+ private final int brokerId1 = 1;
+ private final Uuid topicId = Uuid.randomUuid();
+ private final TopicPartition tp = new TopicPartition("foo", 0);
+ private final TopicIdPartition topicIdPartition = new
TopicIdPartition(topicId, tp);
private final Time time = new MockTime(1);
Review Comment:
Does it need adjust time to complete tests? If not, maybe we can use
`SystemTime` instead
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java:
##########
@@ -16,172 +16,183 @@
*/
package org.apache.kafka.server.log.remote.metadata.storage;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
-import
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Stream;
+import java.util.concurrent.ExecutionException;
-public class RemoteLogSegmentLifecycleTest {
- private static final Logger log =
LoggerFactory.getLogger(RemoteLogSegmentLifecycleTest.class);
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
- private static final int SEG_SIZE = 1024 * 1024;
- private static final int BROKER_ID_0 = 0;
- private static final int BROKER_ID_1 = 1;
+import static
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
+import static
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.DELETE_SEGMENT_STARTED;
+import static
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.DELETE_SEGMENT_FINISHED;
- private final TopicIdPartition topicIdPartition = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
+@ClusterTestDefaults(brokers = 3)
+@ExtendWith(value = ClusterTestExtensions.class)
+@Tag("integration")
+public class RemoteLogSegmentLifecycleTest {
+
+ private final int segSize = 1048576;
+ private final int brokerId0 = 0;
+ private final int brokerId1 = 1;
+ private final Uuid topicId = Uuid.randomUuid();
+ private final TopicPartition tp = new TopicPartition("foo", 0);
+ private final TopicIdPartition topicIdPartition = new
TopicIdPartition(topicId, tp);
private final Time time = new MockTime(1);
+ private final RemotePartitionMetadataStore spyRemotePartitionMetadataStore
= spy(new RemotePartitionMetadataStore());
+ private final ClusterInstance clusterInstance;
- @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
- @MethodSource("remoteLogSegmentLifecycleManagers")
- public void testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager
remoteLogSegmentLifecycleManager) throws Exception {
- try {
- remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
+ RemoteLogSegmentLifecycleTest(ClusterInstance clusterInstance) { //
Constructor injections
+ this.clusterInstance = clusterInstance;
+ }
+
+ private RemoteLogMetadataManager
createTopicBasedRemoteLogMetadataManager() {
+ return RemoteLogMetadataManagerTestUtils.builder()
+ .bootstrapServers(clusterInstance.bootstrapServers())
+ .startConsumerThread(true)
+
.remoteLogMetadataTopicPartitioner(RemoteLogMetadataTopicPartitioner::new)
Review Comment:
nit: this line is redundant
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]