This is an automated email from the ASF dual-hosted git repository. agoncharuk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 65b18b7 IGNITE-11204 Fixed exchange merge hang when merged node fails before sending single message - Fixes #6028. 65b18b7 is described below commit 65b18b7ba920b069251da2ec2002a530ceee1404 Author: Alexey Goncharuk <alexey.goncha...@gmail.com> AuthorDate: Wed Feb 6 14:51:12 2019 +0300 IGNITE-11204 Fixed exchange merge hang when merged node fails before sending single message - Fixes #6028. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> --- .../cache/GridCachePartitionExchangeManager.java | 9 + .../preloader/GridDhtPartitionsExchangeFuture.java | 15 +- .../ExchangeMergeStaleServerNodesTest.java | 188 +++++++++++++++++++++ .../testsuites/IgniteCacheMvccTestSuite6.java | 2 + .../ignite/testsuites/IgniteCacheTestSuite6.java | 2 + 5 files changed, 213 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 71a704c..08762a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -2192,6 +2192,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * For testing only. + * + * @return Current version to wait for. + */ + public AffinityTopologyVersion mergeExchangesTestWaitVersion() { + return exchMergeTestWaitVer; + } + + /** * @param curFut Current exchange future. * @param msg Message. * @return {@code True} if node is stopping. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index f4043ab..170d7e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -4390,6 +4390,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte try { boolean crdChanged = false; boolean allReceived = false; + boolean wasMerged = false; ClusterNode crd0; @@ -4405,8 +4406,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte newCrdFut0.onNodeLeft(node.id()); synchronized (mux) { - if (!srvNodes.remove(node)) - return; + srvNodes.remove(node); boolean rmvd = remaining.remove(node.id()); @@ -4415,6 +4415,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (mergedJoinExchMsgs.get(node.id()) == null) { mergedJoinExchMsgs.remove(node.id()); + wasMerged = true; rmvd = true; } } @@ -4513,11 +4514,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (allReceived) { + boolean wasMerged0 = wasMerged; + cctx.kernalContext().getSystemExecutorService().submit(new Runnable() { @Override public void run() { awaitSingleMapUpdates(); - onAllReceived(null); + if (wasMerged0) + finishExchangeOnCoordinator(null); + else + onAllReceived(null); } }); } @@ -4866,14 +4872,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** {@inheritDoc} */ @Override public String toString() { Set<UUID> remaining; + Set<UUID> mergedJoinExch; synchronized (mux) { remaining = new HashSet<>(this.remaining); + mergedJoinExch = mergedJoinExchMsgs == null ? null : new HashSet<>(mergedJoinExchMsgs.keySet()); } return S.toString(GridDhtPartitionsExchangeFuture.class, this, "evtLatch", evtLatch == null ? "null" : evtLatch.getCount(), "remaining", remaining, + "mergedJoinExchMsgs", mergedJoinExch, "super", super.toString()); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java new file mode 100644 index 0000000..0a59f2e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/ExchangeMergeStaleServerNodesTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.CommunicationSpi; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** + * + */ +public class ExchangeMergeStaleServerNodesTest extends GridCommonAbstractTest { + /** */ + private Map<String, DelayableCommunicationSpi> commSpis; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CommunicationSpi commSpi = commSpis == null ? null : commSpis.get(igniteInstanceName); + + if (commSpi != null) + cfg.setCommunicationSpi(commSpi); + + return cfg; + } + + /** + * @throws Exception if failed. + */ + @Test + public void testServersFailAfterMerge() throws Exception { + DelayableCommunicationSpi delaySpi1 = new DelayableCommunicationSpi((msg) -> { + if (msg instanceof GridDhtPartitionsSingleMessage) { + GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage)msg; + + return singleMsg.exchangeId() != null && singleMsg.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(2, 0)); + } + + return false; + }); + + commSpis = F.asMap( + getTestIgniteInstanceName(0), new DelayableCommunicationSpi((msg) -> false), + getTestIgniteInstanceName(1), delaySpi1, + getTestIgniteInstanceName(2), new DelayableCommunicationSpi((msg) -> msg instanceof GridDhtPartitionsSingleMessage), + getTestIgniteInstanceName(3), new DelayableCommunicationSpi((msg) -> false) + ); + + try { + IgniteEx crd = startGrid(0); + + GridCachePartitionExchangeManager<Object, Object> exchMgr = crd.context().cache().context().exchange(); + + exchMgr.mergeExchangesTestWaitVersion(new AffinityTopologyVersion(3, 0), null); + + // Single message for this node is blocked until further notice. + IgniteInternalFuture<IgniteEx> fut = GridTestUtils.runAsync(() -> startGrid(1), "starter1"); + + GridTestUtils.waitForCondition(() -> exchMgr.lastTopologyFuture().exchangeId().topologyVersion() + .equals(new AffinityTopologyVersion(2, 0)), getTestTimeout()); + + IgniteInternalFuture<IgniteEx> futFail = GridTestUtils.runAsync(() -> startGrid(2), "starter2"); + + GridTestUtils.waitForCondition(exchMgr::hasPendingExchange, getTestTimeout()); + + // Unblock message to proceed merging. + delaySpi1.replay(crd.cluster().localNode().id()); + + // Wait for merged exchange. + GridTestUtils.waitForCondition( + () -> exchMgr.mergeExchangesTestWaitVersion() == null, getTestTimeout()); + + futFail.cancel(); + stopGrid(getTestIgniteInstanceName(2), true); + + fut.get(); + + try { + futFail.get(); + } + catch (IgniteCheckedException ignore) { + // No-op. + } + + // Check that next nodes can successfully join topology. + startGrid(3); + } + finally { + stopAllGrids(); + } + } + + /** + * + */ + private static class DelayableCommunicationSpi extends TcpCommunicationSpi { + /** */ + private ConcurrentMap<UUID, Collection<Runnable>> delayed = new ConcurrentHashMap<>(); + + /** */ + private IgnitePredicate<Message> delayPred; + + /** + * @param delayPred Delay predicate. + */ + private DelayableCommunicationSpi(IgnitePredicate<Message> delayPred) { + this.delayPred = delayPred; + } + + /** + * @param nodeId Node ID to replay. + */ + private void replay(UUID nodeId) { + Collection<Runnable> old = delayed.replace(nodeId, new ConcurrentLinkedDeque<>()); + + if (old != null) { + for (Runnable task : old) + task.run(); + } + } + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { + final Message msg0 = ((GridIoMessage)msg).message(); + + if (delayPred.apply(msg0)) { + delayed.computeIfAbsent( + node.id(), + (nodeId) -> new ConcurrentLinkedDeque<>() + ).add(new Runnable() { + @Override public void run() { + DelayableCommunicationSpi.super.sendMessage(node, msg, ackC); + } + }); + + log.info("Delayed message: " + msg0); + } + else { + try { + super.sendMessage(node, msg, ackC); + } + catch (Exception e) { + U.log(null, e); + } + } + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java index 707244d..471b437 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessim import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest; import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest; import org.apache.ignite.internal.processors.cache.distributed.CacheParallelStartTest; +import org.apache.ignite.internal.processors.cache.distributed.ExchangeMergeStaleServerNodesTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest; import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest; @@ -74,6 +75,7 @@ public class IgniteCacheMvccTestSuite6 { // Other non-tx tests. ignoredTests.add(CacheExchangeMergeTest.class); + ignoredTests.add(ExchangeMergeStaleServerNodesTest.class); ignoredTests.add(IgniteExchangeLatchManagerCoordinatorFailTest.class); ignoredTests.add(PartitionsExchangeCoordinatorFailoverTest.class); ignoredTests.add(CacheParallelStartTest.class); diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java index 2388cfb..444152d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchange import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest; import org.apache.ignite.internal.processors.cache.distributed.CacheParallelStartTest; import org.apache.ignite.internal.processors.cache.distributed.CacheTryLockMultithreadedTest; +import org.apache.ignite.internal.processors.cache.distributed.ExchangeMergeStaleServerNodesTest; import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThreadLocalTxTest; @@ -79,6 +80,7 @@ public class IgniteCacheTestSuite6 { GridTestUtils.addTestIfNeeded(suite, IgnitePessimisticTxSuspendResumeTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CacheExchangeMergeTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, ExchangeMergeStaleServerNodesTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, TxRollbackOnTimeoutTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, TxRollbackOnTimeoutNoDeadlockDetectionTest.class, ignoredTests);