IGNITE-7464 - Add property to configure time between node connection attempts - 
Fixes #3493

Signed-off-by: Valentin Kulichenko <valentin.kuliche...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4abcb310
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4abcb310
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4abcb310

Branch: refs/heads/ignite-7485-2
Commit: 4abcb31071c6518d9692f1a3ecb443068ce548a4
Parents: b2f8cf8
Author: Stanislav Lukyanov <stanlukya...@gmail.com>
Authored: Thu Feb 8 14:25:11 2018 -0800
Committer: Valentin Kulichenko <valentin.kuliche...@gmail.com>
Committed: Thu Feb 8 14:25:11 2018 -0800

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  21 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   4 +-
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   7 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  36 +-
 .../tcp/TcpDiscoverySpiReconnectDelayTest.java  | 446 +++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java        |   3 +
 6 files changed, 499 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4abcb310/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index c9a4a5a..a8f13fa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -505,9 +505,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                             "Please check IP finder configuration" +
                             (spi.ipFinder instanceof 
TcpDiscoveryMulticastIpFinder ?
                                 " and make sure multicast works on your 
network. " : ". ") +
-                            "Will retry every 2 secs.", true);
+                            "Will retry every " + spi.getReconnectDelay() + " 
ms. " +
+                            "Change 'reconnectDelay' to configure the 
frequency of retries.", true);
 
-                    Thread.sleep(2000);
+                    Thread.sleep(spi.getReconnectDelay());
                 }
             }
 
