This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ddb17f  IGNITE-12981: Fix pme-free snapshot exchange if coordinator 
left the cluster (#7770)
6ddb17f is described below

commit 6ddb17fb9e346f085d7ef981442e8b62d09ee3fa
Author: Maxim Muzafarov <mmu...@apache.org>
AuthorDate: Wed May 6 19:52:48 2020 +0300

    IGNITE-12981: Fix pme-free snapshot exchange if coordinator left the 
cluster (#7770)
---
 .../preloader/GridDhtPartitionsExchangeFuture.java |  41 ++++-
 .../snapshot/IgniteSnapshotManager.java            |   2 +-
 .../snapshot/IgniteClusterSnapshotSelfTest.java    | 186 +++++++++++++++++++++
 3 files changed, 222 insertions(+), 7 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index e907c20..a8fc478 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -121,6 +121,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.jetbrains.annotations.Nullable;
@@ -800,8 +801,18 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             ExchangeType exchange;
 
             if (exchCtx.exchangeFreeSwitch()) {
-                exchange = isSnapshotOperation(firstDiscoEvt) ? 
onCustomMessageNoAffinityChange() :
-                    onExchangeFreeSwitchNodeLeft();
+                if (isSnapshotOperation(firstDiscoEvt)) {
+                    // Keep if the cluster was rebalanced.
+                    if (wasRebalanced())
+                        markRebalanced();
+
+                    if (!forceAffReassignment)
+                        cctx.affinity().onCustomMessageNoAffinityChange(this, 
exchActions);
+
+                    exchange = cctx.kernalContext().clientNode() ? 
ExchangeType.NONE : ExchangeType.ALL;
+                }
+                else
+                    exchange = onExchangeFreeSwitchNodeLeft();
 
                 initCoordinatorCaches(newCrd);
             }
@@ -1445,6 +1456,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     private void clientOnlyExchange() throws IgniteCheckedException {
         if (crd != null) {
             assert !crd.isLocal() : crd;
+            assert !exchCtx.exchangeFreeSwitch() : this;
 
             cctx.exchange().exchangerBlockingSectionBegin();
 
@@ -2166,6 +2178,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * @param oldestNode Oldest node. Target node to send message to.
      */
     private void sendPartitions(ClusterNode oldestNode) {
+        assert !exchCtx.exchangeFreeSwitch() : this;
+
         try {
             sendLocalPartitions(oldestNode);
         }
@@ -2946,7 +2960,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         if (log.isDebugEnabled())
             log.debug("Single message will be handled on completion of 
exchange future: " + this);
 
-        listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+        listen(failureHandlerWrapper(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
             @Override public void 
apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                 if (cctx.kernalContext().isStopping())
                     return;
@@ -2963,8 +2977,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 }
 
                 if (finishState0 == null) {
-                    assert (firstDiscoEvt.type() == EVT_NODE_JOINED && 
firstDiscoEvt.eventNode().isClient())
-                        || isSnapshotOperation(firstDiscoEvt) : 
GridDhtPartitionsExchangeFuture.this;
+                    assert (firstDiscoEvt.type() == EVT_NODE_JOINED && 
firstDiscoEvt.eventNode().isClient()) :
+                        GridDhtPartitionsExchangeFuture.this;
 
                     ClusterNode node = cctx.node(nodeId);
 
@@ -2990,7 +3004,22 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                 sendAllPartitionsToNode(finishState0, msg, nodeId);
             }
-        });
+        }));
+    }
+
+    /**
+     * @param clsr Closure to wrap with failure handler.
+     * @return Wrapped closure.
+     */
+    private <T extends IgniteInternalFuture<?>> IgniteInClosure<T> 
failureHandlerWrapper(IgniteInClosure<T> clsr) {
+        try {
+            return (CI1<T>)clsr::apply;
+        }
+        catch (Error e) {
+            cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
+
+            throw e;
+        }
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index ae8203a..99c2de2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -1171,7 +1171,7 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(SnapshotStartDiscoveryMessage.class, this);
+            return S.toString(SnapshotStartDiscoveryMessage.class, this, 
super.toString());
         }
     }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
index 7fe818a..9fa1e58 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
@@ -21,9 +21,15 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.file.OpenOption;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -50,8 +56,10 @@ import 
org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
@@ -63,6 +71,7 @@ import 
org.apache.ignite.internal.util.distributed.DistributedProcess;
 import org.apache.ignite.internal.util.distributed.FullMessage;
 import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
