ignite-1.5 Fixed client discovery impl to skip node failed message processing 
while disconnected.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4687d9f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4687d9f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4687d9f

Branch: refs/heads/master
Commit: d4687d9f636b38736d327351ca4b22c3262a2ae8
Parents: 58b55b5
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Dec 21 10:19:51 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Dec 21 10:19:51 2015 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  4 +-
 .../dht/preloader/GridDhtPreloader.java         | 29 ------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 48 +++++++++++---------
 .../IgniteClientReconnectCacheTest.java         | 26 +++++++++--
 .../cache/IgniteCachePutAllRestartTest.java     |  2 +-
 5 files changed, 53 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d4687d9f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 92d66d7..72a2bef 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1641,7 +1641,9 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
         if (cache == null) {
             throw new IgniteException("Failed to resolve nodes topology 
[cacheName=" + cacheName +
-                ", topVer=" + topVer + ", history=" + discoCacheHist.keySet() +
+                ", topVer=" + topVer +
+                ", history=" + discoCacheHist.keySet() +
+                ", snap=" + snap +
                 ", locNode=" + ctx.discovery().localNode() + ']');
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4687d9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index c46a66c..f0054e4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -48,7 +48,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -92,9 +91,6 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     /** */
     private GridDhtPartitionTopology top;
 
-    /** Topology version. */
-    private final GridAtomicLong topVer = new GridAtomicLong();
-
     /** Force key futures. */
     private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> 
forceKeyFuts = newMap();
 
@@ -149,11 +145,6 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
                 assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() 
: "Node joined with smaller-than-local " +
                     "order [newOrder=" + n.order() + ", locOrder=" + 
loc.order() + ']';
 
-                boolean set = topVer.setIfGreater(e.topologyVersion());
-
-                assert set : "Have you configured TcpDiscoverySpi for your 
in-memory data grid? [newVer=" +
-                    e.topologyVersion() + ", curVer=" + topVer.get() + ", 
evt=" + e + ']';
-
                 if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) {
                     for (GridDhtAssignmentFetchFuture fut : 
pendingAssignmentFetchFuts.values())
                         fut.onNodeLeft(e.eventNode().id());
@@ -238,20 +229,6 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        if (log.isDebugEnabled())
-            log.debug("DHT rebalancer onKernalStart callback.");
-
-        ClusterNode loc = cctx.localNode();
-
-        assert loc.metrics().getStartTime() > 0;
-
-        final long startTopVer = loc.order();
-
-        topVer.setIfGreater(startTopVer);
-    }
-
-    /** {@inheritDoc} */
     @Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> 
preloadPred) {
         super.preloadPredicate(preloadPred);
 
@@ -382,12 +359,6 @@ public class GridDhtPreloader extends 
GridCachePreloaderAdapter {
     /** {@inheritDoc} */
     @Override public void onReconnected() {
         startFut = new GridFutureAdapter<>();
-
-        long topVer0 = cctx.kernalContext().discovery().topologyVersion();
-
-        assert topVer0 > 0 : topVer0;
-
-        topVer.set(topVer0);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4687d9f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 8f6c8a9..850cc24 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1828,36 +1828,42 @@ class ClientImpl extends TcpDiscoveryImpl {
                 return;
             }
 
-            if (!getLocalNodeId().equals(msg.creatorNodeId())) {
-                TcpDiscoveryNode node = rmtNodes.remove(msg.failedNodeId());
+            if (nodeAdded()) {
+                if (!getLocalNodeId().equals(msg.creatorNodeId())) {
+                    TcpDiscoveryNode node = 
rmtNodes.remove(msg.failedNodeId());
 
-                if (node == null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Discarding node failed message since node 
is not found [msg=" + msg + ']');
+                    if (node == null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Discarding node failed message since 
node is not found [msg=" + msg + ']');
 
-                    return;
-                }
+                        return;
+                    }
 
-                Collection<ClusterNode> top = 
updateTopologyHistory(msg.topologyVersion(), msg);
+                    Collection<ClusterNode> top = 
updateTopologyHistory(msg.topologyVersion(), msg);
 
-                if (state != CONNECTED) {
-                    if (log.isDebugEnabled())
-                        log.debug("Discarding node failed message (join 
process is not finished): " + msg);
+                    if (state != CONNECTED) {
+                        if (log.isDebugEnabled())
+                            log.debug("Discarding node failed message (join 
process is not finished): " + msg);
 
-                    return;
-                }
+                        return;
+                    }
 
-                if (msg.warning() != null) {
-                    ClusterNode creatorNode = 
rmtNodes.get(msg.creatorNodeId());
+                    if (msg.warning() != null) {
+                        ClusterNode creatorNode = 
rmtNodes.get(msg.creatorNodeId());
 
-                    U.warn(log, "Received EVT_NODE_FAILED event with warning 
[" +
-                        "nodeInitiatedEvt=" + (creatorNode != null ? 
creatorNode : msg.creatorNodeId()) +
-                        ", msg=" + msg.warning() + ']');
-                }
+                        U.warn(log, "Received EVT_NODE_FAILED event with 
warning [" +
+                            "nodeInitiatedEvt=" + (creatorNode != null ? 
creatorNode : msg.creatorNodeId()) +
+                            ", msg=" + msg.warning() + ']');
+                    }
 
-                notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), node, 
top);
+                    notifyDiscovery(EVT_NODE_FAILED, msg.topologyVersion(), 
node, top);
 
-                spi.stats.onNodeFailed();
+                    spi.stats.onNodeFailed();
+                }
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Ignore topology message, local node not added 
to topology: " + msg);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4687d9f/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 5234d6e..ad6c46f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -1088,7 +1088,7 @@ public class IgniteClientReconnectCacheTest extends 
IgniteClientReconnectAbstrac
 
         clientMode = true;
 
-        final int CLIENTS = 2;
+        final int CLIENTS = 5;
 
         List<Ignite> clients = new ArrayList<>();
 
@@ -1103,12 +1103,14 @@ public class IgniteClientReconnectCacheTest extends 
IgniteClientReconnectAbstrac
         int nodes = SRV_CNT + CLIENTS;
         int srvNodes = SRV_CNT;
 
-        for (int iter = 0; iter < 3; iter++) {
+        for (int iter = 0; iter < 5; iter++) {
             log.info("Iteration: " + iter);
 
             reconnectClientNodes(log, clients, grid(0), null);
 
-            for (Ignite client : clients) {
+            final int expNodes = CLIENTS + srvNodes;
+
+            for (final Ignite client : clients) {
                 IgniteCache<Object, Object> cache = client.cache(null);
 
                 assertNotNull(cache);
@@ -1117,6 +1119,14 @@ public class IgniteClientReconnectCacheTest extends 
IgniteClientReconnectAbstrac
 
                 assertEquals(1, cache.get(client.name()));
 
+                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        ClusterGroup grp = 
client.cluster().forCacheNodes(null);
+
+                        return grp.nodes().size() == expNodes;
+                    }
+                }, 5000);
+
                 ClusterGroup grp = client.cluster().forCacheNodes(null);
 
                 assertEquals(CLIENTS + srvNodes, grp.nodes().size());
@@ -1127,7 +1137,15 @@ public class IgniteClientReconnectCacheTest extends 
IgniteClientReconnectAbstrac
             }
 
             for (int i = 0; i < nodes; i++) {
-                Ignite ignite = grid(i);
+                final Ignite ignite = grid(i);
+
+                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        ClusterGroup grp = 
ignite.cluster().forCacheNodes(null);
+
+                        return grp.nodes().size() == expNodes;
+                    }
+                }, 5000);
 
                 ClusterGroup grp = ignite.cluster().forCacheNodes(null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4687d9f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
index 3e124f3..96a396c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
@@ -121,7 +121,7 @@ public class IgniteCachePutAllRestartTest extends 
GridCommonAbstractTest {
 
                     iter++;
 
-                    if (iter % 10 == 0)
+                    if (iter % 1000 == 0)
                         log.info("Iteration: " + iter);
                 }
 

Reply via email to