ignite-5682 Added stale version check for GridDhtPartFullMessage not related to exchange.
(cherry picked from commit eb9d06d) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/316312d2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/316312d2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/316312d2 Branch: refs/heads/ignite-6181-1 Commit: 316312d2ae9015228e67f959e492b2c5c4a9366d Parents: 5bda409 Author: Dmitry Pavlov <dpavlov....@gmail.com> Authored: Thu Jul 27 14:51:25 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Aug 25 16:53:41 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 5 +- .../processors/cache/GridCacheIoManager.java | 2 +- .../GridCachePartitionExchangeManager.java | 42 +++++++++----- .../dht/GridClientPartitionTopology.java | 12 +++- .../dht/GridDhtPartitionTopology.java | 5 +- .../dht/GridDhtPartitionTopologyImpl.java | 12 +++- .../preloader/GridDhtPartitionExchangeId.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 36 ++++++++---- .../preloader/GridDhtPartitionsFullMessage.java | 4 +- ...cingDelayedPartitionMapExchangeSelfTest.java | 58 ++++++++++++++++---- .../junits/common/GridCommonAbstractTest.java | 6 +- 11 files changed, 136 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 0e03a29..ffb55e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -460,7 +460,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap grp.topology().update(grpHolder.affinity().lastVersion(), clientTop.partitionMap(true), clientTop.fullUpdateCounters(), - Collections.<Integer>emptySet()); + Collections.<Integer>emptySet(), + null); } assert grpHolder.affinity().lastVersion().equals(grp.affinity().lastVersion()); @@ -518,7 +519,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap grp.topology().updateTopologyVersion(topFut, discoCache, -1, false); - grp.topology().update(topVer, partMap, null, Collections.<Integer>emptySet()); + grp.topology().update(topVer, partMap, null, Collections.<Integer>emptySet(), null); topFut.validate(grp, discoCache.allNodes()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 45edc53..6529795 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -551,7 +551,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** - * @param nodeId Node ID. + * @param nodeId Sender Node ID. * @param cacheMsg Cache message. * @param c Handler closure. * @param plc Message policy. http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index aad5b35..bd34a5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -140,7 +140,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** */ private static final IgniteProductVersion EXCHANGE_PROTOCOL_2_SINCE = IgniteProductVersion.fromString("2.1.4"); - /** Atomic reference for pending timeout object. */ + /** Atomic reference for pending partition resend timeout object. */ private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>(); /** Partition resend timeout after eviction. */ @@ -161,7 +161,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private final ConcurrentMap<Integer, GridClientPartitionTopology> clientTops = new ConcurrentHashMap8<>(); /** */ - private volatile GridDhtPartitionsExchangeFuture lastInitializedFut; + @Nullable private volatile GridDhtPartitionsExchangeFuture lastInitializedFut; /** */ private final AtomicReference<GridDhtTopologyFuture> lastFinishedFut = new AtomicReference<>(); @@ -921,6 +921,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * Partition refresh callback. + * For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send, + * for non coordinator - {@link GridDhtPartitionsSingleMessage SingleMessages} send */ private void refreshPartitions() { ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); @@ -958,7 +960,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (log.isDebugEnabled()) log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId()); - sendAllPartitions(rmts); + sendAllPartitions(rmts, rmtTopVer); } else { if (log.isDebugEnabled()) @@ -971,10 +973,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param nodes Nodes. + * @param msgTopVer Topology version. Will be added to full message. */ - private void sendAllPartitions(Collection<ClusterNode> nodes) { + private void sendAllPartitions(Collection<ClusterNode> nodes, + AffinityTopologyVersion msgTopVer) { GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, false, null, null, null, null); + m.topologyVersion(msgTopVer); + if (log.isDebugEnabled()) log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']'); @@ -1126,8 +1132,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param node Node. - * @param id ID. + * @param node Destination cluster node. + * @param id Exchange ID. */ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) { GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id, @@ -1153,7 +1159,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param exchangeId ID. + * @param exchangeId Exchange ID. * @param clientOnlyExchange Client exchange flag. * @param sndCounters {@code True} if need send partition update counters. * @param newCntrMap {@code True} if possible to use {@link CachePartitionPartialCountersMap}. @@ -1372,7 +1378,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param node Node. + * @param node Sender cluster node. * @param msg Message. */ private void processFullPartitionUpdate(ClusterNode node, GridDhtPartitionsFullMessage msg) { @@ -1398,8 +1404,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana else if (!grp.isLocal()) top = grp.topology(); - if (top != null) - updated |= top.update(null, entry.getValue(), null, msg.partsToReload(cctx.localNodeId(), grpId)); + if (top != null) { + updated |= top.update(null, + entry.getValue(), + null, + msg.partsToReload(cctx.localNodeId(), grpId), + msg.topologyVersion()); + } } if (!cctx.kernalContext().clientNode() && updated) @@ -1427,7 +1438,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param node Node ID. + * @param node Sender cluster node. * @param msg Message. */ private void processSinglePartitionUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { @@ -1477,7 +1488,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param node Node ID. + * @param node Sender cluster node. * @param msg Message. */ private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSingleRequest msg) { @@ -2595,7 +2606,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** */ private static final long serialVersionUID = 0L; - /** {@inheritDoc} */ + /** + * @param nodeId Sender node ID. + * @param msg Message. + */ @Override public void apply(UUID nodeId, M msg) { ClusterNode node = cctx.node(nodeId); @@ -2613,7 +2627,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param node Node. + * @param node Sender cluster node. * @param msg Message. */ protected abstract void onMessage(ClusterNode node, M msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 10d08b9..c8856fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -656,8 +656,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap cntrMap, - Set<Integer> partsToReload - ) { + Set<Integer> partsToReload, + @Nullable AffinityTopologyVersion msgTopVer) { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']'); @@ -672,6 +672,14 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { return false; } + if (msgTopVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(msgTopVer) > 0) { + if (log.isDebugEnabled()) + log.debug("Stale topology version for full partition map update message (will ignore) " + + "[lastExchId=" + lastExchangeVer + ", topVersion=" + msgTopVer + ']'); + + return false; + } + boolean fullMapUpdated = (node2part == null); if (node2part != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index f48bd352..4ae68ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -263,13 +263,16 @@ public interface GridDhtPartitionTopology { * @param partMap Update partition map. * @param cntrMap Partition update counters. * @param partsToReload Set of partitions that need to be reloaded. + * @param msgTopVer Topology version from incoming message. This value is not null only for case message is not + * related to exchange. Value should be not less than previous 'Topology version from exchange'. * @return {@code True} if local state was changed. */ public boolean update( @Nullable AffinityTopologyVersion exchangeResVer, GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap cntrMap, - Set<Integer> partsToReload); + Set<Integer> partsToReload, + @Nullable AffinityTopologyVersion msgTopVer); /** * @param exchId Exchange ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 6ab6bd8..f25ae21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1176,7 +1176,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { @Nullable AffinityTopologyVersion exchangeVer, GridDhtPartitionFullMap partMap, @Nullable CachePartitionFullCountersMap incomeCntrMap, - Set<Integer> partsToReload) { + Set<Integer> partsToReload, + @Nullable AffinityTopologyVersion msgTopVer) { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']'); @@ -1217,6 +1218,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } + if (msgTopVer != null && lastTopChangeVer.compareTo(msgTopVer) > 0) { + U.warn(log, "Stale version for full partition map update message (will ignore) [" + + "lastTopChange=" + lastTopChangeVer + + ", readTopVer=" + readyTopVer + + ", msgVer=" + msgTopVer + ']'); + + return false; + } + boolean fullMapUpdated = (node2part == null); if (node2part != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java index 07daeda..33728d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java @@ -51,7 +51,7 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa @GridToStringExclude private UUID nodeId; - /** Event. */ + /** Event type. */ @GridToStringExclude private int evt; http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 5755bc7..6decb44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -137,7 +137,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte @GridToStringExclude private final Set<UUID> remaining = new HashSet<>(); - /** */ + /** Guarded by this */ @GridToStringExclude private int pendingSingleUpdates; @@ -164,7 +164,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte @GridToStringExclude private final CountDownLatch evtLatch = new CountDownLatch(1); - /** */ + /** Exchange future init method completes this future. */ private GridFutureAdapter<Boolean> initFut; /** */ @@ -213,7 +213,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** Init timestamp. Used to track the amount of time spent to complete the future. */ private long initTs; - /** */ + /** + * Centralized affinity assignment required. Activated for node left of failed. For this mode crd will send full + * partitions maps to nodes using discovery (ring) instead of communication. + */ private boolean centralizedAff; /** Change global state exception. */ @@ -765,7 +768,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte top.update(null, clientTop.partitionMap(true), clientTop.fullUpdateCounters(), - Collections.<Integer>emptySet()); + Collections.<Integer>emptySet(), + null); } } @@ -1204,7 +1208,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * @param node Node. + * @param node Target Node. * @throws IgniteCheckedException If failed. */ private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException { @@ -1351,7 +1355,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * @param oldestNode Oldest node. + * @param oldestNode Oldest node. Target node to send message to. */ private void sendPartitions(ClusterNode oldestNode) { try { @@ -1720,6 +1724,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * Processing of received single message. Actual processing in future may be delayed if init method was not + * completed, see {@link #initDone()} + * * @param node Sender node. * @param msg Single partition info. */ @@ -1823,8 +1830,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * Note this method performs heavy updatePartitionSingleMap operation, this operation is moved out from the + * synchronized block. Only count of such updates {@link #pendingSingleUpdates} is managed under critical section. + * * @param nodeId Sender node. - * @param msg Message. + * @param msg Partition single message. */ private void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) { if (msg.client()) { @@ -1833,7 +1843,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte return; } - boolean allReceived = false; + boolean allReceived = false; // Received all expected messages. boolean updateSingleMap = false; FinishState finishState0 = null; @@ -2509,7 +2519,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @param node Sender node. - * @param msg Message. + * @param msg Message with full partition info. */ public void onReceivePartitionRequest(final ClusterNode node, final GridDhtPartitionsSingleRequest msg) { assert !cctx.kernalContext().clientNode() || msg.restoreState(); @@ -2803,7 +2813,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte grp.topology().update(resTopVer, entry.getValue(), cntrMap, - msg.partsToReload(cctx.localNodeId(), grpId)); + msg.partsToReload(cctx.localNodeId(), grpId), + null); } else { ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); @@ -2817,7 +2828,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte top.update(resTopVer, entry.getValue(), cntrMap, - Collections.<Integer>emptySet()); + Collections.<Integer>emptySet(), + null); } } } @@ -2912,7 +2924,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * + * Moves exchange future to state 'init done' using {@link #initFut}. */ private void initDone() { while (!isDone()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 19ec0a8..edbfc23 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -129,7 +129,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** * @param id Exchange ID. * @param lastVer Last version. - * @param topVer Topology version. + * @param topVer Topology version. For messages not related to exchange may be {@link AffinityTopologyVersion#NONE}. + * @param partHistSuppliers + * @param partsToReload */ public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer, http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java index dc141db..f307b6a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java @@ -18,14 +18,17 @@ package org.apache.ignite.internal.processors.cache.distributed.rebalancing; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; @@ -45,12 +48,19 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri /** */ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - /** */ + /** Map of destination node ID to runnable with logic for real message sending. + * To apply real message sending use run method */ private final ConcurrentHashMap8<UUID, Runnable> rs = new ConcurrentHashMap8<>(); - /** */ + /** + * Flag to redirect {@link GridDhtPartitionsFullMessage}s from real communication channel to {@link #rs} map. + * Applied only to messages not related to particular exchange + */ private volatile boolean record = false; + /** */ + private AtomicBoolean replay = new AtomicBoolean(); + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration iCfg = super.getConfiguration(igniteInstanceName); @@ -74,13 +84,26 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { final Object msg0 = ((GridIoMessage)msg).message(); + if (log.isDebugEnabled()) + log.debug("Message [thread=" + Thread.currentThread().getName() + ", msg=" + msg0 + ']'); + if (msg0 instanceof GridDhtPartitionsFullMessage && record && - ((GridDhtPartitionsFullMessage)msg0).exchangeId() == null) { - rs.putIfAbsent(node.id(), new Runnable() { + ((GridDhtPartitionsAbstractMessage)msg0).exchangeId() == null) { + if (log.isDebugEnabled()) + log.debug("Record message [toNode=" + node.id() + ", msg=" + msg + "]"); + + assert !replay.get() : "Record of message is not allowed after replay"; + + Runnable prevValue = rs.putIfAbsent(node.id(), new Runnable() { @Override public void run() { + if (log.isDebugEnabled()) + log.debug("Replay: " + msg); + DelayableCommunicationSpi.super.sendMessage(node, msg, ackC); } }); + + assert prevValue == null : "Duplicate message registered to [" + node.id() + "]"; } else try { @@ -94,10 +117,10 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri } /** - * @throws Exception e. + * @throws Exception e if failed. */ public void test() throws Exception { - IgniteKernal ignite = (IgniteKernal)startGrid(0); + IgniteEx ignite = startGrid(0); CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); @@ -144,10 +167,7 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri awaitPartitionMapExchange(); - for (Runnable r : rs.values()) - r.run(); - - U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages. + replayMessages(); stopGrid(3); // Forces exchange at all nodes and cause assertion failure in case obsolete partition map accepted. @@ -167,6 +187,22 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri assert grid(2).context().cache().context().exchange().readyAffinityVersion().topologyVersion() > topVer2; } + /** + * Replays all saved messages from map, actual sent is performed. + * + * @throws IgniteInterruptedCheckedException If interrupted. + */ + private void replayMessages() throws IgniteInterruptedCheckedException { + record = false; + + for (Runnable r : rs.values()) + r.run(); // Causes real messages sending. + + assertTrue(replay.compareAndSet(false, true)); + + U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages. + } + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 307456c..f5e8eef 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -524,7 +524,8 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { /** * @param waitEvicts If {@code true} will wait for evictions finished. * @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update finished. - * @param nodes Optional nodes. + * @param nodes Optional nodes. If {@code null} method will wait for all nodes, for non null collection nodes will + * be filtered * @throws InterruptedException If interrupted. */ @SuppressWarnings("BusyWait") @@ -546,7 +547,8 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { /** * @param waitEvicts If {@code true} will wait for evictions finished. * @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update finished. - * @param nodes Optional nodes. + * @param nodes Optional nodes. If {@code null} method will wait for all nodes, for non null collection nodes will + * be filtered * @param printPartState If {@code true} will print partition state if evictions not happened. * @throws InterruptedException If interrupted. */