@@ -854,6 +863,183 @@ public class IgniteClusterSnapshotSelfTest extends 
AbstractSnapshotSelfTest {
         assertSnapshotCacheKeys(snp.cache(ccfg2.getName()));
     }
 
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCoordinatorStopped() throws Exception {
+        CountDownLatch block = new CountDownLatch(1);
+        startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+        startClientGrid(3);
+
+        awaitPartitionMapExchange();
+
+        for (Ignite grid : Arrays.asList(grid(1), grid(2))) {
+            ((IgniteEx)grid).context().cache().context().exchange()
+                .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                    /** {@inheritDoc} */
+                    @Override public void 
onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                        try {
+                            block.await();
+                        }
+                        catch (InterruptedException e) {
+                            fail("Must not catch exception here: " + 
e.getMessage());
+                        }
+                    }
+                });
+        }
+
+        for (Ignite grid : G.allGrids()) {
+            TestRecordingCommunicationSpi.spi(grid)
+                .blockMessages((node, msg) -> msg instanceof 
GridDhtPartitionsSingleMessage);
+        }
+
+        IgniteFuture<Void> fut = 
grid(1).snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        stopGrid(0);
+
+        block.countDown();
+
+        // There are two exchanges happen: snapshot, node left (with pme-free).
+        // Both of them are not require for sending messages.
+        assertFalse("Pme-free switch doesn't expect messaging exchanging 
between nodes",
+            GridTestUtils.waitForCondition(() -> {
+                boolean hasMsgs = false;
+
+                for (Ignite g : G.allGrids())
+                    hasMsgs |= 
TestRecordingCommunicationSpi.spi(g).hasBlockedMessages();
+
+                return hasMsgs;
+            }, 5_000));
+
+        assertThrowsWithCause((Callable<Object>)fut::get, 
IgniteException.class);
+
+        List<GridDhtPartitionsExchangeFuture> exchFuts =
+            grid(1).context().cache().context().exchange().exchangeFutures();
+
+        assertFalse("Exchanges cannot be empty due to snapshot and node left 
happened",
+            exchFuts.isEmpty());
+
+        for (GridDhtPartitionsExchangeFuture exch : exchFuts) {
+            assertTrue("Snapshot and node left events must keep `rebalanced` 
state" + exch,
+                exch.rebalanced());
+        }
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotOnMovingPartitionsCoordinatorLeft() throws 
Exception {
+        startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        for (Ignite grid : G.allGrids()) {
+            TestRecordingCommunicationSpi.spi(grid)
+                .blockMessages((node, msg) -> msg instanceof 
GridDhtPartitionSupplyMessage);
+        }
+
+        Ignite ignite = startGrid(2);
+
+        
ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
+
+        TestRecordingCommunicationSpi.spi(grid(0))
+            .waitForBlocked();
+
+        CountDownLatch latch = new CountDownLatch(G.allGrids().size());
+        IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(() -> {
+            try {
+                U.await(latch);
+
+                stopGrid(0);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                fail("Must not fail here: " + e.getMessage());
+            }
+        });
+
+        Queue<T2<GridDhtPartitionExchangeId, Boolean>> exchFuts = new 
ConcurrentLinkedQueue<>();
+
+        for (Ignite ig : G.allGrids()) {
+            ((IgniteEx)ig).context().cache().context().exchange()
+                .registerExchangeAwareComponent(new PartitionsExchangeAware() {
+                    /** {@inheritDoc} */
+                    @Override public void 
onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                        try {
+                            exchFuts.add(new T2<>(fut.exchangeId(), 
fut.rebalanced()));
+                            latch.countDown();
+
+                            stopFut.get();
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.log(log, "Interrupted on coordinator: " + 
e.getMessage());
+                        }
+                    }
+                });
+        }
+
+        IgniteFuture<Void> fut = 
ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        stopFut.get();
+
+        assertThrowsAnyCause(log,
+            fut::get,
+            IgniteException.class,
+            "Snapshot creation has been finished with an error");
+
+        assertEquals("Snapshot futures expected: " + exchFuts, 3, 
exchFuts.size());
+
+        for (T2<GridDhtPartitionExchangeId, Boolean> exch : exchFuts)
+            assertFalse("Snapshot `rebalanced` must be false with moving 
partitions: " + exch.get1(), exch.get2());
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotPartitionExchangeAwareOrder() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, 
CACHE_KEYS_RANGE);
+
+        Map<UUID, PartitionsExchangeAware> comps = new HashMap<>();
+
+        for (Ignite ig : G.allGrids()) {
+            PartitionsExchangeAware comp;
+
+            ((IgniteEx)ig).context().cache().context().exchange()
+                .registerExchangeAwareComponent(comp = new 
PartitionsExchangeAware() {
+                    private final AtomicInteger order = new AtomicInteger();
+
+                    @Override public void 
onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                        assertEquals("Exchange order violated: " + 
fut.firstEvent(), 0, order.getAndIncrement());
+                    }
+
+                    @Override public void 
onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                        assertEquals("Exchange order violated: " + 
fut.firstEvent(), 1, order.getAndIncrement());
+                    }
+
+                    @Override public void 
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+                        assertEquals("Exchange order violated: " + 
fut.firstEvent(), 2, order.getAndIncrement());
+                    }
+
+                    @Override public void 
onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+                        assertEquals("Exchange order violated: " + 
fut.firstEvent(), 3, order.getAndSet(0));
+                    }
+                });
+
+            comps.put(((IgniteEx)ig).localNode().id(), comp);
+        }
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        for (Ignite ig : G.allGrids()) {
+            ((IgniteEx)ig).context().cache().context().exchange()
+                
.unregisterExchangeAwareComponent(comps.get(((IgniteEx)ig).localNode().id()));
+        }
+
+        awaitPartitionMapExchange();
+
+        assertEquals("Some of ignite instances failed during snapshot", 3, 
G.allGrids().size());
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(3, SNAPSHOT_NAME);
+
+        assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+    }
+
     /**
      * @param ignite Ignite instance.
      * @param started Latch will be released when delta partition processing 
starts.

Reply via email to