IGNITE-7464 - Add property to configure time between node connection attempts - Fixes #3493
Signed-off-by: Valentin Kulichenko <valentin.kuliche...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4abcb310 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4abcb310 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4abcb310 Branch: refs/heads/ignite-7485-2 Commit: 4abcb31071c6518d9692f1a3ecb443068ce548a4 Parents: b2f8cf8 Author: Stanislav Lukyanov <stanlukya...@gmail.com> Authored: Thu Feb 8 14:25:11 2018 -0800 Committer: Valentin Kulichenko <valentin.kuliche...@gmail.com> Committed: Thu Feb 8 14:25:11 2018 -0800 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ClientImpl.java | 21 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 4 +- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 7 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 36 +- .../tcp/TcpDiscoverySpiReconnectDelayTest.java | 446 +++++++++++++++++++ .../IgniteSpiDiscoverySelfTestSuite.java | 3 + 6 files changed, 499 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4abcb310/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index c9a4a5a..a8f13fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -505,9 +505,10 @@ class ClientImpl extends TcpDiscoveryImpl { "Please check IP finder configuration" + (spi.ipFinder instanceof TcpDiscoveryMulticastIpFinder ? " and make sure multicast works on your network. " : ". ") + - "Will retry every 2 secs.", true); + "Will retry every " + spi.getReconnectDelay() + " ms. " + + "Change 'reconnectDelay' to configure the frequency of retries.", true); - Thread.sleep(2000); + Thread.sleep(spi.getReconnectDelay()); } } @@ -566,23 +567,21 @@ class ClientImpl extends TcpDiscoveryImpl { } } - if (wait) { - if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout) - return null; + if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout) + return null; + if (wait) { if (log.isDebugEnabled()) log.debug("Will wait before retry join."); - Thread.sleep(2000); + Thread.sleep(spi.getReconnectDelay()); } else if (addrs.isEmpty()) { - if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout) - return null; - LT.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " + - "every 2 secs): " + toOrderedList(addrs0), true); + "every " + spi.getReconnectDelay() + " ms; change 'reconnectDelay' to configure the frequency " + + "of retries): " + toOrderedList(addrs0), true); - Thread.sleep(2000); + Thread.sleep(spi.getReconnectDelay()); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4abcb310/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 6f79720..743964a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1091,7 +1091,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Concurrent discovery SPI start has been detected (local node should wait)."); try { - U.sleep(2000); + U.sleep(spi.getReconnectDelay()); } catch (IgniteInterruptedCheckedException e) { throw new IgniteSpiException("Thread has been interrupted.", e); @@ -1125,7 +1125,7 @@ class ServerImpl extends TcpDiscoveryImpl { } try { - U.sleep(2000); + U.sleep(spi.getReconnectDelay()); } catch (IgniteInterruptedCheckedException ex) { throw new IgniteSpiException("Thread has been interrupted.", ex); http://git-wip-us.apache.org/repos/asf/ignite/blob/4abcb310/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index f3cf48d..00d83dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -326,8 +326,9 @@ abstract class TcpDiscoveryImpl { } catch (IgniteSpiException e) { LT.error(log, e, "Failed to register local node address in IP finder on start " + - "(retrying every 2000 ms)."); - } + "(retrying every " + spi.getReconnectDelay() + " ms; " + + "change 'reconnectDelay' to configure the frequency of retries)."); + }; if (start > 0 && (U.currentTimeMillis() - start) > spi.getJoinTimeout()) throw new IgniteSpiException( @@ -337,7 +338,7 @@ abstract class TcpDiscoveryImpl { "[joinTimeout=" + spi.getJoinTimeout() + ']'); try { - U.sleep(2000); + U.sleep(spi.getReconnectDelay()); } catch (IgniteInterruptedCheckedException e) { throw new IgniteSpiException("Thread has been interrupted.", e); http://git-wip-us.apache.org/repos/asf/ignite/blob/4abcb310/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 51c5adf..2d9a314 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -260,6 +260,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { /** Default reconnect attempts count (value is <tt>10</tt>). */ public static final int DFLT_RECONNECT_CNT = 10; + /** Default delay between attempts to connect to the cluster in milliseconds (value is <tt>2000</tt>). */ + public static final long DFLT_RECONNECT_DELAY = 2000; + /** Default IP finder clean frequency in milliseconds (value is <tt>60,000ms</tt>). */ public static final long DFLT_IP_FINDER_CLEAN_FREQ = 60 * 1000; @@ -349,6 +352,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private int reconCnt = DFLT_RECONNECT_CNT; + /** Delay between attempts to connect to the cluster. */ + private long reconDelay = DFLT_RECONNECT_DELAY; + /** Statistics print frequency. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"}) protected long statsPrintFreq = DFLT_STATS_PRINT_FREQ; @@ -642,6 +648,31 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { } /** + * Gets the amount of time in milliseconds that node waits before retrying to (re)connect to the cluster. + * + * @return Delay between attempts to connect to the cluster in milliseconds. + */ + public long getReconnectDelay() { + return reconDelay; + } + + /** + * Sets the amount of time in milliseconds that node waits before retrying to (re)connect to the cluster. + * <p> + * If not specified, default is {@link #DFLT_RECONNECT_DELAY}. + * + * @param reconDelay Delay between attempts to connect to the cluster in milliseconds. + * + * @return {@code this} for chaining. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setReconnectDelay(int reconDelay) { + this.reconDelay = reconDelay; + + return this; + } + + /** * Gets maximum message acknowledgement timeout. * * @return Maximum message acknowledgement timeout. @@ -1730,11 +1761,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { } catch (IgniteSpiException e) { LT.error(log, e, "Failed to get registered addresses from IP finder on start " + - "(retrying every 2000 ms)."); + "(retrying every " + getReconnectDelay() + "ms; change 'reconnectDelay' to configure " + + "the frequency of retries)."); } try { - U.sleep(2000); + U.sleep(getReconnectDelay()); } catch (IgniteInterruptedCheckedException e) { throw new IgniteSpiException("Thread has been interrupted.", e); http://git-wip-us.apache.org/repos/asf/ignite/blob/4abcb310/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java new file mode 100644 index 0000000..89df32c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiReconnectDelayTest.java @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.spi.discovery.tcp.TcpDiscoveryImpl.RES_WAIT; + +/** + * Test for {@link TcpDiscoverySpi#setReconnectDelay(int)}. + */ +public class TcpDiscoverySpiReconnectDelayTest extends GridCommonAbstractTest { + /** Time to wait for events. */ + private static final int EVT_TIMEOUT = 120000; + + /** Timeout for socket operations. */ + private static final int SOCK_AND_ACK_TIMEOUT = 500; + + static { + System.setProperty(IgniteSystemProperties.IGNITE_QUIET, "false"); + } + + //region Client joins after failNode() + + /** */ + public void testClientJoinAfterFailureShortTimeout() throws Exception { + checkClientJoinAfterNodeFailure(5, 500); + } + + /** */ + public void testClientJoinAfterFailureLongTimeout() throws Exception { + checkClientJoinAfterNodeFailure(3, 5000); + } + + /** + * Check that client restores connection after the given time, with the expected number of messages sent + * and expected time elapsed. + * + * @param numOfFailedRequests number of TcpDiscoveryJoinRequestMessage to be failed before succeed to connect. + * @param reconnectDelay argument for {@link TcpDiscoverySpi#setReconnectDelay(int)} + */ + private void checkClientJoinAfterNodeFailure(int numOfFailedRequests, int reconnectDelay) throws Exception { + try ( + Ignite ignite1 = G.start(getConfiguration("server", false, reconnectDelay)); + Ignite ignite2 = G.start(getConfiguration("client", true, reconnectDelay)) + ) { + // Check topology. + assertEquals(1L, ignite1.cluster().localNode().order()); + assertEquals(2L, ignite2.cluster().localNode().order()); + assertEquals(2L, ignite2.cluster().topologyVersion()); + + final CountDownLatch failLatch = new CountDownLatch(1); + final CountDownLatch joinLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + final CountDownLatch disconnectLatch = new CountDownLatch(1); + + ignite1.events().localListen(new IgnitePredicate<DiscoveryEvent>() { + @Override public boolean apply(DiscoveryEvent evt) { + info("Node1 event: " + evt); + + if (evt.type() == EVT_NODE_FAILED) + failLatch.countDown(); + else if (evt.type() == EVT_NODE_JOINED) + joinLatch.countDown(); + + return true; + } + }, EVT_NODE_FAILED, EVT_NODE_JOINED); + + ignite2.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Node2 event: " + evt); + + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) + disconnectLatch.countDown(); + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) + reconnectLatch.countDown(); + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + long startTime = System.currentTimeMillis(); + + AtomicInteger failJoinReq = ((FailingTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi()).failJoinReq; + failJoinReq.set(numOfFailedRequests); + ignite1.configuration().getDiscoverySpi().failNode(ignite2.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(EVT_TIMEOUT, MILLISECONDS)); + assertTrue(failLatch.await(EVT_TIMEOUT, MILLISECONDS)); + assertTrue(reconnectLatch.await(EVT_TIMEOUT, MILLISECONDS)); + assertTrue(joinLatch.await(EVT_TIMEOUT, MILLISECONDS)); + + long endTime = System.currentTimeMillis(); + + // Check topology. + assertEquals(1L, ignite1.cluster().localNode().order()); + assertEquals(4L, ignite2.cluster().localNode().order()); + assertEquals(4L, ignite2.cluster().topologyVersion()); + + // Check connection time. + // Total time should be at least the sum of all delays. + long totalTime = endTime - startTime; + long expTotalTime = numOfFailedRequests * reconnectDelay; + assertTrue(totalTime >= expTotalTime); + + // Check number of messages. + // If exactly numOfFailedRequests fail, counter will be at -1. + // If unexpected additional requests are sent, counter will be <= -2. + int cntr = failJoinReq.get(); + int numOfMessages = numOfFailedRequests - cntr; + int expNumOfMessages = numOfFailedRequests + 1; + assertEquals("Unexpected number of messages", expNumOfMessages, numOfMessages); + } + } + + //endregion + + //region Client joins after brakeConnection() + + /** */ + public void testClientJoinAfterSocketClosedShortTimeout() throws Exception { + checkClientJoinAfterSocketClosed(5, 500); + } + + /** */ + public void testClientJoinAfterSocketClosedLongTimeout() throws Exception { + checkClientJoinAfterSocketClosed(3, 5000); + } + + /** + * Check that client restores connection after the given time, with the expected number of messages sent + * and expected time elapsed. + * + * @param numOfFailedRequests number of TcpDiscoveryJoinRequestMessage to be failed before succeed to connect. + * @param reconnectDelay argument for {@link TcpDiscoverySpi#setReconnectDelay(int)} + */ + private void checkClientJoinAfterSocketClosed(int numOfFailedRequests, int reconnectDelay) throws Exception { + try ( + Ignite ignite1 = G.start(getConfiguration("server", false, reconnectDelay)); + Ignite ignite2 = G.start(getConfiguration("client", true, reconnectDelay)) + ) { + // Check topology. + assertEquals(1L, ignite1.cluster().localNode().order()); + assertEquals(2L, ignite2.cluster().localNode().order()); + assertEquals(2L, ignite2.cluster().topologyVersion()); + + long startTime = System.currentTimeMillis(); + + AtomicInteger failCntr = ((FailingTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi()).failReconReq; + failCntr.set(numOfFailedRequests); + ((TcpDiscoverySpi)ignite2.configuration().getDiscoverySpi()).brakeConnection(); + + // Need to send a discovery message to a remote node to provoke reconnection. + // remoteListen() is used because it is synchronous (e.g. send() is not). + ignite2.message().remoteListen(null, new DummyListener()); + + long endTime = System.currentTimeMillis(); + + // Check topology. + assertEquals(1L, ignite1.cluster().localNode().order()); + assertEquals(2L, ignite2.cluster().localNode().order()); + assertEquals(2L, ignite2.cluster().topologyVersion()); + + // Check connection time. + // Total time should be at least the sum of all delays. + long totalTime = endTime - startTime; + long expTotalTime = numOfFailedRequests * reconnectDelay; + assertTrue(totalTime >= expTotalTime); + + // Check number of messages. + // If exactly numOfFailedRequests fail, counter will be at -1. + // If unexpected additional requests are sent, counter will be <= -2. + int cntr = failCntr.get(); + int numOfMessages = numOfFailedRequests - cntr; + int expNumOfMessages = numOfFailedRequests + 1; + assertEquals("Unexpected number of messages", expNumOfMessages, numOfMessages); + } + } + + //endregion + + //region Client joins at start + + /** */ + public void testClientJoinAtStartShortTimeout() throws Exception { + checkClientJoinAtStart(5, 500); + } + + /** */ + public void testClientJoinAtStartLongTimeout() throws Exception { + checkClientJoinAtStart(3, 5000); + } + + /** */ + public void testServerJoinAtStartShortTimeout() throws Exception { + checkServerJoinAtStart(5, 500); + } + + /** */ + public void testServerJoinAtStartLongTimeout() throws Exception { + checkServerJoinAtStart(3, 5000); + } + + /** + * Check that client restores connection after the given time, with the expected number of messages sent + * and expected time elapsed. + * @param numOfFailedRequests number of TcpDiscoveryJoinRequestMessage to be failed before succeed to connect. + * @param reconnectDelay argument for {@link TcpDiscoverySpi#setReconnectDelay(int)} + */ + private void checkClientJoinAtStart(int numOfFailedRequests, int reconnectDelay) throws Exception { + try ( + Ignite ignite1 = G.start(getConfiguration("server", false, reconnectDelay)) + ) { + final CountDownLatch joinLatch = new CountDownLatch(1); + + ignite1.events().localListen(new IgnitePredicate<DiscoveryEvent>() { + @Override public boolean apply(DiscoveryEvent evt) { + info("Node1 event: " + evt); + + if (evt.type() == EVT_NODE_JOINED) + joinLatch.countDown(); + + return true; + } + }, EVT_NODE_JOINED); + + IgniteConfiguration ignite2Cfg = getConfiguration("client", true, reconnectDelay); + final AtomicInteger failJoinReq = ((FailingTcpDiscoverySpi)ignite2Cfg.getDiscoverySpi()).failJoinReq; + failJoinReq.set(numOfFailedRequests); + + final long startTime = System.currentTimeMillis(); + + try (Ignite ignite2 = G.start(ignite2Cfg)) { + assertTrue(joinLatch.await(EVT_TIMEOUT, MILLISECONDS)); + + long endTime = System.currentTimeMillis(); + + // Check topology. + assertEquals(1L, ignite1.cluster().localNode().order()); + assertEquals(2L, ignite2.cluster().localNode().order()); + assertEquals(2L, ignite2.cluster().topologyVersion()); + + // Check connection time. + // Total time should be at least the sum of all delays. + long totalTime = endTime - startTime; + long expTotalTime = numOfFailedRequests * reconnectDelay; + assertTrue(totalTime >= expTotalTime); + + // Check number of messages. + // If exactly numOfFailedRequests fail, counter will be at -1. + // If unexpected additional requests are sent, counter will be <= -2. + int cntr = failJoinReq.get(); + int numOfMessages = numOfFailedRequests - cntr; + int expNumOfMessages = numOfFailedRequests + 1; + assertEquals("Unexpected number of messages", expNumOfMessages, numOfMessages); + } + } + } + + /** + * Check that client restores connection after the given time, with the expected number of messages sent + * and expected time elapsed. + * @param numOfFailedRequests number of TcpDiscoveryJoinRequestMessage to be failed before succeed to connect. + * @param reconnectDelay argument for {@link TcpDiscoverySpi#setReconnectDelay(int)} + */ + private void checkServerJoinAtStart(int numOfFailedRequests, int reconnectDelay) throws Exception { + try ( + Ignite ignite1 = G.start(getConfiguration("server", false, reconnectDelay)) + ) { + final CountDownLatch joinLatch = new CountDownLatch(1); + final AtomicInteger failJoinReqRes = ((FailingTcpDiscoverySpi)ignite1.configuration().getDiscoverySpi()) + .failJoinReqRes; + failJoinReqRes.set(numOfFailedRequests); + + ignite1.events().localListen(new IgnitePredicate<DiscoveryEvent>() { + @Override public boolean apply(DiscoveryEvent evt) { + info("Node1 event: " + evt); + + if (evt.type() == EVT_NODE_JOINED) + joinLatch.countDown(); + + return true; + } + }, EVT_NODE_JOINED); + + final long startTime = System.currentTimeMillis(); + + try (Ignite ignite2 = G.start(getConfiguration("server-2", false, reconnectDelay))) { + assertTrue(joinLatch.await(EVT_TIMEOUT, MILLISECONDS)); + + long endTime = System.currentTimeMillis(); + + // Check topology. + assertEquals(1L, ignite1.cluster().localNode().order()); + assertEquals(2L, ignite2.cluster().localNode().order()); + assertEquals(2L, ignite2.cluster().topologyVersion()); + + // Check connection time. + // Total time should be at least the sum of all delays. + long totalTime = endTime - startTime; + long expTotalTime = numOfFailedRequests * reconnectDelay; + assertTrue(totalTime >= expTotalTime); + + // Check number of messages. + // If exactly numOfFailedRequests fail, counter will be at -1. + // If unexpected additional requests are sent, counter will be <= -2. + int cntr = failJoinReqRes.get(); + int numOfMessages = numOfFailedRequests - cntr; + int expNumOfMessages = numOfFailedRequests + 1; + assertEquals("Unexpected number of messages", expNumOfMessages, numOfMessages); + } + } + } + + //endregion + + //region Helpers + + /** */ + private IgniteConfiguration getConfiguration(String name, boolean isClient, int reconnectDelay) { + IgniteConfiguration cfg = new IgniteConfiguration() + .setIgniteInstanceName(name) + .setDiscoverySpi(new FailingTcpDiscoverySpi() + .setIpFinder(LOCAL_IP_FINDER) + .setReconnectDelay(reconnectDelay) + // Allow reconnection to take long. + .setNetworkTimeout(EVT_TIMEOUT) + // Make sure reconnection attempts are short enough. + // Each reconnection attempt is + // 500ms for write (socketTimeout) + 500ms for read (ackTimeout) + // tried only once. + .setSocketTimeout(SOCK_AND_ACK_TIMEOUT) + .setAckTimeout(SOCK_AND_ACK_TIMEOUT) + .setReconnectCount(1)) + // Make sure that server doesn't kick reconnecting client out. + .setClientFailureDetectionTimeout(EVT_TIMEOUT); + + if (isClient) + cfg.setClientMode(true); + return cfg; + } + + /** Custom Discovery SPI allowing to fail sending of certain messages. */ + private static class FailingTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + private final AtomicInteger failJoinReq = new AtomicInteger(); + + /** */ + private final AtomicInteger failJoinReqRes = new AtomicInteger(); + + /** */ + private final AtomicInteger failReconReq = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + + if (!onMessage(sock, msg)) + return; + + super.writeToSocket(sock, out, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, + long timeout) throws IOException { + + if (msg instanceof TcpDiscoveryJoinRequestMessage && failJoinReqRes.getAndDecrement() > 0) + res = RES_WAIT; + + super.writeToSocket(msg, sock, res, timeout); + } + + /** + * @param sock Socket. + * @param msg Message. + * @return {@code False} if should not further process message. + * @throws IOException If failed. + */ + private boolean onMessage(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException { + boolean fail = false; + + if (msg instanceof TcpDiscoveryJoinRequestMessage) + fail = failJoinReq.getAndDecrement() > 0; + if (msg instanceof TcpDiscoveryClientReconnectMessage) + fail = failReconReq.getAndDecrement() > 0; + + if (fail) { + log.info("Close socket on message write [msg=" + msg + "]"); + + sock.close(); + } + + return true; + } + } + + /** */ + private static class DummyListener implements IgniteBiPredicate<UUID, Object> { + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, Object msg) { + return true; + } + } + + //endregion +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4abcb310/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index 6e51c36..0681d10 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -38,6 +38,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySnapshotHistoryTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiConfigSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiFailureTimeoutSelfTest; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiReconnectDelayTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiStartStopSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslSecuredUnsecuredTest; @@ -99,6 +100,8 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { suite.addTest(new TestSuite(TcpDiscoveryWithWrongServerTest.class)); + suite.addTest(new TestSuite(TcpDiscoverySpiReconnectDelayTest.class)); + // Client connect. suite.addTest(new TestSuite(IgniteClientConnectTest.class)); suite.addTest(new TestSuite(IgniteClientReconnectMassiveShutdownTest.class));