alex-plekhanov commented on a change in pull request #7607: IGNITE-11073: 
Create consistent partitions copy on each cluster node
URL: https://github.com/apache/ignite/pull/7607#discussion_r408971404
 
 

 ##########
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
 ##########
 @@ -0,0 +1,1986 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSnapshot;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteFeatures;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import 
org.apache.ignite.internal.managers.communication.TransmissionCancelledException;
+import org.apache.ignite.internal.managers.communication.TransmissionHandler;
+import org.apache.ignite.internal.managers.communication.TransmissionMeta;
+import org.apache.ignite.internal.managers.communication.TransmissionPolicy;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.CacheType;
+import 
org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
+import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
+import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.util.GridBusyLock;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.apache.ignite.thread.OomExceptionHandler;
+import org.jetbrains.annotations.Nullable;
+
+import static java.nio.file.StandardOpenOption.READ;
+import static org.apache.ignite.cluster.ClusterState.active;
+import static 
org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_DIRECTORY;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static 
org.apache.ignite.internal.IgniteFeatures.PERSISTENCE_CACHE_SNAPSHOT;
+import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
+import static 
org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
+import static org.apache.ignite.internal.MarshallerContextImpl.saveMappings;
+import static 
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static 
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static 
org.apache.ignite.internal.pagemem.PageIdAllocator.MAX_PARTITION_ID;
+import static 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.resolveBinaryWorkDir;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName;
+import static 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor.DB_DEFAULT_FOLDER;
+import static 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getFlagByPartId;
+import static 
org.apache.ignite.internal.util.IgniteUtils.isLocalNodeCoordinator;
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.END_SNAPSHOT;
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.START_SNAPSHOT;
+
+/**
+ * Internal implementation of snapshot operations over persistence caches.
+ * <p>
+ * There are two major actions available:
+ * <ul>
+ *     <li>Create snapshot of the whole cluster cache groups by triggering PME 
to achieve consistency.</li>
+ *     <li>Create local snapshot of requested cache groups and send it to the 
node which request this operation.
+ *     Cache groups will be transmitted using internal API for transferring 
files. See {@link TransmissionHandler}.</li>
+ * </ul>
+ */
+public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
+    implements IgniteSnapshot, PartitionsExchangeAware, 
MetastorageLifecycleListener {
+    /** File with delta pages suffix. */
+    public static final String DELTA_SUFFIX = ".delta";
+
+    /** File name template consists of delta pages. */
+    public static final String PART_DELTA_TEMPLATE = PART_FILE_TEMPLATE + 
DELTA_SUFFIX;
+
+    /** File name template for index delta pages. */
+    public static final String INDEX_DELTA_NAME = INDEX_FILE_NAME + 
DELTA_SUFFIX;
+
+    /** Text Reason for checkpoint to start snapshot operation. */
+    public static final String CP_SNAPSHOT_REASON = "Checkpoint started to 
enforce snapshot operation: %s";
+
+    /** Name prefix for each remote snapshot operation. */
+    public static final String RMT_SNAPSHOT_PREFIX = "snapshot_";
+
+    /** Default snapshot directory for loading remote snapshots. */
+    public static final String DFLT_SNAPSHOT_TMP_DIR = "snp";
+
+    /** Timeout in millisecond for snapshot operations. */
+    public static final long DFLT_SNAPSHOT_TIMEOUT = 15_000L;
+
+    /** Snapshot in progress error message. */
+    public static final String SNP_IN_PROGRESS_ERR_MSG = "Operation rejected 
due to the snapshot operation in progress.";
+
+    /** Error message to finalize snapshot tasks. */
+    public static final String SNP_NODE_STOPPING_ERR_MSG = "Snapshot has been 
cancelled due to the local node " +
+        "is stopping";
+
+    /** Metastorage key to save currently running snapshot. */
+    public static final String SNP_RUNNING_KEY = "snapshot-running";
+
+    /** Snapshot metrics prefix. */
+    public static final String SNAPSHOT_METRICS = "snapshot";
+
+    /** Prefix for snapshot threads. */
+    private static final String SNAPSHOT_RUNNER_THREAD_PREFIX = 
"snapshot-runner";
+
+    /** Total number of thread to perform local snapshot. */
+    private static final int SNAPSHOT_THREAD_POOL_SIZE = 4;
+
+    /** Default snapshot topic to receive snapshots from remote node. */
+    private static final Object DFLT_INITIAL_SNAPSHOT_TOPIC = 
GridTopic.TOPIC_SNAPSHOT.topic("rmt_snp");
+
+    /** File transmission parameter of cache group id. */
+    private static final String SNP_GRP_ID_PARAM = "grpId";
+
+    /** File transmission parameter of cache partition id. */
+    private static final String SNP_PART_ID_PARAM = "partId";
+
+    /** File transmission parameter of node-sender directory path with its 
consistentId (e.g. db/IgniteNode0). */
+    private static final String SNP_DB_NODE_PATH_PARAM = "dbNodePath";
+
+    /** File transmission parameter of a cache directory with is currently 
sends its partitions. */
+    private static final String SNP_CACHE_DIR_NAME_PARAM = "cacheDirName";
+
+    /** Snapshot parameter name for a file transmission. */
+    private static final String SNP_NAME_PARAM = "snpName";
+
+    /** Total snapshot files count which receiver should expect to receive. */
+    private static final String SNP_PARTITIONS_CNT = "partsCnt";
+
+    /**
+     * Local buffer to perform copy-on-write operations with pages for {@code 
SnapshotFutureTask.PageStoreSerialWriter}s.
+     * It is important to have only only buffer per thread (instead of 
creating each buffer per
+     * each {@code SnapshotFutureTask.PageStoreSerialWriter}) this is 
redundant and can lead to OOM errors. Direct buffer
+     * deallocate only when ByteBuffer is garbage collected, but it can get 
out of off-heap memory before it.
+     */
+    private final ThreadLocal<ByteBuffer> locBuff;
+
+    /** Map of registered cache snapshot processes and their corresponding 
contexts. */
+    private final ConcurrentMap<String, SnapshotFutureTask> locSnpTasks = new 
ConcurrentHashMap<>();
+
+    /** Lock to protect the resources is used. */
+    private final GridBusyLock busyLock = new GridBusyLock();
+
+    /** Requested snapshot from remote node. */
+    private final AtomicReference<RemoteSnapshotFuture> rmtSnpReq = new 
AtomicReference<>();
+
+    /** Mutex used to order cluster snapshot operation progress. */
+    private final Object snpOpMux = new Object();
+
+    /** Take snapshot operation procedure. */
+    private final DistributedProcess<SnapshotOperationRequest, 
SnapshotOperationResponse> startSnpProc;
+
+    /** Check previously performed snapshot operation and delete uncompleted 
files if need. */
+    private final DistributedProcess<SnapshotOperationRequest, 
SnapshotOperationResponse> endSnpProc;
+
+    /** Resolved persistent data storage settings. */
+    private volatile PdsFolderSettings pdsSettings;
+
+    /** Fully initialized metastorage. */
+    private volatile ReadWriteMetastorage metaStorage;
+
+    /** Local snapshot sender factory. */
+    private Function<String, SnapshotSender> locSndrFactory = 
localSnapshotSenderFactory();
+
+    /** Main snapshot directory to save created snapshots. */
+    private volatile File locSnpDir;
+
+    /**
+     * Working directory for loaded snapshots from the remote nodes and storing
+     * temporary partition delta-files of locally started snapshot process.
+     */
+    private File tmpWorkDir;
+
+    /** Factory to working with delta as file storage. */
+    private volatile FileIOFactory ioFactory = new RandomAccessFileIOFactory();
+
+    /** Factory to create page store for restore. */
+    private volatile BiFunction<Integer, Boolean, FilePageStoreFactory> 
storeFactory;
+
+    /** Snapshot thread pool to perform local partition snapshots. */
+    private ExecutorService snpRunner;
+
+    /** System discovery message listener. */
+    private DiscoveryEventListener discoLsnr;
+
+    /** Cluster snapshot operation requested by user. */
+    private ClusterSnapshotFuture clusterSnpFut;
+
+    /** Current snapshot operation on local node. */
+    private volatile SnapshotOperationRequest clusterSnpRq;
+
+    /** {@code true} if recovery process occurred for snapshot. */
+    private volatile boolean recovered;
+
+    /** Last seen cluster snapshot operation. */
+    private volatile ClusterSnapshotFuture lastSeenSnpFut = new 
ClusterSnapshotFuture();
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public IgniteSnapshotManager(GridKernalContext ctx) {
+        locBuff = ThreadLocal.withInitial(() ->
+            
ByteBuffer.allocateDirect(ctx.config().getDataStorageConfiguration().getPageSize())
+                .order(ByteOrder.nativeOrder()));
+
+        startSnpProc = new DistributedProcess<>(ctx, START_SNAPSHOT, 
this::initLocalSnapshotStartStage,
+            this::processLocalSnapshotStartStageResult);
+
+        endSnpProc = new DistributedProcess<>(ctx, END_SNAPSHOT, 
this::initLocalSnapshotEndStage,
+            this::processLocalSnapshotEndStageResult);
+    }
+
+    /**
+     * @param snapshotCacheDir Snapshot directory to store files.
+     * @param partId Cache partition identifier.
+     * @return A file representation.
+     */
+    public static File partDeltaFile(File snapshotCacheDir, int partId) {
+        return new File(snapshotCacheDir, partDeltaFileName(partId));
+    }
+
+    /**
+     * @param partId Partition id.
+     * @return File name of delta partition pages.
+     */
+    public static String partDeltaFileName(int partId) {
+        assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION;
+
+        return partId == INDEX_PARTITION ? INDEX_DELTA_NAME : 
String.format(PART_DELTA_TEMPLATE, partId);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        super.start0();
+
+        GridKernalContext ctx = cctx.kernalContext();
+
+        if (ctx.clientNode())
+            return;
+
+        if (!CU.isPersistenceEnabled(ctx.config()))
+            return;
+
+        snpRunner = new IgniteThreadPoolExecutor(SNAPSHOT_RUNNER_THREAD_PREFIX,
+            cctx.igniteInstanceName(),
+            SNAPSHOT_THREAD_POOL_SIZE,
+            SNAPSHOT_THREAD_POOL_SIZE,
+            IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
+            new LinkedBlockingQueue<>(),
+            SYSTEM_POOL,
+            new OomExceptionHandler(ctx));
+
+        assert cctx.pageStore() instanceof FilePageStoreManager;
+
+        FilePageStoreManager storeMgr = (FilePageStoreManager)cctx.pageStore();
+
+        pdsSettings = 
cctx.kernalContext().pdsFolderResolver().resolveFolders();
+
+        locSnpDir = resolveSnapshotWorkDirectory(ctx.config());
+        tmpWorkDir = Paths.get(storeMgr.workDir().getAbsolutePath(), 
DFLT_SNAPSHOT_TMP_DIR).toFile();
+
+        U.ensureDirectory(locSnpDir, "snapshot work directory", log);
+        U.ensureDirectory(tmpWorkDir, "temp directory for snapshot creation", 
log);
+
+        MetricRegistry mreg = 
cctx.kernalContext().metric().registry(SNAPSHOT_METRICS);
+
+        mreg.register("LastSnapshotStartTime", () -> lastSeenSnpFut.startTime,
+            "The system time approximated by 10 ms when the last cluster 
snapshot operation has been started.");
+        mreg.register("LastSnapshotEndTime", () -> lastSeenSnpFut.endTime,
+            "The system time approximated by 10 ms when the last cluster 
snapshot operation has been finished.");
+        mreg.register("LastSnapshotName", () -> lastSeenSnpFut.name, 
String.class,
+            "The name of last started cluster snapshot operation.");
+        mreg.register("LastSnapshotErrorMessage",
+            () -> lastSeenSnpFut.error() == null ? null : 
lastSeenSnpFut.error().getMessage(),
+            String.class,
+            "The error message of last started cluster snapshot operation 
which fail. This value will be 'null' " +
+                "if last snapshot operation completed successfully.");
+        mreg.register("localSnapshotList", this::getSnapshots, List.class,
+            "The list of all known snapshots currently saved on the local node 
with respect to " +
+                "configured via IgniteConfiguration a snapshot path.");
+
+        storeFactory = storeMgr::getPageStoreFactory;
+
+        cctx.exchange().registerExchangeAwareComponent(this);
+        ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
+
+        // Receive remote snapshots requests.
+        cctx.gridIO().addMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC, new 
GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) 
{
+                if (!busyLock.enterBusy())
+                    return;
+
+                try {
+                    if (msg instanceof SnapshotRequestMessage) {
+                        SnapshotRequestMessage reqMsg0 = 
(SnapshotRequestMessage)msg;
+                        String snpName = reqMsg0.snapshotName();
+
+                        synchronized (this) {
+                            SnapshotFutureTask task = 
lastScheduledRemoteSnapshotTask(nodeId);
+
+                            if (task != null) {
+                                // Task will also be removed from local map 
due to the listener on future done.
+                                task.cancel();
+
+                                log.info("Snapshot request has been cancelled 
due to another request received " +
+                                    "[prevSnpResp=" + task + ", msg0=" + 
reqMsg0 + ']');
+                            }
+                        }
+
+                        SnapshotFutureTask task = registerSnapshotTask(snpName,
+                            nodeId,
+                            reqMsg0.parts(),
+                            remoteSnapshotSender(snpName, nodeId));
+
+                        task.listen(f -> {
+                            if (f.error() == null)
+                                return;
+
+                            U.error(log, "Failed to process request of 
creating a snapshot " +
+                                "[from=" + nodeId + ", msg=" + reqMsg0 + ']', 
f.error());
+
+                            try {
+                                cctx.gridIO().sendToCustomTopic(nodeId,
+                                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                                    new 
SnapshotResponseMessage(reqMsg0.snapshotName(), f.error().getMessage()),
+                                    SYSTEM_POOL);
+                            }
+                            catch (IgniteCheckedException ex0) {
+                                U.error(log, "Fail to send the response 
message with processing snapshot request " +
+                                    "error [request=" + reqMsg0 + ", nodeId=" 
+ nodeId + ']', ex0);
+                            }
+                        });
+
+                        task.start();
+                    }
+                    else if (msg instanceof SnapshotResponseMessage) {
+                        SnapshotResponseMessage respMsg0 = 
(SnapshotResponseMessage)msg;
+
+                        RemoteSnapshotFuture fut0 = rmtSnpReq.get();
+
+                        if (fut0 == null || 
!fut0.snpName.equals(respMsg0.snapshotName())) {
+                            if (log.isInfoEnabled()) {
+                                log.info("A stale snapshot response message 
has been received. Will be ignored " +
+                                    "[fromNodeId=" + nodeId + ", response=" + 
respMsg0 + ']');
+                            }
+
+                            return;
+                        }
+
+                        if (respMsg0.errorMessage() != null) {
+                            fut0.onDone(new IgniteCheckedException("Request 
cancelled. The snapshot operation stopped " +
+                                "on the remote node with an error: " + 
respMsg0.errorMessage()));
+                        }
+                    }
+                }
+                catch (Throwable e) {
+                    U.error(log, "Processing snapshot request from remote node 
fails with an error", e);
+
+                    cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
+                }
+                finally {
+                    busyLock.leaveBusy();
+                }
+            }
+        });
+
+        cctx.gridEvents().addDiscoveryEventListener(discoLsnr = (evt, 
discoCache) -> {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                UUID leftNodeId = evt.eventNode().id();
+
+                if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+                    DiscoveryCustomEvent evt0 = (DiscoveryCustomEvent)evt;
+
+                    if (evt0.customMessage() instanceof InitMessage) {
+                        InitMessage<?> msg = 
(InitMessage<?>)evt0.customMessage();
+
+                        // This happens when #takeSnapshot() method already 
invoked and distributed process
+                        // starts its action.
+                        if (msg.type() == START_SNAPSHOT.ordinal()) {
+                            assert clusterSnpRq != null ||
+                                !CU.baselineNode(cctx.localNode(), 
cctx.kernalContext().state().clusterState()) : evt;
+
+                            DiscoveryCustomEvent customEvt = new 
DiscoveryCustomEvent();
+
+                            customEvt.node(evt0.node());
+                            customEvt.eventNode(evt0.eventNode());
+                            
customEvt.affinityTopologyVersion(evt0.affinityTopologyVersion());
+                            customEvt.customMessage(new 
SnapshotStartDiscoveryMessage(discoCache, msg.processId()));
+
+                            // Handle new event inside discovery thread, so no 
guarantees will be violated.
+                            cctx.exchange().onDiscoveryEvent(customEvt, 
discoCache);
+                        }
+                    }
+                }
+                else if (evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED) {
+                    SnapshotOperationRequest snpRq = clusterSnpRq;
+
+                    for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+                        if (sctx.sourceNodeId().equals(leftNodeId) ||
+                            (snpRq != null &&
+                                snpRq.snpName.equals(sctx.snapshotName()) &&
+                                snpRq.bltNodes.contains(leftNodeId))) {
+                            sctx.acceptException(new 
ClusterTopologyCheckedException("The node which requested snapshot " +
+                                "creation has left the grid"));
+                        }
+                    }
+
+                    RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+                    if (snpTrFut != null && 
snpTrFut.rmtNodeId.equals(leftNodeId)) {
+                        snpTrFut.onDone(new 
ClusterTopologyCheckedException("The node from which a snapshot has been " +
+                            "requested left the grid"));
+                    }
+                }
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_DISCOVERY_CUSTOM_EVT);
+
+        // Remote snapshot handler.
+        
cctx.kernalContext().io().addTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC, 
new TransmissionHandler() {
+            @Override public void onEnd(UUID nodeId) {
+                RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+                assert snpTrFut.stores.isEmpty() : snpTrFut.stores.entrySet();
+                assert snpTrFut.partsLeft == 0 : snpTrFut;
+
+                snpTrFut.onDone();
+
+                log.info("Requested snapshot from remote node has been fully 
received " +
+                    "[snpName=" + snpTrFut.snpName + ", snpTrans=" + snpTrFut 
+ ']');
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onException(UUID nodeId, Throwable err) {
+                RemoteSnapshotFuture fut = rmtSnpReq.get();
+
+                if (fut == null)
+                    return;
+
+                if (fut.rmtNodeId.equals(nodeId))
+                    fut.onDone(err);
+            }
+
+            /** {@inheritDoc} */
+            @Override public String filePath(UUID nodeId, TransmissionMeta 
fileMeta) {
+                Integer partId = 
(Integer)fileMeta.params().get(SNP_PART_ID_PARAM);
+                String rmtDbNodePath = 
(String)fileMeta.params().get(SNP_DB_NODE_PATH_PARAM);
+                String cacheDirName = 
(String)fileMeta.params().get(SNP_CACHE_DIR_NAME_PARAM);
+
+                RemoteSnapshotFuture transFut = resolve(nodeId, fileMeta);
+
+                try {
+                    File cacheDir = 
U.resolveWorkDirectory(tmpWorkDir.getAbsolutePath(),
+                        Paths.get(transFut.snpName, rmtDbNodePath, 
cacheDirName).toString(),
+                        false);
+
+                    return new File(cacheDir, 
getPartitionFileName(partId)).getAbsolutePath();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+
+            /**
+             * @param nodeId Remote node id.
+             * @param meta Transmission meta.
+             * @return Resolved transmission future.
+             */
+            private RemoteSnapshotFuture resolve(UUID nodeId, TransmissionMeta 
meta) {
+                String snpName = (String)meta.params().get(SNP_NAME_PARAM);
+                Integer partsCnt = 
(Integer)meta.params().get(SNP_PARTITIONS_CNT);
+
+                RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+                if (snpTrFut == null || !snpTrFut.snpName.equals(snpName)) {
+                    throw new TransmissionCancelledException("Stale snapshot 
transmission will be ignored " +
+                        "[snpName=" + snpName + ", meta=" + meta + ", 
snpTrFut=" + snpTrFut + ']');
+                }
+
+                assert snpTrFut.snpName.equals(snpName) && 
snpTrFut.rmtNodeId.equals(nodeId) :
+                    "Another transmission in progress [snpTrFut=" + snpTrFut + 
", nodeId=" + snpName + ']';
+
+                if (snpTrFut.partsLeft == -1)
+                    snpTrFut.partsLeft = partsCnt;
+
+                return snpTrFut;
+            }
+
+            /**
+             * @param snpTrans Current snapshot transmission.
+             * @param grpPartId Pair of group id and its partition id.
+             */
+            private void finishRecover(RemoteSnapshotFuture snpTrans, 
GroupPartitionId grpPartId) {
+                FilePageStore pageStore = null;
+
+                try {
+                    pageStore = snpTrans.stores.remove(grpPartId);
+
+                    pageStore.finishRecover();
+
+                    snpTrans.partConsumer.accept(new 
File(pageStore.getFileAbsolutePath()), grpPartId);
+
+                    snpTrans.partsLeft--;
+                }
+                catch (StorageException e) {
+                    throw new IgniteException(e);
+                }
+                finally {
+                    U.closeQuiet(pageStore);
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, 
TransmissionMeta initMeta) {
+                Integer grpId = 
(Integer)initMeta.params().get(SNP_GRP_ID_PARAM);
+                Integer partId = 
(Integer)initMeta.params().get(SNP_PART_ID_PARAM);
+
+                RemoteSnapshotFuture snpTrFut = resolve(nodeId, initMeta);
+
+                GroupPartitionId grpPartId = new GroupPartitionId(grpId, 
partId);
+                FilePageStore pageStore = snpTrFut.stores.get(grpPartId);
+
+                if (pageStore == null) {
+                    throw new IgniteException("Partition must be loaded before 
applying snapshot delta pages " +
+                        "[snpName=" + snpTrFut.snpName + ", grpId=" + grpId + 
", partId=" + partId + ']');
+                }
+
+                pageStore.beginRecover();
+
+                // No snapshot delta pages received. Finalize recovery.
+                if (initMeta.count() == 0)
+                    finishRecover(snpTrFut, grpPartId);
+
+                return new Consumer<ByteBuffer>() {
+                    final LongAdder transferred = new LongAdder();
+
+                    @Override public void accept(ByteBuffer buff) {
+                        try {
+                            assert initMeta.count() != 0 : initMeta;
+
+                            RemoteSnapshotFuture fut0 = rmtSnpReq.get();
+
+                            if (fut0 == null || !fut0.equals(snpTrFut) || 
fut0.isCancelled()) {
+                                throw new 
TransmissionCancelledException("Snapshot request is cancelled " +
+                                    "[snpName=" + snpTrFut.snpName + ", 
grpId=" + grpId + ", partId=" + partId + ']');
+                            }
+
+                            pageStore.write(PageIO.getPageId(buff), buff, 0, 
false);
+
+                            transferred.add(buff.capacity());
+
+                            if (transferred.longValue() == initMeta.count())
+                                finishRecover(snpTrFut, grpPartId);
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                    }
+                };
+            }
+
+            /** {@inheritDoc} */
+            @Override public Consumer<File> fileHandler(UUID nodeId, 
TransmissionMeta initMeta) {
+                Integer grpId = 
(Integer)initMeta.params().get(SNP_GRP_ID_PARAM);
+                Integer partId = 
(Integer)initMeta.params().get(SNP_PART_ID_PARAM);
+                String snpName = (String)initMeta.params().get(SNP_NAME_PARAM);
+
+                assert grpId != null;
+                assert partId != null;
+                assert snpName != null;
+                assert storeFactory != null;
+
+                RemoteSnapshotFuture transFut = rmtSnpReq.get();
+
+                if (transFut == null) {
+                    throw new IgniteException("Snapshot transmission with 
given name doesn't exists " +
+                        "[snpName=" + snpName + ", grpId=" + grpId + ", 
partId=" + partId + ']');
+                }
+
+                return new Consumer<File>() {
+                    @Override public void accept(File file) {
+                        RemoteSnapshotFuture fut0 = rmtSnpReq.get();
+
+                        if (fut0 == null || !fut0.equals(transFut) || 
fut0.isCancelled()) {
+                            throw new TransmissionCancelledException("Snapshot 
request is cancelled [snpName=" + snpName +
+                                ", grpId=" + grpId + ", partId=" + partId + 
']');
+                        }
+
+                        busyLock.enterBusy();
+
+                        try {
+                            FilePageStore pageStore = 
(FilePageStore)storeFactory
+                                .apply(grpId, false)
+                                .createPageStore(getFlagByPartId(partId),
+                                    file::toPath,
+                                    new LongAdderMetric("NO_OP", null));
+
+                            transFut.stores.put(new GroupPartitionId(grpId, 
partId), pageStore);
+
+                            pageStore.init();
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                        finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+                };
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void stop0(boolean cancel) {
+        busyLock.block();
+
+        try {
+            // Try stop all snapshot processing if not yet.
+            for (SnapshotFutureTask sctx : locSnpTasks.values())
+                sctx.acceptException(new 
NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
+
+            locSnpTasks.clear();
+
+            RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+            if (snpTrFut != null)
+                snpTrFut.cancel();
+
+            synchronized (snpOpMux) {
+                if (clusterSnpFut != null) {
+                    clusterSnpFut.onDone(new 
NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
+
+                    clusterSnpFut = null;
+                }
+            }
+
+            if (snpRunner != null)
+                snpRunner.shutdownNow();
+
+            
cctx.kernalContext().io().removeMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC);
+            
cctx.kernalContext().io().removeTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC);
+
+            if (discoLsnr != null)
+                
cctx.kernalContext().event().removeDiscoveryEventListener(discoLsnr);
+
+            cctx.exchange().unregisterExchangeAwareComponent(this);
+        }
+        finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * @param snpDir Snapshot dire
+     * @param folderName Local node folder name (see {@link U#maskForFileName} 
with consistent id).
+     */
+    public static void deleteSnapshot(File snpDir, String folderName) {
+        if (!snpDir.exists())
+            return;
+
+        assert snpDir.isDirectory() : snpDir;
+
+        try {
+            File binDir = resolveBinaryWorkDir(snpDir.getAbsolutePath(), 
folderName);
+            File dbDir = U.resolveWorkDirectory(snpDir.getAbsolutePath(), 
databaseRelativePath(folderName), false);
+
+            U.delete(binDir);
+            U.delete(dbDir);
+
+            File marshDir = mappingFileStoreWorkDir(snpDir.getAbsolutePath());
+
+            // Concurrently traverse the snapshot marshaller directory and 
delete all files.
+            Files.walkFileTree(marshDir.toPath(), new 
SimpleFileVisitor<Path>() {
+                @Override public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) throws IOException {
+                    U.delete(file);
+
+                    return FileVisitResult.CONTINUE;
+                }
+
+                @Override public FileVisitResult visitFileFailed(Path file, 
IOException exc) {
+                    // Skip files which can be concurrently removed from 
FileTree.
+                    return FileVisitResult.CONTINUE;
+                }
+            });
+
+            File db = new File(snpDir, DB_DEFAULT_FOLDER);
+
+            if (!db.exists() || db.list().length == 0)
+                U.delete(snpDir);
+        }
+        catch (IOException | IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @return Local snapshot directory for snapshot with given name.
+     */
+    public File snapshotLocalDir(String snpName) {
+        assert locSnpDir != null;
+
+        return new File(locSnpDir, snpName);
+    }
+
+    /**
+     * @return Node snapshot working directory.
+     */
+    public File snapshotTmpDir() {
+        assert tmpWorkDir != null;
+
+        return tmpWorkDir;
+    }
+
+    /**
+     * @param req Request on snapshot creation.
+     * @return Future which will be completed when a snapshot has been started.
+     */
+    private IgniteInternalFuture<SnapshotOperationResponse> 
initLocalSnapshotStartStage(SnapshotOperationRequest req) {
+        if (cctx.kernalContext().clientNode() ||
+            !CU.baselineNode(cctx.localNode(), 
cctx.kernalContext().state().clusterState()))
+            return new GridFinishedFuture<>();
+
+        // Executed inside discovery notifier thread, prior to firing 
discovery custom event,
+        // so it is safe to set new snapshot task inside this method without 
synchronization.
+        if (clusterSnpRq != null) {
+            return new GridFinishedFuture<>(new 
IgniteCheckedException("Snapshot operation has been rejected. " +
+                "Another snapshot operation in progress [req=" + req + ", 
curr=" + clusterSnpRq + ']'));
+        }
+
+        // Collection of pairs group and appropriate cache partition to be 
snapshot.
+        Map<Integer, Set<Integer>> parts = new HashMap<>();
+
+        for (Integer grpId : req.grpIds)
+            parts.put(grpId, null);
+
+        SnapshotFutureTask task0 = registerSnapshotTask(req.snpName,
+            req.srcNodeId,
+            parts,
+            locSndrFactory.apply(req.snpName));
+
+        clusterSnpRq = req;
+
+        return task0.chain(f -> new SnapshotOperationResponse());
+    }
+
+    /**
+     * @param id Request id.
+     * @param res Results.
+     * @param err Errors.
+     */
+    private void processLocalSnapshotStartStageResult(UUID id, Map<UUID, 
SnapshotOperationResponse> res, Map<UUID, Exception> err) {
+        SnapshotOperationRequest snpRq = clusterSnpRq;
+
+        if (snpRq == null)
+            return;
+
+        if (!snpRq.rqId.equals(id)) {
+            synchronized (snpOpMux) {
+                if (clusterSnpFut != null && clusterSnpFut.rqId.equals(id)) {
+                    clusterSnpFut.onDone(new IgniteCheckedException("Snapshot 
operation failed due to another snapshot " +
+                        "operation in progress: " + snpRq.snpName));
+
+                    clusterSnpFut = null;
+                }
+
+                return;
+            }
+        }
+
+        if (isLocalNodeCoordinator(cctx.discovery())) {
+            Set<UUID> missed = new HashSet<>(snpRq.bltNodes);
+            missed.removeAll(res.keySet());
+            missed.removeAll(err.keySet());
+
+            snpRq.hasErr = !F.isEmpty(err) || !missed.isEmpty();
+
+            if (snpRq.hasErr) {
+                U.warn(log, "Execution of local snapshot tasks fails or them 
haven't been executed " +
+                    "due to some of nodes left the cluster. Uncompleted 
snapshot will be deleted " +
+                    "[err=" + err + ", missed=" + missed + ']');
+            }
+
+            endSnpProc.start(UUID.randomUUID(), snpRq);
+        }
+    }
+
+    /**
+     * @param req Request on snapshot creation.
+     * @return Future which will be completed when the snapshot will be 
finalized.
+     */
+    private IgniteInternalFuture<SnapshotOperationResponse> 
initLocalSnapshotEndStage(SnapshotOperationRequest req) {
+        if (clusterSnpRq == null)
+            return new GridFinishedFuture<>(new SnapshotOperationResponse());
+
+        try {
+            if (req.hasErr)
+                deleteSnapshot(snapshotLocalDir(req.snpName), 
pdsSettings.folderName());
+
+            removeLastMetaStorageKey();
+        }
+        catch (Exception e) {
+            return new GridFinishedFuture<>(e);
+        }
+
+        return new GridFinishedFuture<>(new SnapshotOperationResponse());
+    }
+
+    /**
+     * @param id Request id.
+     * @param res Results.
+     * @param err Errors.
+     */
+    private void processLocalSnapshotEndStageResult(UUID id, Map<UUID, 
SnapshotOperationResponse> res, Map<UUID, Exception> err) {
+        SnapshotOperationRequest snpRq = clusterSnpRq;
+
+        if (snpRq == null)
+            return;
+
+        Set<UUID> endFail = new HashSet<>(snpRq.bltNodes);
+        endFail.removeAll(res.keySet());
+
+        clusterSnpRq = null;
+
+        synchronized (snpOpMux) {
+            if (clusterSnpFut != null) {
+                if (endFail.isEmpty() && !snpRq.hasErr) {
+                    clusterSnpFut.onDone();
+
+                    if (log.isInfoEnabled())
+                        log.info("Cluster-wide snapshot operation finished 
successfully [req=" + snpRq + ']');
+                }
+                else {
+                    clusterSnpFut.onDone(new IgniteCheckedException("Snapshot 
creation has been finished with an error. " +
+                        "Local snapshot tasks may not finished completely or 
finalizing results fails " +
+                        "[hasErr" + snpRq.hasErr + ", fail=" + endFail + ']'));
+                }
+
+                clusterSnpFut = null;
+            }
+        }
+    }
+
+    /**
+     * @return {@code True} if snapshot operation is in progress.
+     */
+    public boolean isSnapshotCreating() {
+        if (clusterSnpRq != null)
+            return true;
+
+        synchronized (snpOpMux) {
+            return clusterSnpRq != null || clusterSnpFut != null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<String> getSnapshots() {
+        if (cctx.kernalContext().clientNode())
+            throw new UnsupportedOperationException("Client and daemon nodes 
can not perform this operation.");
+
+        synchronized (snpOpMux) {
+            return Arrays.stream(locSnpDir.listFiles(File::isDirectory))
+                .map(File::getName)
+                .collect(Collectors.toList());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> createSnapshot(String name) {
+        A.notNullOrEmpty(name, "name");
+
+        try {
+            if (cctx.kernalContext().clientNode())
+                throw new UnsupportedOperationException("Client and daemon 
nodes can not perform this operation.");
+
+            if (!IgniteFeatures.allNodesSupports(cctx.discovery().allNodes(), 
PERSISTENCE_CACHE_SNAPSHOT))
+                throw new IgniteException("Not all nodes in the cluster 
support a snapshot operation.");
+
+            if (!active(cctx.kernalContext().state().clusterState().state()))
+                throw new IgniteException("Snapshot operation has been 
rejected. The cluster is inactive.");
+
+            DiscoveryDataClusterState clusterState = 
cctx.kernalContext().state().clusterState();
+
+            if (!clusterState.hasBaselineTopology())
+                throw new IgniteException("Snapshot operation has been 
rejected. The baseline topology is not configured for cluster.");
+
+            ClusterSnapshotFuture snpFut0;
+
+            synchronized (snpOpMux) {
+                if (clusterSnpFut != null && !clusterSnpFut.isDone())
+                    throw new IgniteException("Create snapshot request has 
been rejected. The previous snapshot operation was not completed.");
+
+                if (clusterSnpRq != null)
+                    throw new IgniteException("Create snapshot request has 
been rejected. Parallel snapshot processes are not allowed.");
+
+                if (getSnapshots().contains(name))
+                    throw new IgniteException("Create snapshot request has 
been rejected. Snapshot with given name already exists.");
+
+                snpFut0 = new ClusterSnapshotFuture(UUID.randomUUID(), name);
+
+                clusterSnpFut = snpFut0;
+                lastSeenSnpFut = snpFut0;
+            }
+
+            List<Integer> grps = cctx.cache().persistentGroups().stream()
+                .filter(g -> cctx.cache().cacheType(g.cacheOrGroupName()) == 
CacheType.USER)
+                .filter(g -> !g.config().isEncryptionEnabled())
+                .map(CacheGroupDescriptor::groupId)
+                .collect(Collectors.toList());
+
+            List<ClusterNode> srvNodes = 
cctx.discovery().serverNodes(AffinityTopologyVersion.NONE);
+
+            startSnpProc.start(snpFut0.rqId, new 
SnapshotOperationRequest(snpFut0.rqId,
+                cctx.localNodeId(),
+                name,
+                grps,
+                new HashSet<>(F.viewReadOnly(srvNodes,
+                    F.node2id(),
+                    (node) -> CU.baselineNode(node, clusterState)))));
+
+            if (log.isInfoEnabled())
+                log.info("Cluster-wide snapshot operation started [snpName=" + 
name + ", grps=" + grps + ']');
+
+            return new IgniteFutureImpl<>(snpFut0);
+        }
+        catch (Exception e) {
+            U.error(log, "Start snapshot operation failed", e);
+
+            lastSeenSnpFut = new ClusterSnapshotFuture(name, e);
+
+            return new IgniteFinishedFutureImpl<>(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onReadyForReadWrite(ReadWriteMetastorage 
metaStorage) throws IgniteCheckedException {
+        synchronized (snpOpMux) {
+            this.metaStorage = metaStorage;
+
+            if (recovered)
+                removeLastMetaStorageKey();
+
+            recovered = false;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onReadyForRead(ReadOnlyMetastorage metaStorage) 
throws IgniteCheckedException {
+        // Snapshot which has not been completed due to the local node crashed 
must be deleted.
+        String snpName = (String)metaStorage.read(SNP_RUNNING_KEY);
+
+        if (snpName == null)
+            return;
+
+        recovered = true;
+
+        for (File tmp : snapshotTmpDir().listFiles())
+            U.delete(tmp);
+
+        deleteSnapshot(snapshotLocalDir(snpName), pdsSettings.folderName());
+
+        if (log.isInfoEnabled()) {
+            log.info("Previous attempt to create snapshot fail due to the 
local node crash. All resources " +
+                "related to snapshot operation have been deleted: " + snpName);
+        }
+    }
+
+    /**
+     * @param evt Discovery event to check.
+     * @return {@code true} if exchange started by snapshot operation.
+     */
+    public static boolean isSnapshotOperation(DiscoveryEvent evt) {
+        return !evt.eventNode().isClient() &&
+            evt.type() == EVT_DISCOVERY_CUSTOM_EVT &&
+            ((DiscoveryCustomEvent)evt).customMessage() instanceof 
SnapshotStartDiscoveryMessage;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void 
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+        if (clusterSnpRq == null || cctx.kernalContext().clientNode())
+            return;
+
+        SnapshotOperationRequest snpRq = clusterSnpRq;
+
+        SnapshotFutureTask task = locSnpTasks.get(snpRq.snpName);
+
+        if (task == null)
+            return;
+
+        if (task.start()) {
+            cctx.database().forceCheckpoint(String.format("Start snapshot 
operation: %s", snpRq.snpName));
+
+            // schedule task on checkpoint and wait when it starts
+            try {
+                task.awaitStarted();
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Fail to wait while cluster-wide snapshot 
operation started", e);
+            }
+        }
+    }
+
+    /**
+     * @param parts Collection of pairs group and appropriate cache partition 
to be snapshot.
+     * @param rmtNodeId The remote node to connect to.
+     * @param partConsumer Received partition handler.
+     * @return Future which will be completed when requested snapshot fully 
received.
+     */
+    public IgniteInternalFuture<Void> createRemoteSnapshot(
+        UUID rmtNodeId,
+        Map<Integer, Set<Integer>> parts,
+        BiConsumer<File, GroupPartitionId> partConsumer
+    ) {
+        assert partConsumer != null;
+
+        ClusterNode rmtNode = cctx.discovery().node(rmtNodeId);
+
+        if (!nodeSupports(rmtNode, PERSISTENCE_CACHE_SNAPSHOT))
+            return new GridFinishedFuture<>(new 
IgniteCheckedException("Snapshot on remote node is not supported: " + 
rmtNode.id()));
+
+        if (rmtNode == null) {
+            return new GridFinishedFuture<>(new 
ClusterTopologyCheckedException("Snapshot request cannot be performed. " +
+                "Remote node left the grid [rmtNodeId=" + rmtNodeId + ']'));
+        }
+
+        String snpName = RMT_SNAPSHOT_PREFIX + UUID.randomUUID().toString();
+
+        RemoteSnapshotFuture snpTransFut = new RemoteSnapshotFuture(rmtNodeId, 
snpName, partConsumer);
+
+        busyLock.enterBusy();
+        SnapshotRequestMessage msg0;
+
+        try {
+            msg0 = new SnapshotRequestMessage(snpName, parts);
+
+            RemoteSnapshotFuture fut = rmtSnpReq.get();
+
+            try {
+                if (fut != null)
+                    fut.get(DFLT_SNAPSHOT_TIMEOUT, TimeUnit.MILLISECONDS);
+            }
+            catch (IgniteCheckedException e) {
+                if (log.isInfoEnabled())
+                    log.info("The previous snapshot request finished with an 
exception:" + e.getMessage());
+            }
+
+            try {
+                if (rmtSnpReq.compareAndSet(null, snpTransFut)) {
+                    cctx.gridIO().sendOrderedMessage(rmtNode, 
DFLT_INITIAL_SNAPSHOT_TOPIC, msg0, SYSTEM_POOL,
+                        Long.MAX_VALUE, true);
+                }
+                else
+                    return new GridFinishedFuture<>(new 
IgniteCheckedException("Snapshot request has been concurrently interrupted."));
+
+            }
+            catch (IgniteCheckedException e) {
+                rmtSnpReq.compareAndSet(snpTransFut, null);
+
+                return new GridFinishedFuture<>(e);
+            }
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info("Snapshot request is sent to the remote node [rmtNodeId=" 
+ rmtNodeId +
+                ", msg0=" + msg0 + ", snpTransFut=" + snpTransFut +
+                ", topVer=" + cctx.discovery().topologyVersionEx() + ']');
+        }
+
+        return snpTransFut;
+    }
+
+    /**
+     * @param grps List of cache groups which will be destroyed.
+     */
+    public void onCacheGroupsStopped(List<Integer> grps) {
+        for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+            Set<Integer> retain = new HashSet<>(grps);
+            retain.retainAll(sctx.affectedCacheGroups());
+
+            if (!retain.isEmpty()) {
+                sctx.acceptException(new IgniteCheckedException("Snapshot has 
been interrupted due to some of the required " +
+                    "cache groups stopped: " + retain));
+            }
+        }
+    }
+
+    /**
+     * @param snpName Unique snapshot name.
+     * @param srcNodeId Node id which cause snapshot operation.
+     * @param parts Collection of pairs group and appropriate cache partition 
to be snapshot.
+     * @param snpSndr Factory which produces snapshot receiver instance.
+     * @return Snapshot operation task which should be registered on 
checkpoint to run.
+     */
+    SnapshotFutureTask registerSnapshotTask(
+        String snpName,
+        UUID srcNodeId,
+        Map<Integer, Set<Integer>> parts,
+        SnapshotSender snpSndr
+    ) {
+        if (!busyLock.enterBusy())
+            return new SnapshotFutureTask(new IgniteCheckedException("Snapshot 
manager is stopping [locNodeId=" + cctx.localNodeId() + ']'));
+
+        try {
+            if (locSnpTasks.containsKey(snpName))
+                return new SnapshotFutureTask(new 
IgniteCheckedException("Snapshot with requested name is already scheduled: " + 
snpName));
+
+            SnapshotFutureTask snpFutTask;
+
+            SnapshotFutureTask prev = locSnpTasks.putIfAbsent(snpName,
+                snpFutTask = new SnapshotFutureTask(cctx,
+                    srcNodeId,
+                    snpName,
+                    tmpWorkDir,
+                    ioFactory,
+                    snpSndr,
+                    parts,
+                    locBuff));
+
+            if (prev != null)
+                return new SnapshotFutureTask(new 
IgniteCheckedException("Snapshot with requested name is already scheduled: " + 
snpName));
+
+            if (log.isInfoEnabled()) {
+                log.info("Snapshot task has been registered on local node 
[sctx=" + this +
+                    ", topVer=" + cctx.discovery().topologyVersionEx() + ']');
+            }
+
+            snpFutTask.listen(f -> locSnpTasks.remove(snpName));
+
+            return snpFutTask;
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param factory Factory which produces {@link LocalSnapshotSender} 
implementation.
+     */
+    void setLocalSnapshotSenderFactory(Function<String, SnapshotSender> 
factory) {
+        locSndrFactory = factory;
+    }
+
+    /**
+     * @return Factory which produces {@link LocalSnapshotSender} 
implementation.
+     */
+    Function<String, SnapshotSender> localSnapshotSenderFactory() {
+        return LocalSnapshotSender::new;
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param rmtNodeId Remote node id to send snapshot to.
+     * @return Snapshot sender instance.
+     */
+    SnapshotSender remoteSnapshotSender(String snpName, UUID rmtNodeId) {
+        // Remote snapshots can be send only by single threaded executor since 
only one transmissionSender created.
+        return new RemoteSnapshotSender(log,
+            new SequentialExecutorWrapper(log, snpRunner),
+            () -> databaseRelativePath(pdsSettings.folderName()),
+            cctx.gridIO().openTransmissionSender(rmtNodeId, 
DFLT_INITIAL_SNAPSHOT_TOPIC),
+            snpName);
+    }
+
+    /** Snapshot finished successfully or already restored. Key can be 
removed. */
+    private void removeLastMetaStorageKey() throws IgniteCheckedException {
+        cctx.database().checkpointReadLock();
+
+        try {
+            metaStorage.remove(SNP_RUNNING_KEY);
+        }
+        finally {
+            cctx.database().checkpointReadUnlock();
+        }
+    }
+
+    /**
+     * @return The executor service used to run snapshot tasks.
+     */
+    ExecutorService snapshotExecutorService() {
+        assert snpRunner != null;
+
+        return snpRunner;
+    }
+
+    /**
+     * @param ioFactory Factory to create IO interface over a page stores.
+     */
+    void ioFactory(FileIOFactory ioFactory) {
+        this.ioFactory = ioFactory;
+    }
+
+    /**
+     * @param nodeId Remote node id on which requests has been registered.
+     * @return Snapshot future related to given node id.
+     */
+    SnapshotFutureTask lastScheduledRemoteSnapshotTask(UUID nodeId) {
+        return locSnpTasks.values().stream()
+            .filter(t -> t.type() == RemoteSnapshotSender.class && 
t.sourceNodeId().equals(nodeId))
+            .findFirst()
+            .orElse(null);
+    }
+
+    /**
+     * @return Relative configured path of persistence data storage directory 
for the local node.
+     * Example: {@code snapshotWorkDir/db/IgniteNodeName0}
+     */
+    static String databaseRelativePath(String folderName) {
+        return Paths.get(DB_DEFAULT_FOLDER, folderName).toString();
+    }
+
+    /**
+     * @param cfg Ignite configuration.
+     * @return Snapshot work path.
+     */
+    static File resolveSnapshotWorkDirectory(IgniteConfiguration cfg) {
+        try {
+            return cfg.getSnapshotPath() == null ?
+                U.resolveWorkDirectory(cfg.getWorkDirectory(), 
DFLT_SNAPSHOT_DIRECTORY, false) :
+                U.resolveWorkDirectory(cfg.getWorkDirectory(), 
cfg.getSnapshotPath(), false);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** Remote snapshot future which tracks remote snapshot transmission 
result. */
+    private class RemoteSnapshotFuture extends GridFutureAdapter<Void> {
+        /** Snapshot name to create. */
+        private final String snpName;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Collection of partition to be received. */
+        private final Map<GroupPartitionId, FilePageStore> stores = new 
ConcurrentHashMap<>();
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, GroupPartitionId> partConsumer;
+
+        /** Counter which show how many partitions left to be received. */
+        private int partsLeft = -1;
+
+        /**
+         * @param partConsumer Received partition handler.
+         */
+        public RemoteSnapshotFuture(UUID rmtNodeId, String snpName, 
BiConsumer<File, GroupPartitionId> partConsumer) {
+            this.snpName = snpName;
+            this.rmtNodeId = rmtNodeId;
+            this.partConsumer = partConsumer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean cancel() {
+            return onCancelled();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected boolean onDone(@Nullable Void res, @Nullable 
Throwable err, boolean cancel) {
+            assert err != null || cancel || stores.isEmpty() : "Not all file 
storage processed: " + stores;
+
+            rmtSnpReq.compareAndSet(this, null);
+
+            if (err != null || cancel) {
+                // Close non finished file storage.
+                for (Map.Entry<GroupPartitionId, FilePageStore> entry : 
stores.entrySet()) {
+                    FilePageStore store = entry.getValue();
+
+                    try {
+                        store.stop(true);
+                    }
+                    catch (StorageException e) {
+                        log.warning("Error stopping received file page store", 
e);
+                    }
+                }
+            }
+
+            U.delete(Paths.get(tmpWorkDir.getAbsolutePath(), snpName));
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotFuture fut = (RemoteSnapshotFuture)o;
+
+            return rmtNodeId.equals(fut.rmtNodeId) &&
+                snpName.equals(fut.snpName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(rmtNodeId, snpName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotFuture.class, this);
+        }
+    }
+
+    /**
+     * Such an executor can executes tasks not in a single thread, but 
executes them
+     * on different threads sequentially. It's important for some {@link 
SnapshotSender}'s
+     * to process sub-task sequentially due to all these sub-tasks may share a 
single socket
+     * channel to send data to.
+     */
+    private static class SequentialExecutorWrapper implements Executor {
+        /** Ignite logger. */
+        private final IgniteLogger log;
+
+        /** Queue of task to execute. */
+        private final Queue<Runnable> tasks = new ArrayDeque<>();
+
+        /** Delegate executor. */
+        private final Executor executor;
+
+        /** Currently running task. */
+        private volatile Runnable active;
+
+        /** If wrapped executor is shutting down. */
+        private volatile boolean stopping;
+
+        /**
+         * @param executor Executor to run tasks on.
+         */
+        public SequentialExecutorWrapper(IgniteLogger log, Executor executor) {
+            this.log = log.getLogger(SequentialExecutorWrapper.class);
+            this.executor = executor;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void execute(final Runnable r) {
+            assert !stopping : "Task must be cancelled prior to the wrapped 
executor is shutting down.";
+
+            tasks.offer(() -> {
+                try {
+                    r.run();
+                }
+                finally {
+                    scheduleNext();
+                }
+            });
+
+            if (active == null)
+                scheduleNext();
+        }
+
+        /** */
+        protected synchronized void scheduleNext() {
+            if ((active = tasks.poll()) != null) {
+                try {
+                    executor.execute(active);
+                }
+                catch (RejectedExecutionException e) {
+                    tasks.clear();
+
+                    stopping = true;
+
+                    log.warning("Task is outdated. Wrapped executor is 
shutting down.", e);
+                }
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RemoteSnapshotSender extends SnapshotSender {
+        /** The sender which sends files to remote node. */
+        private final GridIoManager.TransmissionSender sndr;
+
+        /** Relative node path initializer. */
+        private final Supplier<String> initPath;
+
+        /** Snapshot name */
+        private final String snpName;
+
+        /** Local node persistent directory with consistent id. */
+        private String relativeNodePath;
+
+        /** The number of cache partition files expected to be processed. */
+        private int partsCnt;
+
+        /**
+         * @param log Ignite logger.
+         * @param sndr File sender instance.
+         * @param snpName Snapshot name.
+         */
+        public RemoteSnapshotSender(
+            IgniteLogger log,
+            Executor exec,
+            Supplier<String> initPath,
+            GridIoManager.TransmissionSender sndr,
+            String snpName
+        ) {
+            super(log, exec);
+
+            this.sndr = sndr;
+            this.snpName = snpName;
+            this.initPath = initPath;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void init(int partsCnt) {
+            this.partsCnt = partsCnt;
+
+            relativeNodePath = initPath.get();
+
+            if (relativeNodePath == null)
+                throw new IgniteException("Relative node path cannot be 
empty.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendPart0(File part, String cacheDirName, 
GroupPartitionId pair, Long len) {
+            try {
+                assert part.exists();
+                assert len > 0 : "Requested partitions has incorrect file 
length " +
+                    "[pair=" + pair + ", cacheDirName=" + cacheDirName + ']';
+
+                sndr.send(part, 0, len, transmissionParams(snpName, 
cacheDirName, pair), TransmissionPolicy.FILE);
+
+                if (log.isInfoEnabled()) {
+                    log.info("Partition file has been send [part=" + 
part.getName() + ", pair=" + pair +
+                        ", length=" + len + ']');
+                }
+            }
+            catch (TransmissionCancelledException e) {
+                if (log.isInfoEnabled()) {
+                    log.info("Transmission partition file has been interrupted 
[part=" + part.getName() +
+                        ", pair=" + pair + ']');
+                }
+            }
+            catch (IgniteCheckedException | InterruptedException | IOException 
e) {
+                U.error(log, "Error sending partition file [part=" + 
part.getName() + ", pair=" + pair +
+                    ", length=" + len + ']', e);
+
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendDelta0(File delta, String cacheDirName, 
GroupPartitionId pair) {
+            try {
+                sndr.send(delta, transmissionParams(snpName, cacheDirName, 
pair), TransmissionPolicy.CHUNK);
+
+                if (log.isInfoEnabled())
+                    log.info("Delta pages storage has been send [part=" + 
delta.getName() + ", pair=" + pair + ']');
+            }
+            catch (TransmissionCancelledException e) {
+                if (log.isInfoEnabled()) {
+                    log.info("Transmission delta pages has been interrupted 
[part=" + delta.getName() +
+                        ", pair=" + pair + ']');
+                }
+            }
+            catch (IgniteCheckedException | InterruptedException | IOException 
e) {
+                U.error(log, "Error sending delta file  [part=" + 
delta.getName() + ", pair=" + pair + ']', e);
+
+                throw new IgniteException(e);
+            }
+        }
+
+        /**
+         * @param cacheDirName Cache directory name.
+         * @param pair Cache group id with corresponding partition id.
+         * @return Map of params.
+         */
+        private Map<String, Serializable> transmissionParams(String snpName, 
String cacheDirName,
+            GroupPartitionId pair) {
+            Map<String, Serializable> params = new HashMap<>();
+
+            params.put(SNP_GRP_ID_PARAM, pair.getGroupId());
+            params.put(SNP_PART_ID_PARAM, pair.getPartitionId());
+            params.put(SNP_DB_NODE_PATH_PARAM, relativeNodePath);
+            params.put(SNP_CACHE_DIR_NAME_PARAM, cacheDirName);
+            params.put(SNP_NAME_PARAM, snpName);
+            params.put(SNP_PARTITIONS_CNT, partsCnt);
+
+            return params;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close0(@Nullable Throwable th) {
+            U.closeQuiet(sndr);
+
+            if (th == null) {
+                if (log.isInfoEnabled())
+                    log.info("The remote snapshot sender closed normally 
[snpName=" + snpName + ']');
+            }
+            else {
+                U.warn(log, "The remote snapshot sender closed due to an error 
occurred while processing " +
+                    "snapshot operation [snpName=" + snpName + ']', th);
+            }
+        }
+    }
+
+    /**
+     * Snapshot sender which writes all data to local directory.
+     */
+    private class LocalSnapshotSender extends SnapshotSender {
+        /** Snapshot name. */
+        private final String snpName;
+
+        /** Local snapshot directory. */
+        private final File snpLocDir;
+
+        /** Local node snapshot directory calculated on snapshot directory. */
+        private File dbDir;
+
+        /** Size of page. */
+        private final int pageSize;
+
+        /**
+         * @param snpName Snapshot name.
+         */
+        public LocalSnapshotSender(String snpName) {
+            super(IgniteSnapshotManager.this.log, snpRunner);
+
+            this.snpName = snpName;
+            snpLocDir = snapshotLocalDir(snpName);
+            pageSize = 
cctx.kernalContext().config().getDataStorageConfiguration().getPageSize();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void init(int partsCnt) {
+            dbDir = new File (snpLocDir, 
databaseRelativePath(pdsSettings.folderName()));
+
+            if (dbDir.exists()) {
+                throw new IgniteException("Snapshot with given name already 
exists " +
+                    "[snpName=" + snpName + ", absPath=" + 
dbDir.getAbsolutePath() + ']');
+            }
+
+            cctx.database().checkpointReadLock();
+
+            try {
+                assert metaStorage != null && 
metaStorage.read(SNP_RUNNING_KEY) == null :
+                    "The previous snapshot hasn't been completed correctly";
+
+                metaStorage.write(SNP_RUNNING_KEY, snpName);
+
+                U.ensureDirectory(dbDir, "snapshot work directory", log);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+            finally {
+                cctx.database().checkpointReadUnlock();
+            }
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendCacheConfig0(File ccfg, String cacheDirName) 
{
+            assert dbDir != null;
+
+            try {
+                File cacheDir = 
U.resolveWorkDirectory(dbDir.getAbsolutePath(), cacheDirName, false);
+
+                copy(ccfg, new File(cacheDir, ccfg.getName()), ccfg.length());
+            }
+            catch (IgniteCheckedException | IOException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMarshallerMeta0(List<Map<Integer, 
MappedName>> mappings) {
+            if (mappings == null)
+                return;
+
+            saveMappings(cctx.kernalContext(), mappings, snpLocDir);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendBinaryMeta0(Collection<BinaryType> types) {
+            if (types == null)
+                return;
+
+            cctx.kernalContext().cacheObjects().saveMetadata(types, snpLocDir);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendPart0(File part, String cacheDirName, 
GroupPartitionId pair, Long len) {
+            try {
+                if (len == 0)
+                    return;
+
+                File cacheDir = 
U.resolveWorkDirectory(dbDir.getAbsolutePath(), cacheDirName, false);
+
+                File snpPart = new File(cacheDir, part.getName());
+
+                if (!snpPart.exists() || snpPart.delete())
+                    snpPart.createNewFile();
+
+                copy(part, snpPart, len);
+
+                if (log.isInfoEnabled()) {
+                    log.info("Partition has been snapshot [snapshotDir=" + 
dbDir.getAbsolutePath() +
+                        ", cacheDirName=" + cacheDirName + ", part=" + 
part.getName() +
+                        ", length=" + part.length() + ", snapshot=" + 
snpPart.getName() + ']');
+                }
+            }
+            catch (IOException | IgniteCheckedException ex) {
+                throw new IgniteException(ex);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendDelta0(File delta, String cacheDirName, 
GroupPartitionId pair) {
+            File snpPart = getPartitionFile(dbDir, cacheDirName, 
pair.getPartitionId());
+
+            if (log.isInfoEnabled()) {
+                log.info("Start partition snapshot recovery with the given 
delta page file [part=" + snpPart +
+                    ", delta=" + delta + ']');
+            }
+
+            try (FileIO fileIo = ioFactory.create(delta, READ);
+                 FilePageStore pageStore = (FilePageStore)storeFactory
+                     .apply(pair.getGroupId(), false)
+                     .createPageStore(getFlagByPartId(pair.getPartitionId()),
+                         snpPart::toPath,
+                         new LongAdderMetric("NO_OP", null))
+            ) {
+                ByteBuffer pageBuf = ByteBuffer.allocate(pageSize)
+                    .order(ByteOrder.nativeOrder());
+
+                long totalBytes = fileIo.size();
+
+                assert totalBytes % pageSize == 0 : "Given file with delta 
pages has incorrect size: " + fileIo.size();
+
+                pageStore.beginRecover();
+
+                for (long pos = 0; pos < totalBytes; pos += pageSize) {
+                    long read = fileIo.readFully(pageBuf, pos);
+
+                    assert read == pageBuf.capacity();
+
+                    pageBuf.flip();
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("Read page given delta file [path=" + 
delta.getName() +
+                            ", pageId=" + PageIO.getPageId(pageBuf) + ", pos=" 
+ pos + ", pages=" + (totalBytes / pageSize) +
+                            ", crcBuff=" + FastCrc.calcCrc(pageBuf, 
pageBuf.limit()) + ", crcPage=" + PageIO.getCrc(pageBuf) + ']');
+
+                        pageBuf.rewind();
+                    }
+
+                    pageStore.write(PageIO.getPageId(pageBuf), pageBuf, 0, 
false);
+
+                    pageBuf.flip();
+                }
+
+                pageStore.finishRecover();
+            }
+            catch (IOException | IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void close0(@Nullable Throwable th) {
+            if (th == null) {
+                if (log.isInfoEnabled())
+                    log.info("Local snapshot sender closed, resources released 
[dbNodeSnpDir=" + dbDir + ']');
+            }
+            else {
+                deleteSnapshot(snpLocDir, pdsSettings.folderName());
+
+                U.warn(log, "Local snapshot sender closed due to an error 
occurred", th);
+            }
+        }
+
+        /**
+         * @param from Copy from file.
+         * @param to Copy data to file.
+         * @param length Number of bytes to copy from beginning.
+         * @throws IOException If fails.
+         */
+        private void copy(File from, File to, long length) throws IOException {
+            try (FileIO src = ioFactory.create(from, READ);
+                 FileChannel dest = new FileOutputStream(to).getChannel()) {
+                if (src.size() < length) {
+                    throw new IgniteException("The source file to copy has to 
enough length " +
+                        "[expected=" + length + ", actual=" + src.size() + 
']');
+                }
+
+                src.position(0);
+
+                long written = 0;
+
+                while (written < length)
+                    written += src.transferTo(written, length - written, dest);
+            }
+        }
+    }
+
+    /** Snapshot start request for {@link DistributedProcess} initiate 
message. */
+    private static class SnapshotOperationRequest implements Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Unique snapshot request id. */
+        private final UUID rqId;
+
+        /** Source node id which trigger request. */
+        private final UUID srcNodeId;
+
+        /** Snapshot name. */
+        private final String snpName;
+
+        @GridToStringInclude
+        /** The list of cache groups to include into snapshot. */
+        private final List<Integer> grpIds;
+
+        @GridToStringInclude
+        /** The list of affected by snapshot operation baseline nodes. */
+        private final Set<UUID> bltNodes;
+
+        /** {@code true} if an execution of local snapshot tasks failed with 
an error. */
+        private volatile boolean hasErr;
+
+        /**
+         * @param snpName Snapshot name.
+         * @param grpIds Cache groups to include into snapshot.
+         */
+        public SnapshotOperationRequest(UUID rqId, UUID srcNodeId, String 
snpName, List<Integer> grpIds, Set<UUID> bltNodes) {
+            this.rqId = rqId;
+            this.srcNodeId = srcNodeId;
+            this.snpName = snpName;
+            this.grpIds = grpIds;
+            this.bltNodes = bltNodes;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SnapshotOperationRequest.class, this);
+        }
+    }
+
+    /** */
+    private static class SnapshotOperationResponse implements Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+    }
+
+    /** Snapshot operation start message. */
+    private static class SnapshotStartDiscoveryMessage implements 
SnapshotDiscoveryMessage {
+        /** Serial version UID. */
+        private static final long serialVersionUID = 0L;
+
+        /** Discovery cache. */
+        private final DiscoCache discoCache;
+
+        /** Snapshot request id */
+        private final IgniteUuid id;
+
+        /**
+         * @param discoCache Discovery cache.
+         * @param id Snapshot request id.
+         */
+        public SnapshotStartDiscoveryMessage(DiscoCache discoCache, UUID id) {
+            this.discoCache = discoCache;
+            this.id = new IgniteUuid(id, 0);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean needExchange() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean needAssignPartitions() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid id() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable DiscoveryCustomMessage ackMessage() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isMutable() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, 
AffinityTopologyVersion topVer,
+            DiscoCache discoCache) {
+            return this.discoCache;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            SnapshotStartDiscoveryMessage message = 
(SnapshotStartDiscoveryMessage)o;
+
+            return id.equals(message.id);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(id);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SnapshotStartDiscoveryMessage.class, this);
+        }
+    }
+
+    /** */
+    private static class ClusterSnapshotFuture extends GridFutureAdapter<Void> 
{
+        /** Unique snapshot request id. */
+        private final UUID rqId;
+
+        /** Snapshot name */
 
 Review comment:
   Point

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to