IGNITE-10815 Fix coordinator failover in case of exchanges merge and non-affinity nodes - Fixes #5746.
Signed-off-by: Pavel Kovalenko <jokse...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/28d8acc5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/28d8acc5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/28d8acc5 Branch: refs/heads/ignite-601 Commit: 28d8acc596f54f95ce8bbf591d046b2b5d24d915 Parents: 4a8921e Author: Pavel Kovalenko <jokse...@gmail.com> Authored: Thu Dec 27 11:32:12 2018 +0300 Committer: Pavel Kovalenko <jokse...@gmail.com> Committed: Thu Dec 27 11:32:12 2018 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 46 +++-- .../GridCachePartitionExchangeManager.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 32 +++- ...rtitionsExchangeCoordinatorFailoverTest.java | 176 ++++++++++++++++++- .../distributed/CacheExchangeMergeTest.java | 8 +- 5 files changed, 236 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/28d8acc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 61d88c7..2c4a640 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -2003,33 +2003,53 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert idx >= 0 && idx < exchFuts.size() - 1 : "Invalid exchange futures state [cur=" + idx + ", total=" + exchFuts.size() + ']'; - final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1); + GridDhtPartitionsExchangeFuture futureToFetchAffinity = null; - assert prev.isDone() && prev.topologyVersion().compareTo(topVer) < 0 : prev; + for (int i = idx + 1; i < exchFuts.size(); i++) { + GridDhtPartitionsExchangeFuture prev = exchFuts.get(i); + + assert prev.isDone() && prev.topologyVersion().compareTo(topVer) < 0; + + if (prev.isMerged()) + continue; + + futureToFetchAffinity = prev; + + break; + } + + if (futureToFetchAffinity == null) + throw new IgniteCheckedException("Failed to find completed exchange future to fetch affinity."); if (log.isDebugEnabled()) { log.debug("Need initialize affinity on coordinator [" + "cacheGrp=" + desc.cacheOrGroupName() + - "prevAff=" + prev.topologyVersion() + ']'); + "prevAff=" + futureToFetchAffinity.topologyVersion() + ']'); } - GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, - desc.groupId(), - prev.topologyVersion(), - prev.events().discoveryCache()); + GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture( + cctx, + desc.groupId(), + futureToFetchAffinity.topologyVersion(), + futureToFetchAffinity.events().discoveryCache() + ); fetchFut.init(false); final GridFutureAdapter<AffinityTopologyVersion> affFut = new GridFutureAdapter<>(); + final GridDhtPartitionsExchangeFuture futureToFetchAffinity0 = futureToFetchAffinity; + fetchFut.listen(new IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>() { @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut) - throws IgniteCheckedException { - fetchAffinity(prev.topologyVersion(), - prev.events(), - prev.events().discoveryCache(), - aff, - (GridDhtAssignmentFetchFuture)fetchFut); + throws IgniteCheckedException { + fetchAffinity( + futureToFetchAffinity0.topologyVersion(), + futureToFetchAffinity0.events(), + futureToFetchAffinity0.events().discoveryCache(), + aff, + (GridDhtAssignmentFetchFuture)fetchFut + ); aff.calculate(topVer, fut.events(), fut.events().discoveryCache()); http://git-wip-us.apache.org/repos/asf/ignite/blob/28d8acc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 01c10aa..de1054b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -2616,7 +2616,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridDhtPartitionsExchangeFuture fut0 = (GridDhtPartitionsExchangeFuture)task; if (resVer.compareTo(fut0.initialVersion()) >= 0) { - fut0.finishMerged(); + fut0.finishMerged(resVer); futQ.remove(fut0); } http://git-wip-us.apache.org/repos/asf/ignite/blob/28d8acc5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index d4d89bc..b53fe99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -51,6 +51,8 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteDiagnosticAware; import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; @@ -2027,8 +2029,23 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * Finish merged future to allow GridCachePartitionExchangeManager.ExchangeFutureSet cleanup. */ - public void finishMerged() { - super.onDone(null, null); + public void finishMerged(AffinityTopologyVersion resVer) { + synchronized (mux) { + if (state == null) state = ExchangeLocalState.MERGED; + } + + done.set(true); + + super.onDone(resVer, null); + } + + /** + * @return {@code True} if future was merged. + */ + public boolean isMerged() { + synchronized (mux) { + return state == ExchangeLocalState.MERGED; + } } /** @@ -4420,7 +4437,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.kernalContext().closure().callLocal(new Callable<Void>() { @Override public Void call() throws Exception { - newCrdFut.init(GridDhtPartitionsExchangeFuture.this); + try { + newCrdFut.init(GridDhtPartitionsExchangeFuture.this); + } + catch (Throwable t) { + U.error(log, "Failed to initialize new coordinator future [topVer=" + initialVersion() + "]", t); + + cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, t)); + + throw t; + } newCrdFut.listen(new CI1<IgniteInternalFuture>() { @Override public void apply(IgniteInternalFuture fut) { http://git-wip-us.apache.org/repos/asf/ignite/blob/28d8acc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java index a001bd4..dc42b29 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java @@ -18,20 +18,32 @@ package org.apache.ignite.internal.processors.cache; import java.util.concurrent.CountDownLatch; +import java.util.function.Function; +import java.util.function.Supplier; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.CommunicationSpi; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Assert; @@ -44,23 +56,38 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE_NAME = "cache"; + + /** */ + private Supplier<CommunicationSpi> spiFactory = TcpCommunicationSpi::new; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); cfg.setConsistentId(igniteInstanceName); - cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); - - IgnitePredicate<ClusterNode> nodeFilter = node -> node.consistentId().equals(igniteInstanceName); + cfg.setCommunicationSpi(spiFactory.get()); cfg.setCacheConfiguration( - new CacheConfiguration("cache-" + igniteInstanceName) - .setBackups(1) - .setNodeFilter(nodeFilter) - .setAffinity(new RendezvousAffinityFunction(false, 32)) + new CacheConfiguration(CACHE_NAME) + .setBackups(2) + .setAffinity(new RendezvousAffinityFunction(false, 32)) ); + // Add cache that exists only on coordinator node. + if (igniteInstanceName.equals("crd")) { + IgnitePredicate<ClusterNode> nodeFilter = node -> node.consistentId().equals(igniteInstanceName); + + cfg.setCacheConfiguration( + new CacheConfiguration(CACHE_NAME + 0) + .setBackups(2) + .setNodeFilter(nodeFilter) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + ); + } + return cfg; } @@ -81,6 +108,8 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac */ @Test public void testNewCoordinatorCompletedExchange() throws Exception { + spiFactory = TestRecordingCommunicationSpi::new; + IgniteEx crd = (IgniteEx) startGrid("crd"); IgniteEx newCrd = startGrid(1); @@ -150,7 +179,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac // Check that all caches are operable. for (Ignite grid : G.allGrids()) { - IgniteCache cache = grid.cache("cache-" + grid.cluster().localNode().consistentId()); + IgniteCache cache = grid.cache(CACHE_NAME); Assert.assertNotNull(cache); @@ -165,6 +194,8 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac */ @Test public void testDelayedFullMessageReplacedIfCoordinatorChanged() throws Exception { + spiFactory = TestRecordingCommunicationSpi::new; + IgniteEx crd = startGrid("crd"); IgniteEx newCrd = startGrid(1); @@ -199,6 +230,94 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac } /** + * Test that exchange coordinator initialized correctly in case of exchanges merge and caches without affinity nodes. + * + * @throws Exception If failed. + */ + @Test + public void testCoordinatorChangeAfterExchangesMerge() throws Exception { + // Delay demand messages sending to suspend late affinity assignment. + spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> { + final int delay = 5_000; + + if (msg instanceof GridDhtPartitionDemandMessage) { + GridDhtPartitionDemandMessage demandMessage = (GridDhtPartitionDemandMessage) msg; + + if (demandMessage.groupId() == GridCacheUtils.cacheId(GridCacheUtils.UTILITY_CACHE_NAME)) + return 0; + + return delay; + } + + return 0; + }); + + final IgniteEx crd = startGrid("crd"); + + startGrid(1); + + for (int k = 0; k < 1024; k++) + crd.cache(CACHE_NAME).put(k, k); + + // Delay sending single messages to ensure exchanges are merged. + spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> { + final int delay = 1_000; + + if (msg instanceof GridDhtPartitionsSingleMessage) { + GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) msg; + + if (singleMsg.exchangeId() != null) + return delay; + } + + return 0; + }); + + // This should trigger exchanges merge. + startGridsMultiThreaded(2, 2); + + // Delay sending single message from new node to have time to shutdown coordinator. + spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> { + final int delay = 5_000; + + if (msg instanceof GridDhtPartitionsSingleMessage) { + GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) msg; + + if (singleMsg.exchangeId() != null) + return delay; + } + + return 0; + }); + + // Trigger next exchange. + IgniteInternalFuture startNodeFut = GridTestUtils.runAsync(() -> startGrid(4)); + + // Wait till other nodes will send their messages to coordinator. + U.sleep(2_500); + + // And then stop coordinator node. + stopGrid("crd", true); + + startNodeFut.get(); + + awaitPartitionMapExchange(); + + // Check that all caches are operable. + for (Ignite grid : G.allGrids()) { + IgniteCache cache = grid.cache(CACHE_NAME); + + Assert.assertNotNull(cache); + + for (int k = 0; k < 1024; k++) + Assert.assertEquals(k, cache.get(k)); + + for (int k = 0; k < 1024; k++) + cache.put(k, k); + } + } + + /** * Blocks sending full message from coordinator to non-coordinator node. * @param from Coordinator node. * @param to Non-coordinator node. @@ -222,4 +341,45 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac return false; }); } + + /** + * Communication SPI that allows to delay sending message by predicate. + */ + class DynamicDelayingCommunicationSpi extends TcpCommunicationSpi { + /** Function that returns delay in milliseconds for given message. */ + private final Function<Message, Integer> delayMessageFunc; + + /** */ + DynamicDelayingCommunicationSpi() { + this(msg -> 0); + } + + /** + * @param delayMessageFunc Function to calculate delay for message. + */ + DynamicDelayingCommunicationSpi(final Function<Message, Integer> delayMessageFunc) { + this.delayMessageFunc = delayMessageFunc; + } + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) + throws IgniteSpiException { + try { + GridIoMessage ioMsg = (GridIoMessage)msg; + + int delay = delayMessageFunc.apply(ioMsg.message()); + + if (delay > 0) { + log.warning(String.format("Delay sending %s to %s", msg, node)); + + U.sleep(delay); + } + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteSpiException(e); + } + + super.sendMessage(node, msg, ackC); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/28d8acc5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java index 942bcd9..8305e66 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -72,6 +72,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -1487,11 +1488,12 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { for (int i = futs.size() - 1; i >= 0; i--) { GridDhtPartitionsExchangeFuture fut = futs.get(i); - if (fut.exchangeDone() && fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT) { + if (!fut.isMerged() && fut.exchangeDone() && fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT) { AffinityTopologyVersion resVer = fut.topologyVersion(); - if (resVer != null) - doneVers.add(resVer); + Assert.assertNotNull(resVer); + + doneVers.add(resVer); } }