Hi Looks like master has gone ahead. Try this one.
On Tue, Apr 18, 2017 at 4:51 PM, rick_tem <[email protected]> wrote: > Hi, > > I appreciate the last response to this. I've checked out the 1.9 branch > (ie. git clone -b ignite-1.9 --single-branch > https://github.com/apache/ignite.git ignite-1.9)...I'm new to github so > bear > with me. In seeing if this patch will apply, I get the following error. > I'm not exactly sure why. Is there a procedure somewhere of how to apply > patches? As well, any documentation links for beginner contributors would > be appreciated. > > Thanks! > > C:\java\GitHub\ignite1.9\ignite-1.9\modules [ignite-1.9 ≡ +1 ~0 -0 !]> git > apply --check IGNITE_4473___Client_should_re_ > try_connection_attempt_in_case_of_concurrent_network_failur.patch > error: patch failed: > modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ > GridCachePartitionExchangeMa > nager.java:1836 > error: > modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ > GridCachePartitionExchangeManager.java: > pa > tch does not apply > error: patch failed: > modules/core/src/main/java/org/apache/ignite/internal/ > processors/cache/distributed/dht/GridDhtAssig > nmentFetchFuture.java:17 > error: > modules/core/src/main/java/org/apache/ignite/internal/ > processors/cache/distributed/dht/GridDhtAssignmentFetchFutu > re.java: patch does not apply > *error: patch failed: > modules/core/src/main/java/org/apache/ignite/internal/ > processors/cache/distributed/dht/preloader/Gr > idDhtPartitionsExchangeFuture.java:54* > error: > modules/core/src/main/java/org/apache/ignite/internal/ > processors/cache/distributed/dht/preloader/GridDhtPartition > sExchangeFuture.java: patch does not apply > > > > -- > View this message in context: http://apache-ignite-users. > 70518.x6.nabble.com/IgniteCacheProxy-connection- > failure-in-REPLICATAED-mode-tp11769p12030.html > Sent from the Apache Ignite Users mailing list archive at Nabble.com. > -- Best regards, Andrey V. Mashenkov
Index: modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java (revision ) +++ modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java (revision ) @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.jetbrains.annotations.Nullable; + +/** + * Indicates that node should try reconnect to cluster. + */ +public class IgniteNeedReconnectException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param locNode Local node. + * @param cause Cause. + */ + public IgniteNeedReconnectException(ClusterNode locNode, @Nullable Throwable cause) { + super("Local node need try to reconnect [locNodeId=" + locNode.id() + ']', cause); + + assert locNode.isClient(); + } +} Index: modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java (revision 0be92732ec4dcc683b1dbeea6c027c4076579847) +++ modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java (revision ) @@ -1933,6 +1933,15 @@ return ignite().configuration().getSslContextFactory() != null; } + /** + * Force reconnect to cluster. + * + * @throws IgniteSpiException If failed. + */ + public void reconnect() throws IgniteSpiException { + impl.reconnect(); + } + /** * <strong>FOR TEST ONLY!!!</strong> */ Index: modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java (revision 0be92732ec4dcc683b1dbeea6c027c4076579847) +++ modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java (revision ) @@ -29,6 +29,7 @@ import org.apache.ignite.internal.IgniteClientReconnectServicesTest; import org.apache.ignite.internal.IgniteClientReconnectStopTest; import org.apache.ignite.internal.IgniteClientReconnectStreamerTest; +import org.apache.ignite.internal.IgniteClientRejoinTest; /** * @@ -52,6 +53,7 @@ suite.addTestSuite(IgniteClientReconnectServicesTest.class); suite.addTestSuite(IgniteClientReconnectStreamerTest.class); suite.addTestSuite(IgniteClientReconnectFailoverTest.class); + suite.addTestSuite(IgniteClientRejoinTest.class); return suite; } Index: modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java (revision 0be92732ec4dcc683b1dbeea6c027c4076579847) +++ modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java (revision ) @@ -253,6 +253,9 @@ /** Periodic starvation check interval. */ private static final long PERIODIC_STARVATION_CHECK_FREQ = 1000 * 30; + /** Force complete reconnect future. */ + private static final Object STOP_RECONNECT = new Object(); + /** */ @GridToStringExclude private GridKernalContextImpl ctx; @@ -330,6 +333,9 @@ @GridToStringExclude private final AtomicBoolean stopGuard = new AtomicBoolean(); + /** */ + private final ReconnectState reconnectState = new ReconnectState(); + /** * No-arg constructor is required by externalization. */ @@ -945,6 +951,8 @@ // Notify IO manager the second so further components can send and receive messages. ctx.io().onKernalStart(); + boolean recon = false; + // Callbacks. for (GridComponent comp : ctx) { // Skip discovery manager. @@ -955,9 +963,23 @@ if (comp instanceof GridIoManager) continue; - if (!skipDaemon(comp)) - comp.onKernalStart(); + if (!skipDaemon(comp)) { + try { + comp.onKernalStart(); + } + catch (IgniteNeedReconnectException e) { + assert ctx.discovery().reconnectSupported(); + + if (log.isDebugEnabled()) + log.debug("Failed to start node components on node start, will wait for reconnect: " + e); + + recon = true; + } + } } + + if (recon) + reconnectState.waitFirstReconnect(); // Register MBeans. registerKernalMBean(); @@ -3315,6 +3337,8 @@ public void onDisconnected() { Throwable err = null; + reconnectState.waitPreviousReconnect(); + GridFutureAdapter<?> reconnectFut = ctx.gateway().onDisconnected(); if (reconnectFut == null) { @@ -3323,9 +3347,18 @@ return; } - IgniteFuture<?> userFut = new IgniteFutureImpl<>(reconnectFut); + IgniteFutureImpl<?> curFut = (IgniteFutureImpl<?>)ctx.cluster().get().clientReconnectFuture(); + + IgniteFuture<?> userFut; + + // In case of previous reconnect did not finish keep reconnect future. + if (curFut != null && curFut.internalFuture() == reconnectFut) + userFut = curFut; + else { + userFut = new IgniteFutureImpl<>(reconnectFut); - ctx.cluster().get().clientReconnectFuture(userFut); + ctx.cluster().get().clientReconnectFuture(userFut); + } ctx.disconnected(true); @@ -3378,31 +3411,54 @@ try { ctx.disconnected(false); - GridCompoundFuture<?, ?> reconnectFut = new GridCompoundFuture<>(); + GridCompoundFuture curReconnectFut = reconnectState.curReconnectFut = new GridCompoundFuture<>(); + + reconnectState.reconnectDone = new GridFutureAdapter<>(); for (GridComponent comp : ctx.components()) { IgniteInternalFuture<?> fut = comp.onReconnected(clusterRestarted); if (fut != null) - reconnectFut.add((IgniteInternalFuture)fut); + curReconnectFut.add(fut); } - reconnectFut.add((IgniteInternalFuture)ctx.cache().context().exchange().reconnectExchangeFuture()); + curReconnectFut.add(ctx.cache().context().exchange().reconnectExchangeFuture()); - reconnectFut.markInitialized(); + curReconnectFut.markInitialized(); - reconnectFut.listen(new CI1<IgniteInternalFuture<?>>() { + final GridFutureAdapter reconnectDone = reconnectState.reconnectDone; + + curReconnectFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { try { - fut.get(); + Object res = fut.get(); + + if (res == STOP_RECONNECT) + return; ctx.gateway().onReconnected(); + + reconnectState.firstReconnectFut.onDone(); } catch (IgniteCheckedException e) { - U.error(log, "Failed to reconnect, will stop node", e); + if (!X.hasCause(e, IgniteNeedReconnectException.class, + IgniteClientDisconnectedCheckedException.class)) { + U.error(log, "Failed to reconnect, will stop node.", e); + + reconnectState.firstReconnectFut.onDone(e); - close(); + close(); + } + else { + assert ctx.discovery().reconnectSupported(); + + U.error(log, "Failed to finish reconnect, will retry [locNodeId=" + ctx.localNodeId() + + ", err=" + e.getMessage() + ']'); + } } + finally { + reconnectDone.onDone(); + } } }); } @@ -3580,6 +3636,46 @@ return ctx.io().sendIoTest(nodes, payload, procFromNioThread); } + /** + * + */ + private class ReconnectState { + /** */ + private final GridFutureAdapter firstReconnectFut = new GridFutureAdapter(); + + /** */ + private GridCompoundFuture<?, Object> curReconnectFut; + + /** */ + private GridFutureAdapter<?> reconnectDone; + + /** + * @throws IgniteCheckedException If failed. + */ + void waitFirstReconnect() throws IgniteCheckedException { + firstReconnectFut.get(); + } + + /** + * + */ + void waitPreviousReconnect() { + if (curReconnectFut != null && !curReconnectFut.isDone()) { + assert reconnectDone != null; + + curReconnectFut.onDone(STOP_RECONNECT); + + try { + reconnectDone.get(); + } + catch (IgniteCheckedException ignote) { + // No-op. + } + } + + } + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteKernal.class, this); Index: modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java (revision 0be92732ec4dcc683b1dbeea6c027c4076579847) +++ modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java (revision ) @@ -1589,6 +1589,11 @@ throw new UnsupportedOperationException(); } + /** {@inheritDoc} */ + @Override public void reconnect() throws IgniteSpiException { + throw new UnsupportedOperationException("Reconnect is not supported for server."); + } + /** {@inheritDoc} */ @Override protected IgniteSpiThread workerThread() { return msgWorker; Index: modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java (revision 0be92732ec4dcc683b1dbeea6c027c4076579847) +++ modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java (revision ) @@ -700,9 +700,12 @@ try { Ignition.start(optimize(getConfiguration(getTestGridName(SRV_CNT)))); - fail(); + // Commented due to IGNITE-4473, because + // IgniteClientDisconnectedException won't + // be thrown, but client will reconnect. +// fail(); - return false; + return true; } catch (IgniteClientDisconnectedException e) { log.info("Expected start error: " + e); Index: modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java (revision ) +++ modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java (revision ) @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; +import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests client to be able restore connection to cluster if coordination is not available. + */ +public class IgniteClientRejoinTest extends GridCommonAbstractTest { + /** Block. */ + private volatile boolean block; + + /** Block all. */ + private volatile boolean blockAll; + + /** Coordinator. */ + private volatile ClusterNode crd; + + /** Client reconnect disabled. */ + private boolean clientReconnectDisabled; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + System.setProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK", "true"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + System.clearProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK"); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + clientReconnectDisabled = false; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (gridName.contains("client")) { + cfg.setCommunicationSpi(new TcpCommunicationSpi()); + + TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + DiscoverySpi dspi = new DiscoverySpi(); + + dspi.setIpFinder(spi.getIpFinder()); + + cfg.setDiscoverySpi(dspi); + + dspi.setJoinTimeout(60_000); + dspi.setClientReconnectDisabled(clientReconnectDisabled); + + cfg.setClientMode(true); + } + + // TODO: IGNITE-4833 + cfg.setPeerClassLoadingEnabled(false); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testClientsReconnectAfterStart() throws Exception { + Ignite srv1 = startGrid("server1"); + + crd = ((IgniteKernal)srv1).localNode(); + + Ignite srv2 = startGrid("server2"); + + final CountDownLatch latch = new CountDownLatch(1); + + List<Ignite> clientNodes = new ArrayList<>(); + + final int CLIENTS_NUM = 5; + + for (int i = 0; i < CLIENTS_NUM; i++) + clientNodes.add(startGrid("client" + i)); + + blockAll = true; + + GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + U.sleep(5_000); + + block = true; + blockAll = false; + + System.out.println(">>> Allow with blocked coordinator."); + + latch.countDown(); + + return null; + } + }); + + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + latch.await(); + + U.sleep((new Random().nextInt(15) + 30) * 1000); + + block = false; + + System.out.println(">>> Allow coordinator."); + + return null; + } + }); + + fut.get(); + + for (Ignite client : clientNodes) { + while (true) { + try { + IgniteCache<Integer, Integer> cache = client.getOrCreateCache("some"); + + for (int i = 0; i < 100; i++) + cache.put(i, i); + + for (int i = 0; i < 100; i++) + assertEquals((Integer)i, cache.get(i)); + + cache.clear(); + + break; + } + catch (IgniteClientDisconnectedException e) { + e.reconnectFuture().get(); + } + } + } + + assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size()); + assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size()); + } + + /** + * @throws Exception If failed. + */ + public void testClientsReconnect() throws Exception { + Ignite srv1 = startGrid("server1"); + + crd = ((IgniteKernal)srv1).localNode(); + + Ignite srv2 = startGrid("server2"); + + block = true; + + List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>(); + + final CountDownLatch latch = new CountDownLatch(1); + + final int CLIENTS_NUM = 5; + + for (int i = 0; i < CLIENTS_NUM; i++) { + final int idx = i; + + IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() { + @Override public Ignite call() throws Exception { + latch.await(); + + return startGrid("client" + idx); + } + }); + + futs.add(fut); + } + + GridTestUtils.runAsync(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + latch.countDown(); + + Random rnd = new Random(); + + U.sleep((rnd.nextInt(15) + 15) * 1000); + + block = false; + + System.out.println(">>> ALLOW connection to coordinator."); + + return true; + } + }); + + for (IgniteInternalFuture<Ignite> clientFut : futs) { + Ignite client = clientFut.get(); + + IgniteCache<Integer, Integer> cache = client.getOrCreateCache(client.name()); + + for (int i = 0; i < 100; i++) + cache.put(i, i); + + for (int i = 0; i < 100; i++) + assert i == cache.get(i); + } + + assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size()); + assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size()); + } + + /** + * @throws Exception If failed. + */ + public void testClientsReconnectDisabled() throws Exception { + clientReconnectDisabled = true; + + Ignite srv1 = startGrid("server1"); + + crd = ((IgniteKernal)srv1).localNode(); + + Ignite srv2 = startGrid("server2"); + + block = true; + + List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>(); + + final CountDownLatch latch = new CountDownLatch(1); + + final int CLIENTS_NUM = 5; + + for (int i = 0; i < CLIENTS_NUM; i++) { + final int idx = i; + + IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() { + @Override public Ignite call() throws Exception { + latch.await(); + + return startGrid("client" + idx); + } + }); + + futs.add(fut); + } + + latch.countDown(); + + for (final IgniteInternalFuture<Ignite> clientFut : futs) { + //noinspection ThrowableNotThrown + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + clientFut.get(); + + return null; + } + }, IgniteCheckedException.class, null); + } + + assertEquals(0, srv1.cluster().forClients().nodes().size()); + assertEquals(0, srv2.cluster().forClients().nodes().size()); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 3 * 60_000; + } + + /** + * + */ + private class TcpCommunicationSpi extends org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + if (blockAll || block && node.id().equals(crd.id())) + throw new IgniteSpiException(new SocketException("Test communication exception")); + + super.sendMessage(node, msg); + } + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, + IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { + if (blockAll || block && node.id().equals(crd.id())) + throw new IgniteSpiException(new SocketException("Test communication exception")); + + super.sendMessage(node, msg, ackC); + } + } + + /** + * + */ + private class DiscoverySpi extends TcpDiscoverySpi { + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, + long timeout) throws IOException { + if (blockAll || block && sock.getPort() == 47500) + throw new SocketException("Test discovery exception"); + + super.writeToSocket(sock, msg, data, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + if (blockAll || block && sock.getPort() == 47500) + throw new SocketException("Test discovery exception"); + + super.writeToSocket(sock, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + if (blockAll || block && sock.getPort() == 47500) + throw new SocketException("Test discovery exception"); + + super.writeToSocket(sock, out, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, + long timeout) throws IOException { + if (blockAll || block && sock.getPort() == 47500) + throw new SocketException("Test discovery exception"); + + super.writeToSocket(msg, sock, res, timeout); + } + + /** {@inheritDoc} */ + @Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr, + IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException { + if (blockAll || block && sock.getPort() == 47500) + throw new SocketException("Test discovery exception"); + + return super.openSocket(sock, remAddr, timeoutHelper); + } + } +} Index: modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java (revision 0be92732ec4dcc683b1dbeea6c027c4076579847) +++ modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java (revision ) @@ -127,6 +127,9 @@ /** */ private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED"; + /** */ + private static final Object SPI_RECONNECT = "SPI_RECONNECT"; + /** Remote nodes. */ private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>(); @@ -788,6 +791,11 @@ log); } + /** {@inheritDoc} */ + @Override public void reconnect() throws IgniteSpiException { + msgWorker.addMessage(SPI_RECONNECT); + } + /** {@inheritDoc} */ @Override public void brakeConnection() { SocketStream sockStream = msgWorker.currSock; @@ -859,9 +867,12 @@ /** */ private UUID rmtNodeId; + /** */ + private CountDownLatch stopReadLatch; + /** */ - protected SocketReader() { + SocketReader() { super(spi.ignite().name(), "tcp-client-disco-sock-reader", log); } @@ -869,7 +880,7 @@ * @param sockStream Socket. * @param rmtNodeId Rmt node id. */ - public void setSocket(SocketStream sockStream, UUID rmtNodeId) { + void setSocket(SocketStream sockStream, UUID rmtNodeId) { synchronized (mux) { this.sockStream = sockStream; @@ -879,6 +890,31 @@ } } + /** + * @throws InterruptedException If interrupted. + */ + private void forceStopRead() throws InterruptedException { + CountDownLatch stopReadLatch; + + synchronized (mux) { + SocketStream stream = sockStream; + + if (stream == null) + return; + + this.stopReadLatch = stopReadLatch = new CountDownLatch(1); + + U.closeQuiet(stream.socket()); + + this.sockStream = null; + this.rmtNodeId = null; + + mux.notifyAll(); + } + + stopReadLatch.await(); + } + /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { while (!isInterrupted()) { @@ -886,6 +922,12 @@ UUID rmtNodeId; synchronized (mux) { + if (stopReadLatch != null) { + stopReadLatch.countDown(); + + stopReadLatch = null; + } + if (this.sockStream == null) { mux.wait(); @@ -987,18 +1029,21 @@ private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>(); /** */ - private final long socketTimeout; + private final long sockTimeout; /** */ private TcpDiscoveryAbstractMessage unackedMsg; + /** */ + private CountDownLatch forceLeaveLatch; + /** * */ - protected SocketWriter() { + SocketWriter() { super(spi.ignite().name(), "tcp-client-disco-sock-writer", log); - socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : + sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout(); } @@ -1013,6 +1058,29 @@ } } + /** + * Sends {@link TcpDiscoveryNodeLeftMessage} and closes socket. + * + * @throws InterruptedException If interrupted. + */ + private void forceLeave() throws InterruptedException { + CountDownLatch forceLeaveLatch; + + synchronized (mux) { + // If writer was stopped. + if (sock == null) + return; + + this.forceLeaveLatch = forceLeaveLatch = new CountDownLatch(1); + + unackedMsg = null; + + mux.notifyAll(); + } + + forceLeaveLatch.await(); + } + /** * @param sock Socket. * @param clientAck {@code True} is server supports client message acknowlede. @@ -1069,12 +1137,40 @@ continue; } - msg = queue.poll(); + if (forceLeaveLatch != null) { + msg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); + + msg.client(true); + + try { + spi.writeToSocket( + sock, + msg, + sockTimeout); + } + catch (IOException | IgniteCheckedException e) { + if (log.isDebugEnabled()) { + log.debug("Failed to send TcpDiscoveryNodeLeftMessage on force leave [msg=" + msg + + ", err=" + e.getMessage() + ']'); + } + } + + U.closeQuiet(sock); + + this.sock = null; + + clear(); + + continue; + } + else { + msg = queue.poll(); - if (msg == null) { - mux.wait(); + if (msg == null) { + mux.wait(); - continue; + continue; + } } } @@ -1095,7 +1191,7 @@ spi.writeToSocket( sock, msg, - socketTimeout); + sockTimeout); msg = null; @@ -1145,10 +1241,30 @@ synchronized (mux) { if (sock == this.sock) this.sock = null; // Connection has dead. + + clear(); } } } } + + /** + * + */ + private void clear() { + assert Thread.holdsLock(mux); + + queue.clear(); + unackedMsg = null; + + CountDownLatch forceLeaveLatch = this.forceLeaveLatch; + + if (forceLeaveLatch != null) { + this.forceLeaveLatch = null; + + forceLeaveLatch.countDown(); + } + } } /** @@ -1393,6 +1509,38 @@ else leaveLatch.countDown(); } + else if (msg == SPI_RECONNECT) { + if (state == CONNECTED) { + if (reconnector != null) { + reconnector.cancel(); + reconnector.join(); + + reconnector = null; + } + + sockWriter.forceLeave(); + sockReader.forceStopRead(); + + currSock = null; + + queue.clear(); + + onDisconnected(); + + notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes()); + + UUID newId = UUID.randomUUID(); + + U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " + + "to network problems [newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode+ ']'); + + locNode.onClientDisconnected(newId); + + tryJoin(); + } + } else if (msg instanceof TcpDiscoveryNodeFailedMessage && ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) { TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg; @@ -1475,20 +1623,7 @@ ", failMsg=" + forceFailMsg + ']'); } - state = DISCONNECTED; - - nodeAdded = false; - - IgniteClientDisconnectedCheckedException err = - new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " + - "client node disconnected."); - - for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) { - GridFutureAdapter<Boolean> fut = e.getValue(); - - if (pingFuts.remove(e.getKey(), fut)) - fut.onDone(err); - } + onDisconnected(); notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes()); } @@ -1583,6 +1718,26 @@ } } + /** + * + */ + private void onDisconnected() { + state = DISCONNECTED; + + nodeAdded = false; + + IgniteClientDisconnectedCheckedException err = + new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " + + "client node disconnected."); + + for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) { + GridFutureAdapter<Boolean> fut = e.getValue(); + + if (pingFuts.remove(e.getKey(), fut)) + fut.onDone(err); + } + } + /** * @throws InterruptedException If interrupted. */ Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java (revision 0be92732ec4dcc683b1dbeea6c027c4076579847) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java (revision ) @@ -43,6 +43,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; @@ -448,6 +449,15 @@ else U.warn(log, "Still waiting for initial partition map exchange [fut=" + fut + ']'); } + catch (IgniteNeedReconnectException e) { + throw e; + } + catch (Exception e) { + if (fut.reconnectOnError(e)) + throw new IgniteNeedReconnectException(cctx.localNode(), e); + + throw e; + } } for (GridCacheContext cacheCtx : cctx.cacheContexts()) { @@ -1701,6 +1711,12 @@ dumpedObjects++; } } + catch (Exception e) { + if (exchFut.reconnectOnError(e)) + throw new IgniteNeedReconnectException(cctx.localNode(), e); + + throw e; + } } @@ -1841,7 +1857,14 @@ catch (IgniteInterruptedCheckedException e) { throw e; } - catch (IgniteClientDisconnectedCheckedException ignored) { + catch (IgniteClientDisconnectedCheckedException | IgniteNeedReconnectException e) { + assert cctx.discovery().reconnectSupported(); + + U.warn(log,"Local node failed to complete partition map exchange due to " + + "network issues, will try to reconnect to cluster", e); + + cctx.discovery().reconnect(); + return; } catch (IgniteCheckedException e) { Index: modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java (revision 0be92732ec4dcc683b1dbeea6c027c4076579847) +++ modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java (revision ) @@ -44,7 +44,7 @@ /** */ @GridToStringExclude - private IgniteFutureImpl<?> reconnectFut; + private volatile IgniteFutureImpl<?> reconnectFut; /** */ private final AtomicReference<GridKernalState> state = new AtomicReference<>(GridKernalState.STOPPED); @@ -149,6 +149,12 @@ /** {@inheritDoc} */ @Override public GridFutureAdapter<?> onDisconnected() { + if (state.get() == GridKernalState.DISCONNECTED) { + assert reconnectFut != null; + + return (GridFutureAdapter<?>)reconnectFut.internalFuture(); + } + GridFutureAdapter<?> fut = new GridFutureAdapter<>(); reconnectFut = new IgniteFutureImpl<>(fut); Index: modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java (revision 0be92732ec4dcc683b1dbeea6c027c4076579847) +++ modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java (revision ) @@ -29,6 +29,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; @@ -258,6 +259,13 @@ return t.isAlive() ? "alive" : "dead"; } + /** + * Leave cluster and try to join again. + * + * @throws IgniteSpiException If failed. + */ + public abstract void reconnect() throws IgniteSpiException; + /** * <strong>FOR TEST ONLY!!!</strong> * <p> Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java (revision 0be92732ec4dcc683b1dbeea6c027c4076579847) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java (revision ) @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.LinkedList; @@ -25,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridNodeOrderComparator; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -33,6 +35,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -201,8 +204,14 @@ "continue to another node): " + node); } catch (IgniteCheckedException e) { - U.error(log0, "Failed to request affinity assignment from remote node (will " + - "continue to another node): " + node, e); + if (ctx.discovery().reconnectSupported() && X.hasCause(e, IOException.class)) { + onDone(new IgniteNeedReconnectException(ctx.localNode(), e)); + + return; + } + + U.warn(log0, "Failed to request affinity assignment from remote node (will " + + "continue to another node): " + node); } } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java (revision 0be92732ec4dcc683b1dbeea6c027c4076579847) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java (revision ) @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; @@ -39,6 +41,7 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -64,6 +67,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -516,10 +520,17 @@ throw e; } + catch (IgniteNeedReconnectException e) { + onDone(e); + } catch (Throwable e) { - U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e); + if (reconnectOnError(e)) + onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); + else { + U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e); - onDone(e); + onDone(e); + } if (e instanceof Error) throw (Error)e; @@ -1306,7 +1317,10 @@ } } catch (IgniteCheckedException e) { - onDone(e); + if (reconnectOnError(e)) + onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); + else + onDone(e); } } @@ -1323,12 +1337,19 @@ } catch (IgniteCheckedException e) { if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) { - log.debug("Failed to send full partition map to node, node left grid " + - "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']'); + if (log.isDebugEnabled()) + log.debug("Failed to send full partition map to node, node left grid " + + "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']'); return; } + if (reconnectOnError(e)) { + onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); + + return; + } + if (retryCnt > 0) { long timeout = cctx.gridConfig().getNetworkSendRetryDelay(); @@ -1652,6 +1673,12 @@ } } } + catch (Exception e) { + if (reconnectOnError(e)) + onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); + else + throw e; + } finally { leaveBusy(); } @@ -1663,6 +1690,15 @@ } } + /** + * @param e Exception. + * @return {@code True} if local node should try reconnect in case of error. + */ + public boolean reconnectOnError(Throwable e) { + return X.hasCause(e, IOException.class, IgniteClientDisconnectedCheckedException.class) && + cctx.discovery().reconnectSupported(); + } + /** {@inheritDoc} */ @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) { return exchId.compareTo(fut.exchId); Index: modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java (revision 0be92732ec4dcc683b1dbeea6c027c4076579847) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java (revision ) @@ -1508,60 +1508,60 @@ } } - /** - * Deployment callback. - * - * @param dep Service deployment. - * @param topVer Topology version. - */ - private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) { - // Retry forever. - try { - AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx(); + /** + * Deployment callback. + * + * @param dep Service deployment. + * @param topVer Topology version. + */ + private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) { + // Retry forever. + try { + AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx(); - // If topology version changed, reassignment will happen from topology event. - if (newTopVer.equals(topVer)) - reassign(dep, topVer); - } - catch (IgniteCheckedException e) { - if (!(e instanceof ClusterTopologyCheckedException)) - log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e); + // If topology version changed, reassignment will happen from topology event. + if (newTopVer.equals(topVer)) + reassign(dep, topVer); + } + catch (IgniteCheckedException e) { + if (!(e instanceof ClusterTopologyCheckedException)) + log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e); - AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx(); + AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx(); - if (!newTopVer.equals(topVer)) { - assert newTopVer.compareTo(topVer) > 0; + if (!newTopVer.equals(topVer)) { + assert newTopVer.compareTo(topVer) > 0; - // Reassignment will happen from topology event. - return; - } + // Reassignment will happen from topology event. + return; + } - ctx.timeout().addTimeoutObject(new GridTimeoutObject() { - private IgniteUuid id = IgniteUuid.randomUuid(); + ctx.timeout().addTimeoutObject(new GridTimeoutObject() { + private IgniteUuid id = IgniteUuid.randomUuid(); - private long start = System.currentTimeMillis(); + private long start = System.currentTimeMillis(); - @Override public IgniteUuid timeoutId() { - return id; - } + @Override public IgniteUuid timeoutId() { + return id; + } - @Override public long endTime() { - return start + RETRY_TIMEOUT; - } + @Override public long endTime() { + return start + RETRY_TIMEOUT; + } - @Override public void onTimeout() { - if (!busyLock.enterBusy()) - return; + @Override public void onTimeout() { + if (!busyLock.enterBusy()) + return; - try { - // Try again. - onDeployment(dep, topVer); - } - finally { - busyLock.leaveBusy(); - } - } - }); + try { + // Try again. + onDeployment(dep, topVer); + } + finally { + busyLock.leaveBusy(); + } + } + }); } } Index: modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java (revision 0be92732ec4dcc683b1dbeea6c027c4076579847) +++ modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java (revision ) @@ -112,6 +112,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -1890,6 +1891,29 @@ } } + /** + * @return {@code True} if local node client and discovery SPI supports reconnect. + */ + public boolean reconnectSupported() { + DiscoverySpi spi = getSpi(); + + return ctx.clientNode() && (spi instanceof TcpDiscoverySpi) && + !(((TcpDiscoverySpi) spi).isClientReconnectDisabled()); + } + + /** + * Leave cluster and try to join again. + * + * @throws IgniteSpiException If failed. + */ + public void reconnect() { + assert reconnectSupported(); + + DiscoverySpi discoverySpi = getSpi(); + + ((TcpDiscoverySpi)discoverySpi).reconnect(); + } + /** * Updates topology version if current version is smaller than updated. * Index: modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java (revision 0be92732ec4dcc683b1dbeea6c027c4076579847) +++ modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java (revision ) @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.IgniteState; @@ -43,6 +44,7 @@ import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -1788,8 +1790,7 @@ clientNodeIds.add(client.cluster().localNode().id()); GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { + @Override public boolean apply() { return srv.cluster().nodes().size() == 2; } }, awaitTime()); @@ -1799,6 +1800,49 @@ assertFalse(err.get()); } + /** + * @throws Exception If failed. + */ + public void testForceClientReconnect() throws Exception { + startServerNodes(1); + + startClientNodes(1); + + Ignite srv = G.ignite("server-0"); + IgniteKernal client = (IgniteKernal)G.ignite("client-0"); + + UUID clientId = F.first(clientNodeIds); + + final CountDownLatch latch = new CountDownLatch(1); + + srv.events().enableLocal(EVT_NODE_JOINED); + + srv.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + latch.countDown(); + + return false; + } + }, EVT_NODE_JOINED); + + client.context().discovery().reconnect(); + + assert latch.await(10, TimeUnit.SECONDS); + + while (true) { + try { + UUID newId = client.localNode().id(); + + assert !clientId.equals(newId) : clientId; + + break; + } + catch (IgniteClientDisconnectedException e) { + e.reconnectFuture().get(10_000); + } + } + } + /** * @param ignite Ignite. * @throws Exception If failed.
