ignite-5682 Added stale version check for GridDhtPartFullMessage not related to 
exchange.

(cherry picked from commit eb9d06d)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/316312d2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/316312d2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/316312d2

Branch: refs/heads/ignite-6181-1
Commit: 316312d2ae9015228e67f959e492b2c5c4a9366d
Parents: 5bda409
Author: Dmitry Pavlov <dpavlov....@gmail.com>
Authored: Thu Jul 27 14:51:25 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri Aug 25 16:53:41 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  5 +-
 .../processors/cache/GridCacheIoManager.java    |  2 +-
 .../GridCachePartitionExchangeManager.java      | 42 +++++++++-----
 .../dht/GridClientPartitionTopology.java        | 12 +++-
 .../dht/GridDhtPartitionTopology.java           |  5 +-
 .../dht/GridDhtPartitionTopologyImpl.java       | 12 +++-
 .../preloader/GridDhtPartitionExchangeId.java   |  2 +-
 .../GridDhtPartitionsExchangeFuture.java        | 36 ++++++++----
 .../preloader/GridDhtPartitionsFullMessage.java |  4 +-
 ...cingDelayedPartitionMapExchangeSelfTest.java | 58 ++++++++++++++++----
 .../junits/common/GridCommonAbstractTest.java   |  6 +-
 11 files changed, 136 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 0e03a29..ffb55e4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -460,7 +460,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                                 
grp.topology().update(grpHolder.affinity().lastVersion(),
                                     clientTop.partitionMap(true),
                                     clientTop.fullUpdateCounters(),
-                                    Collections.<Integer>emptySet());
+                                    Collections.<Integer>emptySet(),
+                                    null);
                             }
 
                             assert 
grpHolder.affinity().lastVersion().equals(grp.affinity().lastVersion());
@@ -518,7 +519,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
                 grp.topology().updateTopologyVersion(topFut, discoCache, -1, 
false);
 
-                grp.topology().update(topVer, partMap, null, 
Collections.<Integer>emptySet());
+                grp.topology().update(topVer, partMap, null, 
Collections.<Integer>emptySet(), null);
 
                 topFut.validate(grp, discoCache.allNodes());
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 45edc53..6529795 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -551,7 +551,7 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @param nodeId Node ID.
+     * @param nodeId Sender Node ID.
      * @param cacheMsg Cache message.
      * @param c Handler closure.
      * @param plc Message policy.

http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index aad5b35..bd34a5f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -140,7 +140,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     /** */
     private static final IgniteProductVersion EXCHANGE_PROTOCOL_2_SINCE = 
IgniteProductVersion.fromString("2.1.4");
 
-    /** Atomic reference for pending timeout object. */
+    /** Atomic reference for pending partition resend timeout object. */
     private AtomicReference<ResendTimeoutObject> pendingResend = new 
AtomicReference<>();
 
     /** Partition resend timeout after eviction. */
@@ -161,7 +161,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     private final ConcurrentMap<Integer, GridClientPartitionTopology> 
clientTops = new ConcurrentHashMap8<>();
 
     /** */
-    private volatile GridDhtPartitionsExchangeFuture lastInitializedFut;
+    @Nullable private volatile GridDhtPartitionsExchangeFuture 
lastInitializedFut;
 
     /** */
     private final AtomicReference<GridDhtTopologyFuture> lastFinishedFut = new 
