IGNITE-10815 Fix coordinator failover in case of exchanges merge and 
non-affinity nodes - Fixes #5746.

Signed-off-by: Pavel Kovalenko <jokse...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/28d8acc5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/28d8acc5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/28d8acc5

Branch: refs/heads/ignite-601
Commit: 28d8acc596f54f95ce8bbf591d046b2b5d24d915
Parents: 4a8921e
Author: Pavel Kovalenko <jokse...@gmail.com>
Authored: Thu Dec 27 11:32:12 2018 +0300
Committer: Pavel Kovalenko <jokse...@gmail.com>
Committed: Thu Dec 27 11:32:12 2018 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  46 +++--
 .../GridCachePartitionExchangeManager.java      |   2 +-
 .../GridDhtPartitionsExchangeFuture.java        |  32 +++-
 ...rtitionsExchangeCoordinatorFailoverTest.java | 176 ++++++++++++++++++-
 .../distributed/CacheExchangeMergeTest.java     |   8 +-
 5 files changed, 236 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/28d8acc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 61d88c7..2c4a640 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -2003,33 +2003,53 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                         assert idx >= 0 && idx < exchFuts.size() - 1 : 
"Invalid exchange futures state [cur=" + idx +
                             ", total=" + exchFuts.size() + ']';
 
-                        final GridDhtPartitionsExchangeFuture prev = 
exchFuts.get(idx + 1);
+                        GridDhtPartitionsExchangeFuture futureToFetchAffinity 
= null;
 
