This is an automated email from the ASF dual-hosted git repository. mmuzaf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 6ddb17f IGNITE-12981: Fix pme-free snapshot exchange if coordinator left the cluster (#7770) 6ddb17f is described below commit 6ddb17fb9e346f085d7ef981442e8b62d09ee3fa Author: Maxim Muzafarov <mmu...@apache.org> AuthorDate: Wed May 6 19:52:48 2020 +0300 IGNITE-12981: Fix pme-free snapshot exchange if coordinator left the cluster (#7770) --- .../preloader/GridDhtPartitionsExchangeFuture.java | 41 ++++- .../snapshot/IgniteSnapshotManager.java | 2 +- .../snapshot/IgniteClusterSnapshotSelfTest.java | 186 +++++++++++++++++++++ 3 files changed, 222 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e907c20..a8fc478 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -121,6 +121,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteRunnable; import org.jetbrains.annotations.Nullable; @@ -800,8 +801,18 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte ExchangeType exchange; if (exchCtx.exchangeFreeSwitch()) { - exchange = isSnapshotOperation(firstDiscoEvt) ? onCustomMessageNoAffinityChange() : - onExchangeFreeSwitchNodeLeft(); + if (isSnapshotOperation(firstDiscoEvt)) { + // Keep if the cluster was rebalanced. + if (wasRebalanced()) + markRebalanced(); + + if (!forceAffReassignment) + cctx.affinity().onCustomMessageNoAffinityChange(this, exchActions); + + exchange = cctx.kernalContext().clientNode() ? ExchangeType.NONE : ExchangeType.ALL; + } + else + exchange = onExchangeFreeSwitchNodeLeft(); initCoordinatorCaches(newCrd); } @@ -1445,6 +1456,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private void clientOnlyExchange() throws IgniteCheckedException { if (crd != null) { assert !crd.isLocal() : crd; + assert !exchCtx.exchangeFreeSwitch() : this; cctx.exchange().exchangerBlockingSectionBegin(); @@ -2166,6 +2178,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param oldestNode Oldest node. Target node to send message to. */ private void sendPartitions(ClusterNode oldestNode) { + assert !exchCtx.exchangeFreeSwitch() : this; + try { sendLocalPartitions(oldestNode); } @@ -2946,7 +2960,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (log.isDebugEnabled()) log.debug("Single message will be handled on completion of exchange future: " + this); - listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + listen(failureHandlerWrapper(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { if (cctx.kernalContext().isStopping()) return; @@ -2963,8 +2977,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (finishState0 == null) { - assert (firstDiscoEvt.type() == EVT_NODE_JOINED && firstDiscoEvt.eventNode().isClient()) - || isSnapshotOperation(firstDiscoEvt) : GridDhtPartitionsExchangeFuture.this; + assert (firstDiscoEvt.type() == EVT_NODE_JOINED && firstDiscoEvt.eventNode().isClient()) : + GridDhtPartitionsExchangeFuture.this; ClusterNode node = cctx.node(nodeId); @@ -2990,7 +3004,22 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte sendAllPartitionsToNode(finishState0, msg, nodeId); } - }); + })); + } + + /** + * @param clsr Closure to wrap with failure handler. + * @return Wrapped closure. + */ + private <T extends IgniteInternalFuture<?>> IgniteInClosure<T> failureHandlerWrapper(IgniteInClosure<T> clsr) { + try { + return (CI1<T>)clsr::apply; + } + catch (Error e) { + cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + + throw e; + } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index ae8203a..99c2de2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -1171,7 +1171,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter /** {@inheritDoc} */ @Override public String toString() { - return S.toString(SnapshotStartDiscoveryMessage.class, this); + return S.toString(SnapshotStartDiscoveryMessage.class, this, super.toString()); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java index 7fe818a..9fa1e58 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java @@ -21,9 +21,15 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; import java.nio.file.OpenOption; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -50,8 +56,10 @@ import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; @@ -63,6 +71,7 @@ import org.apache.ignite.internal.util.distributed.DistributedProcess; import org.apache.ignite.internal.util.distributed.FullMessage; import org.apache.ignite.internal.util.distributed.SingleNodeMessage; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; @@ -854,6 +863,183 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest { assertSnapshotCacheKeys(snp.cache(ccfg2.getName())); } + /** @throws Exception If fails. */ + @Test + public void testClusterSnapshotCoordinatorStopped() throws Exception { + CountDownLatch block = new CountDownLatch(1); + startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE); + startClientGrid(3); + + awaitPartitionMapExchange(); + + for (Ignite grid : Arrays.asList(grid(1), grid(2))) { + ((IgniteEx)grid).context().cache().context().exchange() + .registerExchangeAwareComponent(new PartitionsExchangeAware() { + /** {@inheritDoc} */ + @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) { + try { + block.await(); + } + catch (InterruptedException e) { + fail("Must not catch exception here: " + e.getMessage()); + } + } + }); + } + + for (Ignite grid : G.allGrids()) { + TestRecordingCommunicationSpi.spi(grid) + .blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage); + } + + IgniteFuture<Void> fut = grid(1).snapshot().createSnapshot(SNAPSHOT_NAME); + + stopGrid(0); + + block.countDown(); + + // There are two exchanges happen: snapshot, node left (with pme-free). + // Both of them are not require for sending messages. + assertFalse("Pme-free switch doesn't expect messaging exchanging between nodes", + GridTestUtils.waitForCondition(() -> { + boolean hasMsgs = false; + + for (Ignite g : G.allGrids()) + hasMsgs |= TestRecordingCommunicationSpi.spi(g).hasBlockedMessages(); + + return hasMsgs; + }, 5_000)); + + assertThrowsWithCause((Callable<Object>)fut::get, IgniteException.class); + + List<GridDhtPartitionsExchangeFuture> exchFuts = + grid(1).context().cache().context().exchange().exchangeFutures(); + + assertFalse("Exchanges cannot be empty due to snapshot and node left happened", + exchFuts.isEmpty()); + + for (GridDhtPartitionsExchangeFuture exch : exchFuts) { + assertTrue("Snapshot and node left events must keep `rebalanced` state" + exch, + exch.rebalanced()); + } + } + + /** @throws Exception If fails. */ + @Test + public void testClusterSnapshotOnMovingPartitionsCoordinatorLeft() throws Exception { + startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE); + + for (Ignite grid : G.allGrids()) { + TestRecordingCommunicationSpi.spi(grid) + .blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage); + } + + Ignite ignite = startGrid(2); + + ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion()); + + TestRecordingCommunicationSpi.spi(grid(0)) + .waitForBlocked(); + + CountDownLatch latch = new CountDownLatch(G.allGrids().size()); + IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(() -> { + try { + U.await(latch); + + stopGrid(0); + } + catch (IgniteInterruptedCheckedException e) { + fail("Must not fail here: " + e.getMessage()); + } + }); + + Queue<T2<GridDhtPartitionExchangeId, Boolean>> exchFuts = new ConcurrentLinkedQueue<>(); + + for (Ignite ig : G.allGrids()) { + ((IgniteEx)ig).context().cache().context().exchange() + .registerExchangeAwareComponent(new PartitionsExchangeAware() { + /** {@inheritDoc} */ + @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) { + try { + exchFuts.add(new T2<>(fut.exchangeId(), fut.rebalanced())); + latch.countDown(); + + stopFut.get(); + } + catch (IgniteCheckedException e) { + U.log(log, "Interrupted on coordinator: " + e.getMessage()); + } + } + }); + } + + IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME); + + stopFut.get(); + + assertThrowsAnyCause(log, + fut::get, + IgniteException.class, + "Snapshot creation has been finished with an error"); + + assertEquals("Snapshot futures expected: " + exchFuts, 3, exchFuts.size()); + + for (T2<GridDhtPartitionExchangeId, Boolean> exch : exchFuts) + assertFalse("Snapshot `rebalanced` must be false with moving partitions: " + exch.get1(), exch.get2()); + } + + /** @throws Exception If fails. */ + @Test + public void testSnapshotPartitionExchangeAwareOrder() throws Exception { + IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE); + + Map<UUID, PartitionsExchangeAware> comps = new HashMap<>(); + + for (Ignite ig : G.allGrids()) { + PartitionsExchangeAware comp; + + ((IgniteEx)ig).context().cache().context().exchange() + .registerExchangeAwareComponent(comp = new PartitionsExchangeAware() { + private final AtomicInteger order = new AtomicInteger(); + + @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) { + assertEquals("Exchange order violated: " + fut.firstEvent(), 0, order.getAndIncrement()); + } + + @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) { + assertEquals("Exchange order violated: " + fut.firstEvent(), 1, order.getAndIncrement()); + } + + @Override public void onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) { + assertEquals("Exchange order violated: " + fut.firstEvent(), 2, order.getAndIncrement()); + } + + @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) { + assertEquals("Exchange order violated: " + fut.firstEvent(), 3, order.getAndSet(0)); + } + }); + + comps.put(((IgniteEx)ig).localNode().id(), comp); + } + + ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(); + + for (Ignite ig : G.allGrids()) { + ((IgniteEx)ig).context().cache().context().exchange() + .unregisterExchangeAwareComponent(comps.get(((IgniteEx)ig).localNode().id())); + } + + awaitPartitionMapExchange(); + + assertEquals("Some of ignite instances failed during snapshot", 3, G.allGrids().size()); + + stopAllGrids(); + + IgniteEx snp = startGridsFromSnapshot(3, SNAPSHOT_NAME); + + assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName())); + } + /** * @param ignite Ignite instance. * @param started Latch will be released when delta partition processing starts.