ibessonov commented on code in PR #3207: URL: https://github.com/apache/ignite-3/pull/3207#discussion_r1489115706
########## modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java: ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.distributed; + +import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW; +import static org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.lang.SafeTimeReorderException; +import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.network.StaticNodeFinder; +import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftGroupEventsListener; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.raft.server.RaftGroupOptions; +import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.replicator.TestReplicationGroupId; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; +import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; +import org.apache.ignite.internal.table.distributed.raft.PartitionListener; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.storage.state.TxStateStorage; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Replica safeTime propagation tests. + */ +@ExtendWith(ConfigurationExtension.class) +public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest { + @InjectConfiguration("mock: { fsync: false }") + private RaftConfiguration raftConfiguration; + + private static final int BASE_PORT = 1234; + + private static final TestReplicationGroupId GROUP_ID = new TestReplicationGroupId("group_1"); + + private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); + + private static final StaticNodeFinder NODE_FINDER = new StaticNodeFinder( + IntStream.range(BASE_PORT, BASE_PORT + 5) + .mapToObj(p -> new NetworkAddress("localhost", p)) + .collect(Collectors.toList()) + ); + + private int port = BASE_PORT; + + private Map<String, PartialNode> cluster; + + @AfterEach + public void after() throws Exception { + for (PartialNode partialNode : cluster.values()) { + try { + partialNode.stop(); + } catch (NodeStoppingException ignored) { + // No-op, multiple stop. + } + } + } + + /** + * Test verifies that a new leader will reject a command with safeTime less than previously applied within old leader. + * <ol> + * <li>Start three nodes and a raft group with three peers.</li> + * <li>Send command with safe time X.</li> + * <li>Stop the leader - the only node that actually do safeTime watermark validation within onBeforeApply.</li> + * <li>Send command with safe time less than X to the new leader and verify that SafeTimeReorderException is thrown.</li> + * </ol> + */ + @Test + public void testSafeTimeReorderingOnLeaderReElection() throws Exception { + // Start three nodes and a raft group with three peers. + { + cluster = Set.of("node1", "node2", "node3").parallelStream().collect(Collectors.toMap(Function.identity(), PartialNode::new)); + + startCluster(cluster); + } + + PartialNode someNode = cluster.values().iterator().next(); + + RaftGroupService raftClient = someNode.raftClient; + + assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); + + long firstSafeTime = calculateSafeTime(someNode.clock); + + // Send command with safe time X. + sendSafeTimeSyncCommand(raftClient, firstSafeTime, false); + + // Stop the leader - the only node that actually do safeTime watermark validation within onBeforeApply. + assertNotNull(raftClient.leader()); + + PartialNode nodeTopStop = cluster.get(raftClient.leader().consistentId()); + + assertNotNull(nodeTopStop); + + nodeTopStop.stop(); + + // Select alive raft client + Optional<PartialNode> aliveNode = cluster.values().stream().filter(node -> !node.nodeName.equals(nodeTopStop.nodeName)).findFirst(); + + assertTrue(aliveNode.isPresent()); + + RaftGroupService anotherClient = aliveNode.get().raftClient; + + // Send command with safe time less than previously applied to the new leader and verify that SafeTimeReorderException is thrown. + sendSafeTimeSyncCommand(anotherClient, firstSafeTime - 1, true); + + sendSafeTimeSyncCommand(anotherClient, calculateSafeTime(aliveNode.get().clock), false); + } + + /** + * Test verifies that a leader will reject a command with safeTime less than previously applied within leader restart. + * <ol> + * <li>Start two and a raft group with two peer.</li> + * <li>Send command with safe time X.</li> + * <li>Restart the cluster.</li> + * <li>Send command with safe time less than previously applied to the leader before the restart + * and verify that SafeTimeReorderException is thrown.</li> + * </ol> + */ + @Test + public void testSafeTimeReorderingOnLeaderRestart() throws Exception { + // Start two node and a raft group with two peer. + { + cluster = Set.of("node1", "node2").parallelStream().collect(Collectors.toMap(Function.identity(), PartialNode::new)); + + startCluster(cluster); + } + + PartialNode someNode = cluster.values().iterator().next(); + + RaftGroupService raftClient = someNode.raftClient; + + assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); + + long firstSafeTime = calculateSafeTime(someNode.clock); + + // Send command with safe time X. + sendSafeTimeSyncCommand(raftClient, firstSafeTime, false); + + // Stop all nodes + for (PartialNode node : cluster.values() + ) { + node.stop(); + } + + // And restart. + startCluster(cluster); + + // Send command with safe time less than previously applied to the leader before the restart + // and verify that SafeTimeReorderException is thrown. + sendSafeTimeSyncCommand(someNode.raftClient, firstSafeTime - 1, true); + + sendSafeTimeSyncCommand(someNode.raftClient, calculateSafeTime(someNode.clock), false); + } + + private void startCluster(Map<String, PartialNode> cluster) throws Exception { + Collection<CompletableFuture<Void>> startingFutures = new ArrayList<>(cluster.size()); + for (PartialNode node : cluster.values()) { + startingFutures.add(node.start()); + } + + CompletableFuture<Void> clusterReadyFuture = CompletableFuture.allOf(startingFutures.toArray(CompletableFuture[]::new)); + + assertThat(clusterReadyFuture, willCompleteSuccessfully()); + } + + private static void sendSafeTimeSyncCommand( + RaftGroupService raftClient, + long safeTime, + boolean expectSafeTimeReorderException + ) { + CompletableFuture<Object> safeTimeCommandFuture = raftClient.run( + REPLICA_MESSAGES_FACTORY + .safeTimeSyncCommand() + .safeTimeLong(safeTime) + .build() + ); + + if (expectSafeTimeReorderException) { + assertThat(safeTimeCommandFuture, CompletableFutureExceptionMatcher.willThrow(SafeTimeReorderException.class)); Review Comment: Please import this static method :) ########## modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java: ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.distributed; + +import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW; +import static org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.lang.SafeTimeReorderException; +import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.network.StaticNodeFinder; +import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftGroupEventsListener; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.raft.server.RaftGroupOptions; +import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.replicator.TestReplicationGroupId; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; +import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; +import org.apache.ignite.internal.table.distributed.raft.PartitionListener; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.storage.state.TxStateStorage; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Replica safeTime propagation tests. + */ +@ExtendWith(ConfigurationExtension.class) +public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest { + @InjectConfiguration("mock: { fsync: false }") + private RaftConfiguration raftConfiguration; + + private static final int BASE_PORT = 1234; + + private static final TestReplicationGroupId GROUP_ID = new TestReplicationGroupId("group_1"); + + private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); + + private static final StaticNodeFinder NODE_FINDER = new StaticNodeFinder( + IntStream.range(BASE_PORT, BASE_PORT + 5) + .mapToObj(p -> new NetworkAddress("localhost", p)) + .collect(Collectors.toList()) + ); + + private int port = BASE_PORT; + + private Map<String, PartialNode> cluster; + + @AfterEach + public void after() throws Exception { + for (PartialNode partialNode : cluster.values()) { + try { + partialNode.stop(); + } catch (NodeStoppingException ignored) { + // No-op, multiple stop. + } + } + } + + /** + * Test verifies that a new leader will reject a command with safeTime less than previously applied within old leader. + * <ol> + * <li>Start three nodes and a raft group with three peers.</li> + * <li>Send command with safe time X.</li> + * <li>Stop the leader - the only node that actually do safeTime watermark validation within onBeforeApply.</li> + * <li>Send command with safe time less than X to the new leader and verify that SafeTimeReorderException is thrown.</li> + * </ol> + */ + @Test + public void testSafeTimeReorderingOnLeaderReElection() throws Exception { + // Start three nodes and a raft group with three peers. + { + cluster = Set.of("node1", "node2", "node3").parallelStream().collect(Collectors.toMap(Function.identity(), PartialNode::new)); + + startCluster(cluster); + } + + PartialNode someNode = cluster.values().iterator().next(); + + RaftGroupService raftClient = someNode.raftClient; + + assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); + + long firstSafeTime = calculateSafeTime(someNode.clock); + + // Send command with safe time X. + sendSafeTimeSyncCommand(raftClient, firstSafeTime, false); + + // Stop the leader - the only node that actually do safeTime watermark validation within onBeforeApply. + assertNotNull(raftClient.leader()); + + PartialNode nodeTopStop = cluster.get(raftClient.leader().consistentId()); + + assertNotNull(nodeTopStop); + + nodeTopStop.stop(); + + // Select alive raft client Review Comment: ```suggestion // Select alive raft client. ``` ########## modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java: ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.distributed; + +import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW; +import static org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.lang.SafeTimeReorderException; +import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.network.StaticNodeFinder; +import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftGroupEventsListener; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.raft.server.RaftGroupOptions; +import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.replicator.TestReplicationGroupId; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; +import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; +import org.apache.ignite.internal.table.distributed.raft.PartitionListener; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.storage.state.TxStateStorage; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Replica safeTime propagation tests. + */ +@ExtendWith(ConfigurationExtension.class) +public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest { + @InjectConfiguration("mock: { fsync: false }") + private RaftConfiguration raftConfiguration; + + private static final int BASE_PORT = 1234; + + private static final TestReplicationGroupId GROUP_ID = new TestReplicationGroupId("group_1"); + + private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); + + private static final StaticNodeFinder NODE_FINDER = new StaticNodeFinder( + IntStream.range(BASE_PORT, BASE_PORT + 5) + .mapToObj(p -> new NetworkAddress("localhost", p)) + .collect(Collectors.toList()) + ); + + private int port = BASE_PORT; + + private Map<String, PartialNode> cluster; + + @AfterEach + public void after() throws Exception { + for (PartialNode partialNode : cluster.values()) { + try { + partialNode.stop(); + } catch (NodeStoppingException ignored) { + // No-op, multiple stop. + } + } + } + + /** + * Test verifies that a new leader will reject a command with safeTime less than previously applied within old leader. + * <ol> + * <li>Start three nodes and a raft group with three peers.</li> + * <li>Send command with safe time X.</li> + * <li>Stop the leader - the only node that actually do safeTime watermark validation within onBeforeApply.</li> + * <li>Send command with safe time less than X to the new leader and verify that SafeTimeReorderException is thrown.</li> + * </ol> + */ + @Test + public void testSafeTimeReorderingOnLeaderReElection() throws Exception { + // Start three nodes and a raft group with three peers. + { + cluster = Set.of("node1", "node2", "node3").parallelStream().collect(Collectors.toMap(Function.identity(), PartialNode::new)); + + startCluster(cluster); + } + + PartialNode someNode = cluster.values().iterator().next(); + + RaftGroupService raftClient = someNode.raftClient; + + assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); + + long firstSafeTime = calculateSafeTime(someNode.clock); + + // Send command with safe time X. + sendSafeTimeSyncCommand(raftClient, firstSafeTime, false); + + // Stop the leader - the only node that actually do safeTime watermark validation within onBeforeApply. + assertNotNull(raftClient.leader()); + + PartialNode nodeTopStop = cluster.get(raftClient.leader().consistentId()); + + assertNotNull(nodeTopStop); + + nodeTopStop.stop(); + + // Select alive raft client + Optional<PartialNode> aliveNode = cluster.values().stream().filter(node -> !node.nodeName.equals(nodeTopStop.nodeName)).findFirst(); + + assertTrue(aliveNode.isPresent()); + + RaftGroupService anotherClient = aliveNode.get().raftClient; + + // Send command with safe time less than previously applied to the new leader and verify that SafeTimeReorderException is thrown. + sendSafeTimeSyncCommand(anotherClient, firstSafeTime - 1, true); + + sendSafeTimeSyncCommand(anotherClient, calculateSafeTime(aliveNode.get().clock), false); + } + + /** + * Test verifies that a leader will reject a command with safeTime less than previously applied within leader restart. + * <ol> + * <li>Start two and a raft group with two peer.</li> + * <li>Send command with safe time X.</li> + * <li>Restart the cluster.</li> + * <li>Send command with safe time less than previously applied to the leader before the restart + * and verify that SafeTimeReorderException is thrown.</li> + * </ol> + */ + @Test + public void testSafeTimeReorderingOnLeaderRestart() throws Exception { + // Start two node and a raft group with two peer. + { + cluster = Set.of("node1", "node2").parallelStream().collect(Collectors.toMap(Function.identity(), PartialNode::new)); + + startCluster(cluster); + } + + PartialNode someNode = cluster.values().iterator().next(); + + RaftGroupService raftClient = someNode.raftClient; + + assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); + + long firstSafeTime = calculateSafeTime(someNode.clock); + + // Send command with safe time X. + sendSafeTimeSyncCommand(raftClient, firstSafeTime, false); + + // Stop all nodes + for (PartialNode node : cluster.values() + ) { + node.stop(); + } + + // And restart. + startCluster(cluster); + + // Send command with safe time less than previously applied to the leader before the restart + // and verify that SafeTimeReorderException is thrown. + sendSafeTimeSyncCommand(someNode.raftClient, firstSafeTime - 1, true); + + sendSafeTimeSyncCommand(someNode.raftClient, calculateSafeTime(someNode.clock), false); + } + + private void startCluster(Map<String, PartialNode> cluster) throws Exception { + Collection<CompletableFuture<Void>> startingFutures = new ArrayList<>(cluster.size()); + for (PartialNode node : cluster.values()) { + startingFutures.add(node.start()); + } + + CompletableFuture<Void> clusterReadyFuture = CompletableFuture.allOf(startingFutures.toArray(CompletableFuture[]::new)); + + assertThat(clusterReadyFuture, willCompleteSuccessfully()); + } + + private static void sendSafeTimeSyncCommand( + RaftGroupService raftClient, + long safeTime, + boolean expectSafeTimeReorderException + ) { + CompletableFuture<Object> safeTimeCommandFuture = raftClient.run( + REPLICA_MESSAGES_FACTORY + .safeTimeSyncCommand() + .safeTimeLong(safeTime) + .build() + ); + + if (expectSafeTimeReorderException) { + assertThat(safeTimeCommandFuture, CompletableFutureExceptionMatcher.willThrow(SafeTimeReorderException.class)); + } else { + assertThat(safeTimeCommandFuture, willCompleteSuccessfully()); + } + } + + private static long calculateSafeTime(HybridClock clock) { + return clock.now().addPhysicalTime(CLOCK_SKEW).longValue(); + } + + private class PartialNode { + private final String nodeName; + + private final HybridClock clock; + + private ClusterService clusterService; + + private Loza raftManager; + + private RaftGroupService raftClient; + + PartialNode(String nodeName) { + this.nodeName = nodeName; + this.clock = new HybridClockImpl(); + } + + CompletableFuture<Void> start() throws Exception { + clusterService = ClusterServiceTestUtils.clusterService(nodeName, port++, NODE_FINDER); Review Comment: There's a chance that you wanted to call `start()` in parallel. If that's true then `port++` is unsafe ########## modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java: ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.distributed; + +import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW; +import static org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.lang.SafeTimeReorderException; +import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.network.StaticNodeFinder; +import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftGroupEventsListener; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.raft.server.RaftGroupOptions; +import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.replicator.TestReplicationGroupId; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; +import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; +import org.apache.ignite.internal.table.distributed.raft.PartitionListener; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.storage.state.TxStateStorage; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Replica safeTime propagation tests. + */ +@ExtendWith(ConfigurationExtension.class) +public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest { + @InjectConfiguration("mock: { fsync: false }") + private RaftConfiguration raftConfiguration; + + private static final int BASE_PORT = 1234; + + private static final TestReplicationGroupId GROUP_ID = new TestReplicationGroupId("group_1"); + + private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); + + private static final StaticNodeFinder NODE_FINDER = new StaticNodeFinder( + IntStream.range(BASE_PORT, BASE_PORT + 5) + .mapToObj(p -> new NetworkAddress("localhost", p)) + .collect(Collectors.toList()) + ); + + private int port = BASE_PORT; + + private Map<String, PartialNode> cluster; + + @AfterEach + public void after() throws Exception { + for (PartialNode partialNode : cluster.values()) { + try { + partialNode.stop(); + } catch (NodeStoppingException ignored) { + // No-op, multiple stop. + } + } + } + + /** + * Test verifies that a new leader will reject a command with safeTime less than previously applied within old leader. + * <ol> + * <li>Start three nodes and a raft group with three peers.</li> + * <li>Send command with safe time X.</li> + * <li>Stop the leader - the only node that actually do safeTime watermark validation within onBeforeApply.</li> + * <li>Send command with safe time less than X to the new leader and verify that SafeTimeReorderException is thrown.</li> + * </ol> + */ + @Test + public void testSafeTimeReorderingOnLeaderReElection() throws Exception { + // Start three nodes and a raft group with three peers. + { + cluster = Set.of("node1", "node2", "node3").parallelStream().collect(Collectors.toMap(Function.identity(), PartialNode::new)); + + startCluster(cluster); + } + + PartialNode someNode = cluster.values().iterator().next(); + + RaftGroupService raftClient = someNode.raftClient; + + assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); + + long firstSafeTime = calculateSafeTime(someNode.clock); + + // Send command with safe time X. + sendSafeTimeSyncCommand(raftClient, firstSafeTime, false); + + // Stop the leader - the only node that actually do safeTime watermark validation within onBeforeApply. + assertNotNull(raftClient.leader()); + + PartialNode nodeTopStop = cluster.get(raftClient.leader().consistentId()); + + assertNotNull(nodeTopStop); + + nodeTopStop.stop(); + + // Select alive raft client + Optional<PartialNode> aliveNode = cluster.values().stream().filter(node -> !node.nodeName.equals(nodeTopStop.nodeName)).findFirst(); + + assertTrue(aliveNode.isPresent()); + + RaftGroupService anotherClient = aliveNode.get().raftClient; + + // Send command with safe time less than previously applied to the new leader and verify that SafeTimeReorderException is thrown. + sendSafeTimeSyncCommand(anotherClient, firstSafeTime - 1, true); + + sendSafeTimeSyncCommand(anotherClient, calculateSafeTime(aliveNode.get().clock), false); + } + + /** + * Test verifies that a leader will reject a command with safeTime less than previously applied within leader restart. + * <ol> + * <li>Start two and a raft group with two peer.</li> + * <li>Send command with safe time X.</li> + * <li>Restart the cluster.</li> + * <li>Send command with safe time less than previously applied to the leader before the restart + * and verify that SafeTimeReorderException is thrown.</li> + * </ol> + */ + @Test + public void testSafeTimeReorderingOnLeaderRestart() throws Exception { + // Start two node and a raft group with two peer. + { + cluster = Set.of("node1", "node2").parallelStream().collect(Collectors.toMap(Function.identity(), PartialNode::new)); Review Comment: I don't think that invoking a constructor required different thread, what was your motivation? ########## modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java: ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.distributed; + +import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW; +import static org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.lang.SafeTimeReorderException; +import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.network.StaticNodeFinder; +import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftGroupEventsListener; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.raft.server.RaftGroupOptions; +import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.replicator.TestReplicationGroupId; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; +import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; +import org.apache.ignite.internal.table.distributed.raft.PartitionListener; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.storage.state.TxStateStorage; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Replica safeTime propagation tests. + */ +@ExtendWith(ConfigurationExtension.class) +public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest { + @InjectConfiguration("mock: { fsync: false }") + private RaftConfiguration raftConfiguration; + + private static final int BASE_PORT = 1234; + + private static final TestReplicationGroupId GROUP_ID = new TestReplicationGroupId("group_1"); + + private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); + + private static final StaticNodeFinder NODE_FINDER = new StaticNodeFinder( + IntStream.range(BASE_PORT, BASE_PORT + 5) + .mapToObj(p -> new NetworkAddress("localhost", p)) + .collect(Collectors.toList()) + ); + + private int port = BASE_PORT; + + private Map<String, PartialNode> cluster; + + @AfterEach + public void after() throws Exception { + for (PartialNode partialNode : cluster.values()) { + try { + partialNode.stop(); + } catch (NodeStoppingException ignored) { + // No-op, multiple stop. + } + } + } + + /** + * Test verifies that a new leader will reject a command with safeTime less than previously applied within old leader. + * <ol> + * <li>Start three nodes and a raft group with three peers.</li> + * <li>Send command with safe time X.</li> + * <li>Stop the leader - the only node that actually do safeTime watermark validation within onBeforeApply.</li> + * <li>Send command with safe time less than X to the new leader and verify that SafeTimeReorderException is thrown.</li> + * </ol> + */ + @Test + public void testSafeTimeReorderingOnLeaderReElection() throws Exception { + // Start three nodes and a raft group with three peers. + { + cluster = Set.of("node1", "node2", "node3").parallelStream().collect(Collectors.toMap(Function.identity(), PartialNode::new)); + + startCluster(cluster); + } + + PartialNode someNode = cluster.values().iterator().next(); + + RaftGroupService raftClient = someNode.raftClient; + + assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); + + long firstSafeTime = calculateSafeTime(someNode.clock); + + // Send command with safe time X. + sendSafeTimeSyncCommand(raftClient, firstSafeTime, false); + + // Stop the leader - the only node that actually do safeTime watermark validation within onBeforeApply. + assertNotNull(raftClient.leader()); + + PartialNode nodeTopStop = cluster.get(raftClient.leader().consistentId()); + + assertNotNull(nodeTopStop); + + nodeTopStop.stop(); + + // Select alive raft client + Optional<PartialNode> aliveNode = cluster.values().stream().filter(node -> !node.nodeName.equals(nodeTopStop.nodeName)).findFirst(); + + assertTrue(aliveNode.isPresent()); + + RaftGroupService anotherClient = aliveNode.get().raftClient; + + // Send command with safe time less than previously applied to the new leader and verify that SafeTimeReorderException is thrown. + sendSafeTimeSyncCommand(anotherClient, firstSafeTime - 1, true); + + sendSafeTimeSyncCommand(anotherClient, calculateSafeTime(aliveNode.get().clock), false); + } + + /** + * Test verifies that a leader will reject a command with safeTime less than previously applied within leader restart. + * <ol> + * <li>Start two and a raft group with two peer.</li> + * <li>Send command with safe time X.</li> + * <li>Restart the cluster.</li> + * <li>Send command with safe time less than previously applied to the leader before the restart + * and verify that SafeTimeReorderException is thrown.</li> + * </ol> + */ + @Test + public void testSafeTimeReorderingOnLeaderRestart() throws Exception { + // Start two node and a raft group with two peer. + { + cluster = Set.of("node1", "node2").parallelStream().collect(Collectors.toMap(Function.identity(), PartialNode::new)); + + startCluster(cluster); + } + + PartialNode someNode = cluster.values().iterator().next(); + + RaftGroupService raftClient = someNode.raftClient; + + assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); + + long firstSafeTime = calculateSafeTime(someNode.clock); + + // Send command with safe time X. + sendSafeTimeSyncCommand(raftClient, firstSafeTime, false); + + // Stop all nodes Review Comment: ```suggestion // Stop all nodes. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