-                        assert prev.isDone() && 
prev.topologyVersion().compareTo(topVer) < 0 : prev;
+                        for (int i = idx + 1; i < exchFuts.size(); i++) {
+                            GridDhtPartitionsExchangeFuture prev = 
exchFuts.get(i);
+
+                            assert prev.isDone() && 
prev.topologyVersion().compareTo(topVer) < 0;
+
+                            if (prev.isMerged())
+                                continue;
+
+                            futureToFetchAffinity = prev;
+
+                            break;
+                        }
+
+                        if (futureToFetchAffinity == null)
+                            throw new IgniteCheckedException("Failed to find 
completed exchange future to fetch affinity.");
 
                         if (log.isDebugEnabled()) {
                             log.debug("Need initialize affinity on coordinator 
[" +
                                 "cacheGrp=" + desc.cacheOrGroupName() +
-                                "prevAff=" + prev.topologyVersion() + ']');
+                                "prevAff=" + 
futureToFetchAffinity.topologyVersion() + ']');
                         }
 
-                        GridDhtAssignmentFetchFuture fetchFut = new 
GridDhtAssignmentFetchFuture(cctx,
-                            desc.groupId(),
-                            prev.topologyVersion(),
-                            prev.events().discoveryCache());
+                        GridDhtAssignmentFetchFuture fetchFut = new 
GridDhtAssignmentFetchFuture(
+                                cctx,
+                                desc.groupId(),
+                                futureToFetchAffinity.topologyVersion(),
+                                futureToFetchAffinity.events().discoveryCache()
+                        );
 
                         fetchFut.init(false);
 
                         final GridFutureAdapter<AffinityTopologyVersion> 
affFut = new GridFutureAdapter<>();
 
+                        final GridDhtPartitionsExchangeFuture 
futureToFetchAffinity0 = futureToFetchAffinity;
+
                         fetchFut.listen(new 
IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>() {
                             @Override public void 
applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
-                                throws IgniteCheckedException {
-                                fetchAffinity(prev.topologyVersion(),
-                                    prev.events(),
-                                    prev.events().discoveryCache(),
-                                    aff,
-                                    (GridDhtAssignmentFetchFuture)fetchFut);
+                                    throws IgniteCheckedException {
+                                fetchAffinity(
+                                        
futureToFetchAffinity0.topologyVersion(),
+                                        futureToFetchAffinity0.events(),
+                                        
futureToFetchAffinity0.events().discoveryCache(),
+                                        aff,
+                                        (GridDhtAssignmentFetchFuture)fetchFut
+                                );
 
                                 aff.calculate(topVer, fut.events(), 
fut.events().discoveryCache());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/28d8acc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 01c10aa..de1054b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -2616,7 +2616,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         GridDhtPartitionsExchangeFuture fut0 = 
(GridDhtPartitionsExchangeFuture)task;
 
                         if (resVer.compareTo(fut0.initialVersion()) >= 0) {
-                            fut0.finishMerged();
+                            fut0.finishMerged(resVer);
 
                             futQ.remove(fut0);
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/28d8acc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d4d89bc..b53fe99 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -51,6 +51,8 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteDiagnosticAware;
 import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
@@ -2027,8 +2029,23 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /**
      * Finish merged future to allow 
GridCachePartitionExchangeManager.ExchangeFutureSet cleanup.
      */
-    public void finishMerged() {
-        super.onDone(null, null);
+    public void finishMerged(AffinityTopologyVersion resVer) {
+        synchronized (mux) {
+            if (state == null) state = ExchangeLocalState.MERGED;
+        }
+
+        done.set(true);
+
+        super.onDone(resVer, null);
+    }
+
+    /**
+     * @return {@code True} if future was merged.
+     */
+    public boolean isMerged() {
+        synchronized (mux) {
+            return state == ExchangeLocalState.MERGED;
+        }
     }
 
     /**
@@ -4420,7 +4437,16 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
                                 cctx.kernalContext().closure().callLocal(new 
Callable<Void>() {
                                     @Override public Void call() throws 
Exception {
-                                        
newCrdFut.init(GridDhtPartitionsExchangeFuture.this);
+                                        try {
+                                            
newCrdFut.init(GridDhtPartitionsExchangeFuture.this);
+                                        }
+                                        catch (Throwable t) {
+                                            U.error(log, "Failed to initialize 
new coordinator future [topVer=" + initialVersion() + "]", t);
+
+                                            
cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, t));
+
+                                            throw t;
+                                        }
 
                                         newCrdFut.listen(new 
CI1<IgniteInternalFuture>() {
                                             @Override public void 
apply(IgniteInternalFuture fut) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/28d8acc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
index a001bd4..dc42b29 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
@@ -18,20 +18,32 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Assert;
@@ -44,23 +56,38 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class PartitionsExchangeCoordinatorFailoverTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final String CACHE_NAME = "cache";
+
+    /** */
+    private Supplier<CommunicationSpi> spiFactory = TcpCommunicationSpi::new;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
         cfg.setConsistentId(igniteInstanceName);
 
-        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
-
-        IgnitePredicate<ClusterNode> nodeFilter = node -> 
node.consistentId().equals(igniteInstanceName);
+        cfg.setCommunicationSpi(spiFactory.get());
 
         cfg.setCacheConfiguration(
-            new CacheConfiguration("cache-" + igniteInstanceName)
-                .setBackups(1)
-                .setNodeFilter(nodeFilter)
-                .setAffinity(new RendezvousAffinityFunction(false, 32))
+                new CacheConfiguration(CACHE_NAME)
+                    .setBackups(2)
+                    .setAffinity(new RendezvousAffinityFunction(false, 32))
         );
 
+        // Add cache that exists only on coordinator node.
+        if (igniteInstanceName.equals("crd")) {
+            IgnitePredicate<ClusterNode> nodeFilter = node -> 
node.consistentId().equals(igniteInstanceName);
+
+            cfg.setCacheConfiguration(
+                    new CacheConfiguration(CACHE_NAME + 0)
+                            .setBackups(2)
+                            .setNodeFilter(nodeFilter)
+                            .setAffinity(new RendezvousAffinityFunction(false, 
32))
+            );
+        }
+
         return cfg;
     }
 
@@ -81,6 +108,8 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
      */
     @Test
     public void testNewCoordinatorCompletedExchange() throws Exception {
+        spiFactory = TestRecordingCommunicationSpi::new;
+
         IgniteEx crd = (IgniteEx) startGrid("crd");
 
         IgniteEx newCrd = startGrid(1);
@@ -150,7 +179,7 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
 
         // Check that all caches are operable.
         for (Ignite grid : G.allGrids()) {
-            IgniteCache cache = grid.cache("cache-" + 
grid.cluster().localNode().consistentId());
+            IgniteCache cache = grid.cache(CACHE_NAME);
 
             Assert.assertNotNull(cache);
 
@@ -165,6 +194,8 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
      */
     @Test
     public void testDelayedFullMessageReplacedIfCoordinatorChanged() throws 
Exception {
+        spiFactory = TestRecordingCommunicationSpi::new;
+
         IgniteEx crd = startGrid("crd");
 
         IgniteEx newCrd = startGrid(1);
@@ -199,6 +230,94 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
     }
 
     /**
+     * Test that exchange coordinator initialized correctly in case of 
exchanges merge and caches without affinity nodes.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCoordinatorChangeAfterExchangesMerge() throws Exception {
+        // Delay demand messages sending to suspend late affinity assignment.
+        spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> {
+            final int delay = 5_000;
+
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage demandMessage = 
(GridDhtPartitionDemandMessage) msg;
+
+                if (demandMessage.groupId() == 
GridCacheUtils.cacheId(GridCacheUtils.UTILITY_CACHE_NAME))
+                    return 0;
+
+                return delay;
+            }
+
+            return 0;
+        });
+
+        final IgniteEx crd = startGrid("crd");
+
+        startGrid(1);
+
+        for (int k = 0; k < 1024; k++)
+            crd.cache(CACHE_NAME).put(k, k);
+
+        // Delay sending single messages to ensure exchanges are merged.
+        spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> {
+            final int delay = 1_000;
+
+            if (msg instanceof GridDhtPartitionsSingleMessage) {
+                GridDhtPartitionsSingleMessage singleMsg = 
(GridDhtPartitionsSingleMessage) msg;
+
+                if (singleMsg.exchangeId() != null)
+                    return delay;
+            }
+
+            return 0;
+        });
+
+        // This should trigger exchanges merge.
+        startGridsMultiThreaded(2, 2);
+
+        // Delay sending single message from new node to have time to shutdown 
coordinator.
+        spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> {
+            final int delay = 5_000;
+
+            if (msg instanceof GridDhtPartitionsSingleMessage) {
+                GridDhtPartitionsSingleMessage singleMsg = 
(GridDhtPartitionsSingleMessage) msg;
+
+                if (singleMsg.exchangeId() != null)
+                    return delay;
+            }
+
+            return 0;
+        });
+
+        // Trigger next exchange.
+        IgniteInternalFuture startNodeFut = GridTestUtils.runAsync(() -> 
startGrid(4));
+
+        // Wait till other nodes will send their messages to coordinator.
+        U.sleep(2_500);
+
+        // And then stop coordinator node.
+        stopGrid("crd", true);
+
+        startNodeFut.get();
+
+        awaitPartitionMapExchange();
+
+        // Check that all caches are operable.
+        for (Ignite grid : G.allGrids()) {
+            IgniteCache cache = grid.cache(CACHE_NAME);
+
+            Assert.assertNotNull(cache);
+
+            for (int k = 0; k < 1024; k++)
+                Assert.assertEquals(k, cache.get(k));
+
+            for (int k = 0; k < 1024; k++)
+                cache.put(k, k);
+        }
+    }
+
+    /**
      * Blocks sending full message from coordinator to non-coordinator node.
      * @param from Coordinator node.
      * @param to Non-coordinator node.
@@ -222,4 +341,45 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
             return false;
         });
     }
+
+    /**
+     * Communication SPI that allows to delay sending message by predicate.
+     */
+    class DynamicDelayingCommunicationSpi extends TcpCommunicationSpi {
+        /** Function that returns delay in milliseconds for given message. */
+        private final Function<Message, Integer> delayMessageFunc;
+
+        /** */
+        DynamicDelayingCommunicationSpi() {
+            this(msg -> 0);
+        }
+
+        /**
+         * @param delayMessageFunc Function to calculate delay for message.
+         */
+        DynamicDelayingCommunicationSpi(final Function<Message, Integer> 
delayMessageFunc) {
+            this.delayMessageFunc = delayMessageFunc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, 
IgniteInClosure<IgniteException> ackC)
+                throws IgniteSpiException {
+            try {
+                GridIoMessage ioMsg = (GridIoMessage)msg;
+
+                int delay = delayMessageFunc.apply(ioMsg.message());
+
+                if (delay > 0) {
+                    log.warning(String.format("Delay sending %s to %s", msg, 
node));
+
+                    U.sleep(delay);
+                }
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw new IgniteSpiException(e);
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/28d8acc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 942bcd9..8305e66 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -72,6 +72,7 @@ import 
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -1487,11 +1488,12 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
         for (int i = futs.size() - 1; i >= 0; i--) {
             GridDhtPartitionsExchangeFuture fut = futs.get(i);
 
-            if (fut.exchangeDone() && fut.firstEvent().type() != 
EVT_DISCOVERY_CUSTOM_EVT) {
+            if (!fut.isMerged() && fut.exchangeDone() && 
fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT) {
                 AffinityTopologyVersion resVer = fut.topologyVersion();
 
-                if (resVer != null)
-                    doneVers.add(resVer);
+                Assert.assertNotNull(resVer);
+
+                doneVers.add(resVer);
             }
         }
 

Reply via email to