Hi Rick,
Looks like it is fixed [1] in master and will be available in 2.0 version.
You can try to apply a patch to 1.9 version. PFA patch.
[1] https://issues.apache.org/jira/browse/IGNITE-4473
On Thu, Apr 6, 2017 at 12:05 PM, rick_tem <[email protected]> wrote:
> Hi,
>
> I have a cache configuration that looks like this:
>
> <bean class="org.apache.ignite.
> configuration.CacheConfiguration">
> <property name="name"
> value="MCache" />
> <property name="cacheMode"
> value="REPLICATED" />
> <property name="memoryMode"
> value="ONHEAP_TIERED" />
>
> <property name="offHeapMaxMemory"
> value="#{10 * 1024L * 1024L * 1024L}"
> />
> <property name="evictionPolicy">
>
> <bean
> class="org.apache.ignite.cache.eviction.lru.LruEvictionPolicy">
>
> <property
> name="maxSize" value="1000000" />
> </bean>
> </property>
> <property name="atomicityMode"
> value="TRANSACTIONAL" />
> <property name="backups" value="1"
> />
> <property
> name="writeSynchronizationMode"
> value="FULL_SYNC" />
> <property name="statisticsEnabled"
> value="true" />
> </bean>
>
> I've started two Ignite servers and within JBOSS application server set
> Ignition.setClientMode(true); such that I have 2 servers and 1 client
> (JBOSS) shown in my topology. When I then terminate one of the Ignite
> servers, I receive the following exception below when my JBOSS app is
> manipulating the cache. If I am in REPLICATED mode, why doesn't the Ignite
> proxy try the other server automatically on connection failure? Is there a
> mechanism I should use to protect against connection failure, or how should
> I ensure (in this case a remove operation on the cache) my actions complete
> successfully?
>
> Thanks!
> Rick
>
> Caused by: org.apache.ignite.IgniteClientDisconnectedException: Client
> node
> disconnected: TGrid
> ... 88 more
> [ERROR] 2017-04-05 14:11:35,675 [tSA 4_1037028145] LOCKING - IgniteCache
> Error
> javax.cache.CacheException: class
> org.apache.ignite.IgniteClientDisconnectedException: Client node
> disconnected: TGrid
> at
> org.apache.ignite.internal.processors.cache.GridCacheGateway.checkState(
> GridCacheGateway.java:92)
> ~[ignite-core-1.9.0.jar!/:1.9.0]
> at
> org.apache.ignite.internal.processors.cache.GridCacheGateway.enter(
> GridCacheGateway.java:173)
> ~[ignite-core-1.9.0.jar!/:1.9.0]
> at
> org.apache.ignite.internal.processors.cache.IgniteCacheProxy.onEnter(
> IgniteCacheProxy.java:2264)
> ~[ignite-core-1.9.0.jar!/:1.9.0]
> at
> org.apache.ignite.internal.processors.cache.IgniteCacheProxy.remove(
> IgniteCacheProxy.java:1463)
> ~[ignite-core-1.9.0.jar!/:1.9.0]
>
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/IgniteCacheProxy-connection-
> failure-in-REPLICATAED-mode-tp11769.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>
--
Best regards,
Andrey V. Mashenkov
Index: modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java (revision 59ea1c2fa87ee6a9e09f9bf7afc2b1ec8ff205ac)
+++ modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -250,6 +250,9 @@
/** Periodic starvation check interval. */
private static final long PERIODIC_STARVATION_CHECK_FREQ = 1000 * 30;
+ /** Force complete reconnect future. */
+ private static final Object STOP_RECONNECT = new Object();
+
/** */
@GridToStringExclude
private GridKernalContextImpl ctx;
@@ -327,6 +330,9 @@
@GridToStringExclude
private final AtomicBoolean stopGuard = new AtomicBoolean();
+ /** */
+ private final ReconnectState reconnectState = new ReconnectState();
+
/**
* No-arg constructor is required by externalization.
*/
@@ -936,6 +942,8 @@
// Notify IO manager the second so further components can send and receive messages.
ctx.io().onKernalStart();
+ boolean recon = false;
+
// Callbacks.
for (GridComponent comp : ctx) {
// Skip discovery manager.
@@ -946,9 +954,23 @@
if (comp instanceof GridIoManager)
continue;
- if (!skipDaemon(comp))
- comp.onKernalStart();
+ if (!skipDaemon(comp)) {
+ try {
+ comp.onKernalStart();
+ }
+ catch (IgniteNeedReconnectException e) {
+ assert ctx.discovery().reconnectSupported();
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to start node components on node start, will wait for reconnect: " + e);
+
+ recon = true;
+ }
+ }
}
+
+ if (recon)
+ reconnectState.waitFirstReconnect();
// Register MBeans.
registerKernalMBean();
@@ -3309,6 +3331,8 @@
public void onDisconnected() {
Throwable err = null;
+ reconnectState.waitPreviousReconnect();
+
GridFutureAdapter<?> reconnectFut = ctx.gateway().onDisconnected();
if (reconnectFut == null) {
@@ -3317,9 +3341,18 @@
return;
}
- IgniteFuture<?> userFut = new IgniteFutureImpl<>(reconnectFut);
+ IgniteFutureImpl<?> curFut = (IgniteFutureImpl<?>)ctx.cluster().get().clientReconnectFuture();
+
+ IgniteFuture<?> userFut;
+
+ // In case of previous reconnect did not finish keep reconnect future.
+ if (curFut != null && curFut.internalFuture() == reconnectFut)
+ userFut = curFut;
+ else {
+ userFut = new IgniteFutureImpl<>(reconnectFut);
- ctx.cluster().get().clientReconnectFuture(userFut);
+ ctx.cluster().get().clientReconnectFuture(userFut);
+ }
ctx.disconnected(true);
@@ -3372,31 +3405,54 @@
try {
ctx.disconnected(false);
- GridCompoundFuture<?, ?> reconnectFut = new GridCompoundFuture<>();
+ GridCompoundFuture curReconnectFut = reconnectState.curReconnectFut = new GridCompoundFuture<>();
+
+ reconnectState.reconnectDone = new GridFutureAdapter<>();
for (GridComponent comp : ctx.components()) {
IgniteInternalFuture<?> fut = comp.onReconnected(clusterRestarted);
if (fut != null)
- reconnectFut.add((IgniteInternalFuture)fut);
+ curReconnectFut.add(fut);
}
- reconnectFut.add((IgniteInternalFuture)ctx.cache().context().exchange().reconnectExchangeFuture());
+ curReconnectFut.add(ctx.cache().context().exchange().reconnectExchangeFuture());
- reconnectFut.markInitialized();
+ curReconnectFut.markInitialized();
- reconnectFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ final GridFutureAdapter reconnectDone = reconnectState.reconnectDone;
+
+ curReconnectFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
try {
- fut.get();
+ Object res = fut.get();
+
+ if (res == STOP_RECONNECT)
+ return;
ctx.gateway().onReconnected();
+
+ reconnectState.firstReconnectFut.onDone();
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to reconnect, will stop node", e);
+ if (!X.hasCause(e, IgniteNeedReconnectException.class,
+ IgniteClientDisconnectedCheckedException.class)) {
+ U.error(log, "Failed to reconnect, will stop node.", e);
+
+ reconnectState.firstReconnectFut.onDone(e);
- close();
+ close();
+ }
+ else {
+ assert ctx.discovery().reconnectSupported();
+
+ U.error(log, "Failed to finish reconnect, will retry [locNodeId=" + ctx.localNodeId() +
+ ", err=" + e.getMessage() + ']');
+ }
}
+ finally {
+ reconnectDone.onDone();
+ }
}
});
}
@@ -3574,6 +3630,46 @@
return ctx.io().sendIoTest(nodes, payload, procFromNioThread);
}
+ /**
+ *
+ */
+ private class ReconnectState {
+ /** */
+ private final GridFutureAdapter firstReconnectFut = new GridFutureAdapter();
+
+ /** */
+ private GridCompoundFuture<?, Object> curReconnectFut;
+
+ /** */
+ private GridFutureAdapter<?> reconnectDone;
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ void waitFirstReconnect() throws IgniteCheckedException {
+ firstReconnectFut.get();
+ }
+
+ /**
+ *
+ */
+ void waitPreviousReconnect() {
+ if (curReconnectFut != null && !curReconnectFut.isDone()) {
+ assert reconnectDone != null;
+
+ curReconnectFut.onDone(STOP_RECONNECT);
+
+ try {
+ reconnectDone.get();
+ }
+ catch (IgniteCheckedException ignote) {
+ // No-op.
+ }
+ }
+
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteKernal.class, this);
Index: modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java (revision 59ea1c2fa87ee6a9e09f9bf7afc2b1ec8ff205ac)
+++ modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -129,6 +129,9 @@
/** */
private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED";
+ /** */
+ private static final Object SPI_RECONNECT = "SPI_RECONNECT";
+
/** Remote nodes. */
private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
@@ -808,6 +811,11 @@
log);
}
+ /** {@inheritDoc} */
+ @Override public void reconnect() throws IgniteSpiException {
+ msgWorker.addMessage(SPI_RECONNECT);
+ }
+
/** {@inheritDoc} */
@Override public void brakeConnection() {
SocketStream sockStream = msgWorker.currSock;
@@ -879,9 +887,12 @@
/** */
private UUID rmtNodeId;
+ /** */
+ private CountDownLatch stopReadLatch;
+
/**
*/
- protected SocketReader() {
+ SocketReader() {
super(spi.ignite().name(), "tcp-client-disco-sock-reader", log);
}
@@ -889,7 +900,7 @@
* @param sockStream Socket.
* @param rmtNodeId Rmt node id.
*/
- public void setSocket(SocketStream sockStream, UUID rmtNodeId) {
+ void setSocket(SocketStream sockStream, UUID rmtNodeId) {
synchronized (mux) {
this.sockStream = sockStream;
@@ -899,6 +910,31 @@
}
}
+ /**
+ * @throws InterruptedException If interrupted.
+ */
+ private void forceStopRead() throws InterruptedException {
+ CountDownLatch stopReadLatch;
+
+ synchronized (mux) {
+ SocketStream stream = sockStream;
+
+ if (stream == null)
+ return;
+
+ this.stopReadLatch = stopReadLatch = new CountDownLatch(1);
+
+ U.closeQuiet(stream.socket());
+
+ this.sockStream = null;
+ this.rmtNodeId = null;
+
+ mux.notifyAll();
+ }
+
+ stopReadLatch.await();
+ }
+
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
while (!isInterrupted()) {
@@ -906,6 +942,12 @@
UUID rmtNodeId;
synchronized (mux) {
+ if (stopReadLatch != null) {
+ stopReadLatch.countDown();
+
+ stopReadLatch = null;
+ }
+
if (this.sockStream == null) {
mux.wait();
@@ -1007,18 +1049,21 @@
private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>();
/** */
- private final long socketTimeout;
+ private final long sockTimeout;
/** */
private TcpDiscoveryAbstractMessage unackedMsg;
+ /** */
+ private CountDownLatch forceLeaveLatch;
+
/**
*
*/
- protected SocketWriter() {
+ SocketWriter() {
super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
- socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+ sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
spi.getSocketTimeout();
}
@@ -1033,6 +1078,29 @@
}
}
+ /**
+ * Sends {@link TcpDiscoveryNodeLeftMessage} and closes socket.
+ *
+ * @throws InterruptedException If interrupted.
+ */
+ private void forceLeave() throws InterruptedException {
+ CountDownLatch forceLeaveLatch;
+
+ synchronized (mux) {
+ // If writer was stopped.
+ if (sock == null)
+ return;
+
+ this.forceLeaveLatch = forceLeaveLatch = new CountDownLatch(1);
+
+ unackedMsg = null;
+
+ mux.notifyAll();
+ }
+
+ forceLeaveLatch.await();
+ }
+
/**
* @param sock Socket.
* @param clientAck {@code True} is server supports client message acknowlede.
@@ -1089,12 +1157,40 @@
continue;
}
- msg = queue.poll();
+ if (forceLeaveLatch != null) {
+ msg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId());
+
+ msg.client(true);
+
+ try {
+ spi.writeToSocket(
+ sock,
+ msg,
+ sockTimeout);
+ }
+ catch (IOException | IgniteCheckedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to send TcpDiscoveryNodeLeftMessage on force leave [msg=" + msg +
+ ", err=" + e.getMessage() + ']');
+ }
+ }
+
+ U.closeQuiet(sock);
+
+ this.sock = null;
+
+ clear();
+
+ continue;
+ }
+ else {
+ msg = queue.poll();
- if (msg == null) {
- mux.wait();
+ if (msg == null) {
+ mux.wait();
- continue;
+ continue;
+ }
}
}
@@ -1115,7 +1211,7 @@
spi.writeToSocket(
sock,
msg,
- socketTimeout);
+ sockTimeout);
msg = null;
@@ -1165,10 +1261,30 @@
synchronized (mux) {
if (sock == this.sock)
this.sock = null; // Connection has dead.
+
+ clear();
}
}
}
}
+
+ /**
+ *
+ */
+ private void clear() {
+ assert Thread.holdsLock(mux);
+
+ queue.clear();
+ unackedMsg = null;
+
+ CountDownLatch forceLeaveLatch = this.forceLeaveLatch;
+
+ if (forceLeaveLatch != null) {
+ this.forceLeaveLatch = null;
+
+ forceLeaveLatch.countDown();
+ }
+ }
}
/**
@@ -1413,6 +1529,38 @@
else
leaveLatch.countDown();
}
+ else if (msg == SPI_RECONNECT) {
+ if (state == CONNECTED) {
+ if (reconnector != null) {
+ reconnector.cancel();
+ reconnector.join();
+
+ reconnector = null;
+ }
+
+ sockWriter.forceLeave();
+ sockReader.forceStopRead();
+
+ currSock = null;
+
+ queue.clear();
+
+ onDisconnected();
+
+ notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
+
+ UUID newId = UUID.randomUUID();
+
+ U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " +
+ "to network problems [newId=" + newId +
+ ", prevId=" + locNode.id() +
+ ", locNode=" + locNode+ ']');
+
+ locNode.onClientDisconnected(newId);
+
+ tryJoin();
+ }
+ }
else if (msg instanceof TcpDiscoveryNodeFailedMessage &&
((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) {
TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg;
@@ -1495,20 +1643,7 @@
", failMsg=" + forceFailMsg + ']');
}
- state = DISCONNECTED;
-
- nodeAdded = false;
-
- IgniteClientDisconnectedCheckedException err =
- new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
- "client node disconnected.");
-
- for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
- GridFutureAdapter<Boolean> fut = e.getValue();
-
- if (pingFuts.remove(e.getKey(), fut))
- fut.onDone(err);
- }
+ onDisconnected();
notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes());
}
@@ -1603,6 +1738,26 @@
}
}
+ /**
+ *
+ */
+ private void onDisconnected() {
+ state = DISCONNECTED;
+
+ nodeAdded = false;
+
+ IgniteClientDisconnectedCheckedException err =
+ new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
+ "client node disconnected.");
+
+ for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
+ GridFutureAdapter<Boolean> fut = e.getValue();
+
+ if (pingFuts.remove(e.getKey(), fut))
+ fut.onDone(err);
+ }
+ }
+
/**
* @throws InterruptedException If interrupted.
*/
Index: modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java (revision 59ea1c2fa87ee6a9e09f9bf7afc2b1ec8ff205ac)
+++ modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -1609,6 +1609,11 @@
throw new UnsupportedOperationException();
}
+ /** {@inheritDoc} */
+ @Override public void reconnect() throws IgniteSpiException {
+ throw new UnsupportedOperationException("Reconnect is not supported for server.");
+ }
+
/** {@inheritDoc} */
@Override protected IgniteSpiThread workerThread() {
return msgWorker;
Index: modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java (revision 59ea1c2fa87ee6a9e09f9bf7afc2b1ec8ff205ac)
+++ modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -44,7 +44,7 @@
/** */
@GridToStringExclude
- private IgniteFutureImpl<?> reconnectFut;
+ private volatile IgniteFutureImpl<?> reconnectFut;
/** */
private final AtomicReference<GridKernalState> state = new AtomicReference<>(GridKernalState.STOPPED);
@@ -149,6 +149,12 @@
/** {@inheritDoc} */
@Override public GridFutureAdapter<?> onDisconnected() {
+ if (state.get() == GridKernalState.DISCONNECTED) {
+ assert reconnectFut != null;
+
+ return (GridFutureAdapter<?>)reconnectFut.internalFuture();
+ }
+
GridFutureAdapter<?> fut = new GridFutureAdapter<>();
reconnectFut = new IgniteFutureImpl<>(fut);
Index: modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
+++ modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
+import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests client to be able restore connection to cluster if coordination is not available.
+ */
+public class IgniteClientRejoinTest extends GridCommonAbstractTest {
+ /** Block. */
+ private volatile boolean block;
+
+ /** Block all. */
+ private volatile boolean blockAll;
+
+ /** Coordinator. */
+ private volatile ClusterNode crd;
+
+ /** Client reconnect disabled. */
+ private boolean clientReconnectDisabled;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ System.setProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK", "true");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ System.clearProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ clientReconnectDisabled = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (gridName.contains("client")) {
+ cfg.setCommunicationSpi(new TcpCommunicationSpi());
+
+ TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+ DiscoverySpi dspi = new DiscoverySpi();
+
+ dspi.setIpFinder(spi.getIpFinder());
+
+ cfg.setDiscoverySpi(dspi);
+
+ dspi.setJoinTimeout(60_000);
+ dspi.setClientReconnectDisabled(clientReconnectDisabled);
+
+ cfg.setClientMode(true);
+ }
+
+ // TODO: IGNITE-4833
+ cfg.setPeerClassLoadingEnabled(false);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientsReconnectAfterStart() throws Exception {
+ Ignite srv1 = startGrid("server1");
+
+ crd = ((IgniteKernal)srv1).localNode();
+
+ Ignite srv2 = startGrid("server2");
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ List<Ignite> clientNodes = new ArrayList<>();
+
+ final int CLIENTS_NUM = 5;
+
+ for (int i = 0; i < CLIENTS_NUM; i++)
+ clientNodes.add(startGrid("client" + i));
+
+ blockAll = true;
+
+ GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ U.sleep(5_000);
+
+ block = true;
+ blockAll = false;
+
+ System.out.println(">>> Allow with blocked coordinator.");
+
+ latch.countDown();
+
+ return null;
+ }
+ });
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ latch.await();
+
+ U.sleep((new Random().nextInt(15) + 30) * 1000);
+
+ block = false;
+
+ System.out.println(">>> Allow coordinator.");
+
+ return null;
+ }
+ });
+
+ fut.get();
+
+ for (Ignite client : clientNodes) {
+ while (true) {
+ try {
+ IgniteCache<Integer, Integer> cache = client.getOrCreateCache("some");
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, i);
+
+ for (int i = 0; i < 100; i++)
+ assertEquals((Integer)i, cache.get(i));
+
+ cache.clear();
+
+ break;
+ }
+ catch (IgniteClientDisconnectedException e) {
+ e.reconnectFuture().get();
+ }
+ }
+ }
+
+ assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size());
+ assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientsReconnect() throws Exception {
+ Ignite srv1 = startGrid("server1");
+
+ crd = ((IgniteKernal)srv1).localNode();
+
+ Ignite srv2 = startGrid("server2");
+
+ block = true;
+
+ List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final int CLIENTS_NUM = 5;
+
+ for (int i = 0; i < CLIENTS_NUM; i++) {
+ final int idx = i;
+
+ IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
+ @Override public Ignite call() throws Exception {
+ latch.await();
+
+ return startGrid("client" + idx);
+ }
+ });
+
+ futs.add(fut);
+ }
+
+ GridTestUtils.runAsync(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ latch.countDown();
+
+ Random rnd = new Random();
+
+ U.sleep((rnd.nextInt(15) + 15) * 1000);
+
+ block = false;
+
+ System.out.println(">>> ALLOW connection to coordinator.");
+
+ return true;
+ }
+ });
+
+ for (IgniteInternalFuture<Ignite> clientFut : futs) {
+ Ignite client = clientFut.get();
+
+ IgniteCache<Integer, Integer> cache = client.getOrCreateCache(client.name());
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, i);
+
+ for (int i = 0; i < 100; i++)
+ assert i == cache.get(i);
+ }
+
+ assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size());
+ assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientsReconnectDisabled() throws Exception {
+ clientReconnectDisabled = true;
+
+ Ignite srv1 = startGrid("server1");
+
+ crd = ((IgniteKernal)srv1).localNode();
+
+ Ignite srv2 = startGrid("server2");
+
+ block = true;
+
+ List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final int CLIENTS_NUM = 5;
+
+ for (int i = 0; i < CLIENTS_NUM; i++) {
+ final int idx = i;
+
+ IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
+ @Override public Ignite call() throws Exception {
+ latch.await();
+
+ return startGrid("client" + idx);
+ }
+ });
+
+ futs.add(fut);
+ }
+
+ latch.countDown();
+
+ for (final IgniteInternalFuture<Ignite> clientFut : futs) {
+ //noinspection ThrowableNotThrown
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ clientFut.get();
+
+ return null;
+ }
+ }, IgniteCheckedException.class, null);
+ }
+
+ assertEquals(0, srv1.cluster().forClients().nodes().size());
+ assertEquals(0, srv2.cluster().forClients().nodes().size());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 3 * 60_000;
+ }
+
+ /**
+ *
+ */
+ private class TcpCommunicationSpi extends org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ if (blockAll || block && node.id().equals(crd.id()))
+ throw new IgniteSpiException(new SocketException("Test communication exception"));
+
+ super.sendMessage(node, msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
+ IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+ if (blockAll || block && node.id().equals(crd.id()))
+ throw new IgniteSpiException(new SocketException("Test communication exception"));
+
+ super.sendMessage(node, msg, ackC);
+ }
+ }
+
+ /**
+ *
+ */
+ private class DiscoverySpi extends TcpDiscoverySpi {
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data,
+ long timeout) throws IOException {
+ if (blockAll || block && sock.getPort() == 47500)
+ throw new SocketException("Test discovery exception");
+
+ super.writeToSocket(sock, msg, data, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (blockAll || block && sock.getPort() == 47500)
+ throw new SocketException("Test discovery exception");
+
+ super.writeToSocket(sock, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg,
+ long timeout) throws IOException, IgniteCheckedException {
+ if (blockAll || block && sock.getPort() == 47500)
+ throw new SocketException("Test discovery exception");
+
+ super.writeToSocket(sock, out, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res,
+ long timeout) throws IOException {
+ if (blockAll || block && sock.getPort() == 47500)
+ throw new SocketException("Test discovery exception");
+
+ super.writeToSocket(msg, sock, res, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr,
+ IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException {
+ if (blockAll || block && sock.getPort() == 47500)
+ throw new SocketException("Test discovery exception");
+
+ return super.openSocket(sock, remAddr, timeoutHelper);
+ }
+ }
+}
Index: modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java (revision 59ea1c2fa87ee6a9e09f9bf7afc2b1ec8ff205ac)
+++ modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -1926,6 +1926,15 @@
return ignite().configuration().getSslContextFactory() != null;
}
+ /**
+ * Force reconnect to cluster.
+ *
+ * @throws IgniteSpiException If failed.
+ */
+ public void reconnect() throws IgniteSpiException {
+ impl.reconnect();
+ }
+
/**
* <strong>FOR TEST ONLY!!!</strong>
*/
Index: modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java (revision 59ea1c2fa87ee6a9e09f9bf7afc2b1ec8ff205ac)
+++ modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -29,6 +29,7 @@
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -258,6 +259,13 @@
return t.isAlive() ? "alive" : "dead";
}
+ /**
+ * Leave cluster and try to join again.
+ *
+ * @throws IgniteSpiException If failed.
+ */
+ public abstract void reconnect() throws IgniteSpiException;
+
/**
* <strong>FOR TEST ONLY!!!</strong>
* <p>
Index: modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
+++ modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Indicates that node should try reconnect to cluster.
+ */
+public class IgniteNeedReconnectException extends IgniteCheckedException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param locNode Local node.
+ * @param cause Cause.
+ */
+ public IgniteNeedReconnectException(ClusterNode locNode, @Nullable Throwable cause) {
+ super("Local node need try to reconnect [locNodeId=" + locNode.id() + ']', cause);
+
+ assert locNode.isClient();
+ }
+}
Index: modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java (revision 59ea1c2fa87ee6a9e09f9bf7afc2b1ec8ff205ac)
+++ modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -700,9 +700,12 @@
try {
Ignition.start(optimize(getConfiguration(getTestGridName(SRV_CNT))));
- fail();
+ // Commented due to IGNITE-4473, because
+ // IgniteClientDisconnectedException won't
+ // be thrown, but client will reconnect.
+// fail();
- return false;
+ return true;
}
catch (IgniteClientDisconnectedException e) {
log.info("Expected start error: " + e);
Index: modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java (revision 59ea1c2fa87ee6a9e09f9bf7afc2b1ec8ff205ac)
+++ modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -29,6 +29,7 @@
import org.apache.ignite.internal.IgniteClientReconnectServicesTest;
import org.apache.ignite.internal.IgniteClientReconnectStopTest;
import org.apache.ignite.internal.IgniteClientReconnectStreamerTest;
+import org.apache.ignite.internal.IgniteClientRejoinTest;
/**
*
@@ -52,6 +53,7 @@
suite.addTestSuite(IgniteClientReconnectServicesTest.class);
suite.addTestSuite(IgniteClientReconnectStreamerTest.class);
suite.addTestSuite(IgniteClientReconnectFailoverTest.class);
+ suite.addTestSuite(IgniteClientRejoinTest.class);
return suite;
}
Index: modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java (revision 59ea1c2fa87ee6a9e09f9bf7afc2b1ec8ff205ac)
+++ modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -32,6 +32,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.IgniteState;
@@ -43,6 +44,7 @@
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -1788,8 +1790,7 @@
clientNodeIds.add(client.cluster().localNode().id());
GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override
- public boolean apply() {
+ @Override public boolean apply() {
return srv.cluster().nodes().size() == 2;
}
}, awaitTime());
@@ -1799,6 +1800,49 @@
assertFalse(err.get());
}
+ /**
+ * @throws Exception If failed.
+ */
+ public void testForceClientReconnect() throws Exception {
+ startServerNodes(1);
+
+ startClientNodes(1);
+
+ Ignite srv = G.ignite("server-0");
+ IgniteKernal client = (IgniteKernal)G.ignite("client-0");
+
+ UUID clientId = F.first(clientNodeIds);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ srv.events().enableLocal(EVT_NODE_JOINED);
+
+ srv.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ latch.countDown();
+
+ return false;
+ }
+ }, EVT_NODE_JOINED);
+
+ client.context().discovery().reconnect();
+
+ assert latch.await(10, TimeUnit.SECONDS);
+
+ while (true) {
+ try {
+ UUID newId = client.localNode().id();
+
+ assert !clientId.equals(newId) : clientId;
+
+ break;
+ }
+ catch (IgniteClientDisconnectedException e) {
+ e.reconnectFuture().get(10_000);
+ }
+ }
+ }
+
/**
* @param ignite Ignite.
* @throws Exception If failed.
Index: modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java (revision 59ea1c2fa87ee6a9e09f9bf7afc2b1ec8ff205ac)
+++ modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -112,6 +112,7 @@
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -1890,6 +1891,29 @@
}
}
+ /**
+ * @return {@code True} if local node client and discovery SPI supports reconnect.
+ */
+ public boolean reconnectSupported() {
+ DiscoverySpi spi = getSpi();
+
+ return ctx.clientNode() && (spi instanceof TcpDiscoverySpi) &&
+ !(((TcpDiscoverySpi) spi).isClientReconnectDisabled());
+ }
+
+ /**
+ * Leave cluster and try to join again.
+ *
+ * @throws IgniteSpiException If failed.
+ */
+ public void reconnect() {
+ assert reconnectSupported();
+
+ DiscoverySpi discoverySpi = getSpi();
+
+ ((TcpDiscoverySpi)discoverySpi).reconnect();
+ }
+
/**
* Updates topology version if current version is smaller than updated.
*
Index: modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java (revision 59ea1c2fa87ee6a9e09f9bf7afc2b1ec8ff205ac)
+++ modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -1498,60 +1498,60 @@
}
}
- /**
- * Deployment callback.
- *
- * @param dep Service deployment.
- * @param topVer Topology version.
- */
- private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
- // Retry forever.
- try {
- AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
+ /**
+ * Deployment callback.
+ *
+ * @param dep Service deployment.
+ * @param topVer Topology version.
+ */
+ private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) {
+ // Retry forever.
+ try {
+ AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
- // If topology version changed, reassignment will happen from topology event.
- if (newTopVer.equals(topVer))
- reassign(dep, topVer);
- }
- catch (IgniteCheckedException e) {
- if (!(e instanceof ClusterTopologyCheckedException))
- log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
+ // If topology version changed, reassignment will happen from topology event.
+ if (newTopVer.equals(topVer))
+ reassign(dep, topVer);
+ }
+ catch (IgniteCheckedException e) {
+ if (!(e instanceof ClusterTopologyCheckedException))
+ log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e);
- AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
+ AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx();
- if (!newTopVer.equals(topVer)) {
- assert newTopVer.compareTo(topVer) > 0;
+ if (!newTopVer.equals(topVer)) {
+ assert newTopVer.compareTo(topVer) > 0;
- // Reassignment will happen from topology event.
- return;
- }
+ // Reassignment will happen from topology event.
+ return;
+ }
- ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
- private IgniteUuid id = IgniteUuid.randomUuid();
+ ctx.timeout().addTimeoutObject(new GridTimeoutObject() {
+ private IgniteUuid id = IgniteUuid.randomUuid();
- private long start = System.currentTimeMillis();
+ private long start = System.currentTimeMillis();
- @Override public IgniteUuid timeoutId() {
- return id;
- }
+ @Override public IgniteUuid timeoutId() {
+ return id;
+ }
- @Override public long endTime() {
- return start + RETRY_TIMEOUT;
- }
+ @Override public long endTime() {
+ return start + RETRY_TIMEOUT;
+ }
- @Override public void onTimeout() {
- if (!busyLock.enterBusy())
- return;
+ @Override public void onTimeout() {
+ if (!busyLock.enterBusy())
+ return;
- try {
- // Try again.
- onDeployment(dep, topVer);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- });
+ try {
+ // Try again.
+ onDeployment(dep, topVer);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ });
}
}
Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java (revision 59ea1c2fa87ee6a9e09f9bf7afc2b1ec8ff205ac)
+++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -43,6 +43,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
@@ -448,6 +449,15 @@
else
U.warn(log, "Still waiting for initial partition map exchange [fut=" + fut + ']');
}
+ catch (IgniteNeedReconnectException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ if (fut.reconnectOnError(e))
+ throw new IgniteNeedReconnectException(cctx.localNode(), e);
+
+ throw e;
+ }
}
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
@@ -1697,6 +1707,12 @@
dumpedObjects++;
}
}
+ catch (Exception e) {
+ if (exchFut.reconnectOnError(e))
+ throw new IgniteNeedReconnectException(cctx.localNode(), e);
+
+ throw e;
+ }
}
@@ -1836,7 +1852,14 @@
catch (IgniteInterruptedCheckedException e) {
throw e;
}
- catch (IgniteClientDisconnectedCheckedException e) {
+ catch (IgniteClientDisconnectedCheckedException | IgniteNeedReconnectException e) {
+ assert cctx.discovery().reconnectSupported();
+
+ U.warn(log,"Local node failed to complete partition map exchange due to " +
+ "network issues, will try to reconnect to cluster", e);
+
+ cctx.discovery().reconnect();
+
return;
}
catch (IgniteCheckedException e) {
Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java (revision 59ea1c2fa87ee6a9e09f9bf7afc2b1ec8ff205ac)
+++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -17,15 +17,16 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;
-import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridNodeOrderComparator;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -34,6 +35,7 @@
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -202,8 +204,14 @@
"continue to another node): " + node);
}
catch (IgniteCheckedException e) {
- U.error(log0, "Failed to request affinity assignment from remote node (will " +
- "continue to another node): " + node, e);
+ if (ctx.discovery().reconnectSupported() && X.hasCause(e, IOException.class)) {
+ onDone(new IgniteNeedReconnectException(ctx.localNode(), e));
+
+ return;
+ }
+
+ U.warn(log0, "Failed to request affinity assignment from remote node (will " +
+ "continue to another node): " + node);
}
}
Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java (revision 59ea1c2fa87ee6a9e09f9bf7afc2b1ec8ff205ac)
+++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java (revision 946419314b567c604a15ae4f9658d89bc350127b)
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -32,6 +33,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
@@ -39,6 +41,7 @@
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -54,7 +57,6 @@
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -65,7 +67,7 @@
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -506,10 +508,17 @@
throw e;
}
+ catch (IgniteNeedReconnectException e) {
+ onDone(e);
+ }
catch (Throwable e) {
- U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e);
+ if (reconnectOnError(e))
+ onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+ else {
+ U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e);
- onDone(e);
+ onDone(e);
+ }
if (e instanceof Error)
throw (Error)e;
@@ -1297,7 +1306,10 @@
}
}
catch (IgniteCheckedException e) {
- onDone(e);
+ if (reconnectOnError(e))
+ onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+ else
+ onDone(e);
}
}
@@ -1314,12 +1326,19 @@
}
catch (IgniteCheckedException e) {
if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) {
- log.debug("Failed to send full partition map to node, node left grid " +
- "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']');
+ if (log.isDebugEnabled())
+ log.debug("Failed to send full partition map to node, node left grid " +
+ "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']');
return;
}
+ if (reconnectOnError(e)) {
+ onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+
+ return;
+ }
+
if (retryCnt > 0) {
long timeout = cctx.gridConfig().getNetworkSendRetryDelay();
@@ -1641,6 +1660,12 @@
}
}
}
+ catch (Exception e) {
+ if (reconnectOnError(e))
+ onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
+ else
+ throw e;
+ }
finally {
leaveBusy();
}
@@ -1652,6 +1677,15 @@
}
}
+ /**
+ * @param e Exception.
+ * @return {@code True} if local node should try reconnect in case of error.
+ */
+ public boolean reconnectOnError(Throwable e) {
+ return X.hasCause(e, IOException.class, IgniteClientDisconnectedCheckedException.class) &&
+ cctx.discovery().reconnectSupported();
+ }
+
/** {@inheritDoc} */
@Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
return exchId.compareTo(fut.exchId);