sanpwc commented on code in PR #3207:
URL: https://github.com/apache/ignite-3/pull/3207#discussion_r1489127659


##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW;
+import static 
org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.lang.SafeTimeReorderException;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.StaticNodeFinder;
+import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.TestReplicationGroupId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Replica safeTime propagation tests.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest {
+    @InjectConfiguration("mock: { fsync: false }")
+    private RaftConfiguration raftConfiguration;
+
+    private static final int BASE_PORT = 1234;
+
+    private static final TestReplicationGroupId GROUP_ID = new 
TestReplicationGroupId("group_1");
+
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    private static final StaticNodeFinder NODE_FINDER = new StaticNodeFinder(
+            IntStream.range(BASE_PORT, BASE_PORT + 5)
+                    .mapToObj(p -> new NetworkAddress("localhost", p))
+                    .collect(Collectors.toList())
+    );
+
+    private int port = BASE_PORT;
+
+    private Map<String, PartialNode> cluster;
+
+    @AfterEach
+    public void after() throws Exception {
+        for (PartialNode partialNode : cluster.values()) {
+            try {
+                partialNode.stop();
+            } catch (NodeStoppingException ignored) {
+                // No-op, multiple stop.
+            }
+        }
+    }
+
+    /**
+     * Test verifies that a new leader will reject a command with safeTime 
less than previously applied within old leader.
+     * <ol>
+     *     <li>Start three nodes and a raft group with three peers.</li>
+     *     <li>Send command with safe time X.</li>
+     *     <li>Stop the leader - the only node that actually do safeTime 
watermark validation within onBeforeApply.</li>
+     *     <li>Send command with safe time less than X to the new leader and 
verify that SafeTimeReorderException is thrown.</li>
+     * </ol>
+     */
+    @Test
+    public void testSafeTimeReorderingOnLeaderReElection() throws Exception {
+        // Start three nodes and a raft group with three peers.
+        {
+            cluster = Set.of("node1", "node2", 
"node3").parallelStream().collect(Collectors.toMap(Function.identity(), 
PartialNode::new));
+
+            startCluster(cluster);
+        }
+
+        PartialNode someNode = cluster.values().iterator().next();
+
+        RaftGroupService raftClient = someNode.raftClient;
+
+        assertThat(raftClient.refreshLeader(), willCompleteSuccessfully());
+
+        long firstSafeTime = calculateSafeTime(someNode.clock);
+
+        // Send command with safe time X.
+        sendSafeTimeSyncCommand(raftClient, firstSafeTime, false);
+
+        // Stop the leader - the only node that actually do safeTime watermark 
validation within onBeforeApply.
+        assertNotNull(raftClient.leader());
+
+        PartialNode nodeTopStop = 
cluster.get(raftClient.leader().consistentId());
+
+        assertNotNull(nodeTopStop);
+
+        nodeTopStop.stop();
+
+        // Select alive raft client
+        Optional<PartialNode> aliveNode = 
cluster.values().stream().filter(node -> 
!node.nodeName.equals(nodeTopStop.nodeName)).findFirst();
+
+        assertTrue(aliveNode.isPresent());
+
+        RaftGroupService anotherClient = aliveNode.get().raftClient;
+
+        // Send command with safe time less than previously applied to the new 
leader and verify that SafeTimeReorderException is thrown.
+        sendSafeTimeSyncCommand(anotherClient, firstSafeTime - 1, true);
+
+        sendSafeTimeSyncCommand(anotherClient, 
calculateSafeTime(aliveNode.get().clock), false);
+    }
+
+    /**
+     * Test verifies that a leader will reject a command with safeTime less 
than previously applied within leader restart.
+     * <ol>
+     *     <li>Start two and a raft group with two peer.</li>
+     *     <li>Send command with safe time X.</li>
+     *     <li>Restart the cluster.</li>
+     *     <li>Send command with safe time less than previously applied to the 
leader before the restart
+     *     and verify that SafeTimeReorderException is thrown.</li>
+     * </ol>
+     */
+    @Test
+    public void testSafeTimeReorderingOnLeaderRestart() throws Exception {
+        // Start two node and a raft group with two peer.
+        {
+            cluster = Set.of("node1", 
"node2").parallelStream().collect(Collectors.toMap(Function.identity(), 
PartialNode::new));
+
+            startCluster(cluster);
+        }
+
+        PartialNode someNode = cluster.values().iterator().next();
+
+        RaftGroupService raftClient = someNode.raftClient;
+
+        assertThat(raftClient.refreshLeader(), willCompleteSuccessfully());
+
+        long firstSafeTime = calculateSafeTime(someNode.clock);
+
+        // Send command with safe time X.
+        sendSafeTimeSyncCommand(raftClient, firstSafeTime, false);
+
+        // Stop all nodes
+        for (PartialNode node : cluster.values()
+        ) {
+            node.stop();
+        }
+
+        // And restart.
+        startCluster(cluster);
+
+        // Send command with safe time less than previously applied to the 
leader before the restart
+        // and verify that SafeTimeReorderException is thrown.
+        sendSafeTimeSyncCommand(someNode.raftClient, firstSafeTime - 1, true);
+
+        sendSafeTimeSyncCommand(someNode.raftClient, 
calculateSafeTime(someNode.clock), false);
+    }
+
+    private void startCluster(Map<String, PartialNode> cluster) throws 
Exception {
+        Collection<CompletableFuture<Void>> startingFutures = new 
ArrayList<>(cluster.size());
+        for (PartialNode node : cluster.values()) {
+            startingFutures.add(node.start());
+        }
+
+        CompletableFuture<Void> clusterReadyFuture = 
CompletableFuture.allOf(startingFutures.toArray(CompletableFuture[]::new));
+
+        assertThat(clusterReadyFuture, willCompleteSuccessfully());
+    }
+
+    private static void sendSafeTimeSyncCommand(
+            RaftGroupService raftClient,
+            long safeTime,
+            boolean expectSafeTimeReorderException
+    ) {
+        CompletableFuture<Object> safeTimeCommandFuture = raftClient.run(
+                REPLICA_MESSAGES_FACTORY
+                        .safeTimeSyncCommand()
+                        .safeTimeLong(safeTime)
+                        .build()
+        );
+
+        if (expectSafeTimeReorderException) {
+            assertThat(safeTimeCommandFuture, 
CompletableFutureExceptionMatcher.willThrow(SafeTimeReorderException.class));

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to