alex-plekhanov commented on a change in pull request #9539:
URL: https://github.com/apache/ignite/pull/9539#discussion_r747573383
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
##########
@@ -277,7 +280,8 @@ protected void ensureCacheAbsent(CacheConfiguration<?, ?>
ccfg) throws IgniteChe
protected <K, V> CacheConfiguration<K, V>
txCacheConfig(CacheConfiguration<K, V> ccfg) {
return ccfg.setCacheMode(CacheMode.PARTITIONED)
.setBackups(2)
- .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setAffinity(new RendezvousAffinityFunction(false, 4));
Review comment:
What's wrong with default partitions count? Only 4 partitions looks too
small, especially when there are more than 4 nodes in the cluster.
##########
File path:
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
##########
@@ -146,7 +147,12 @@ public void testClusterSnapshotRestoreOnBiggerTopology()
throws Exception {
awaitPartitionMapExchange();
- assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(),
CACHE_KEYS_RANGE);
+ // On one node the index will be copied right from snapshot, on the
other it will be rebuilt.
Review comment:
Why? We restore 2 nodes snapshot on 4 nodes cluster, there can be any
distribution for partitions, not exactly 1:1.
Where did we check this statement? Here we just wait for index rebuild
futures if any exist.
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
+import static
org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED;
+import static
org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class IgniteSnapshotRestoreFromRemoteTest extends
IgniteClusterSnapshotRestoreBaseTest {
+ /** */
+ private static final String FIRST_CLUSTER_PREFIX = "one_";
+
+ /** */
+ private static final String SECOND_CLUSTER_PREFIX = "two_";
+
+ /** {@code true} if snapshot parts has been initialized on test-class
startup. */
+ private static boolean inited;
+
+ /** Snapshot parts on dedicated cluster. Each part has its own local
directory. */
+ private static final Set<Path> snpParts = new HashSet<>();
+
+ /** */
+ private static final Function<String, BiFunction<Integer,
IgniteConfiguration, String>> CLUSTER_DIR =
+ new Function<String, BiFunction<Integer, IgniteConfiguration,
String>>() {
+ @Override public BiFunction<Integer, IgniteConfiguration, String>
apply(String prefix) {
+ return (id, cfg) ->
Paths.get(defaultWorkDirectory().toString(),
+ prefix +
U.maskForFileName(cfg.getIgniteInstanceName())).toString();
+ }
+ };
+
+ /** Cache value builder. */
+ private final Function<Integer, Object> valBuilder = String::valueOf;
+
+ /** {@inheritDoc} */
+ @Override protected Function<Integer, Object> valueBuilder() {
+ return valBuilder;
+ }
Review comment:
Redundant
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
+import static
org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED;
+import static
org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class IgniteSnapshotRestoreFromRemoteTest extends
IgniteClusterSnapshotRestoreBaseTest {
+ /** */
+ private static final String FIRST_CLUSTER_PREFIX = "one_";
+
+ /** */
+ private static final String SECOND_CLUSTER_PREFIX = "two_";
+
+ /** {@code true} if snapshot parts has been initialized on test-class
startup. */
+ private static boolean inited;
+
+ /** Snapshot parts on dedicated cluster. Each part has its own local
directory. */
+ private static final Set<Path> snpParts = new HashSet<>();
+
+ /** */
+ private static final Function<String, BiFunction<Integer,
IgniteConfiguration, String>> CLUSTER_DIR =
+ new Function<String, BiFunction<Integer, IgniteConfiguration,
String>>() {
+ @Override public BiFunction<Integer, IgniteConfiguration, String>
apply(String prefix) {
+ return (id, cfg) ->
Paths.get(defaultWorkDirectory().toString(),
+ prefix +
U.maskForFileName(cfg.getIgniteInstanceName())).toString();
+ }
+ };
+
+ /** Cache value builder. */
+ private final Function<Integer, Object> valBuilder = String::valueOf;
+
+ /** {@inheritDoc} */
+ @Override protected Function<Integer, Object> valueBuilder() {
+ return valBuilder;
+ }
+
+ /** @throws Exception If fails. */
+ @Before
+ public void prepareDedicatedSnapshot() throws Exception {
+ if (!inited) {
+ cleanupDedicatedPersistenceDirs(FIRST_CLUSTER_PREFIX);
+
+ CacheConfiguration<Integer, Object> cacheCfg1 =
+ txCacheConfig(new CacheConfiguration<Integer,
Object>(CACHE1)).setGroupName(SHARED_GRP);
+
+ CacheConfiguration<Integer, Object> cacheCfg2 =
+ txCacheConfig(new CacheConfiguration<Integer,
Object>(CACHE2)).setGroupName(SHARED_GRP);
+
+ IgniteEx ignite =
startDedicatedGridsWithCache(FIRST_CLUSTER_PREFIX, 6, CACHE_KEYS_RANGE,
valBuilder,
+ dfltCacheCfg.setBackups(0), cacheCfg1, cacheCfg2);
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+ awaitPartitionMapExchange();
+ stopAllGrids();
+
+ snpParts.addAll(findSnapshotParts(FIRST_CLUSTER_PREFIX,
SNAPSHOT_NAME));
+
+ inited = true;
+ }
+
+ beforeTestSnapshot();
+ cleanupDedicatedPersistenceDirs(SECOND_CLUSTER_PREFIX);
+ }
+
+ /** @throws Exception If fails. */
+ @After
+ public void afterSwitchSnapshot() throws Exception {
+ afterTestSnapshot();
+ cleanupDedicatedPersistenceDirs(SECOND_CLUSTER_PREFIX);
+ }
+
+ /** */
+ @AfterClass
+ public static void cleanupSnapshot() {
+ snpParts.forEach(U::delete);
+ cleanupDedicatedPersistenceDirs(FIRST_CLUSTER_PREFIX);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testRestoreAllGroups() throws Exception {
+ IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+ scc.cluster().state(ClusterState.ACTIVE);
+
+ copyAndShuffle(snpParts, G.allGrids());
+
+ grid(0).cache(DEFAULT_CACHE_NAME).destroy();
+
+ // Restore all cache groups.
+ grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
+
+ awaitPartitionMapExchange(true, true, null, true);
+
+ assertCacheKeys(scc.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+ assertCacheKeys(scc.cache(CACHE1), CACHE_KEYS_RANGE);
+ assertCacheKeys(scc.cache(CACHE2), CACHE_KEYS_RANGE);
+
+ waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED,
EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testSnapshotCachesStoppedIfLoadingFailOnRemote() throws
Exception {
+ IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+ scc.cluster().state(ClusterState.ACTIVE);
+
+ copyAndShuffle(snpParts, G.allGrids());
+
+ grid(0).cache(DEFAULT_CACHE_NAME).destroy();
+
+ IgniteSnapshotManager mgr = snp(grid(1));
+ mgr.remoteSnapshotSenderFactory(new BiFunction<String, UUID,
SnapshotSender>() {
+ @Override public SnapshotSender apply(String s, UUID uuid) {
+ return new DelegateSnapshotSender(log,
mgr.snapshotExecutorService(), mgr.remoteSnapshotSenderFactory(s, uuid)) {
+ @Override public void sendPart0(File part, String
cacheDirName, GroupPartitionId pair, Long length) {
+ if (partId(part.getName()) > 0)
+ throw new IgniteException("Test exception.
Uploading partition file failed: " + pair);
+
+ super.sendPart0(part, cacheDirName, pair, length);
+ }
+ };
+ }
+ });
+
+ IgniteFuture<?> fut =
grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null);
+
+ GridTestUtils.assertThrowsAnyCause(log,
+ () -> fut.get(TIMEOUT),
+ IgniteException.class,
+ "Test exception. Uploading partition file failed");
+ assertNull(scc.cache(DEFAULT_CACHE_NAME));
+ ensureCacheAbsent(dfltCacheCfg);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testSnapshotCachesStoppedIfNodeCrashed() throws Exception {
+ CacheConfiguration<?, ?> ccfg0 = dfltCacheCfg;
+ dfltCacheCfg = null;
+
+ IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 3);
+ scc.cluster().state(ClusterState.ACTIVE);
+
+ copyAndShuffle(snpParts, G.allGrids());
+
+ TestRecordingCommunicationSpi spi =
TestRecordingCommunicationSpi.spi(grid(2));
+
+ IgniteFuture<Void> fut = waitForBlockOnRestore(spi,
RESTORE_CACHE_GROUP_SNAPSHOT_START, DEFAULT_CACHE_NAME);
+ IgniteInternalFuture<?> stopFut = runAsync(() -> stopGrid(2, true));
+
+ GridTestUtils.assertThrowsAnyCause(
+ log,
+ () -> fut.get(TIMEOUT),
+ ClusterTopologyCheckedException.class,
+ "Required node has left the cluster"
+ );
+
+ stopFut.get(TIMEOUT);
+
+ awaitPartitionMapExchange();
+ ensureCacheAbsent(ccfg0);
+
+ Ignite g3 = startDedicatedGrid(SECOND_CLUSTER_PREFIX, 2);
+
+ awaitPartitionMapExchange();
+ assertNull(g3.cache(DEFAULT_CACHE_NAME));
+ ensureCacheAbsent(ccfg0);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testSnapshotRestoreDuringForceReassignment() throws Exception {
+ String locCacheName = "IgniteTestCache";
+ dfltCacheCfg = null;
+
+ IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+ scc.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Integer, Object> cache = scc.getOrCreateCache(new
CacheConfiguration<Integer, Object>(locCacheName)
+ .setAffinity(new RendezvousAffinityFunction(false,
8)).setBackups(2));
+
+ for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+ cache.put(i, valBuilder.apply(i));
+
+ forceCheckpoint();
+ awaitPartitionMapExchange();
+
+ TestRecordingCommunicationSpi spi0 =
TestRecordingCommunicationSpi.spi(grid(0));
+ spi0.blockMessages((node, msg) -> msg instanceof
GridDhtPartitionSupplyMessage);
+
+ startDedicatedGrid(SECOND_CLUSTER_PREFIX, 2);
+
+ resetBaselineTopology();
+ spi0.waitForBlocked();
+
+ copyAndShuffle(snpParts, G.allGrids());
+ IgniteFuture<Void> fut = waitForBlockOnRestore(spi0,
RESTORE_CACHE_GROUP_SNAPSHOT_START, DEFAULT_CACHE_NAME);
+
+ IgniteFuture<?> forceRebFut = cache.rebalance();
Review comment:
This method will be removed soon. And AFAIK this method only triggers
rebalance when it's delayed. You will get the same result even without waiting
for this future.
Also, perhaps this test was left from the old implementation, I don't
understand how the rebalance related to the current implementation.
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
##########
@@ -623,7 +615,7 @@ public void testNodeFailDuringFilesCopy() throws Exception {
IgniteFuture<Void> fut =
grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME,
Collections.singleton(DEFAULT_CACHE_NAME));
- stopFut.get(TIMEOUT);
+ stopFut.get();
Review comment:
Why timeout was removed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]