@@ -566,23 +567,21 @@ class ClientImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (wait) {
-                if (timeout > 0 && (U.currentTimeMillis() - startTime) > 
timeout)
-                    return null;
+            if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
+                return null;
 
+            if (wait) {
                 if (log.isDebugEnabled())
                     log.debug("Will wait before retry join.");
 
-                Thread.sleep(2000);
+                Thread.sleep(spi.getReconnectDelay());
             }
             else if (addrs.isEmpty()) {
-                if (timeout > 0 && (U.currentTimeMillis() - startTime) > 
timeout)
-                    return null;
-
                 LT.warn(log, "Failed to connect to any address from IP finder 
(will retry to join topology " +
-                    "every 2 secs): " + toOrderedList(addrs0), true);
+                    "every " + spi.getReconnectDelay() + " ms; change 
'reconnectDelay' to configure the frequency " +
+                    "of retries): " + toOrderedList(addrs0), true);
 
-                Thread.sleep(2000);
+                Thread.sleep(spi.getReconnectDelay());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4abcb310/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 6f79720..743964a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1091,7 +1091,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     log.debug("Concurrent discovery SPI start has been 
detected (local node should wait).");
 
                 try {
-                    U.sleep(2000);
+                    U.sleep(spi.getReconnectDelay());
                 }
                 catch (IgniteInterruptedCheckedException e) {
                     throw new IgniteSpiException("Thread has been 
interrupted.", e);
@@ -1125,7 +1125,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 try {
-                    U.sleep(2000);
+                    U.sleep(spi.getReconnectDelay());
                 }
                 catch (IgniteInterruptedCheckedException ex) {
                     throw new IgniteSpiException("Thread has been 
interrupted.", ex);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4abcb310/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index f3cf48d..00d83dd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -326,8 +326,9 @@ abstract class TcpDiscoveryImpl {
             }
             catch (IgniteSpiException e) {
                 LT.error(log, e, "Failed to register local node address in IP 
finder on start " +
-                    "(retrying every 2000 ms).");
-            }
+                    "(retrying every " + spi.getReconnectDelay() + " ms; " +
+                    "change 'reconnectDelay' to configure the frequency of 
retries).");
+            };
 
             if (start > 0 && (U.currentTimeMillis() - start) > 
spi.getJoinTimeout())
                 throw new IgniteSpiException(
@@ -337,7 +338,7 @@ abstract class TcpDiscoveryImpl {
                         "[joinTimeout=" + spi.getJoinTimeout() + ']');
 
             try {
-                U.sleep(2000);
+                U.sleep(spi.getReconnectDelay());
             }
             catch (IgniteInterruptedCheckedException e) {
                 throw new IgniteSpiException("Thread has been interrupted.", 
e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4abcb310/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 51c5adf..2d9a314 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -260,6 +260,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements DiscoverySpi {
     /** Default reconnect attempts count (value is <tt>10</tt>). */
     public static final int DFLT_RECONNECT_CNT = 10;
 
+    /** Default delay between attempts to connect to the cluster in 
milliseconds (value is <tt>2000</tt>). */
+    public static final long DFLT_RECONNECT_DELAY = 2000;
+
     /** Default IP finder clean frequency in milliseconds (value is 
<tt>60,000ms</tt>). */
     public static final long DFLT_IP_FINDER_CLEAN_FREQ = 60 * 1000;
 
@@ -349,6 +352,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements DiscoverySpi {
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private int reconCnt = DFLT_RECONNECT_CNT;
 
+    /** Delay between attempts to connect to the cluster. */
+    private long reconDelay = DFLT_RECONNECT_DELAY;
+
     /** Statistics print frequency. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", 
"RedundantFieldInitialization"})
     protected long statsPrintFreq = DFLT_STATS_PRINT_FREQ;
@@ -642,6 +648,31 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements DiscoverySpi {
     }
 
     /**
+     * Gets the amount of time in milliseconds that node waits before retrying 
to (re)connect to the cluster.
+     *
+     * @return Delay between attempts to connect to the cluster in 
milliseconds.
+     */
+    public long getReconnectDelay() {
+        return reconDelay;
+    }
+
+    /**
+     * Sets the amount of time in milliseconds that node waits before retrying 
to (re)connect to the cluster.
+     * <p>
+     * If not specified, default is {@link #DFLT_RECONNECT_DELAY}.
+     *
+     * @param reconDelay Delay between attempts to connect to the cluster in 
milliseconds.
+     *
+     * @return {@code this} for chaining.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public TcpDiscoverySpi setReconnectDelay(int reconDelay) {
+        this.reconDelay = reconDelay;
+
+        return this;
+    }
+
+    /**
      * Gets maximum message acknowledgement timeout.
      *
      * @return Maximum message acknowledgement timeout.
@@ -1730,11 +1761,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements DiscoverySpi {
             }
             catch (IgniteSpiException e) {
                 LT.error(log, e, "Failed to get registered addresses from IP 
finder on start " +
-                    "(retrying every 2000 ms).");
+                    "(retrying every " + getReconnectDelay() + "ms; change 
'reconnectDelay' to configure " +
+                    "the frequency of retries).");
             }
 
             try {
-                U.sleep(2000);
+                U.sleep(getReconnectDelay());
             }
             catch (IgniteInterruptedCheckedException e) {
                 throw new IgniteSpiException("Thread has been interrupted.", 
e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4abcb310/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java
new file mode 100644
index 0000000..89df32c
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.spi.discovery.tcp.TcpDiscoveryImpl.RES_WAIT;
+
+/**
+ * Test for {@link TcpDiscoverySpi#setReconnectDelay(int)}.
+ */
+public class TcpDiscoverySpiReconnectDelayTest extends GridCommonAbstractTest {
+    /** Time to wait for events. */
+    private static final int EVT_TIMEOUT = 120000;
+
+    /** Timeout for socket operations. */
+    private static final int SOCK_AND_ACK_TIMEOUT = 500;
+
+    static {
+        System.setProperty(IgniteSystemProperties.IGNITE_QUIET, "false");
+    }
+
+    //region Client joins after failNode()
+
+    /** */
+    public void testClientJoinAfterFailureShortTimeout() throws Exception {
+        checkClientJoinAfterNodeFailure(5, 500);
+    }
+
+    /** */
+    public void testClientJoinAfterFailureLongTimeout() throws Exception {
+        checkClientJoinAfterNodeFailure(3, 5000);
+    }
+
+    /**
+     * Check that client restores connection after the given time, with the 
expected number of messages sent
+     * and expected time elapsed.
+     *
+     * @param numOfFailedRequests number of TcpDiscoveryJoinRequestMessage to 
be failed before succeed to connect.
+     * @param reconnectDelay argument for {@link 
TcpDiscoverySpi#setReconnectDelay(int)}
+     */
+    private void checkClientJoinAfterNodeFailure(int numOfFailedRequests, int 
reconnectDelay) throws Exception {
+        try (
+            Ignite ignite1 = G.start(getConfiguration("server", false, 
reconnectDelay));
+            Ignite ignite2 = G.start(getConfiguration("client", true, 
reconnectDelay))
+        ) {
+            // Check topology.
+            assertEquals(1L, ignite1.cluster().localNode().order());
+            assertEquals(2L, ignite2.cluster().localNode().order());
+            assertEquals(2L, ignite2.cluster().topologyVersion());
+
+            final CountDownLatch failLatch = new CountDownLatch(1);
+            final CountDownLatch joinLatch = new CountDownLatch(1);
+            final CountDownLatch reconnectLatch = new CountDownLatch(1);
+            final CountDownLatch disconnectLatch = new CountDownLatch(1);
+
+            ignite1.events().localListen(new IgnitePredicate<DiscoveryEvent>() 
{
+                @Override public boolean apply(DiscoveryEvent evt) {
+                    info("Node1 event: " + evt);
+
+                    if (evt.type() == EVT_NODE_FAILED)
+                        failLatch.countDown();
+                    else if (evt.type() == EVT_NODE_JOINED)
+                        joinLatch.countDown();
+
+                    return true;
+                }
+            }, EVT_NODE_FAILED, EVT_NODE_JOINED);
+
+            ignite2.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    info("Node2 event: " + evt);
+
+                    if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED)
+                        disconnectLatch.countDown();
+                    else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED)
+                        reconnectLatch.countDown();
+
+                    return true;
+                }
+            }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+            long startTime = System.currentTimeMillis();
+
+            AtomicInteger failJoinReq = 
((FailingTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi()).failJoinReq;
+            failJoinReq.set(numOfFailedRequests);
+            
ignite1.configuration().getDiscoverySpi().failNode(ignite2.cluster().localNode().id(),
 null);
+
+            assertTrue(disconnectLatch.await(EVT_TIMEOUT, MILLISECONDS));
+            assertTrue(failLatch.await(EVT_TIMEOUT, MILLISECONDS));
+            assertTrue(reconnectLatch.await(EVT_TIMEOUT, MILLISECONDS));
+            assertTrue(joinLatch.await(EVT_TIMEOUT, MILLISECONDS));
+
+            long endTime = System.currentTimeMillis();
+
+            // Check topology.
+            assertEquals(1L, ignite1.cluster().localNode().order());
+            assertEquals(4L, ignite2.cluster().localNode().order());
+            assertEquals(4L, ignite2.cluster().topologyVersion());
+
+            // Check connection time.
+            // Total time should be at least the sum of all delays.
+            long totalTime = endTime - startTime;
+            long expTotalTime = numOfFailedRequests * reconnectDelay;
+            assertTrue(totalTime >= expTotalTime);
+
+            // Check number of messages.
+            // If exactly numOfFailedRequests fail, counter will be at -1.
+            // If unexpected additional requests are sent, counter will be <= 
-2.
+            int cntr = failJoinReq.get();
+            int numOfMessages = numOfFailedRequests - cntr;
+            int expNumOfMessages = numOfFailedRequests + 1;
+            assertEquals("Unexpected number of messages", expNumOfMessages, 
numOfMessages);
+        }
+    }
+
+    //endregion
+
+    //region Client joins after brakeConnection()
+
+    /** */
+    public void testClientJoinAfterSocketClosedShortTimeout() throws Exception 
{
+        checkClientJoinAfterSocketClosed(5, 500);
+    }
+
+    /** */
+    public void testClientJoinAfterSocketClosedLongTimeout() throws Exception {
+        checkClientJoinAfterSocketClosed(3, 5000);
+    }
+
+    /**
+     * Check that client restores connection after the given time, with the 
expected number of messages sent
+     * and expected time elapsed.
+     *
+     * @param numOfFailedRequests number of TcpDiscoveryJoinRequestMessage to 
be failed before succeed to connect.
+     * @param reconnectDelay argument for {@link 
TcpDiscoverySpi#setReconnectDelay(int)}
+     */
+    private void checkClientJoinAfterSocketClosed(int numOfFailedRequests, int 
reconnectDelay) throws Exception {
+        try (
+            Ignite ignite1 = G.start(getConfiguration("server", false, 
reconnectDelay));
+            Ignite ignite2 = G.start(getConfiguration("client", true, 
reconnectDelay))
+        ) {
+            // Check topology.
+            assertEquals(1L, ignite1.cluster().localNode().order());
+            assertEquals(2L, ignite2.cluster().localNode().order());
+            assertEquals(2L, ignite2.cluster().topologyVersion());
+
+            long startTime = System.currentTimeMillis();
+
+            AtomicInteger failCntr = 
((FailingTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi()).failReconReq;
+            failCntr.set(numOfFailedRequests);
+            
((TcpDiscoverySpi)ignite2.configuration().getDiscoverySpi()).brakeConnection();
+
+            // Need to send a discovery message to a remote node to provoke 
reconnection.
+            // remoteListen() is used because it is synchronous (e.g. send() 
is not).
+            ignite2.message().remoteListen(null, new DummyListener());
+
+            long endTime = System.currentTimeMillis();
+
+            // Check topology.
+            assertEquals(1L, ignite1.cluster().localNode().order());
+            assertEquals(2L, ignite2.cluster().localNode().order());
+            assertEquals(2L, ignite2.cluster().topologyVersion());
+
+            // Check connection time.
+            // Total time should be at least the sum of all delays.
+            long totalTime = endTime - startTime;
+            long expTotalTime = numOfFailedRequests * reconnectDelay;
+            assertTrue(totalTime >= expTotalTime);
+
+            // Check number of messages.
+            // If exactly numOfFailedRequests fail, counter will be at -1.
+            // If unexpected additional requests are sent, counter will be <= 
-2.
+            int cntr = failCntr.get();
+            int numOfMessages = numOfFailedRequests - cntr;
+            int expNumOfMessages = numOfFailedRequests + 1;
+            assertEquals("Unexpected number of messages", expNumOfMessages, 
numOfMessages);
+        }
+    }
+
+    //endregion
+
+    //region Client joins at start
+
+    /** */
+    public void testClientJoinAtStartShortTimeout() throws Exception {
+        checkClientJoinAtStart(5, 500);
+    }
+
+    /** */
+    public void testClientJoinAtStartLongTimeout() throws Exception {
+        checkClientJoinAtStart(3, 5000);
+    }
+
+    /** */
+    public void testServerJoinAtStartShortTimeout() throws Exception {
+        checkServerJoinAtStart(5, 500);
+    }
+
+    /** */
+    public void testServerJoinAtStartLongTimeout() throws Exception {
+        checkServerJoinAtStart(3, 5000);
+    }
+
+    /**
+     * Check that client restores connection after the given time, with the 
expected number of messages sent
+     * and expected time elapsed.
+     *  @param numOfFailedRequests number of TcpDiscoveryJoinRequestMessage to 
be failed before succeed to connect.
+     * @param reconnectDelay argument for {@link 
TcpDiscoverySpi#setReconnectDelay(int)}
+     */
+    private void checkClientJoinAtStart(int numOfFailedRequests, int 
reconnectDelay) throws Exception {
+        try (
+            Ignite ignite1 = G.start(getConfiguration("server", false, 
reconnectDelay))
+        ) {
+            final CountDownLatch joinLatch = new CountDownLatch(1);
+
+            ignite1.events().localListen(new IgnitePredicate<DiscoveryEvent>() 
{
+                @Override public boolean apply(DiscoveryEvent evt) {
+                    info("Node1 event: " + evt);
+
+                    if (evt.type() == EVT_NODE_JOINED)
+                        joinLatch.countDown();
+
+                    return true;
+                }
+            }, EVT_NODE_JOINED);
+
+            IgniteConfiguration ignite2Cfg = getConfiguration("client", true, 
reconnectDelay);
+            final AtomicInteger failJoinReq = 
((FailingTcpDiscoverySpi)ignite2Cfg.getDiscoverySpi()).failJoinReq;
+            failJoinReq.set(numOfFailedRequests);
+
+            final long startTime = System.currentTimeMillis();
+
+            try (Ignite ignite2 = G.start(ignite2Cfg)) {
+                assertTrue(joinLatch.await(EVT_TIMEOUT, MILLISECONDS));
+
+                long endTime = System.currentTimeMillis();
+
+                // Check topology.
+                assertEquals(1L, ignite1.cluster().localNode().order());
+                assertEquals(2L, ignite2.cluster().localNode().order());
+                assertEquals(2L, ignite2.cluster().topologyVersion());
+
+                // Check connection time.
+                // Total time should be at least the sum of all delays.
+                long totalTime = endTime - startTime;
+                long expTotalTime = numOfFailedRequests * reconnectDelay;
+                assertTrue(totalTime >= expTotalTime);
+
+                // Check number of messages.
+                // If exactly numOfFailedRequests fail, counter will be at -1.
+                // If unexpected additional requests are sent, counter will be 
<= -2.
+                int cntr = failJoinReq.get();
+                int numOfMessages = numOfFailedRequests - cntr;
+                int expNumOfMessages = numOfFailedRequests + 1;
+                assertEquals("Unexpected number of messages", 
expNumOfMessages, numOfMessages);
+            }
+        }
+    }
+
+    /**
+     * Check that client restores connection after the given time, with the 
expected number of messages sent
+     * and expected time elapsed.
+     *  @param numOfFailedRequests number of TcpDiscoveryJoinRequestMessage to 
be failed before succeed to connect.
+     * @param reconnectDelay argument for {@link 
TcpDiscoverySpi#setReconnectDelay(int)}
+     */
+    private void checkServerJoinAtStart(int numOfFailedRequests, int 
reconnectDelay) throws Exception {
+        try (
+            Ignite ignite1 = G.start(getConfiguration("server", false, 
reconnectDelay))
+        ) {
+            final CountDownLatch joinLatch = new CountDownLatch(1);
+            final AtomicInteger failJoinReqRes = 
((FailingTcpDiscoverySpi)ignite1.configuration().getDiscoverySpi())
+                .failJoinReqRes;
+            failJoinReqRes.set(numOfFailedRequests);
+
+            ignite1.events().localListen(new IgnitePredicate<DiscoveryEvent>() 
{
+                @Override public boolean apply(DiscoveryEvent evt) {
+                    info("Node1 event: " + evt);
+
+                    if (evt.type() == EVT_NODE_JOINED)
+                        joinLatch.countDown();
+
+                    return true;
+                }
+            }, EVT_NODE_JOINED);
+
+            final long startTime = System.currentTimeMillis();
+
+            try (Ignite ignite2 = G.start(getConfiguration("server-2", false, 
reconnectDelay))) {
+                assertTrue(joinLatch.await(EVT_TIMEOUT, MILLISECONDS));
+
+                long endTime = System.currentTimeMillis();
+
+                // Check topology.
+                assertEquals(1L, ignite1.cluster().localNode().order());
+                assertEquals(2L, ignite2.cluster().localNode().order());
+                assertEquals(2L, ignite2.cluster().topologyVersion());
+
+                // Check connection time.
+                // Total time should be at least the sum of all delays.
+                long totalTime = endTime - startTime;
+                long expTotalTime = numOfFailedRequests * reconnectDelay;
+                assertTrue(totalTime >= expTotalTime);
+
+                // Check number of messages.
+                // If exactly numOfFailedRequests fail, counter will be at -1.
+                // If unexpected additional requests are sent, counter will be 
<= -2.
+                int cntr = failJoinReqRes.get();
+                int numOfMessages = numOfFailedRequests - cntr;
+                int expNumOfMessages = numOfFailedRequests + 1;
+                assertEquals("Unexpected number of messages", 
expNumOfMessages, numOfMessages);
+            }
+        }
+    }
+
+    //endregion
+
+    //region Helpers
+
+    /** */
+    private IgniteConfiguration getConfiguration(String name, boolean 
isClient, int reconnectDelay) {
+        IgniteConfiguration cfg = new IgniteConfiguration()
+            .setIgniteInstanceName(name)
+            .setDiscoverySpi(new FailingTcpDiscoverySpi()
+                .setIpFinder(LOCAL_IP_FINDER)
+                .setReconnectDelay(reconnectDelay)
+                // Allow reconnection to take long.
+                .setNetworkTimeout(EVT_TIMEOUT)
+                // Make sure reconnection attempts are short enough.
+                // Each reconnection attempt is
+                // 500ms for write (socketTimeout) + 500ms for read 
(ackTimeout)
+                // tried only once.
+                .setSocketTimeout(SOCK_AND_ACK_TIMEOUT)
+                .setAckTimeout(SOCK_AND_ACK_TIMEOUT)
+                .setReconnectCount(1))
+            // Make sure that server doesn't kick reconnecting client out.
+            .setClientFailureDetectionTimeout(EVT_TIMEOUT);
+
+        if (isClient)
+            cfg.setClientMode(true);
+        return cfg;
+    }
+
+    /** Custom Discovery SPI allowing to fail sending of certain messages. */
+    private static class FailingTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        private final AtomicInteger failJoinReq = new AtomicInteger();
+
+        /** */
+        private final AtomicInteger failJoinReqRes = new AtomicInteger();
+
+        /** */
+        private final AtomicInteger failReconReq = new AtomicInteger();
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+
+            if (!onMessage(sock, msg))
+                return;
+
+            super.writeToSocket(sock, out, msg, timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage 
msg, Socket sock, int res,
+            long timeout) throws IOException {
+
+            if (msg instanceof TcpDiscoveryJoinRequestMessage && 
failJoinReqRes.getAndDecrement() > 0)
+                res = RES_WAIT;
+
+            super.writeToSocket(msg, sock, res, timeout);
+        }
+
+        /**
+         * @param sock Socket.
+         * @param msg Message.
+         * @return {@code False} if should not further process message.
+         * @throws IOException If failed.
+         */
+        private boolean onMessage(Socket sock, TcpDiscoveryAbstractMessage 
msg) throws IOException {
+            boolean fail = false;
+
+            if (msg instanceof TcpDiscoveryJoinRequestMessage)
+                fail = failJoinReq.getAndDecrement() > 0;
+            if (msg instanceof TcpDiscoveryClientReconnectMessage)
+                fail = failReconReq.getAndDecrement() > 0;
+
+            if (fail) {
+                log.info("Close socket on message write [msg=" + msg + "]");
+
+                sock.close();
+            }
+
+            return true;
+        }
+    }
+
+    /** */
+    private static class DummyListener implements IgniteBiPredicate<UUID, 
Object> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID uuid, Object msg) {
+            return true;
+        }
+    }
+
+    //endregion
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4abcb310/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 6e51c36..0681d10 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -38,6 +38,7 @@ import 
org.apache.ignite.spi.discovery.tcp.TcpDiscoverySelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySnapshotHistoryTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiConfigSelfTest;
 import 
org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiFailureTimeoutSelfTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiReconnectDelayTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiStartStopSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslSecuredUnsecuredTest;
@@ -99,6 +100,8 @@ public class IgniteSpiDiscoverySelfTestSuite extends 
TestSuite {
 
         suite.addTest(new TestSuite(TcpDiscoveryWithWrongServerTest.class));
 
+        suite.addTest(new TestSuite(TcpDiscoverySpiReconnectDelayTest.class));
+
         // Client connect.
         suite.addTest(new TestSuite(IgniteClientConnectTest.class));
         suite.addTest(new 
TestSuite(IgniteClientReconnectMassiveShutdownTest.class));

Reply via email to