alex-plekhanov commented on a change in pull request #9539:
URL: https://github.com/apache/ignite/pull/9539#discussion_r752259824



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2251,626 @@ private boolean readPageFromStore(long pageId, 
ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission 
result. */
+    private static class RemoteSnapshotFilesRecevier extends 
GridFutureAdapter<Void> {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + 
U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotFilesRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotFilesRecevier(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotFilesRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** Initiate handler by sending request message. */
+        public synchronized void init() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote 
request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + 
']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote 
node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + 
reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled 
prior to the all requested partitions processed.");
+
+            try {
+                partHnd.accept(part, null);
+            }
+            catch (IgniteInterruptedException e) {
+                throw new TransmissionCancelledException(e.getMessage());
+            }
+
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, 
@Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotFilesRecevier future = 
(RemoteSnapshotFilesRecevier)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotFilesRecevier.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from 
a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements 
TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotFilesRecevier active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotFilesRecevier> queue = new 
ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling.
+         */
+        public synchronized void 
submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
+            assert next != null;
+
+            RemoteSnapshotFilesRecevier curr = active;
+
+            if (curr == null || curr.isDone()) {
+                next.listen(f -> scheduleNext());
+
+                active = next;
+
+                if (stopping)
+                    next.acceptException(new 
IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+                else
+                    next.init();
+            }
+            else
+                queue.offer(next);
+        }
+
+        /** Schedule next async receiver. */
+        private synchronized void scheduleNext() {
+            RemoteSnapshotFilesRecevier next = queue.poll();
+
+            if (next == null)
+                return;
+
+            submit(next);
+        }
+
+        /** Stopping handler. */
+        public void stop() {
+            stopping = true;
+
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            GridCompoundFuture<Void, Void> stopFut = new 
GridCompoundFuture<>();
+
+            try {
+                for (IgniteInternalFuture<Void> fut : futs)
+                    stopFut.add(fut);
+
+                stopFut.markInitialized().get();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /**
+         * @param nodeId A node left the cluster.
+         */
+        public void onNodeLeft(UUID nodeId) {
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            ClusterTopologyCheckedException ex = new 
ClusterTopologyCheckedException("The node from which a snapshot has been " +
+                "requested left the grid");
+
+            futs.forEach(t -> {
+                if (t.rmtNodeId.equals(nodeId))
+                    t.acceptException(ex);
+            });
+        }
+
+        /**
+         * @return The set of currently scheduled tasks, some of them may be 
already completed.
+         */
+        private Set<RemoteSnapshotFilesRecevier> activeTasks() {
+            Set<RemoteSnapshotFilesRecevier> futs = new HashSet<>();
+
+            RemoteSnapshotFilesRecevier curr = active;
+            RemoteSnapshotFilesRecevier changed;
+
+            do {
+                futs.addAll(queue);
+                futs.add(curr);
+
+                changed = curr;
+            } while ((curr = active) != changed);

Review comment:
       See no reason to overcomplicate it. It provides the same guarantees as 
just `new HashSet<>(queue)` + `active`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
##########
@@ -3485,7 +3485,7 @@ else if (task instanceof ForceRebalanceExchangeTask) {
                             if (task instanceof ForceRebalanceExchangeTask)
                                 forcedRebFut = 
((ForceRebalanceExchangeTask)task).forcedRebalanceFuture();
 
-                            for (CacheGroupContext grp : 
assignsSet.descendingSet()) {
+                        for (CacheGroupContext grp : 
assignsSet.descendingSet()) {

Review comment:
       Still not fixed

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3925,16 +3925,16 @@ private void finishExchangeOnCoordinator(@Nullable 
Collection<ClusterNode> sndRe
 
                 if (discoveryCustomMessage instanceof DynamicCacheChangeBatch) 
{
                     if (exchActions != null) {
-                        Set<String> caches = 
exchActions.cachesToResetLostPartitions();
+                    Set<String> caches = 
exchActions.cachesToResetLostPartitions();

Review comment:
       Indent not fixed. All other changes to this file should be reverted too.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2251,626 @@ private boolean readPageFromStore(long pageId, 
ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission 
result. */
+    private static class RemoteSnapshotFilesRecevier extends 
GridFutureAdapter<Void> {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + 
U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotFilesRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotFilesRecevier(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotFilesRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** Initiate handler by sending request message. */
+        public synchronized void init() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote 
request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + 
']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote 
node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + 
reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled 
prior to the all requested partitions processed.");
+
+            try {
+                partHnd.accept(part, null);
+            }
+            catch (IgniteInterruptedException e) {
+                throw new TransmissionCancelledException(e.getMessage());
+            }
+
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, 
@Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotFilesRecevier future = 
(RemoteSnapshotFilesRecevier)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotFilesRecevier.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from 
a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements 
TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotFilesRecevier active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotFilesRecevier> queue = new 
ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling.
+         */
+        public synchronized void 
submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
+            assert next != null;
+
+            RemoteSnapshotFilesRecevier curr = active;
+
+            if (curr == null || curr.isDone()) {
+                next.listen(f -> scheduleNext());
+
+                active = next;
+
+                if (stopping)

Review comment:
       Let's process the `stopping` flag at the beginning of the method and 
complete all futures in the queue in the loop, to avoid recursive call and 
possible stack overflow.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2251,626 @@ private boolean readPageFromStore(long pageId, 
ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission 
result. */
+    private static class RemoteSnapshotFilesRecevier extends 
GridFutureAdapter<Void> {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + 
U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotFilesRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotFilesRecevier(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotFilesRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** Initiate handler by sending request message. */
+        public synchronized void init() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote 
request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + 
']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote 
node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + 
reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled 
prior to the all requested partitions processed.");
+
+            try {
+                partHnd.accept(part, null);
+            }
+            catch (IgniteInterruptedException e) {
+                throw new TransmissionCancelledException(e.getMessage());
+            }
+
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, 
@Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotFilesRecevier future = 
(RemoteSnapshotFilesRecevier)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotFilesRecevier.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from 
a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements 
TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotFilesRecevier active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotFilesRecevier> queue = new 
ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling.
+         */
+        public synchronized void 
submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
+            assert next != null;
+
+            RemoteSnapshotFilesRecevier curr = active;
+
+            if (curr == null || curr.isDone()) {
+                next.listen(f -> scheduleNext());
+
+                active = next;
+
+                if (stopping)
+                    next.acceptException(new 
IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+                else
+                    next.init();
+            }
+            else
+                queue.offer(next);
+        }
+
+        /** Schedule next async receiver. */
+        private synchronized void scheduleNext() {
+            RemoteSnapshotFilesRecevier next = queue.poll();
+
+            if (next == null)
+                return;
+
+            submit(next);
+        }
+
+        /** Stopping handler. */
+        public void stop() {
+            stopping = true;
+
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            GridCompoundFuture<Void, Void> stopFut = new 
GridCompoundFuture<>();
+
+            try {
+                for (IgniteInternalFuture<Void> fut : futs)
+                    stopFut.add(fut);
+
+                stopFut.markInitialized().get();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /**
+         * @param nodeId A node left the cluster.
+         */
+        public void onNodeLeft(UUID nodeId) {
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            ClusterTopologyCheckedException ex = new 
ClusterTopologyCheckedException("The node from which a snapshot has been " +
+                "requested left the grid");
+
+            futs.forEach(t -> {
+                if (t.rmtNodeId.equals(nodeId))
+                    t.acceptException(ex);
+            });
+        }
+
+        /**
+         * @return The set of currently scheduled tasks, some of them may be 
already completed.
+         */
+        private Set<RemoteSnapshotFilesRecevier> activeTasks() {
+            Set<RemoteSnapshotFilesRecevier> futs = new HashSet<>();
+
+            RemoteSnapshotFilesRecevier curr = active;
+            RemoteSnapshotFilesRecevier changed;
+
+            do {
+                futs.addAll(queue);
+                futs.add(curr);
+
+                changed = curr;
+            } while ((curr = active) != changed);
+
+            return futs.stream()
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                if (msg instanceof SnapshotFilesRequestMessage) {
+                    SnapshotFilesRequestMessage reqMsg0 = 
(SnapshotFilesRequestMessage)msg;
+                    String rqId = reqMsg0.requestId();
+                    String snpName = reqMsg0.snapshotName();
+
+                    synchronized (this) {
+                        AbstractSnapshotFutureTask<?> task = 
lastScheduledSnapshotResponseRemoteTask(nodeId);
+
+                        if (task != null) {
+                            // Task will also be removed from local map due to 
the listener on future done.
+                            task.cancel();
+
+                            log.info("Snapshot request has been cancelled due 
to another request received " +
+                                "[prevSnpResp=" + task + ", msg0=" + reqMsg0 + 
']');
+                        }
+                    }
+
+                    AbstractSnapshotFutureTask<?> task = registerTask(rqId,
+                        new SnapshotResponseRemoteFutureTask(cctx,
+                            nodeId,
+                            snpName,
+                            tmpWorkDir,
+                            ioFactory,
+                            rmtSndrFactory.apply(rqId, nodeId),
+                            reqMsg0.parts()));
+
+                    task.listen(f -> {
+                        if (f.error() == null)
+                            return;
+
+                        U.error(log, "Failed to process request of creating a 
snapshot " +
+                            "[from=" + nodeId + ", msg=" + reqMsg0 + ']', 
f.error());
+
+                        try {
+                            cctx.gridIO().sendToCustomTopic(nodeId,
+                                DFLT_INITIAL_SNAPSHOT_TOPIC,
+                                new 
SnapshotFilesFailureMessage(reqMsg0.requestId(), f.error().getMessage()),
+                                SYSTEM_POOL);
+                        }
+                        catch (IgniteCheckedException ex0) {
+                            U.error(log, "Fail to send the response message 
with processing snapshot request " +
+                                "error [request=" + reqMsg0 + ", nodeId=" + 
nodeId + ']', ex0);
+                        }
+                    });
+
+                    task.start();
+                }
+                else if (msg instanceof SnapshotFilesFailureMessage) {
+                    SnapshotFilesFailureMessage respMsg0 = 
(SnapshotFilesFailureMessage)msg;
+
+                    RemoteSnapshotFilesRecevier task = active;
+
+                    if (task == null || 
!task.reqId.equals(respMsg0.requestId())) {
+                        if (log.isInfoEnabled()) {
+                            log.info("A stale snapshot response message has 
been received. Will be ignored " +
+                                "[fromNodeId=" + nodeId + ", response=" + 
respMsg0 + ']');
+                        }
+
+                        return;
+                    }
+
+                    if (respMsg0.errorMessage() != null) {
+                        task.acceptException(new 
IgniteCheckedException("Request cancelled. The snapshot operation stopped " +
+                            "on the remote node with an error: " + 
respMsg0.errorMessage()));
+                    }
+                }
+            }
+            catch (Throwable e) {
+                U.error(log, "Processing snapshot request from remote node 
fails with an error", e);
+
+                cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));

Review comment:
       Should we really shut down the node in this case? Looks like this 
failure is not lead to node inconsistency. Why not just send 
`SnapshotFilesFailureMessage`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to