Mmuzaf commented on a change in pull request #7607: IGNITE-11073: Create 
consistent partitions copy on each cluster node
URL: https://github.com/apache/ignite/pull/7607#discussion_r409741526
 
 

 ##########
 File path: 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
 ##########
 @@ -0,0 +1,770 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
+import 
org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
+import static 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.resolveBinaryWorkDir;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirName;
+import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.CP_SNAPSHOT_REASON;
+import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+
+/**
+ * Default snapshot manager test.
+ */
+public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotLocalPartitions() throws Exception {
+        // Start grid node with data before each test.
+        IgniteEx ig = startGridWithCache(dfltCacheCfg, 2048);
+
+        // The following data will be included into checkpoint.
+        for (int i = 2048; i < 4096; i++)
+            ig.cache(DEFAULT_CACHE_NAME).put(i, new TestOrderItem(i, i));
+
+        for (int i = 4096; i < 8192; i++) {
+            ig.cache(DEFAULT_CACHE_NAME).put(i, new TestOrderItem(i, i) {
+                @Override public String toString() {
+                    return "_" + super.toString();
+                }
+            });
+        }
+
+        GridCacheSharedContext<?, ?> cctx = ig.context().cache().context();
+        IgniteSnapshotManager mgr = snp(ig);
+
+        // Collection of pairs group and appropriate cache partition to be 
snapshot.
+        IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx,
+            SNAPSHOT_NAME,
+            F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+            mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME));
+
+        snpFut.get();
+
+        File cacheWorkDir = ((FilePageStoreManager)ig.context()
+            .cache()
+            .context()
+            .pageStore())
+            .cacheWorkDir(dfltCacheCfg);
+
+        // Checkpoint forces on cluster deactivation (currently only single 
node in cluster),
+        // so we must have the same data in snapshot partitions and those 
which left
+        // after node stop.
+        stopGrid(ig.name());
+
+        // Calculate CRCs.
+        IgniteConfiguration cfg = ig.context().config();
+        PdsFolderSettings settings = 
ig.context().pdsFolderResolver().resolveFolders();
+        String nodePath = databaseRelativePath(settings.folderName());
+        File binWorkDir = resolveBinaryWorkDir(cfg.getWorkDirectory(), 
settings.folderName());
+        File marshWorkDir = 
mappingFileStoreWorkDir(U.workDirectory(cfg.getWorkDirectory(), 
cfg.getIgniteHome()));
+        File snpBinWorkDir = 
resolveBinaryWorkDir(mgr.snapshotLocalDir(SNAPSHOT_NAME).getAbsolutePath(), 
settings.folderName());
+        File snpMarshWorkDir = 
mappingFileStoreWorkDir(mgr.snapshotLocalDir(SNAPSHOT_NAME).getAbsolutePath());
+
+        final Map<String, Integer> origPartCRCs = 
calculateCRC32Partitions(cacheWorkDir);
+        final Map<String, Integer> snpPartCRCs = calculateCRC32Partitions(
+            
FilePageStoreManager.cacheWorkDir(U.resolveWorkDirectory(mgr.snapshotLocalDir(SNAPSHOT_NAME)
+                    .getAbsolutePath(),
+                nodePath,
+                false),
+                cacheDirName(dfltCacheCfg)));
+
+        assertEquals("Partitions must have the same CRC after file copying and 
merging partition delta files",
+            origPartCRCs, snpPartCRCs);
+        assertEquals("Binary object mappings must be the same for local node 
and created snapshot",
+            calculateCRC32Partitions(binWorkDir), 
calculateCRC32Partitions(snpBinWorkDir));
+        assertEquals("Marshaller meta mast be the same for local node and 
created snapshot",
+            calculateCRC32Partitions(marshWorkDir), 
calculateCRC32Partitions(snpMarshWorkDir));
+
+        File snpWorkDir = mgr.snapshotTmpDir();
+
+        assertEquals("Snapshot working directory must be cleaned after usage", 
0, snpWorkDir.listFiles().length);
+    }
+
+    /**
+     * Test that all partitions are copied successfully even after multiple 
checkpoints occur during
+     * the long copy of cache partition files.
+     *
+     * Data consistency checked through a test node started right from 
snapshot directory and all values
+     * read successes.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testSnapshotLocalPartitionMultiCpWithLoad() throws Exception {
+        int valMultiplier = 2;
+        CountDownLatch slowCopy = new CountDownLatch(1);
+
+        // Start grid node with data before each test.
+        IgniteEx ig = startGrid(0);
+
+        ig.cluster().baselineAutoAdjustEnabled(false);
+        ig.cluster().state(ClusterState.ACTIVE);
+        GridCacheSharedContext<?, ?> cctx = ig.context().cache().context();
+
+        for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+            ig.cache(DEFAULT_CACHE_NAME).put(i, new TestOrderItem(i, i));
+
+        forceCheckpoint(ig);
+
+        AtomicInteger cntr = new AtomicInteger();
+        CountDownLatch ldrLatch = new CountDownLatch(1);
+        IgniteSnapshotManager mgr = snp(ig);
+        GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)cctx.database();
+
+        IgniteInternalFuture<?> loadFut = 
GridTestUtils.runMultiThreadedAsync(() -> {
+            try {
+                U.await(ldrLatch);
+
+                while (!Thread.currentThread().isInterrupted())
+                    ig.cache(DEFAULT_CACHE_NAME).put(cntr.incrementAndGet(),
+                        new TestOrderItem(cntr.incrementAndGet(), 
cntr.incrementAndGet()));
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                log.warning("Loader has been interrupted", e);
+            }
+        }, 5, "cache-loader-");
+
+        // Register task but not schedule it on the checkpoint.
+        SnapshotFutureTask snpFutTask = mgr.registerSnapshotTask(SNAPSHOT_NAME,
+            cctx.localNodeId(),
+            F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+            new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), 
mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) {
+                @Override public void sendPart0(File part, String 
cacheDirName, GroupPartitionId pair, Long length) {
+                    try {
+                        U.await(slowCopy);
+
+                        delegate.sendPart0(part, cacheDirName, pair, length);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+            });
+
+        db.addCheckpointListener(new DbCheckpointListener() {
+            /** {@inheritDoc} */
+            @Override public void beforeCheckpointBegin(Context ctx) {
+                // No-op.
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onMarkCheckpointBegin(Context ctx) {
+                // No-op.
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onCheckpointBegin(Context ctx) {
+                Map<Integer, Set<Integer>> processed = 
GridTestUtils.getFieldValue(snpFutTask,
+                    SnapshotFutureTask.class,
+                    "processed");
+
+                if (!processed.isEmpty())
+                    ldrLatch.countDown();
+            }
+        });
+
+        try {
+            snpFutTask.start();
+
+            // Change data before snapshot creation which must be included 
into it witch correct value multiplier.
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to