AtomicReference<>();
@@ -921,6 +921,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
     /**
      * Partition refresh callback.
+     * For coordinator causes {@link GridDhtPartitionsFullMessage 
FullMessages} send,
+     * for non coordinator -  {@link GridDhtPartitionsSingleMessage 
SingleMessages} send
      */
     private void refreshPartitions() {
         ClusterNode oldest = 
cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
@@ -958,7 +960,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             if (log.isDebugEnabled())
                 log.debug("Refreshing partitions from oldest node: " + 
cctx.localNodeId());
 
-            sendAllPartitions(rmts);
+            sendAllPartitions(rmts, rmtTopVer);
         }
         else {
             if (log.isDebugEnabled())
@@ -971,10 +973,14 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
     /**
      * @param nodes Nodes.
+     * @param msgTopVer Topology version. Will be added to full message.
      */
-    private void sendAllPartitions(Collection<ClusterNode> nodes) {
+    private void sendAllPartitions(Collection<ClusterNode> nodes,
+        AffinityTopologyVersion msgTopVer) {
         GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, 
false, null, null, null, null);
 
+        m.topologyVersion(msgTopVer);
+
         if (log.isDebugEnabled())
             log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + 
", msg=" + m + ']');
 
@@ -1126,8 +1132,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
-     * @param node Node.
-     * @param id ID.
+     * @param node Destination cluster node.
+     * @param id Exchange ID.
      */
     private void sendLocalPartitions(ClusterNode node, @Nullable 
GridDhtPartitionExchangeId id) {
         GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id,
@@ -1153,7 +1159,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
-     * @param exchangeId ID.
+     * @param exchangeId Exchange ID.
      * @param clientOnlyExchange Client exchange flag.
      * @param sndCounters {@code True} if need send partition update counters.
      * @param newCntrMap {@code True} if possible to use {@link 
CachePartitionPartialCountersMap}.
@@ -1372,7 +1378,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
-     * @param node Node.
+     * @param node Sender cluster node.
      * @param msg Message.
      */
     private void processFullPartitionUpdate(ClusterNode node, 
GridDhtPartitionsFullMessage msg) {
@@ -1398,8 +1404,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                     else if (!grp.isLocal())
                         top = grp.topology();
 
-                    if (top != null)
-                        updated |= top.update(null, entry.getValue(), null, 
msg.partsToReload(cctx.localNodeId(), grpId));
+                    if (top != null) {
+                        updated |= top.update(null,
+                            entry.getValue(),
+                            null,
+                            msg.partsToReload(cctx.localNodeId(), grpId),
+                            msg.topologyVersion());
+                    }
                 }
 
                 if (!cctx.kernalContext().clientNode() && updated)
@@ -1427,7 +1438,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
-     * @param node Node ID.
+     * @param node Sender cluster node.
      * @param msg Message.
      */
     private void processSinglePartitionUpdate(final ClusterNode node, final 
GridDhtPartitionsSingleMessage msg) {
@@ -1477,7 +1488,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
-     * @param node Node ID.
+     * @param node Sender cluster node.
      * @param msg Message.
      */
     private void processSinglePartitionRequest(ClusterNode node, 
GridDhtPartitionsSingleRequest msg) {
@@ -2595,7 +2606,10 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** {@inheritDoc} */
+        /**
+         * @param nodeId Sender node ID.
+         * @param msg Message.
+         */
         @Override public void apply(UUID nodeId, M msg) {
             ClusterNode node = cctx.node(nodeId);
 
@@ -2613,7 +2627,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         }
 
         /**
-         * @param node Node.
+         * @param node Sender cluster node.
          * @param msg Message.
          */
         protected abstract void onMessage(ClusterNode node, M msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 10d08b9..c8856fd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -656,8 +656,8 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
         @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
         @Nullable CachePartitionFullCountersMap cntrMap,
-        Set<Integer> partsToReload
-    ) {
+        Set<Integer> partsToReload,
+        @Nullable AffinityTopologyVersion msgTopVer) {
         if (log.isDebugEnabled())
             log.debug("Updating full partition map [exchVer=" + exchangeVer + 
", parts=" + fullMapString() + ']');
 
@@ -672,6 +672,14 @@ public class GridClientPartitionTopology implements 
GridDhtPartitionTopology {
                 return false;
             }
 
+            if (msgTopVer != null && lastExchangeVer != null && 
lastExchangeVer.compareTo(msgTopVer) > 0) {
+                if (log.isDebugEnabled())
+                    log.debug("Stale topology version for full partition map 
update message (will ignore) " +
+                        "[lastExchId=" + lastExchangeVer + ", topVersion=" + 
msgTopVer + ']');
+
+                return false;
+            }
+
             boolean fullMapUpdated = (node2part == null);
 
             if (node2part != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index f48bd352..4ae68ef 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -263,13 +263,16 @@ public interface GridDhtPartitionTopology {
      * @param partMap Update partition map.
      * @param cntrMap Partition update counters.
      * @param partsToReload Set of partitions that need to be reloaded.
+     * @param msgTopVer Topology version from incoming message. This value is 
not null only for case message is not
+     *      related to exchange. Value should be not less than previous 
'Topology version from exchange'.
      * @return {@code True} if local state was changed.
      */
     public boolean update(
         @Nullable AffinityTopologyVersion exchangeResVer,
         GridDhtPartitionFullMap partMap,
         @Nullable CachePartitionFullCountersMap cntrMap,
-        Set<Integer> partsToReload);
+        Set<Integer> partsToReload,
+        @Nullable AffinityTopologyVersion msgTopVer);
 
     /**
      * @param exchId Exchange ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 6ab6bd8..f25ae21 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1176,7 +1176,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
         @Nullable CachePartitionFullCountersMap incomeCntrMap,
-        Set<Integer> partsToReload) {
+        Set<Integer> partsToReload,
+        @Nullable AffinityTopologyVersion msgTopVer) {
         if (log.isDebugEnabled())
             log.debug("Updating full partition map [exchVer=" + exchangeVer + 
", parts=" + fullMapString() + ']');
 
@@ -1217,6 +1218,15 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 }
             }
 
+            if (msgTopVer != null && lastTopChangeVer.compareTo(msgTopVer) > 
0) {
+                U.warn(log, "Stale version for full partition map update 
message (will ignore) [" +
+                    "lastTopChange=" + lastTopChangeVer +
+                    ", readTopVer=" + readyTopVer +
+                    ", msgVer=" + msgTopVer + ']');
+
+                return false;
+            }
+
             boolean fullMapUpdated = (node2part == null);
 
             if (node2part != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
index 07daeda..33728d3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java
@@ -51,7 +51,7 @@ public class GridDhtPartitionExchangeId implements Message, 
Comparable<GridDhtPa
     @GridToStringExclude
     private UUID nodeId;
 
-    /** Event. */
+    /** Event type. */
     @GridToStringExclude
     private int evt;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 5755bc7..6decb44 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -137,7 +137,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private final Set<UUID> remaining = new HashSet<>();
 
-    /** */
+    /** Guarded by this */
     @GridToStringExclude
     private int pendingSingleUpdates;
 
@@ -164,7 +164,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private final CountDownLatch evtLatch = new CountDownLatch(1);
 
-    /** */
+    /** Exchange future init method completes this future. */
     private GridFutureAdapter<Boolean> initFut;
 
     /** */
@@ -213,7 +213,10 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /** Init timestamp. Used to track the amount of time spent to complete the 
future. */
     private long initTs;
 
-    /** */
+    /**
+     * Centralized affinity assignment required. Activated for node left of 
failed. For this mode crd will send full
+     * partitions maps to nodes using discovery (ring) instead of 
communication.
+     */
     private boolean centralizedAff;
 
     /** Change global state exception. */
@@ -765,7 +768,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     top.update(null,
                         clientTop.partitionMap(true),
                         clientTop.fullUpdateCounters(),
-                        Collections.<Integer>emptySet());
+                        Collections.<Integer>emptySet(),
+                        null);
                 }
             }
 
@@ -1204,7 +1208,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @param node Node.
+     * @param node Target Node.
      * @throws IgniteCheckedException If failed.
      */
     private void sendLocalPartitions(ClusterNode node) throws 
IgniteCheckedException {
@@ -1351,7 +1355,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @param oldestNode Oldest node.
+     * @param oldestNode Oldest node. Target node to send message to.
      */
     private void sendPartitions(ClusterNode oldestNode) {
         try {
@@ -1720,6 +1724,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Processing of received single message. Actual processing in future may 
be delayed if init method was not
+     * completed, see {@link #initDone()}
+     *
      * @param node Sender node.
      * @param msg Single partition info.
      */
@@ -1823,8 +1830,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Note this method performs heavy updatePartitionSingleMap operation, 
this operation is moved out from the
+     * synchronized block. Only count of such updates {@link 
#pendingSingleUpdates} is managed under critical section.
+     *
      * @param nodeId Sender node.
-     * @param msg Message.
+     * @param msg Partition single message.
      */
     private void processSingleMessage(UUID nodeId, 
GridDhtPartitionsSingleMessage msg) {
         if (msg.client()) {
@@ -1833,7 +1843,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             return;
         }
 
-        boolean allReceived = false;
+        boolean allReceived = false; // Received all expected messages.
         boolean updateSingleMap = false;
 
         FinishState finishState0 = null;
@@ -2509,7 +2519,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
     /**
      * @param node Sender node.
-     * @param msg Message.
+     * @param msg Message with full partition info.
      */
     public void onReceivePartitionRequest(final ClusterNode node, final 
GridDhtPartitionsSingleRequest msg) {
         assert !cctx.kernalContext().clientNode() || msg.restoreState();
@@ -2803,7 +2813,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 grp.topology().update(resTopVer,
                     entry.getValue(),
                     cntrMap,
-                    msg.partsToReload(cctx.localNodeId(), grpId));
+                    msg.partsToReload(cctx.localNodeId(), grpId),
+                    null);
             }
             else {
                 ClusterNode oldest = 
cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
@@ -2817,7 +2828,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     top.update(resTopVer,
                         entry.getValue(),
                         cntrMap,
-                        Collections.<Integer>emptySet());
+                        Collections.<Integer>emptySet(),
+                        null);
                 }
             }
         }
@@ -2912,7 +2924,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
-     *
+     * Moves exchange future to state 'init done' using {@link #initFut}.
      */
     private void initDone() {
         while (!isDone()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 19ec0a8..edbfc23 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -129,7 +129,9 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     /**
      * @param id Exchange ID.
      * @param lastVer Last version.
-     * @param topVer Topology version.
+     * @param topVer Topology version. For messages not related to exchange 
may be {@link AffinityTopologyVersion#NONE}.
+     * @param partHistSuppliers
+     * @param partsToReload
      */
     public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId 
id,
         @Nullable GridCacheVersion lastVer,

http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
index dc141db..f307b6a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
@@ -18,14 +18,17 @@
 package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
 
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -45,12 +48,19 @@ public class 
GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
     /** */
     protected static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
 
-    /** */
+    /** Map of destination node ID to runnable with logic for real message 
sending.
+     * To apply real message sending use run method */
     private final ConcurrentHashMap8<UUID, Runnable> rs = new 
ConcurrentHashMap8<>();
 
-    /** */
+    /**
+     * Flag to redirect {@link GridDhtPartitionsFullMessage}s from real 
communication channel to {@link #rs} map.
+     * Applied only to messages not related to particular exchange
+     */
     private volatile boolean record = false;
 
+    /** */
+    private AtomicBoolean replay = new AtomicBoolean();
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration iCfg = super.getConfiguration(igniteInstanceName);
@@ -74,13 +84,26 @@ public class 
GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
             final IgniteInClosure<IgniteException> ackC) throws 
IgniteSpiException {
             final Object msg0 = ((GridIoMessage)msg).message();
 
+            if (log.isDebugEnabled())
+                log.debug("Message [thread=" + 
Thread.currentThread().getName() + ", msg=" + msg0 + ']');
+
             if (msg0 instanceof GridDhtPartitionsFullMessage && record &&
-                ((GridDhtPartitionsFullMessage)msg0).exchangeId() == null) {
-                rs.putIfAbsent(node.id(), new Runnable() {
+                ((GridDhtPartitionsAbstractMessage)msg0).exchangeId() == null) 
{
+                if (log.isDebugEnabled())
+                    log.debug("Record message [toNode=" + node.id() + ", msg=" 
+ msg + "]");
+
+                assert !replay.get() : "Record of message is not allowed after 
replay";
+
+                Runnable prevValue = rs.putIfAbsent(node.id(), new Runnable() {
                     @Override public void run() {
+                        if (log.isDebugEnabled())
+                            log.debug("Replay: " + msg);
+
                         DelayableCommunicationSpi.super.sendMessage(node, msg, 
ackC);
                     }
                 });
+
+                assert prevValue == null : "Duplicate message registered to [" 
+ node.id() + "]";
             }
             else
                 try {
@@ -94,10 +117,10 @@ public class 
GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
     }
 
     /**
-     * @throws Exception e.
+     * @throws Exception e if failed.
      */
     public void test() throws Exception {
-        IgniteKernal ignite = (IgniteKernal)startGrid(0);
+        IgniteEx ignite = startGrid(0);
 
         CacheConfiguration<Integer, Integer> cfg = new 
CacheConfiguration<>(DEFAULT_CACHE_NAME);
 
@@ -144,10 +167,7 @@ public class 
GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
 
         awaitPartitionMapExchange();
 
-        for (Runnable r : rs.values())
-            r.run();
-
-        U.sleep(10000); // Enough time to process delayed 
GridDhtPartitionsFullMessages.
+        replayMessages();
 
         stopGrid(3); // Forces exchange at all nodes and cause assertion 
failure in case obsolete partition map accepted.
 
@@ -167,6 +187,22 @@ public class 
GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
         assert 
grid(2).context().cache().context().exchange().readyAffinityVersion().topologyVersion()
 > topVer2;
     }
 
+    /**
+     * Replays all saved messages from map, actual sent is performed.
+     *
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    private void replayMessages() throws IgniteInterruptedCheckedException {
+        record = false;
+
+        for (Runnable r : rs.values())
+            r.run(); // Causes real messages sending.
+
+        assertTrue(replay.compareAndSet(false, true));
+
+        U.sleep(10000); // Enough time to process delayed 
GridDhtPartitionsFullMessages.
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();

http://git-wip-us.apache.org/repos/asf/ignite/blob/316312d2/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 307456c..f5e8eef 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -524,7 +524,8 @@ public abstract class GridCommonAbstractTest extends 
GridAbstractTest {
     /**
      * @param waitEvicts If {@code true} will wait for evictions finished.
      * @param waitNode2PartUpdate If {@code true} will wait for nodes 
node2part info update finished.
-     * @param nodes Optional nodes.
+     * @param nodes Optional nodes. If {@code null} method will wait for all 
nodes, for non null collection nodes will
+     *      be filtered
      * @throws InterruptedException If interrupted.
      */
     @SuppressWarnings("BusyWait")
@@ -546,7 +547,8 @@ public abstract class GridCommonAbstractTest extends 
GridAbstractTest {
     /**
      * @param waitEvicts If {@code true} will wait for evictions finished.
      * @param waitNode2PartUpdate If {@code true} will wait for nodes 
node2part info update finished.
-     * @param nodes Optional nodes.
+     * @param nodes Optional nodes. If {@code null} method will wait for all 
nodes, for non null collection nodes will
+     *      be filtered
      * @param printPartState If {@code true} will print partition state if 
evictions not happened.
      * @throws InterruptedException If interrupted.
      */

Reply via email to