This is an automated email from the ASF dual-hosted git repository. agoncharuk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 657b5d9 IGNITE-11255 Fixed failing tests caused by IGNITE-7648 - Fixes #6062. 657b5d9 is described below commit 657b5d9a68f049dbb64301b1aea730de0ae9d324 Author: Pavel Voronkin <pvoron...@gridgain.com> AuthorDate: Fri Feb 22 16:14:43 2019 +0300 IGNITE-11255 Fixed failing tests caused by IGNITE-7648 - Fixes #6062. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> --- .../spi/communication/tcp/TcpCommunicationSpi.java | 22 ++- .../Query/CacheQueriesWithRestartServerTest.cs | 2 +- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 3 + .../zk/ZookeeperDiscoverySpiTestSuite1.java | 2 + .../ZookeeperDiscoveryClientDisconnectTest.java | 158 +---------------- .../ZookeeperDiscoveryClientReconnectTest.java | 192 +++++++++++++++++++++ 6 files changed, 212 insertions(+), 167 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 0f0d0d4..571f0fd 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -3348,6 +3348,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati break; } + long timeout = 0; + try { if (getSpiContext().node(node.id()) == null) throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node); @@ -3393,9 +3395,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati GridSslMeta sslMeta = null; try { - long currTimeout = connTimeoutStgy.nextTimeout(); + timeout = connTimeoutStgy.nextTimeout(); - ch.socket().connect(addr, (int) currTimeout); + ch.socket().connect(addr, (int) timeout); if (getSpiContext().node(node.id()) == null) throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node); @@ -3416,9 +3418,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati throw new IgniteCheckedException("Local node has not been started or " + "fully initialized [isStopping=" + getSpiContext().isStopping() + ']'); + timeout = connTimeoutStgy.nextTimeout(timeout); + rcvCnt = safeTcpHandshake(ch, node.id(), - connTimeoutStgy.nextTimeout(currTimeout), + timeout, sslMeta, new HandshakeMessage2(locNode.id(), recoveryDesc.incrementConnectCount(), @@ -3439,8 +3443,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati "[node=" + node.id() + ", connTimeoutStgy=" + connTimeoutStgy + ", addr=" + addr + - ", failureDetectionTimeoutEnabled" + failureDetectionTimeoutEnabled() + - ", totalTimeout" + totalTimeout + ']'); + ", failureDetectionTimeoutEnabled=" + failureDetectionTimeoutEnabled() + + ", timeout=" + timeout + ']'); throw new ClusterTopologyCheckedException("Failed to connect to node " + "(current or target node is out of topology on target node within timeout). " + @@ -3497,8 +3501,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati U.warn(log, "Handshake timed out (will stop attempts to perform the handshake) " + "[node=" + node.id() + ", connTimeoutStrategy=" + connTimeoutStgy + ", err=" + e.getMessage() + ", addr=" + addr + - ", failureDetectionTimeoutEnabled" + failureDetectionTimeoutEnabled() + - ", totalTimeout" + totalTimeout + ']'); + ", failureDetectionTimeoutEnabled=" + failureDetectionTimeoutEnabled() + + ", timeout=" + timeout + ']'); String msg = "Failed to connect to node (is node still alive?). " + "Make sure that each ComputeTask and cache Transaction has a timeout set " + @@ -3533,8 +3537,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (connTimeoutStgy.checkTimeout()) { U.warn(log, "Connection timed out (will stop attempts to perform the connect) " + "[node=" + node.id() + ", connTimeoutStgy=" + connTimeoutStgy + - ", failureDetectionTimeoutEnabled" + failureDetectionTimeoutEnabled() + - ", totalTimeout" + totalTimeout + + ", failureDetectionTimeoutEnabled=" + failureDetectionTimeoutEnabled() + + ", timeout=" + timeout + ", err=" + e.getMessage() + ", addr=" + addr + ']'); String msg = "Failed to connect to node (is node still alive?). " + diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesWithRestartServerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesWithRestartServerTest.cs index d54204a..b3a910d 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesWithRestartServerTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesWithRestartServerTest.cs @@ -67,7 +67,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Query Ignition.Stop(_server.Name, false); _server = StartGrid(0); - WaitForReconnect(_client, 5000); + WaitForReconnect(_client, 10000); cache = _client.GetOrCreateCache<int, Item>("Test"); cache.Put(1, new Item { Id = 30, Title = "test" }); diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index b0b8888..6326770 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -339,6 +339,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements IgniteDis /** {@inheritDoc} */ @Override public boolean allNodesSupport(IgniteFeatures feature) { + if (impl == null) + return false; + return impl.allNodesSupport(feature); } diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite1.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite1.java index d1218a6..dd64e4e 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite1.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestSuite1.java @@ -19,6 +19,7 @@ package org.apache.ignite.spi.discovery.zk; import org.apache.curator.test.ByteCodeRewrite; import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClientTest; +import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryClientReconnectTest; import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoverySpiSaslFailedAuthTest; import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoverySpiSaslSuccessfulAuthTest; import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryMiscTest; @@ -46,6 +47,7 @@ import org.junit.runners.Suite; ZookeeperDiscoveryTopologyChangeAndReconnectTest.class, ZookeeperDiscoveryCommunicationFailureTest.class, ZookeeperDiscoveryClientDisconnectTest.class, + ZookeeperDiscoveryClientReconnectTest.class, ZookeeperDiscoverySplitBrainTest.class, ZookeeperDiscoveryCustomEventsTest.class, ZookeeperDiscoveryMiscTest.class, diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryClientDisconnectTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryClientDisconnectTest.java index 7578fa4..07305fc 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryClientDisconnectTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryClientDisconnectTest.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import javax.management.JMX; import javax.management.MBeanServer; @@ -38,7 +37,6 @@ import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; @@ -50,7 +48,6 @@ import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpiMBean; import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpiTestUtil; import org.apache.ignite.testframework.GridTestUtils; import org.apache.zookeeper.ZkTestClientCnxnSocketNIO; -import org.junit.Ignore; import org.junit.Test; import static java.util.concurrent.TimeUnit.SECONDS; @@ -67,6 +64,7 @@ public class ZookeeperDiscoveryClientDisconnectTest extends ZookeeperDiscoverySp // we reduce fealure detection tp speedup failure detection on catch(Exception) clause in createTcpClient(). cfg.setFailureDetectionTimeout(2000); + cfg.setClientFailureDetectionTimeout(2000); return cfg; } @@ -402,106 +400,6 @@ public class ZookeeperDiscoveryClientDisconnectTest extends ZookeeperDiscoverySp /** * @throws Exception If failed. */ - @Ignore("https://issues.apache.org/jira/browse/IGNITE-8178") - @Test - public void testReconnectServersRestart_1() throws Exception { - reconnectServersRestart(1); - } - - /** - * @throws Exception If failed. - */ - @Ignore("https://issues.apache.org/jira/browse/IGNITE-8178") - @Test - public void testReconnectServersRestart_2() throws Exception { - reconnectServersRestart(3); - } - - /** - * @throws Exception If failed. - */ - @Test - public void testReconnectServersRestart_3() throws Exception { - startGrid(0); - - helper.clientMode(true); - - startGridsMultiThreaded(10, 10); - - stopGrid(getTestIgniteInstanceName(0), true, false); - - final int srvIdx = ThreadLocalRandom.current().nextInt(10); - - final AtomicInteger idx = new AtomicInteger(); - - info("Restart nodes."); - - // Test concurrent start when there are disconnected nodes from previous cluster. - GridTestUtils.runMultiThreaded(new Callable<Void>() { - @Override public Void call() throws Exception { - int threadIdx = idx.getAndIncrement(); - - helper.clientModeThreadLocal(threadIdx == srvIdx || ThreadLocalRandom.current().nextBoolean()); - - startGrid(threadIdx); - - return null; - } - }, 10, "start-node"); - - waitForTopology(20); - - evts.clear(); - } - - /** - * Checks that a client will reconnect after previous cluster data was cleaned. - * - * @throws Exception If failed. - */ - @Test - public void testReconnectServersRestart_4() throws Exception { - startGrid(0); - - helper.clientMode(true); - - IgniteEx client = startGrid(1); - - helper.clientMode(false); - - CountDownLatch latch = new CountDownLatch(1); - - client.events().localListen(event -> { - latch.countDown(); - - return true; - }, EVT_CLIENT_NODE_DISCONNECTED); - - waitForTopology(2); - - stopGrid(0); - - evts.clear(); - - // Waiting for client starts to reconnect and create join request. - assertTrue("Failed to wait for client node disconnected.", latch.await(10, SECONDS)); - - // Restart cluster twice for incrementing internal order. (alive zk-nodes having lower order and containing - // client join request will be removed). See {@link ZookeeperDiscoveryImpl#cleanupPreviousClusterData}. - startGrid(0); - - stopGrid(0); - - evts.clear(); - - startGrid(0); - - waitForTopology(2); - } - - /** - * @throws Exception If failed. - */ @Test public void testStartNoZk() throws Exception { stopZkCluster(); @@ -535,58 +433,4 @@ public class ZookeeperDiscoveryClientDisconnectTest extends ZookeeperDiscoverySp zkCluster.start(); } } - - /** - * @param srvs Number of server nodes in test. - * @throws Exception If failed. - */ - private void reconnectServersRestart(int srvs) throws Exception { - startGridsMultiThreaded(srvs); - - helper.clientMode(true); - - final int CLIENTS = 10; - - startGridsMultiThreaded(srvs, CLIENTS); - - helper.clientMode(false); - - long stopTime = System.currentTimeMillis() + 30_000; - - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - final int NODES = srvs + CLIENTS; - - int iter = 0; - - while (System.currentTimeMillis() < stopTime) { - int restarts = rnd.nextInt(10) + 1; - - info("Test iteration [iter=" + iter++ + ", restarts=" + restarts + ']'); - - for (int i = 0; i < restarts; i++) { - GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { - @Override public void apply(Integer threadIdx) { - stopGrid(getTestIgniteInstanceName(threadIdx), true, false); - } - }, srvs, "stop-server"); - - startGridsMultiThreaded(0, srvs); - } - - final Ignite srv = ignite(0); - - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return srv.cluster().nodes().size() == NODES; - } - }, 30_000)); - - waitForTopology(NODES); - - awaitPartitionMapExchange(); - } - - evts.clear(); - } } diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryClientReconnectTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryClientReconnectTest.java new file mode 100644 index 0000000..019511b --- /dev/null +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryClientReconnectTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; + +/** + * Tests for Zookeeper SPI discovery client reconnect. + */ +public class ZookeeperDiscoveryClientReconnectTest extends ZookeeperDiscoverySpiTestBase { + /** + * @throws Exception If failed. + */ + @Ignore("https://issues.apache.org/jira/browse/IGNITE-8178") + @Test + public void testReconnectServersRestart_1() throws Exception { + reconnectServersRestart(1); + } + + /** + * @throws Exception If failed. + */ + @Ignore("https://issues.apache.org/jira/browse/IGNITE-8178") + @Test + public void testReconnectServersRestart_2() throws Exception { + reconnectServersRestart(3); + } + /** + * @throws Exception If failed. + */ + @Test + public void testReconnectServersRestart_3() throws Exception { + startGrid(0); + + helper.clientMode(true); + + startGridsMultiThreaded(5, 5); + + stopGrid(getTestIgniteInstanceName(0), true, false); + + final int srvIdx = ThreadLocalRandom.current().nextInt(5); + + final AtomicInteger idx = new AtomicInteger(); + + info("Restart nodes."); + + // Test concurrent start when there are disconnected nodes from previous cluster. + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + int threadIdx = idx.getAndIncrement(); + + helper.clientModeThreadLocal(threadIdx == srvIdx || ThreadLocalRandom.current().nextBoolean()); + + startGrid(threadIdx); + + return null; + } + }, 5, "start-node"); + + waitForTopology(10); + + evts.clear(); + } + + /** + * Checks that a client will reconnect after previous cluster data was cleaned. + * + * @throws Exception If failed. + */ + @Test + public void testReconnectServersRestart_4() throws Exception { + startGrid(0); + + helper.clientMode(true); + + IgniteEx client = startGrid(1); + + helper.clientMode(false); + + CountDownLatch latch = new CountDownLatch(1); + + client.events().localListen(event -> { + latch.countDown(); + + return true; + }, EVT_CLIENT_NODE_DISCONNECTED); + + waitForTopology(2); + + stopGrid(0); + + evts.clear(); + + // Waiting for client starts to reconnect and create join request. + assertTrue("Failed to wait for client node disconnected.", latch.await(15, SECONDS)); + + // Restart cluster twice for incrementing internal order. (alive zk-nodes having lower order and containing + // client join request will be removed). See {@link ZookeeperDiscoveryImpl#cleanupPreviousClusterData}. + startGrid(0); + + stopGrid(0); + + evts.clear(); + + startGrid(0); + + waitForTopology(2); + } + + /** + * @param srvs Number of server nodes in test. + * @throws Exception If failed. + */ + private void reconnectServersRestart(int srvs) throws Exception { + startGridsMultiThreaded(srvs); + + helper.clientMode(true); + + final int CLIENTS = 10; + + startGridsMultiThreaded(srvs, CLIENTS); + + helper.clientMode(false); + + long stopTime = System.currentTimeMillis() + 30_000; + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + final int NODES = srvs + CLIENTS; + + int iter = 0; + + while (System.currentTimeMillis() < stopTime) { + int restarts = rnd.nextInt(10) + 1; + + info("Test iteration [iter=" + iter++ + ", restarts=" + restarts + ']'); + + for (int i = 0; i < restarts; i++) { + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer threadIdx) { + stopGrid(getTestIgniteInstanceName(threadIdx), true, false); + } + }, srvs, "stop-server"); + + startGridsMultiThreaded(0, srvs); + } + + final Ignite srv = ignite(0); + + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return srv.cluster().nodes().size() == NODES; + } + }, 30_000)); + + waitForTopology(NODES); + + awaitPartitionMapExchange(); + } + + evts.clear(); + } +}