Mmuzaf commented on a change in pull request #7607: IGNITE-11073: Create consistent partitions copy on each cluster node URL: https://github.com/apache/ignite/pull/7607#discussion_r409741526
########## File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java ########## @@ -0,0 +1,770 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.file.OpenOption; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.persistence.CheckpointState; +import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; +import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; +import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; + +import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir; +import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.resolveBinaryWorkDir; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirName; +import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.CP_SNAPSHOT_REASON; +import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; + +/** + * Default snapshot manager test. + */ +public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest { + /** @throws Exception If fails. */ + @Test + public void testSnapshotLocalPartitions() throws Exception { + // Start grid node with data before each test. + IgniteEx ig = startGridWithCache(dfltCacheCfg, 2048); + + // The following data will be included into checkpoint. + for (int i = 2048; i < 4096; i++) + ig.cache(DEFAULT_CACHE_NAME).put(i, new TestOrderItem(i, i)); + + for (int i = 4096; i < 8192; i++) { + ig.cache(DEFAULT_CACHE_NAME).put(i, new TestOrderItem(i, i) { + @Override public String toString() { + return "_" + super.toString(); + } + }); + } + + GridCacheSharedContext<?, ?> cctx = ig.context().cache().context(); + IgniteSnapshotManager mgr = snp(ig); + + // Collection of pairs group and appropriate cache partition to be snapshot. + IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx, + SNAPSHOT_NAME, + F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null), + mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)); + + snpFut.get(); + + File cacheWorkDir = ((FilePageStoreManager)ig.context() + .cache() + .context() + .pageStore()) + .cacheWorkDir(dfltCacheCfg); + + // Checkpoint forces on cluster deactivation (currently only single node in cluster), + // so we must have the same data in snapshot partitions and those which left + // after node stop. + stopGrid(ig.name()); + + // Calculate CRCs. + IgniteConfiguration cfg = ig.context().config(); + PdsFolderSettings settings = ig.context().pdsFolderResolver().resolveFolders(); + String nodePath = databaseRelativePath(settings.folderName()); + File binWorkDir = resolveBinaryWorkDir(cfg.getWorkDirectory(), settings.folderName()); + File marshWorkDir = mappingFileStoreWorkDir(U.workDirectory(cfg.getWorkDirectory(), cfg.getIgniteHome())); + File snpBinWorkDir = resolveBinaryWorkDir(mgr.snapshotLocalDir(SNAPSHOT_NAME).getAbsolutePath(), settings.folderName()); + File snpMarshWorkDir = mappingFileStoreWorkDir(mgr.snapshotLocalDir(SNAPSHOT_NAME).getAbsolutePath()); + + final Map<String, Integer> origPartCRCs = calculateCRC32Partitions(cacheWorkDir); + final Map<String, Integer> snpPartCRCs = calculateCRC32Partitions( + FilePageStoreManager.cacheWorkDir(U.resolveWorkDirectory(mgr.snapshotLocalDir(SNAPSHOT_NAME) + .getAbsolutePath(), + nodePath, + false), + cacheDirName(dfltCacheCfg))); + + assertEquals("Partitions must have the same CRC after file copying and merging partition delta files", + origPartCRCs, snpPartCRCs); + assertEquals("Binary object mappings must be the same for local node and created snapshot", + calculateCRC32Partitions(binWorkDir), calculateCRC32Partitions(snpBinWorkDir)); + assertEquals("Marshaller meta mast be the same for local node and created snapshot", + calculateCRC32Partitions(marshWorkDir), calculateCRC32Partitions(snpMarshWorkDir)); + + File snpWorkDir = mgr.snapshotTmpDir(); + + assertEquals("Snapshot working directory must be cleaned after usage", 0, snpWorkDir.listFiles().length); + } + + /** + * Test that all partitions are copied successfully even after multiple checkpoints occur during + * the long copy of cache partition files. + * + * Data consistency checked through a test node started right from snapshot directory and all values + * read successes. + * + * @throws Exception If fails. + */ + @Test + public void testSnapshotLocalPartitionMultiCpWithLoad() throws Exception { + int valMultiplier = 2; + CountDownLatch slowCopy = new CountDownLatch(1); + + // Start grid node with data before each test. + IgniteEx ig = startGrid(0); + + ig.cluster().baselineAutoAdjustEnabled(false); + ig.cluster().state(ClusterState.ACTIVE); + GridCacheSharedContext<?, ?> cctx = ig.context().cache().context(); + + for (int i = 0; i < CACHE_KEYS_RANGE; i++) + ig.cache(DEFAULT_CACHE_NAME).put(i, new TestOrderItem(i, i)); + + forceCheckpoint(ig); + + AtomicInteger cntr = new AtomicInteger(); + CountDownLatch ldrLatch = new CountDownLatch(1); + IgniteSnapshotManager mgr = snp(ig); + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)cctx.database(); + + IgniteInternalFuture<?> loadFut = GridTestUtils.runMultiThreadedAsync(() -> { + try { + U.await(ldrLatch); + + while (!Thread.currentThread().isInterrupted()) + ig.cache(DEFAULT_CACHE_NAME).put(cntr.incrementAndGet(), + new TestOrderItem(cntr.incrementAndGet(), cntr.incrementAndGet())); + } + catch (IgniteInterruptedCheckedException e) { + log.warning("Loader has been interrupted", e); + } + }, 5, "cache-loader-"); + + // Register task but not schedule it on the checkpoint. + SnapshotFutureTask snpFutTask = mgr.registerSnapshotTask(SNAPSHOT_NAME, + cctx.localNodeId(), + F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null), + new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) { + @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) { + try { + U.await(slowCopy); + + delegate.sendPart0(part, cacheDirName, pair, length); + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteException(e); + } + } + }); + + db.addCheckpointListener(new DbCheckpointListener() { + /** {@inheritDoc} */ + @Override public void beforeCheckpointBegin(Context ctx) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onMarkCheckpointBegin(Context ctx) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onCheckpointBegin(Context ctx) { + Map<Integer, Set<Integer>> processed = GridTestUtils.getFieldValue(snpFutTask, + SnapshotFutureTask.class, + "processed"); + + if (!processed.isEmpty()) + ldrLatch.countDown(); + } + }); + + try { + snpFutTask.start(); + + // Change data before snapshot creation which must be included into it witch correct value multiplier. Review comment: Fixed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services