alex-plekhanov commented on a change in pull request #9539:
URL: https://github.com/apache/ignite/pull/9539#discussion_r744705988



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
##########
@@ -3485,7 +3485,7 @@ else if (task instanceof ForceRebalanceExchangeTask) {
                             if (task instanceof ForceRebalanceExchangeTask)
                                 forcedRebFut = 
((ForceRebalanceExchangeTask)task).forcedRebalanceFuture();
 
-                            for (CacheGroupContext grp : 
assignsSet.descendingSet()) {
+                        for (CacheGroupContext grp : 
assignsSet.descendingSet()) {

Review comment:
       Wrong indent

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -96,9 +115,12 @@
     private final GridKernalContext ctx;
 
     /** Cache group restore prepare phase. */
-    private final DistributedProcess<SnapshotOperationRequest, 
ArrayList<StoredCacheData>> prepareRestoreProc;
+    private final DistributedProcess<SnapshotOperationRequest, 
SnapshotRestoreOperationResponse> prepareRestoreProc;
 
-    /** Cache group restore cache start phase. */
+    /**Cache group restore cache start phase. */

Review comment:
       Space after `/**`.
   Looks like javadoc for `cacheStartProc`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * @param <T> Type of snapshot processing result.
+ */
+abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> {
+    /** Shared context. */
+    protected final GridCacheSharedContext<?, ?> cctx;
+
+    /** Ignite logger. */
+    protected final IgniteLogger log;
+
+    /** Node id which cause snapshot operation. */
+    protected final UUID srcNodeId;
+
+    /** Unique identifier of snapshot process. */
+    protected final String snpName;
+
+    /** Snapshot working directory on file system. */
+    protected final File tmpSnpWorkDir;
+
+    /** IO factory which will be used for creating snapshot delta-writers. */
+    protected final FileIOFactory ioFactory;
+
+    /** Snapshot data sender. */
+    @GridToStringExclude
+    protected final SnapshotSender snpSndr;
+
+    /** Partition to be processed. */
+    protected final Map<Integer, Set<Integer>> parts;
+
+    /** An exception which has been occurred during snapshot processing. */
+    protected final AtomicReference<Throwable> err = new AtomicReference<>();
+
+    /**
+     * @param cctx Shared context.
+     * @param srcNodeId Node id which cause snapshot task creation.
+     * @param snpName Unique identifier of snapshot process.
+     * @param tmpWorkDir Working directory for intermediate snapshot results.
+     * @param ioFactory Factory to working with snapshot files.
+     * @param snpSndr Factory which produces snapshot receiver instance.
+     * @param parts Partition to be processed.
+     */
+    protected AbstractSnapshotFutureTask(
+        GridCacheSharedContext<?, ?> cctx,
+        UUID srcNodeId,
+        String snpName,
+        File tmpWorkDir,
+        FileIOFactory ioFactory,
+        SnapshotSender snpSndr,
+        Map<Integer, Set<Integer>> parts
+    ) {
+        assert snpName != null : "Snapshot name cannot be empty or null.";
+        assert snpSndr != null : "Snapshot sender which handles execution 
tasks must be not null.";
+        assert snpSndr.executor() != null : "Executor service must be not 
null.";
+
+        this.cctx = cctx;
+        this.log = cctx.logger(AbstractSnapshotFutureTask.class);
+        this.srcNodeId = srcNodeId;
+        this.snpName = snpName;
+        this.tmpSnpWorkDir = new File(tmpWorkDir, snpName);
+        this.ioFactory = ioFactory;
+        this.snpSndr = snpSndr;
+        this.parts = parts;
+    }
+
+    /**
+     * @return Snapshot name.
+     */
+    public String snapshotName() {
+        return snpName;
+    }
+
+    /**
+     * @return Node id which triggers this operation.
+     */
+    public UUID sourceNodeId() {
+        return srcNodeId;
+    }
+
+    /**
+     * // TODO started and start methods can be merged.

Review comment:
       TODO should be with the link to the ticket
   Method name is strange (taking into account return value) and it strange to 
have it in parent class.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2235,609 @@ private boolean readPageFromStore(long pageId, 
ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission 
result. */
+    private static class RemoteSnapshotHandler extends GridFutureAdapter<Void> 
implements Runnable {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + 
U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotHandler(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void run() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote 
request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + 
']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote 
node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + 
reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled 
prior to the all requested partitions processed.");
+
+            partHnd.accept(part, null);
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, 
@Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotHandler future = (RemoteSnapshotHandler)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotHandler.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from 
a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements 
TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotHandler active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotHandler> queue = new 
ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling. May produce <tt>null</tt> 
value if the queue is empty.
+         */
+        public synchronized void submit(@Nullable RemoteSnapshotHandler next) {
+            RemoteSnapshotHandler curr = active;
+
+            if (curr == null || curr.isDone()) {
+                if (next == null)
+                    return;
+
+                next.listen(f -> submit(queue.poll()));
+
+                active = next;
+
+                if (stopping)
+                    next.acceptException(new 
IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+                else
+                    next.run();

Review comment:
       Runnable is not a good interface here, since someone might think that 
the future completes when the `run()` method completes. Actually `run()` method 
just initializes the task.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -787,30 +711,351 @@ private void finishPrepare(UUID reqId, Map<UUID, 
ArrayList<StoredCacheData>> res
         }
 
         if (failure == null)
-            failure = checkNodeLeft(opCtx0.nodes, res.keySet());
+            failure = checkNodeLeft(opCtx0.nodes(), res.keySet());
 
         // Context has been created - should rollback changes cluster-wide.
         if (failure != null) {
-            opCtx0.err.compareAndSet(null, failure);
-
-            if (U.isLocalNodeCoordinator(ctx.discovery()))
-                rollbackRestoreProc.start(reqId, reqId);
+            opCtx0.errHnd.accept(failure);
 
             return;
         }
 
         Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
 
-        for (List<StoredCacheData> storedCfgs : res.values()) {
-            if (storedCfgs == null)
-                continue;
+        for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e : 
res.entrySet()) {
+            if (e.getValue().ccfgs != null) {
+                for (StoredCacheData cacheData : e.getValue().ccfgs) {
+                    globalCfgs.put(CU.cacheId(cacheData.config().getName()), 
cacheData);
 
-            for (StoredCacheData cacheData : storedCfgs)
-                globalCfgs.put(CU.cacheId(cacheData.config().getName()), 
cacheData);
+                    
opCtx0.dirs.add(((FilePageStoreManager)ctx.cache().context().pageStore()).cacheWorkDir(cacheData.config()));
+                }
+            }
+
+            opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new 
ArrayList<>())
+                .addAll(e.getValue().metas);
         }
 
         opCtx0.cfgs = globalCfgs;
 
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
+            preloadProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param cache Discovery cache.
+     * @return Map of affinity per each cache group.
+     */
+    private static GridAffinityAssignmentCache calculateAffinity(
+        GridKernalContext ctx,
+        CacheConfiguration<?, ?> ccfg,
+        DiscoCache cache
+    ) {
+        GridAffinityAssignmentCache affCache = 
GridAffinityAssignmentCache.create(ctx, ccfg.getAffinity(), ccfg);
+
+        affCache.calculate(cache.version(), null, cache);
+
+        return affCache;
+    }
+
+    /**
+     * @param metas List of snapshot metadata to check.
+     * @param grpId Group id.
+     * @param parts Set of partitions to search for.
+     * @return Snapshot metadata which contains a full set of given partitions 
or {@code null} the otherwise.
+     */
+    private static @Nullable SnapshotMetadata findMetadataWithSamePartitions(
+        List<SnapshotMetadata> metas,
+        int grpId,
+        Set<Integer> parts
+    ) {
+        assert !F.isEmpty(parts) && !parts.contains(INDEX_PARTITION) : parts;
+
+        // Try to copy everything right from the single snapshot part.
+        for (SnapshotMetadata meta : metas) {
+            Set<Integer> grpParts = meta.partitions().get(grpId);
+            Set<Integer> grpWoIndex = grpParts == null ? 
Collections.emptySet() : new HashSet<>(grpParts);
+
+            grpWoIndex.remove(INDEX_PARTITION);
+
+            if (grpWoIndex.equals(parts))
+                return meta;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param reqId Request id.
+     * @return Future which will be completed when the preload ends.
+     */
+    private IgniteInternalFuture<Boolean> preload(UUID reqId) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+        GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+
+        if (opCtx0 == null)
+            return new GridFinishedFuture<>(new 
IgniteCheckedException("Snapshot restore process has incorrect restore state: " 
+ reqId));
+
+        if (opCtx0.dirs.isEmpty())
+            return new GridFinishedFuture<>();
+
+        try {
+            if (ctx.isStopping())
+                throw new NodeStoppingException("Node is stopping: " + 
ctx.localNodeId());
+
+            IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
+
+            synchronized (this) {
+                opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> 
null));
+            }
+
+            if (log.isInfoEnabled()) {
+                log.info("Starting snapshot preload operation to restore cache 
groups" +
+                    "[snapshot=" + opCtx0.snpName +
+                    ", caches=" + F.transform(opCtx0.dirs, File::getName) + 
']');
+            }
+
+            CompletableFuture<Void> metaFut = 
ctx.localNodeId().equals(opCtx0.opNodeId) ?
+                CompletableFuture.runAsync(
+                    () -> {
+                        try {
+                            SnapshotMetadata meta = 
F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
+
+                            File binDir = 
binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+                                meta.folderName());
+
+                            ctx.cacheObjects().updateMetadata(binDir, 
opCtx0.stopChecker);
+                        }
+                        catch (Throwable t) {
+                            log.error("Unable to perform metadata update 
operation for the cache groups restore process", t);
+
+                            opCtx0.errHnd.accept(t);
+                        }
+                    }, snpMgr.snapshotExecutorService()) : 
CompletableFuture.completedFuture(null);
+
+            for (StoredCacheData data : opCtx0.cfgs.values()) {
+                
opCtx0.affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
+                    grp -> calculateAffinity(ctx, data.config(), 
opCtx0.discoCache));
+            }
+
+            // First preload everything from the local node.
+            List<SnapshotMetadata> locMetas = 
opCtx0.metasPerNode.get(ctx.localNodeId());
+
+            Map<Integer, Set<PartitionRestoreFuture>> rmtLoadParts = new 
HashMap<>();
+            ClusterNode locNode = ctx.cache().context().localNode();
+
+            for (File dir : opCtx0.dirs) {
+                String cacheOrGrpName = cacheGroupName(dir);
+                int grpId = CU.cacheId(cacheOrGrpName);
+
+                File tmpCacheDir = formatTmpDirName(dir);
+                tmpCacheDir.mkdir();
+
+                Set<PartitionRestoreFuture> leftParts;
+
+                opCtx0.locProgress.put(grpId,
+                    
nodeAffinityPartitions(opCtx0.affCache.get(cacheOrGrpName), locNode, 
PartitionRestoreFuture::new));
+
+                rmtLoadParts.put(grpId, leftParts = new 
HashSet<>(opCtx0.locProgress.get(grpId)));
+
+                if (leftParts.isEmpty())
+                    continue;
+
+                SnapshotMetadata full = 
findMetadataWithSamePartitions(locMetas,
+                    grpId,
+                    leftParts.stream().map(p -> 
p.partId).collect(Collectors.toSet()));
+
+                for (SnapshotMetadata meta : full == null ? locMetas : 
Collections.singleton(full)) {
+                    if (leftParts.isEmpty())
+                        break;
+
+                    File snpCacheDir = new 
File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx.snpName),
+                        Paths.get(databaseRelativePath(meta.folderName()), 
dir.getName()).toString());
+
+                    leftParts.removeIf(partFut -> {
+                        boolean doCopy = 
ofNullable(meta.partitions().get(grpId))
+                            .orElse(Collections.emptySet())
+                            .contains(partFut.partId);
+
+                        if (doCopy) {
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                partFut);
+                        }
+
+                        return doCopy;
+                    });
+
+                    if (meta == full) {
+                        assert leftParts.isEmpty() : leftParts;
+
+                        if (log.isInfoEnabled()) {
+                            log.info("The snapshot was taken on the same 
cluster topology. The index will be copied to " +
+                                "restoring cache group if necessary [snpName=" 
+ opCtx0.snpName + ", dir=" + dir.getName() + ']');
+                        }
+
+                        File idxFile = new File(snpCacheDir, 
FilePageStoreManager.getPartitionFileName(INDEX_PARTITION));
+
+                        if (idxFile.exists()) {
+                            PartitionRestoreFuture idxFut;
+
+                            opCtx0.locProgress.computeIfAbsent(grpId, g -> new 
HashSet<>())
+                                .add(idxFut = new 
PartitionRestoreFuture(INDEX_PARTITION));
+
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                idxFut);
+                        }
+                    }
+                }
+            }
+
+            // Load other partitions from remote nodes.
+            // This is necessary for sending only one partitions request per 
each cluster node.
+            Map<UUID, Map<Integer, Set<Integer>>> snpAff = 
snapshotAffinity(opCtx0.metasPerNode,
+                (grpId, partId) -> rmtLoadParts.get(grpId) != null &&
+                    rmtLoadParts.get(grpId).contains(new 
PartitionRestoreFuture(partId)));
+            Map<Integer, File> grpToDir = opCtx0.dirs.stream()
+                .collect(Collectors.toMap(d -> 
CU.cacheId(FilePageStoreManager.cacheGroupName(d)),
+                    d -> d));
+
+            try {
+                if (log.isInfoEnabled() && !snpAff.isEmpty()) {
+                    log.info("Trying to request partitions from remote nodes" +
+                        "[snapshot=" + opCtx0.snpName +
+                        ", map=" + 
snpAff.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+                        e -> partitionsMapToCompactString(e.getValue()))) + 
']');
+                }
+
+                for (Map.Entry<UUID, Map<Integer, Set<Integer>>> m : 
snpAff.entrySet()) {
+                    if (m.getKey().equals(ctx.localNodeId()))
+                        continue;
+
+                    ctx.cache().context().snapshotMgr()
+                        .requestRemoteSnapshot(m.getKey(),
+                            opCtx0.snpName,
+                            m.getValue(),
+                            opCtx0.stopChecker,
+                            (snpFile, t) -> {
+                                if (opCtx0.stopChecker.getAsBoolean())
+                                    throw new 
TransmissionCancelledException("Snapshot remote operation request cancelled.");

Review comment:
       This type of exception here looks strange. Method `preload` knows 
nothing about the internals of `requestRemoteSnapshot`, but looks like this 
exception is required by some workflow in `requestRemoteSnapshot`. 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -1007,17 +1414,96 @@ private void finishRollback(UUID reqId, Map<UUID, 
Boolean> res, Map<UUID, Except
 
         /**
          * @param req Request to prepare cache group restore from the snapshot.
-         * @param dirs List of cache group names to restore from the snapshot.
          * @param cfgs Cache ID to configuration mapping.
          */
-        protected SnapshotRestoreContext(SnapshotOperationRequest req, 
Collection<File> dirs,
-            Map<Integer, StoredCacheData> cfgs) {
+        protected SnapshotRestoreContext(
+            SnapshotOperationRequest req,
+            DiscoCache discoCache,
+            Map<Integer, StoredCacheData> cfgs,
+            UUID locNodeId,
+            List<SnapshotMetadata> locMetas
+        ) {
             reqId = req.requestId();
             snpName = req.snapshotName();
-            nodes = new HashSet<>(req.nodes());
+            opNodeId = req.operationalNodeId();
+            this.discoCache = discoCache;
 
-            this.dirs = dirs;
             this.cfgs = cfgs;
+
+            metasPerNode.computeIfAbsent(locNodeId, id -> new 
ArrayList<>()).addAll(locMetas);
+        }
+
+        /**
+         * @return Required baseline nodeIds that must be alive to complete 
restore operation.
+         */
+        public Collection<UUID> nodes() {
+            return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
+        }
+    }
+
+    /** */
+    private static class RestoreCacheStartException extends 
IgniteCheckedException {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** @param cause Error. */
+        public RestoreCacheStartException(Throwable cause) {
+            super(cause);
+        }
+    }
+
+    /** Snapshot operation prepare response. */
+    private static class SnapshotRestoreOperationResponse implements 
Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Cache configurations on local node. */
+        private ArrayList<StoredCacheData> ccfgs;

Review comment:
       `ArrayList` -> `List`, final

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -315,28 +327,44 @@ protected void cleanup() throws IgniteCheckedException {
     }
 
     /**
-     * Check if the cache or group with the specified name is currently being 
restored from the snapshot.
-     *
-     * @param cacheName Cache name.
-     * @param grpName Cache group name.
+     * @param opCtx Restoring context.
+     * @return The request id of restoring snapshot operation.
+     */
+    private @Nullable UUID restoringId(@Nullable SnapshotRestoreContext opCtx) 
{

Review comment:
       Not used

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1440,13 +1504,46 @@ public static boolean 
isSnapshotOperation(DiscoveryEvent evt) {
         }
     }
 
+    /**
+     * @param parts Collection of pairs group and appropriate cache partition 
to be snapshot.
+     * @param rmtNodeId The remote node to connect to.
+     * @param partHnd Received partition handler.
+     */
+    public IgniteInternalFuture<Void> requestRemoteSnapshot(

Review comment:
       I don't like naming. This method does not request to make a remote 
snapshot, it requests remote snapshot files (already existing). 
RemoteSnapshotHandler does not handle remote snapshots, it receives remote 
snapshot files. SnapshotRequestMessage does not request to make a snapshot, it 
requests already existing files. SnapshotResponseMessage send only in case of 
error when sending files, it's better to have this information in the name 
(SnapshotFilesFailureResponse for example). Etc.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3925,16 +3925,16 @@ private void finishExchangeOnCoordinator(@Nullable 
Collection<ClusterNode> sndRe
 
                 if (discoveryCustomMessage instanceof DynamicCacheChangeBatch) 
{
                     if (exchActions != null) {
-                        Set<String> caches = 
exchActions.cachesToResetLostPartitions();
+                    Set<String> caches = 
exchActions.cachesToResetLostPartitions();

Review comment:
       Wrong indent. Nothing changed in this class except indents.

##########
File path: 
modules/core/src/test/java/org/apache/ignite/platform/PlatformAddArgEntryProcessorBinarizable.java
##########
@@ -19,7 +19,6 @@
 
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.MutableEntry;
-

Review comment:
       Not related to this fix

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1579,52 +1676,41 @@ SnapshotFutureTask registerSnapshotTask(
         boolean withMetaStorage,
         SnapshotSender snpSndr
     ) {
+        return (SnapshotFutureTask)registerSnapshotTask(snpName, new 
SnapshotFutureTask(cctx, srcNodeId, snpName, tmpWorkDir,

Review comment:
       Method returns `SnapshotFinishedFutureTask` in some cases, but result 
force casted to `SnapshotFutureTask`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1579,52 +1676,41 @@ SnapshotFutureTask registerSnapshotTask(
         boolean withMetaStorage,
         SnapshotSender snpSndr
     ) {
+        return (SnapshotFutureTask)registerSnapshotTask(snpName, new 
SnapshotFutureTask(cctx, srcNodeId, snpName, tmpWorkDir,
+            ioFactory, snpSndr, parts, withMetaStorage, locBuff));
+    }
+
+    /**
+     * @param task Snapshot operation task to be executed.
+     * @return Snapshot operation task which should be registered on 
checkpoint to run.
+     */
+    private AbstractSnapshotFutureTask<?> registerSnapshotTask(String rqId, 
AbstractSnapshotFutureTask<?> task) {
         if (!busyLock.enterBusy()) {
-            return new SnapshotFutureTask(
-                new IgniteCheckedException("Snapshot manager is stopping 
[locNodeId=" + cctx.localNodeId() + ']'));
+            return new SnapshotFinishedFutureTask(new 
IgniteCheckedException("Snapshot manager is stopping [locNodeId=" +
+                cctx.localNodeId() + ']'));
         }
 
         try {
-            if (locSnpTasks.containsKey(snpName))
-                return new SnapshotFutureTask(new 
IgniteCheckedException("Snapshot with requested name is already scheduled: " + 
snpName));
-
-            SnapshotFutureTask snpFutTask;
+            if (locSnpTasks.containsKey(rqId)) {
+                return new SnapshotFinishedFutureTask(new 
IgniteCheckedException("Snapshot with requested name is already scheduled: " +
+                    rqId));
+            }
 
-            SnapshotFutureTask prev = locSnpTasks.putIfAbsent(snpName,
-                snpFutTask = new SnapshotFutureTask(cctx,
-                    srcNodeId,
-                    snpName,
-                    tmpWorkDir,
-                    ioFactory,
-                    snpSndr,
-                    parts,
-                    withMetaStorage,
-                    locBuff));
+            AbstractSnapshotFutureTask<?> prev = locSnpTasks.putIfAbsent(rqId, 
task);
 
             if (prev != null)
-                return new SnapshotFutureTask(new 
IgniteCheckedException("Snapshot with requested name is already scheduled: " + 
snpName));
-
-            if (!withMetaStorage) {

Review comment:
       Why this check was removed?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2235,609 @@ private boolean readPageFromStore(long pageId, 
ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission 
result. */
+    private static class RemoteSnapshotHandler extends GridFutureAdapter<Void> 
implements Runnable {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + 
U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotHandler(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void run() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote 
request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + 
']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote 
node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + 
reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled 
prior to the all requested partitions processed.");
+
+            partHnd.accept(part, null);
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, 
@Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotHandler future = (RemoteSnapshotHandler)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotHandler.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from 
a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements 
TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotHandler active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotHandler> queue = new 
ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling. May produce <tt>null</tt> 
value if the queue is empty.
+         */
+        public synchronized void submit(@Nullable RemoteSnapshotHandler next) {
+            RemoteSnapshotHandler curr = active;
+
+            if (curr == null || curr.isDone()) {
+                if (next == null)
+                    return;
+
+                next.listen(f -> submit(queue.poll()));

Review comment:
       The race is still here, when task is done it first poll the queue (get 
null result) and then pass it to sync submit method, at the same time another 
thread can be inside sync submit method before offer next tast to the queue.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -787,30 +711,351 @@ private void finishPrepare(UUID reqId, Map<UUID, 
ArrayList<StoredCacheData>> res
         }
 
         if (failure == null)
-            failure = checkNodeLeft(opCtx0.nodes, res.keySet());
+            failure = checkNodeLeft(opCtx0.nodes(), res.keySet());
 
         // Context has been created - should rollback changes cluster-wide.
         if (failure != null) {
-            opCtx0.err.compareAndSet(null, failure);
-
-            if (U.isLocalNodeCoordinator(ctx.discovery()))
-                rollbackRestoreProc.start(reqId, reqId);
+            opCtx0.errHnd.accept(failure);
 
             return;
         }
 
         Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
 
-        for (List<StoredCacheData> storedCfgs : res.values()) {
-            if (storedCfgs == null)
-                continue;
+        for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e : 
res.entrySet()) {
+            if (e.getValue().ccfgs != null) {
+                for (StoredCacheData cacheData : e.getValue().ccfgs) {
+                    globalCfgs.put(CU.cacheId(cacheData.config().getName()), 
cacheData);
 
-            for (StoredCacheData cacheData : storedCfgs)
-                globalCfgs.put(CU.cacheId(cacheData.config().getName()), 
cacheData);
+                    
opCtx0.dirs.add(((FilePageStoreManager)ctx.cache().context().pageStore()).cacheWorkDir(cacheData.config()));
+                }
+            }
+
+            opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new 
ArrayList<>())
+                .addAll(e.getValue().metas);
         }
 
         opCtx0.cfgs = globalCfgs;
 
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
+            preloadProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param cache Discovery cache.
+     * @return Map of affinity per each cache group.
+     */
+    private static GridAffinityAssignmentCache calculateAffinity(
+        GridKernalContext ctx,
+        CacheConfiguration<?, ?> ccfg,
+        DiscoCache cache
+    ) {
+        GridAffinityAssignmentCache affCache = 
GridAffinityAssignmentCache.create(ctx, ccfg.getAffinity(), ccfg);
+
+        affCache.calculate(cache.version(), null, cache);
+
+        return affCache;
+    }
+
+    /**
+     * @param metas List of snapshot metadata to check.
+     * @param grpId Group id.
+     * @param parts Set of partitions to search for.
+     * @return Snapshot metadata which contains a full set of given partitions 
or {@code null} the otherwise.
+     */
+    private static @Nullable SnapshotMetadata findMetadataWithSamePartitions(
+        List<SnapshotMetadata> metas,
+        int grpId,
+        Set<Integer> parts
+    ) {
+        assert !F.isEmpty(parts) && !parts.contains(INDEX_PARTITION) : parts;
+
+        // Try to copy everything right from the single snapshot part.
+        for (SnapshotMetadata meta : metas) {
+            Set<Integer> grpParts = meta.partitions().get(grpId);
+            Set<Integer> grpWoIndex = grpParts == null ? 
Collections.emptySet() : new HashSet<>(grpParts);
+
+            grpWoIndex.remove(INDEX_PARTITION);
+
+            if (grpWoIndex.equals(parts))
+                return meta;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param reqId Request id.
+     * @return Future which will be completed when the preload ends.
+     */
+    private IgniteInternalFuture<Boolean> preload(UUID reqId) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+        GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+
+        if (opCtx0 == null)
+            return new GridFinishedFuture<>(new 
IgniteCheckedException("Snapshot restore process has incorrect restore state: " 
+ reqId));
+
+        if (opCtx0.dirs.isEmpty())
+            return new GridFinishedFuture<>();
+
+        try {
+            if (ctx.isStopping())
+                throw new NodeStoppingException("Node is stopping: " + 
ctx.localNodeId());
+
+            IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
+
+            synchronized (this) {
+                opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> 
null));
+            }
+
+            if (log.isInfoEnabled()) {
+                log.info("Starting snapshot preload operation to restore cache 
groups" +
+                    "[snapshot=" + opCtx0.snpName +
+                    ", caches=" + F.transform(opCtx0.dirs, File::getName) + 
']');
+            }
+
+            CompletableFuture<Void> metaFut = 
ctx.localNodeId().equals(opCtx0.opNodeId) ?
+                CompletableFuture.runAsync(
+                    () -> {
+                        try {
+                            SnapshotMetadata meta = 
F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
+
+                            File binDir = 
binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+                                meta.folderName());
+
+                            ctx.cacheObjects().updateMetadata(binDir, 
opCtx0.stopChecker);
+                        }
+                        catch (Throwable t) {
+                            log.error("Unable to perform metadata update 
operation for the cache groups restore process", t);
+
+                            opCtx0.errHnd.accept(t);
+                        }
+                    }, snpMgr.snapshotExecutorService()) : 
CompletableFuture.completedFuture(null);
+
+            for (StoredCacheData data : opCtx0.cfgs.values()) {
+                
opCtx0.affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
+                    grp -> calculateAffinity(ctx, data.config(), 
opCtx0.discoCache));
+            }
+
+            // First preload everything from the local node.
+            List<SnapshotMetadata> locMetas = 
opCtx0.metasPerNode.get(ctx.localNodeId());
+
+            Map<Integer, Set<PartitionRestoreFuture>> rmtLoadParts = new 
HashMap<>();
+            ClusterNode locNode = ctx.cache().context().localNode();
+
+            for (File dir : opCtx0.dirs) {
+                String cacheOrGrpName = cacheGroupName(dir);
+                int grpId = CU.cacheId(cacheOrGrpName);
+
+                File tmpCacheDir = formatTmpDirName(dir);
+                tmpCacheDir.mkdir();
+
+                Set<PartitionRestoreFuture> leftParts;
+
+                opCtx0.locProgress.put(grpId,
+                    
nodeAffinityPartitions(opCtx0.affCache.get(cacheOrGrpName), locNode, 
PartitionRestoreFuture::new));
+
+                rmtLoadParts.put(grpId, leftParts = new 
HashSet<>(opCtx0.locProgress.get(grpId)));
+
+                if (leftParts.isEmpty())
+                    continue;
+
+                SnapshotMetadata full = 
findMetadataWithSamePartitions(locMetas,
+                    grpId,
+                    leftParts.stream().map(p -> 
p.partId).collect(Collectors.toSet()));
+
+                for (SnapshotMetadata meta : full == null ? locMetas : 
Collections.singleton(full)) {
+                    if (leftParts.isEmpty())
+                        break;
+
+                    File snpCacheDir = new 
File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx.snpName),
+                        Paths.get(databaseRelativePath(meta.folderName()), 
dir.getName()).toString());
+
+                    leftParts.removeIf(partFut -> {
+                        boolean doCopy = 
ofNullable(meta.partitions().get(grpId))
+                            .orElse(Collections.emptySet())
+                            .contains(partFut.partId);
+
+                        if (doCopy) {
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                partFut);
+                        }
+
+                        return doCopy;
+                    });
+
+                    if (meta == full) {
+                        assert leftParts.isEmpty() : leftParts;
+
+                        if (log.isInfoEnabled()) {
+                            log.info("The snapshot was taken on the same 
cluster topology. The index will be copied to " +
+                                "restoring cache group if necessary [snpName=" 
+ opCtx0.snpName + ", dir=" + dir.getName() + ']');
+                        }
+
+                        File idxFile = new File(snpCacheDir, 
FilePageStoreManager.getPartitionFileName(INDEX_PARTITION));
+
+                        if (idxFile.exists()) {
+                            PartitionRestoreFuture idxFut;
+
+                            opCtx0.locProgress.computeIfAbsent(grpId, g -> new 
HashSet<>())
+                                .add(idxFut = new 
PartitionRestoreFuture(INDEX_PARTITION));
+
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                idxFut);
+                        }
+                    }
+                }
+            }
+
+            // Load other partitions from remote nodes.
+            // This is necessary for sending only one partitions request per 
each cluster node.
+            Map<UUID, Map<Integer, Set<Integer>>> snpAff = 
snapshotAffinity(opCtx0.metasPerNode,
+                (grpId, partId) -> rmtLoadParts.get(grpId) != null &&
+                    rmtLoadParts.get(grpId).contains(new 
PartitionRestoreFuture(partId)));
+            Map<Integer, File> grpToDir = opCtx0.dirs.stream()
+                .collect(Collectors.toMap(d -> 
CU.cacheId(FilePageStoreManager.cacheGroupName(d)),
+                    d -> d));
+
+            try {
+                if (log.isInfoEnabled() && !snpAff.isEmpty()) {
+                    log.info("Trying to request partitions from remote nodes" +
+                        "[snapshot=" + opCtx0.snpName +
+                        ", map=" + 
snpAff.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+                        e -> partitionsMapToCompactString(e.getValue()))) + 
']');
+                }
+
+                for (Map.Entry<UUID, Map<Integer, Set<Integer>>> m : 
snpAff.entrySet()) {
+                    if (m.getKey().equals(ctx.localNodeId()))
+                        continue;

Review comment:
       Is it possible? Looks like only remote nodes should be in snpAff

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -787,30 +711,351 @@ private void finishPrepare(UUID reqId, Map<UUID, 
ArrayList<StoredCacheData>> res
         }
 
         if (failure == null)
-            failure = checkNodeLeft(opCtx0.nodes, res.keySet());
+            failure = checkNodeLeft(opCtx0.nodes(), res.keySet());
 
         // Context has been created - should rollback changes cluster-wide.
         if (failure != null) {
-            opCtx0.err.compareAndSet(null, failure);
-
-            if (U.isLocalNodeCoordinator(ctx.discovery()))
-                rollbackRestoreProc.start(reqId, reqId);
+            opCtx0.errHnd.accept(failure);
 
             return;
         }
 
         Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
 
-        for (List<StoredCacheData> storedCfgs : res.values()) {
-            if (storedCfgs == null)
-                continue;
+        for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e : 
res.entrySet()) {
+            if (e.getValue().ccfgs != null) {
+                for (StoredCacheData cacheData : e.getValue().ccfgs) {
+                    globalCfgs.put(CU.cacheId(cacheData.config().getName()), 
cacheData);
 
-            for (StoredCacheData cacheData : storedCfgs)
-                globalCfgs.put(CU.cacheId(cacheData.config().getName()), 
cacheData);
+                    
opCtx0.dirs.add(((FilePageStoreManager)ctx.cache().context().pageStore()).cacheWorkDir(cacheData.config()));
+                }
+            }
+
+            opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new 
ArrayList<>())
+                .addAll(e.getValue().metas);
         }
 
         opCtx0.cfgs = globalCfgs;
 
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
+            preloadProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param cache Discovery cache.
+     * @return Map of affinity per each cache group.
+     */
+    private static GridAffinityAssignmentCache calculateAffinity(
+        GridKernalContext ctx,
+        CacheConfiguration<?, ?> ccfg,
+        DiscoCache cache
+    ) {
+        GridAffinityAssignmentCache affCache = 
GridAffinityAssignmentCache.create(ctx, ccfg.getAffinity(), ccfg);
+
+        affCache.calculate(cache.version(), null, cache);
+
+        return affCache;
+    }
+
+    /**
+     * @param metas List of snapshot metadata to check.
+     * @param grpId Group id.
+     * @param parts Set of partitions to search for.
+     * @return Snapshot metadata which contains a full set of given partitions 
or {@code null} the otherwise.
+     */
+    private static @Nullable SnapshotMetadata findMetadataWithSamePartitions(
+        List<SnapshotMetadata> metas,
+        int grpId,
+        Set<Integer> parts
+    ) {
+        assert !F.isEmpty(parts) && !parts.contains(INDEX_PARTITION) : parts;
+
+        // Try to copy everything right from the single snapshot part.
+        for (SnapshotMetadata meta : metas) {
+            Set<Integer> grpParts = meta.partitions().get(grpId);
+            Set<Integer> grpWoIndex = grpParts == null ? 
Collections.emptySet() : new HashSet<>(grpParts);
+
+            grpWoIndex.remove(INDEX_PARTITION);
+
+            if (grpWoIndex.equals(parts))
+                return meta;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param reqId Request id.
+     * @return Future which will be completed when the preload ends.
+     */
+    private IgniteInternalFuture<Boolean> preload(UUID reqId) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+        GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+
+        if (opCtx0 == null)
+            return new GridFinishedFuture<>(new 
IgniteCheckedException("Snapshot restore process has incorrect restore state: " 
+ reqId));
+
+        if (opCtx0.dirs.isEmpty())
+            return new GridFinishedFuture<>();
+
+        try {
+            if (ctx.isStopping())
+                throw new NodeStoppingException("Node is stopping: " + 
ctx.localNodeId());
+
+            IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
+
+            synchronized (this) {
+                opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> 
null));
+            }
+
+            if (log.isInfoEnabled()) {
+                log.info("Starting snapshot preload operation to restore cache 
groups" +
+                    "[snapshot=" + opCtx0.snpName +
+                    ", caches=" + F.transform(opCtx0.dirs, File::getName) + 
']');
+            }
+
+            CompletableFuture<Void> metaFut = 
ctx.localNodeId().equals(opCtx0.opNodeId) ?
+                CompletableFuture.runAsync(
+                    () -> {
+                        try {
+                            SnapshotMetadata meta = 
F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
+
+                            File binDir = 
binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+                                meta.folderName());
+
+                            ctx.cacheObjects().updateMetadata(binDir, 
opCtx0.stopChecker);
+                        }
+                        catch (Throwable t) {
+                            log.error("Unable to perform metadata update 
operation for the cache groups restore process", t);
+
+                            opCtx0.errHnd.accept(t);
+                        }
+                    }, snpMgr.snapshotExecutorService()) : 
CompletableFuture.completedFuture(null);
+
+            for (StoredCacheData data : opCtx0.cfgs.values()) {
+                
opCtx0.affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
+                    grp -> calculateAffinity(ctx, data.config(), 
opCtx0.discoCache));
+            }
+
+            // First preload everything from the local node.
+            List<SnapshotMetadata> locMetas = 
opCtx0.metasPerNode.get(ctx.localNodeId());
+
+            Map<Integer, Set<PartitionRestoreFuture>> rmtLoadParts = new 
HashMap<>();
+            ClusterNode locNode = ctx.cache().context().localNode();
+
+            for (File dir : opCtx0.dirs) {
+                String cacheOrGrpName = cacheGroupName(dir);
+                int grpId = CU.cacheId(cacheOrGrpName);
+
+                File tmpCacheDir = formatTmpDirName(dir);
+                tmpCacheDir.mkdir();
+
+                Set<PartitionRestoreFuture> leftParts;
+
+                opCtx0.locProgress.put(grpId,
+                    
nodeAffinityPartitions(opCtx0.affCache.get(cacheOrGrpName), locNode, 
PartitionRestoreFuture::new));
+
+                rmtLoadParts.put(grpId, leftParts = new 
HashSet<>(opCtx0.locProgress.get(grpId)));
+
+                if (leftParts.isEmpty())
+                    continue;
+
+                SnapshotMetadata full = 
findMetadataWithSamePartitions(locMetas,
+                    grpId,
+                    leftParts.stream().map(p -> 
p.partId).collect(Collectors.toSet()));
+
+                for (SnapshotMetadata meta : full == null ? locMetas : 
Collections.singleton(full)) {
+                    if (leftParts.isEmpty())
+                        break;
+
+                    File snpCacheDir = new 
File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx.snpName),
+                        Paths.get(databaseRelativePath(meta.folderName()), 
dir.getName()).toString());
+
+                    leftParts.removeIf(partFut -> {
+                        boolean doCopy = 
ofNullable(meta.partitions().get(grpId))
+                            .orElse(Collections.emptySet())
+                            .contains(partFut.partId);
+
+                        if (doCopy) {
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                partFut);
+                        }
+
+                        return doCopy;
+                    });
+
+                    if (meta == full) {
+                        assert leftParts.isEmpty() : leftParts;
+
+                        if (log.isInfoEnabled()) {
+                            log.info("The snapshot was taken on the same 
cluster topology. The index will be copied to " +
+                                "restoring cache group if necessary [snpName=" 
+ opCtx0.snpName + ", dir=" + dir.getName() + ']');
+                        }
+
+                        File idxFile = new File(snpCacheDir, 
FilePageStoreManager.getPartitionFileName(INDEX_PARTITION));
+
+                        if (idxFile.exists()) {
+                            PartitionRestoreFuture idxFut;
+
+                            opCtx0.locProgress.computeIfAbsent(grpId, g -> new 
HashSet<>())
+                                .add(idxFut = new 
PartitionRestoreFuture(INDEX_PARTITION));
+
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                idxFut);
+                        }
+                    }
+                }
+            }
+
+            // Load other partitions from remote nodes.
+            // This is necessary for sending only one partitions request per 
each cluster node.
+            Map<UUID, Map<Integer, Set<Integer>>> snpAff = 
snapshotAffinity(opCtx0.metasPerNode,
+                (grpId, partId) -> rmtLoadParts.get(grpId) != null &&
+                    rmtLoadParts.get(grpId).contains(new 
PartitionRestoreFuture(partId)));
+            Map<Integer, File> grpToDir = opCtx0.dirs.stream()
+                .collect(Collectors.toMap(d -> 
CU.cacheId(FilePageStoreManager.cacheGroupName(d)),
+                    d -> d));
+
+            try {
+                if (log.isInfoEnabled() && !snpAff.isEmpty()) {
+                    log.info("Trying to request partitions from remote nodes" +
+                        "[snapshot=" + opCtx0.snpName +
+                        ", map=" + 
snpAff.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+                        e -> partitionsMapToCompactString(e.getValue()))) + 
']');
+                }
+
+                for (Map.Entry<UUID, Map<Integer, Set<Integer>>> m : 
snpAff.entrySet()) {

Review comment:
       As far as I understand snpAff contains required by current node remote 
partitions. But here can be duplicates (two nodes can contain the same 
partitions), and looks like if two nodes contain the same partitions we request 
it from both nodes.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -990,15 +1369,43 @@ private void finishRollback(UUID reqId, Map<UUID, 
Boolean> res, Map<UUID, Except
         /** Snapshot name. */
         private final String snpName;
 
-        /** Baseline node IDs that must be alive to complete the operation. */
-        private final Set<UUID> nodes;
+        /** Baseline discovery cache for node IDs that must be alive to 
complete the operation.*/
+        private final DiscoCache discoCache;
 
-        /** List of restored cache group directories. */
-        private final Collection<File> dirs;
+        /** Operational node id. */
+        private final UUID opNodeId;
+
+        /**
+         * Set of restored cache groups path on local node. Collected when all 
cache configurations received
+         * from the <tt>prepare</tt> distributed process.
+         */
+        private final Set<File> dirs = new HashSet<>();
 
         /** The exception that led to the interruption of the process. */
         private final AtomicReference<Throwable> err = new AtomicReference<>();
 
+        /** Distribution of snapshot metadata files across the cluster. */
+        private final Map<UUID, ArrayList<SnapshotMetadata>> metasPerNode = 
new HashMap<>();

Review comment:
       `ArrayList` -> `List`?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -872,12 +1118,49 @@ private void finishCacheStart(UUID reqId, Map<UUID, 
Boolean> res, Map<UUID, Exce
             rollbackRestoreProc.start(reqId, reqId);
     }
 
+    /**
+     * @param metas Map of snapshot metadata distribution across the cluster.
+     * @return Map of cache partitions per each node.
+     */
+    private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
+        Map<UUID, ArrayList<SnapshotMetadata>> metas,
+        BiPredicate<Integer, Integer> filter
+    ) {
+        Map<UUID, Map<Integer, Set<Integer>>> nodeToSnp = new HashMap<>();
+
+        for (Map.Entry<UUID, ArrayList<SnapshotMetadata>> e : 
metas.entrySet()) {
+            UUID nodeId = e.getKey();
+
+            for (SnapshotMetadata meta : ofNullable(e.getValue()).orElse(new 
ArrayList<>())) {

Review comment:
       `new ArrayList<>()` -> `Collections.emptyList()`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2235,609 @@ private boolean readPageFromStore(long pageId, 
ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission 
result. */
+    private static class RemoteSnapshotHandler extends GridFutureAdapter<Void> 
implements Runnable {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + 
U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotHandler(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void run() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote 
request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + 
']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote 
node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + 
reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled 
prior to the all requested partitions processed.");
+
+            partHnd.accept(part, null);
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, 
@Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotHandler future = (RemoteSnapshotHandler)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotHandler.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from 
a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements 
TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotHandler active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotHandler> queue = new 
ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling. May produce <tt>null</tt> 
value if the queue is empty.
+         */
+        public synchronized void submit(@Nullable RemoteSnapshotHandler next) {
+            RemoteSnapshotHandler curr = active;
+
+            if (curr == null || curr.isDone()) {
+                if (next == null)
+                    return;
+
+                next.listen(f -> submit(queue.poll()));
+
+                active = next;
+
+                if (stopping)
+                    next.acceptException(new 
IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+                else
+                    next.run();
+            }
+            else if (next != null)
+                queue.offer(next);
+        }
+
+        /** Stopping handler. */
+        public void stop() {
+            stopping = true;
+
+            Set<RemoteSnapshotHandler> futs = activeTasks();
+            GridCompoundFuture<Void, Void> stopFut = new 
GridCompoundFuture<>();
+
+            try {
+                for (IgniteInternalFuture<Void> fut : futs)
+                    stopFut.add(fut);
+
+                stopFut.markInitialized().get();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /**
+         * @param nodeId A node left the cluster.
+         */
+        public void onNodeLeft(UUID nodeId) {
+            Set<RemoteSnapshotHandler> futs = activeTasks();
+            ClusterTopologyCheckedException ex = new 
ClusterTopologyCheckedException("The node from which a snapshot has been " +
+                "requested left the grid");
+
+            futs.forEach(t -> {
+                if (t.rmtNodeId.equals(nodeId))
+                    t.acceptException(ex);
+            });
+        }
+
+        /**
+         * @return The set of currently scheduled tasks, some of them may be 
already completed.
+         */
+        private Set<RemoteSnapshotHandler> activeTasks() {
+            Set<RemoteSnapshotHandler> futs = new HashSet<>();
+
+            RemoteSnapshotHandler curr = active;
+            RemoteSnapshotHandler changed;
+
+            do {
+                futs.addAll(queue);
+                futs.add(curr);
+
+                changed = curr;
+            } while ((curr = active) != changed);
+
+            return futs.stream()
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                if (msg instanceof SnapshotRequestMessage) {
+                    SnapshotRequestMessage reqMsg0 = 
(SnapshotRequestMessage)msg;
+                    String rqId = reqMsg0.requestId();
+                    String snpName = reqMsg0.snapshotName();
+
+                    synchronized (this) {
+                        AbstractSnapshotFutureTask<?> task = 
lastScheduledSnapshotResponseRemoteTask(nodeId);
+
+                        if (task != null) {
+                            // Task will also be removed from local map due to 
the listener on future done.
+                            task.cancel();
+
+                            log.info("Snapshot request has been cancelled due 
to another request received " +
+                                "[prevSnpResp=" + task + ", msg0=" + reqMsg0 + 
']');
+                        }
+                    }
+
+                    AbstractSnapshotFutureTask<?> task = 
registerSnapshotTask(rqId,
+                        new SnapshotResponseRemoteFutureTask(cctx,
+                            nodeId,
+                            snpName,
+                            tmpWorkDir,
+                            ioFactory,
+                            rmtSndrFactory.apply(rqId, nodeId),
+                            reqMsg0.parts()));
+
+                    task.listen(f -> {
+                        if (f.error() == null)
+                            return;
+
+                        U.error(log, "Failed to process request of creating a 
snapshot " +
+                            "[from=" + nodeId + ", msg=" + reqMsg0 + ']', 
f.error());
+
+                        try {
+                            cctx.gridIO().sendToCustomTopic(nodeId,
+                                DFLT_INITIAL_SNAPSHOT_TOPIC,
+                                new 
SnapshotResponseMessage(reqMsg0.requestId(), f.error().getMessage()),
+                                SYSTEM_POOL);
+                        }
+                        catch (IgniteCheckedException ex0) {
+                            U.error(log, "Fail to send the response message 
with processing snapshot request " +
+                                "error [request=" + reqMsg0 + ", nodeId=" + 
nodeId + ']', ex0);
+                        }
+                    });
+
+                    task.start();
+                }
+                else if (msg instanceof SnapshotResponseMessage) {
+                    SnapshotResponseMessage respMsg0 = 
(SnapshotResponseMessage)msg;
+
+                    RemoteSnapshotHandler task = active;
+
+                    if (task == null || 
!task.reqId.equals(respMsg0.requestId())) {
+                        if (log.isInfoEnabled()) {
+                            log.info("A stale snapshot response message has 
been received. Will be ignored " +
+                                "[fromNodeId=" + nodeId + ", response=" + 
respMsg0 + ']');
+                        }
+
+                        return;
+                    }
+
+                    if (respMsg0.errorMessage() != null) {
+                        task.acceptException(new 
IgniteCheckedException("Request cancelled. The snapshot operation stopped " +
+                            "on the remote node with an error: " + 
respMsg0.errorMessage()));
+                    }
+                }
+            }
+            catch (Throwable e) {
+                U.error(log, "Processing snapshot request from remote node 
fails with an error", e);
+
+                cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onEnd(UUID nodeId) {
+            RemoteSnapshotHandler task = active;
+
+            if (task == null)
+                return;
+
+            assert task.partsLeft.get() == 0 : task;
+            assert task.rmtNodeId.equals(nodeId);
+
+            log.info("Requested snapshot from remote node has been fully 
received " +

Review comment:
       isInfoEnabled

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -907,8 +1190,9 @@ private Exception checkNodeLeft(Set<UUID> reqNodes, 
Set<UUID> respNodes) {
 
         synchronized (this) {
             opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+        }
 
-            try {
+        try {
                 
ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {

Review comment:
       Wrong indent

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -1007,17 +1414,96 @@ private void finishRollback(UUID reqId, Map<UUID, 
Boolean> res, Map<UUID, Except
 
         /**
          * @param req Request to prepare cache group restore from the snapshot.
-         * @param dirs List of cache group names to restore from the snapshot.
          * @param cfgs Cache ID to configuration mapping.
          */
-        protected SnapshotRestoreContext(SnapshotOperationRequest req, 
Collection<File> dirs,
-            Map<Integer, StoredCacheData> cfgs) {
+        protected SnapshotRestoreContext(
+            SnapshotOperationRequest req,
+            DiscoCache discoCache,
+            Map<Integer, StoredCacheData> cfgs,
+            UUID locNodeId,
+            List<SnapshotMetadata> locMetas
+        ) {
             reqId = req.requestId();
             snpName = req.snapshotName();
-            nodes = new HashSet<>(req.nodes());
+            opNodeId = req.operationalNodeId();
+            this.discoCache = discoCache;
 
-            this.dirs = dirs;
             this.cfgs = cfgs;
+
+            metasPerNode.computeIfAbsent(locNodeId, id -> new 
ArrayList<>()).addAll(locMetas);
+        }
+
+        /**
+         * @return Required baseline nodeIds that must be alive to complete 
restore operation.
+         */
+        public Collection<UUID> nodes() {
+            return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
+        }
+    }
+
+    /** */
+    private static class RestoreCacheStartException extends 
IgniteCheckedException {

Review comment:
       Not used

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -980,6 +1263,102 @@ private void finishRollback(UUID reqId, Map<UUID, 
Boolean> res, Map<UUID, Except
         finishProcess(reqId, opCtx0.err.get());
     }
 
+    /**
+     * @param mgr Ignite snapshot manager.
+     * @param opCtx Snapshot operation context.
+     * @param srcDir Snapshot directory to copy from.
+     * @param targetDir Destination directory to copy to.
+     */
+    private static void copyLocalAsync(
+        IgniteSnapshotManager mgr,
+        SnapshotRestoreContext opCtx,
+        File srcDir,
+        File targetDir,
+        PartitionRestoreFuture partFut
+    ) {
+        File snpFile = new File(srcDir, 
FilePageStoreManager.getPartitionFileName(partFut.partId));
+        Path partFile = Paths.get(targetDir.getAbsolutePath(), 
FilePageStoreManager.getPartitionFileName(partFut.partId));
+
+        CompletableFuture.supplyAsync(() -> {
+                if (opCtx.stopChecker.getAsBoolean())
+                    throw new IgniteInterruptedException("The operation has 
been stopped on copy file: " + snpFile.getAbsolutePath());
+
+                if (Thread.interrupted())
+                    throw new IgniteInterruptedException("Thread has been 
interrupted: " + Thread.currentThread().getName());
+
+                if (!snpFile.exists()) {
+                    throw new IgniteException("Partition snapshot file doesn't 
exist [snpName=" + opCtx.snpName +
+                        ", snpDir=" + snpFile.getAbsolutePath() + ", name=" + 
snpFile.getName() + ']');
+                }
+
+                IgniteSnapshotManager.copy(mgr.ioFactory(), snpFile, 
partFile.toFile(), snpFile.length());
+
+                return partFile;
+            }, mgr.snapshotExecutorService())
+            .whenComplete((r, t) -> opCtx.errHnd.accept(t))
+            .whenComplete((r, t) -> {
+                if (t == null)
+                    partFut.complete(partFile);
+                else
+                    partFut.completeExceptionally(t);
+            });
+    }
+
+    /**
+     * @param affCache Affinity cache.
+     * @param node Cluster node to get assigned partitions.
+     * @return The set of partitions assigned to the given node.
+     */
+    private static <T> Set<T> nodeAffinityPartitions(
+        GridAffinityAssignmentCache affCache,
+        ClusterNode node,
+        IntFunction<T> factory
+    ) {
+        return IntStream.range(0, affCache.partitions())
+            .filter(p -> 
affCache.idealAssignment().assignment().get(p).contains(node))
+            .mapToObj(factory)
+            .collect(Collectors.toSet());
+    }
+
+    /**
+     * @param col Collection of sets to complete.
+     * @param ex Exception to set.
+     */
+    private static void 
completeListExceptionally(Collection<Set<PartitionRestoreFuture>> col, 
Throwable ex) {
+        col.stream()
+            .flatMap(Collection::stream)
+            .forEach(f -> f.completeExceptionally(ex));
+    }
+
+    /**
+     * @param cacheStartFut The cache started future to wrap exception if need.
+     * @param <T> Result future type.
+     * @return Future which completes with wrapped exception if it occurred.
+     */
+    private static <T> IgniteInternalFuture<T> 
chainCacheStartException(IgniteInternalFuture<T> cacheStartFut) {

Review comment:
       Not used

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -1007,17 +1414,96 @@ private void finishRollback(UUID reqId, Map<UUID, 
Boolean> res, Map<UUID, Except
 
         /**
          * @param req Request to prepare cache group restore from the snapshot.
-         * @param dirs List of cache group names to restore from the snapshot.
          * @param cfgs Cache ID to configuration mapping.
          */
-        protected SnapshotRestoreContext(SnapshotOperationRequest req, 
Collection<File> dirs,
-            Map<Integer, StoredCacheData> cfgs) {
+        protected SnapshotRestoreContext(
+            SnapshotOperationRequest req,
+            DiscoCache discoCache,
+            Map<Integer, StoredCacheData> cfgs,
+            UUID locNodeId,
+            List<SnapshotMetadata> locMetas
+        ) {
             reqId = req.requestId();
             snpName = req.snapshotName();
-            nodes = new HashSet<>(req.nodes());
+            opNodeId = req.operationalNodeId();
+            this.discoCache = discoCache;
 
-            this.dirs = dirs;
             this.cfgs = cfgs;
+
+            metasPerNode.computeIfAbsent(locNodeId, id -> new 
ArrayList<>()).addAll(locMetas);
+        }
+
+        /**
+         * @return Required baseline nodeIds that must be alive to complete 
restore operation.
+         */
+        public Collection<UUID> nodes() {
+            return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
+        }
+    }
+
+    /** */
+    private static class RestoreCacheStartException extends 
IgniteCheckedException {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** @param cause Error. */
+        public RestoreCacheStartException(Throwable cause) {
+            super(cause);
+        }
+    }
+
+    /** Snapshot operation prepare response. */
+    private static class SnapshotRestoreOperationResponse implements 
Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Cache configurations on local node. */
+        private ArrayList<StoredCacheData> ccfgs;
+
+        /** Snapshot metadata files on local node. */
+        private ArrayList<SnapshotMetadata> metas;

Review comment:
       `ArrayList` -> `List`, final




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to