conn
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30133896 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30133896 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30133896 Branch: refs/heads/ignite-comm-balance Commit: 3013389609de337cfebeaea8be5a34cdd93136b9 Parents: f27d747 Author: sboikov <sboi...@gridgain.com> Authored: Mon Sep 19 20:54:35 2016 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Sep 19 20:54:35 2016 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 5 + .../nio/GridAbstractCommunicationClient.java | 11 +- .../util/nio/GridCommunicationClient.java | 5 + .../util/nio/GridShmemCommunicationClient.java | 6 +- .../util/nio/GridTcpNioCommunicationClient.java | 8 +- .../communication/tcp/TcpCommunicationSpi.java | 702 ++++++++++++------- 6 files changed, 476 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 1eebfd4..908543c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -195,6 +195,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE2: + msg = new TcpCommunicationSpi.HandshakeMessage2(); + + break; + case 0: msg = new GridJobCancelRequest(); http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java index 9b014ec..37bc170 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java @@ -35,14 +35,23 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati /** Metrics listener. */ protected final GridNioMetricsListener metricsLsnr; + /** */ + private final int connIdx; + /** * @param metricsLsnr Metrics listener. */ - protected GridAbstractCommunicationClient(@Nullable GridNioMetricsListener metricsLsnr) { + protected GridAbstractCommunicationClient(int connIdx, @Nullable GridNioMetricsListener metricsLsnr) { + this.connIdx = connIdx; this.metricsLsnr = metricsLsnr; } /** {@inheritDoc} */ + @Override public int connectionIndex() { + return connIdx; + } + + /** {@inheritDoc} */ @Override public boolean close() { return reserves.compareAndSet(0, -1); } http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java index 0de54e9..312a20e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java @@ -107,4 +107,9 @@ public interface GridCommunicationClient { * @return {@code True} if send is asynchronous. */ public boolean async(); + + /** + * @return Connection index. + */ + public int connectionIndex(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index ebe86fb..74d58b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -55,14 +55,16 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien * @param formatter Message formatter. * @throws IgniteCheckedException If failed. */ - public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr, + public GridShmemCommunicationClient( + int connIdx, + GridNioMetricsListener metricsLsnr, int port, long connTimeout, IgniteLogger log, MessageFormatter formatter) throws IgniteCheckedException { - super(metricsLsnr); + super(connIdx, metricsLsnr); assert metricsLsnr != null; assert port > 0 && port < 0xffff; http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index 5fe521d..90f17b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -46,10 +46,14 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie /** * @param ses Session. + * @param connIdx Connection index. * @param log Logger. */ - public GridTcpNioCommunicationClient(GridNioSession ses, IgniteLogger log) { - super(null); + public GridTcpNioCommunicationClient( + int connIdx, + GridNioSession ses, + IgniteLogger log) { + super(connIdx, null); assert ses != null; assert log != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 9031247..7d91120 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -44,6 +44,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLEngine; @@ -178,6 +179,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; * <li>Node local IP address (see {@link #setLocalAddress(String)})</li> * <li>Node local port number (see {@link #setLocalPort(int)})</li> * <li>Local port range (see {@link #setLocalPortRange(int)}</li> + * <li>Connections per node (see {@link #setConnectionsPerNode(int)})</li> * <li>Connection buffer flush frequency (see {@link #setConnectionBufferFlushFrequency(long)})</li> * <li>Connection buffer size (see {@link #setConnectionBufferSize(int)})</li> * <li>Idle connection timeout (see {@link #setIdleConnectionTimeout(long)})</li> @@ -238,7 +240,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; public class TcpCommunicationSpi extends IgniteSpiAdapter implements CommunicationSpi<Message>, TcpCommunicationSpiMBean { /** */ - private static final IgniteProductVersion TWO_CONN_SINCE_VER = IgniteProductVersion.fromString("1.7.2"); + private static final IgniteProductVersion MULTIPLE_CONN_SINCE_VER = IgniteProductVersion.fromString("1.7.2"); /** IPC error message. */ public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " + @@ -289,8 +291,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ public static final int DFLT_SELECTORS_CNT = Math.min(4, Runtime.getRuntime().availableProcessors()); - /** Node ID meta for session. */ - private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey(); + /** Connection ID meta for session. */ + private static final int CONN_ID_META = GridNioSessionMetaKey.nextUniqueKey(); /** Message tracker meta for session. */ private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey(); @@ -310,6 +312,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Default socket write timeout. */ public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000; + /** */ + public static final int DFLT_CONN_PER_NODE = 2; + /** No-op runnable. */ private static final IgniteRunnable NOOP = new IgniteRunnable() { @Override public void run() { @@ -327,6 +332,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter public static final byte HANDSHAKE_MSG_TYPE = -3; /** */ + public static final byte HANDSHAKE_MSG_TYPE2 = -4; + + /** */ private ConnectGateway connectGate; /** Server listener. */ @@ -354,41 +362,43 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { - UUID id = ses.meta(NODE_ID_META); + ConnectionKey connId = ses.meta(CONN_ID_META); - if (id != null) { - GridCommunicationClient client = clients.get(id); + if (connId != null) { + UUID id = connId.nodeId(); - if (client instanceof GridTcpNioCommunicationClient && - ((GridTcpNioCommunicationClient) client).session() == ses) { - client.close(); + GridCommunicationClient[] nodeClients = clients.get(id); - clients.remove(id, client); + if (nodeClients != null) { + for (GridCommunicationClient client : nodeClients) { + if (client instanceof GridTcpNioCommunicationClient && + ((GridTcpNioCommunicationClient) client).session() == ses) { + client.close(); + + removeNodeClient(id, client); + } + } } if (!stopping) { - boolean reconnect = false; - GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor(); if (outDesc != null) { if (outDesc.nodeAlive(getSpiContext().node(id))) { if (!outDesc.messagesFutures().isEmpty()) { - reconnect = true; - if (log.isDebugEnabled()) log.debug("Session was closed but there are unacknowledged messages, " + "will try to reconnect [rmtNode=" + outDesc.node().id() + ']'); + + DisconnectedSessionInfo disconnectData = + new DisconnectedSessionInfo(outDesc, connId.connectionIndex()); + + commWorker.addProcessDisconnectRequest(disconnectData); } } else outDesc.onNodeLeft(); } - - DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(outDesc, - reconnect); - - commWorker.addProcessDisconnectRequest(disconnectData); } CommunicationListener<Message> lsnr0 = lsnr; @@ -405,21 +415,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private void onFirstMessage(GridNioSession ses, Message msg) { UUID sndId; - if (msg instanceof NodeIdMessage) - sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0); + ConnectionKey connKey; + + if (msg instanceof NodeIdMessage) { + sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0); + connKey = new ConnectionKey(sndId, 0); + } else { assert msg instanceof HandshakeMessage : msg; sndId = ((HandshakeMessage)msg).nodeId(); + connKey = new ConnectionKey(sndId, ((HandshakeMessage)msg).connectionIndex()); } if (log.isDebugEnabled()) log.debug("Remote node ID received: " + sndId); - final UUID old = ses.addMeta(NODE_ID_META, sndId); - - assert old == null; - final ClusterNode rmtNode = getSpiContext().node(sndId); if (rmtNode == null) { @@ -431,6 +442,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return; } + final ConnectionKey old = ses.addMeta(CONN_ID_META, connKey); + + assert old == null; + ClusterNode locNode = getSpiContext().localNode(); if (ses.remoteAddress() == null) @@ -440,8 +455,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter HandshakeMessage msg0 = (HandshakeMessage)msg; - if (useTwoConnections(rmtNode)) { - final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode); + if (useMultipleConnections(rmtNode)) { + final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey); ConnectClosureNew c = new ConnectClosureNew(ses, recoveryDesc, rmtNode); @@ -455,7 +470,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } } else { - GridCommunicationClient oldClient = clients.get(sndId); + GridCommunicationClient[] curClients = clients.get(sndId); + + GridCommunicationClient oldClient = curClients != null ? curClients[0] : null; boolean hasShmemClient = false; @@ -479,12 +496,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>(); - GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut); + GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut); - final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode); + final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey); if (oldFut == null) { - oldClient = clients.get(sndId); + curClients = clients.get(sndId); + + oldClient = curClients != null ? curClients[0] : null; if (oldClient != null) { if (oldClient instanceof GridTcpNioCommunicationClient) { @@ -521,7 +540,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter fut.onDone(client); } finally { - clientFuts.remove(rmtNode.id(), fut); + clientFuts.remove(connKey, fut); } } } @@ -549,9 +568,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } @Override public void onMessage(GridNioSession ses, Message msg) { - UUID sndId = ses.meta(NODE_ID_META); + ConnectionKey connKey = ses.meta(CONN_ID_META); - if (sndId == null) { + if (connKey == null) { assert ses.accepted() : ses; if (!connectGate.tryEnter()) { @@ -579,9 +598,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (recovery != null) { RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg; - if (log.isDebugEnabled()) - log.debug("Received recovery acknowledgement [rmtNode=" + sndId + + if (log.isDebugEnabled()) { + log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() + + ", connIdx=" + connKey.connectionIndex() + ", rcvCnt=" + msg0.received() + ']'); + } recovery.ackReceived(msg0.received()); @@ -595,9 +616,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter long rcvCnt = recovery.onReceived(); if (rcvCnt % ackSndThreshold == 0) { - if (log.isDebugEnabled()) - log.debug("Send recovery acknowledgement [rmtNode=" + sndId + + if (log.isDebugEnabled()) { + log.debug("Send recovery acknowledgement [rmtNode=" + connKey.nodeId() + + ", connIdx=" + connKey.connectionIndex() + ", rcvCnt=" + rcvCnt + ']'); + } nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt)); @@ -625,7 +648,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else c = NOOP; - notifyListener(sndId, msg, c); + notifyListener(connKey.nodeId(), msg, c); } } @@ -659,12 +682,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridTcpNioCommunicationClient client = null; if (createClient) { - client = new GridTcpNioCommunicationClient(ses, log); - - GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); + client = new GridTcpNioCommunicationClient(0, ses, log); - assert oldClient == null : "Client already created [node=" + node + ", client=" + client + - ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']'; + addNodeClient(node.id(), 0, client); } return client; @@ -894,6 +914,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Shared memory server. */ private IpcSharedMemoryServerEndpoint shmemSrv; + /** */ + private int connectionsPerNode = DFLT_CONN_PER_NODE; + /** {@code TCP_NODELAY} option value for created sockets. */ private boolean tcpNoDelay = DFLT_TCP_NODELAY; @@ -916,7 +939,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>(); /** Clients. */ - private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap(); + private final ConcurrentMap<UUID, GridCommunicationClient[]> clients = GridConcurrentFactory.newMap(); /** SPI listener. */ private volatile CommunicationListener<Message> lsnr; @@ -963,17 +986,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter }; /** Client connect futures. */ - private final ConcurrentMap<UUID, GridFutureAdapter<GridCommunicationClient>> clientFuts = + private final ConcurrentMap<ConnectionKey, GridFutureAdapter<GridCommunicationClient>> clientFuts = GridConcurrentFactory.newMap(); /** */ - private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap(); + private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs = GridConcurrentFactory.newMap(); /** */ - private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> outRecDescs = GridConcurrentFactory.newMap(); + private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> outRecDescs = GridConcurrentFactory.newMap(); /** */ - private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> inRecDescs = GridConcurrentFactory.newMap(); + private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> inRecDescs = GridConcurrentFactory.newMap(); /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @@ -1082,6 +1105,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return locPortRange; } + public void setConnectionsPerNode(int maxConnectionsPerNode) { + this.connectionsPerNode = maxConnectionsPerNode; + } + + public int getConnectionsPerNode() { + return connectionsPerNode; + } + /** * Sets local port to accept shared memory connections. * <p> @@ -1502,7 +1533,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log != null) { StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl()); - for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) { + for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) { GridNioRecoveryDescriptor desc = entry.getValue(); sb.append(" [key=").append(entry.getKey()) @@ -1515,7 +1546,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .append(']').append(U.nl()); } - for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) { + for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) { GridNioRecoveryDescriptor desc = entry.getValue(); sb.append(" [key=").append(entry.getKey()) @@ -1526,7 +1557,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .append(']').append(U.nl()); } - for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) { + for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) { GridNioRecoveryDescriptor desc = entry.getValue(); sb.append(" [key=").append(entry.getKey()) @@ -1542,10 +1573,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter sb.append("Communication SPI clients: ").append(U.nl()); - for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) { - sb.append(" [node=").append(entry.getKey()) - .append(", client=").append(entry.getValue()) - .append(']').append(U.nl()); + for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) { + UUID nodeId = entry.getKey(); + GridCommunicationClient[] clients0 = entry.getValue(); + + for (GridCommunicationClient client : clients0) { + sb.append(" [node=").append(nodeId) + .append(", client=").append(client) + .append(']').append(U.nl()); + } } U.warn(log, sb.toString()); @@ -1773,9 +1809,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionKey key = ses.meta(CONN_ID_META); - return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null; + return key != null ? formatter.reader(key.nodeId(), msgFactory) : null; } }; @@ -1788,9 +1824,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionKey key = ses.meta(CONN_ID_META); - return rmtNodeId != null ? formatter.writer(rmtNodeId) : null; + return key != null ? formatter.writer(key.nodeId()) : null; } }; @@ -1966,8 +2002,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter shmemWorkers.clear(); // Force closing on stop (safety). - for (GridCommunicationClient client : clients.values()) - client.forceClose(); + for (GridCommunicationClient[] clients0 : clients.values()) { + for (GridCommunicationClient client : clients0) + client.forceClose(); + } // Clear resources. nioSrvr = null; @@ -1992,8 +2030,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter connectGate.stopped(); // Force closing. - for (GridCommunicationClient client : clients.values()) - client.forceClose(); + for (GridCommunicationClient[] clients0 : clients.values()) { + for (GridCommunicationClient client : clients0) + client.forceClose(); + } getSpiContext().deregisterPorts(); @@ -2004,8 +2044,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) { connectGate.disconnected(reconnectFut); - for (GridCommunicationClient client : clients.values()) - client.forceClose(); + for (GridCommunicationClient[] clients0 : clients.values()) { + for (GridCommunicationClient client : clients0) + client.forceClose(); + } IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, "Failed to connect client node disconnected."); @@ -2029,16 +2071,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter void onNodeLeft(UUID nodeId) { assert nodeId != null; - GridCommunicationClient client = clients.get(nodeId); - - if (client != null) { - if (log.isDebugEnabled()) - log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId + - ", client=" + client + ']'); + GridCommunicationClient[] clients0 = clients.remove(nodeId); - client.forceClose(); + if (clients0 != null) { + for (GridCommunicationClient client : clients0) { + if (log.isDebugEnabled()) + log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId + + ", client=" + client + ']'); - clients.remove(nodeId, client); + client.forceClose(); + } } } @@ -2087,6 +2129,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** + * TODO + * @return + */ + private int connectionIndex() { + return ThreadLocalRandom.current().nextInt(connectionsPerNode); + } + + /** * @param node Destination node. * @param msg Message to send. * @param ackC Ack closure. @@ -2113,11 +2163,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else { GridCommunicationClient client = null; + int connIdx = useMultipleConnections(node) ? connectionIndex() : 0; + try { boolean retry; do { - client = reserveClient(node); + client = reserveClient(node, connIdx); UUID nodeId = null; @@ -2131,7 +2183,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (!retry) sentMsgsCnt.increment(); else { - clients.remove(node.id(), client); + removeNodeClient(node.id(), client); ClusterNode node0 = getSpiContext().node(node.id()); @@ -2148,26 +2200,85 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter throw new IgniteSpiException("Failed to send message to remote node: " + node, e); } finally { - if (client != null && clients.remove(node.id(), client)) + if (client != null && removeNodeClient(node.id(), client)) client.forceClose(); } } } /** + * @param nodeId Node ID. + * @param rmvClient Client to remove. + * @return {@code True} if client was removed. + */ + private boolean removeNodeClient(UUID nodeId, GridCommunicationClient rmvClient) { + for (;;) { + GridCommunicationClient[] curClients = clients.get(nodeId); + + if (curClients == null) + return false; + + if (curClients[rmvClient.connectionIndex()] == rmvClient) { + GridCommunicationClient[] newClients = Arrays.copyOf(curClients, curClients.length); + + newClients[rmvClient.connectionIndex()] = null; + + if (clients.replace(nodeId, curClients, newClients)) + return true; + } + else + return false; + } + } + + /** + * @param nodeId Node ID. + * @param connIdx Connection index. + * @param addClient Client to add. + */ + private void addNodeClient(UUID nodeId, int connIdx, GridCommunicationClient addClient) { + for (;;) { + GridCommunicationClient[] curClients = clients.get(nodeId); + + assert curClients == null || curClients[connIdx] == null : "Client already created " + + "[node=" + nodeId + ", client=" + addClient + ", oldClient=" + curClients[connIdx] + ']'; + + GridCommunicationClient[] newClients; + + if (curClients == null) { + newClients = new GridCommunicationClient[connectionsPerNode]; + newClients[connIdx] = addClient; + + if (clients.putIfAbsent(nodeId, newClients) == null) + break; + } + else { + newClients = Arrays.copyOf(curClients, curClients.length); + newClients[connIdx] = addClient; + + if (clients.replace(nodeId, curClients, newClients)) + break; + } + } + } + + /** * Returns existing or just created client to node. * * @param node Node to which client should be open. + * @param connIdx Connection index. * @return The existing or just created client. * @throws IgniteCheckedException Thrown if any exception occurs. */ - private GridCommunicationClient reserveClient(ClusterNode node) throws IgniteCheckedException { + private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException { assert node != null; UUID nodeId = node.id(); while (true) { - GridCommunicationClient client = clients.get(nodeId); + GridCommunicationClient[] curClients = clients.get(nodeId); + + GridCommunicationClient client = curClients != null ? curClients[connIdx] : null; if (client == null) { if (stopping) @@ -2176,25 +2287,26 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // Do not allow concurrent connects. GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture(); - GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(nodeId, fut); + ConnectionKey connKey = new ConnectionKey(nodeId, connIdx); + + GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut); if (oldFut == null) { try { - GridCommunicationClient client0 = clients.get(nodeId); + GridCommunicationClient[] curClients0 = clients.get(nodeId); + + GridCommunicationClient client0 = curClients0 != null ? curClients0[connIdx] : null; if (client0 == null) { - client0 = createNioClient(node); + client0 = createNioClient(node, connIdx); if (client0 != null) { - GridCommunicationClient old = clients.put(nodeId, client0); - - assert old == null : "Client already created " + - "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']'; + addNodeClient(nodeId, connIdx, client0); if (client0 instanceof GridTcpNioCommunicationClient) { GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0); - if (tcpClient.session().closeTime() > 0 && clients.remove(nodeId, client0)) { + if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) { if (log.isDebugEnabled()) log.debug("Session was closed after client creation, will retry " + "[node=" + node + ", client=" + client0 + ']'); @@ -2228,7 +2340,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter continue; if (getSpiContext().node(nodeId) == null) { - if (clients.remove(nodeId, client)) + if (removeNodeClient(nodeId, client)) client.forceClose(); throw new IgniteSpiException("Destination node is not in topology: " + node.id()); @@ -2239,16 +2351,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return client; else // Client has just been closed by idle worker. Help it and try again. - clients.remove(nodeId, client); + removeNodeClient(nodeId, client); } } /** * @param node Node to create client for. + * @param connIdx Connection index. * @return Client. * @throws IgniteCheckedException If failed. */ - @Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException { + @Nullable private GridCommunicationClient createNioClient(ClusterNode node, int connIdx) + throws IgniteCheckedException { assert node != null; Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT)); @@ -2267,6 +2381,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter try { GridCommunicationClient client = createShmemClient( node, + connIdx, shmemPort); if (log.isDebugEnabled()) @@ -2289,7 +2404,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter connectGate.enter(); try { - GridCommunicationClient client = createTcpClient(node); + GridCommunicationClient client = createTcpClient(node, connIdx); if (log.isDebugEnabled()) log.debug("TCP client created: " + client); @@ -2304,10 +2419,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * @param node Node. * @param port Port. + * @param connIdx Connection index. * @return Client. * @throws IgniteCheckedException If failed. */ @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, + int connIdx, Integer port) throws IgniteCheckedException { int attempt = 1; @@ -2321,7 +2438,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridCommunicationClient client; try { - client = new GridShmemCommunicationClient(metricsLsnr, + client = new GridShmemCommunicationClient( + connIdx, + metricsLsnr, port, timeoutHelper.nextTimeoutChunk(connTimeout), log, @@ -2342,7 +2461,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { - safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0), null); + safeHandshake(client, + null, + node.id(), + timeoutHelper.nextTimeoutChunk(connTimeout0), + null, + null); } catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { client.forceClose(); @@ -2401,10 +2525,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) { if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) { - UUID id = ses.meta(NODE_ID_META); + ConnectionKey id = ses.meta(CONN_ID_META); if (id != null) { - ClusterNode node = getSpiContext().node(id); + ClusterNode node = getSpiContext().node(id.nodeId); if (node != null && node.isClient()) { String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " + @@ -2418,7 +2542,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log, msg); - getSpiContext().failNode(id, msg); + getSpiContext().failNode(id.nodeId(), msg); } } } @@ -2431,7 +2555,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @return Client. * @throws IgniteCheckedException If failed. */ - protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException { + protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException { Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS)); Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES)); Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT)); @@ -2499,7 +2623,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter "(node left topology): " + node); } - GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node); + ConnectionKey connKey = new ConnectionKey(node.id(), connIdx); + + GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey); if (!recoveryDesc.reserve()) { U.closeQuiet(ch); @@ -2520,8 +2646,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter sslEngine.setUseClientMode(true); } - rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), - timeoutHelper.nextTimeoutChunk(connTimeout0), sslEngine); + Integer handshakeConnIdx = useMultipleConnections(node) ? connIdx : null; + + rcvCnt = safeHandshake(ch, + recoveryDesc, + node.id(), + timeoutHelper.nextTimeoutChunk(connTimeout0), + sslEngine, + handshakeConnIdx); if (rcvCnt == -1) return null; @@ -2534,7 +2666,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter try { Map<Integer, Object> meta = new HashMap<>(); - meta.put(NODE_ID_META, node.id()); + meta.put(CONN_ID_META, connKey); if (isSslEnabled()) { assert sslEngine != null; @@ -2550,7 +2682,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridNioSession ses = nioSrvr.createSession(ch, meta).get(); - client = new GridTcpNioCommunicationClient(ses, log); + client = new GridTcpNioCommunicationClient(connIdx, ses, log); conn = true; } @@ -2703,7 +2835,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Nullable GridNioRecoveryDescriptor recovery, UUID rmtNodeId, long timeout, - @Nullable SSLEngine ssl + @Nullable SSLEngine ssl, + @Nullable Integer handshakeConnIdx ) throws IgniteCheckedException { HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout); @@ -2783,14 +2916,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter "fully initialized [isStopping=" + getSpiContext().isStopping() + ']'); if (recovery != null) { - HandshakeMessage msg = new HandshakeMessage(locNode.id(), - recovery.incrementConnectCount(), - recovery.received()); + HandshakeMessage msg; + + int msgSize = 33; + + if (handshakeConnIdx != null) { + msg = new HandshakeMessage2(locNode.id(), + recovery.incrementConnectCount(), + recovery.received(), + handshakeConnIdx); + + msgSize += 4; + } + else { + msg = new HandshakeMessage(locNode.id(), + recovery.incrementConnectCount(), + recovery.received()); + } if (log.isDebugEnabled()) log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']'); - buf = ByteBuffer.allocate(33); + buf = ByteBuffer.allocate(msgSize); buf.order(ByteOrder.nativeOrder()); @@ -2937,51 +3084,55 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter U.join(commWorker, log); - for (GridCommunicationClient client : clients.values()) - client.forceClose(); + for (GridCommunicationClient[] clients0 : clients.values()) { + for (GridCommunicationClient client : clients0) + client.forceClose(); + } } /** * @param node Node. + * @param key Connection key. * @return Recovery descriptor for outgoing connection. */ - private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node) { - if (useTwoConnections(node)) - return recoveryDescriptor(outRecDescs, node); + private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node, ConnectionKey key) { + if (useMultipleConnections(node)) + return recoveryDescriptor(outRecDescs, node, key); else - return recoveryDescriptor(recoveryDescs, node); + return recoveryDescriptor(recoveryDescs, node, key); } /** * @param node Node. + * @param key Connection key. * @return Recovery descriptor for incoming connection. */ - private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node) { - if (useTwoConnections(node)) - return recoveryDescriptor(inRecDescs, node); + private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node, ConnectionKey key) { + if (useMultipleConnections(node)) + return recoveryDescriptor(inRecDescs, node, key); else - return recoveryDescriptor(recoveryDescs, node); + return recoveryDescriptor(recoveryDescs, node, key); } /** * @param node Node. - * @return {@code True} if given node supports two connections per-node for communication. + * @return {@code True} if given node supports multiple connections per-node for communication. */ - private boolean useTwoConnections(ClusterNode node) { - return node.version().compareToIgnoreTimestamp(TWO_CONN_SINCE_VER) >= 0; + private boolean useMultipleConnections(ClusterNode node) { + return node.version().compareToIgnoreTimestamp(MULTIPLE_CONN_SINCE_VER) >= 0; } /** * @param recoveryDescs Descriptors map. * @param node Node. + * @param key Connection key. * @return Recovery receive data for given node. */ private GridNioRecoveryDescriptor recoveryDescriptor( - ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs, - ClusterNode node) { - ClientKey id = new ClientKey(node.id(), node.order()); - - GridNioRecoveryDescriptor recovery = recoveryDescs.get(id); + ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs, + ClusterNode node, + ConnectionKey key) { + GridNioRecoveryDescriptor recovery = recoveryDescs.get(key); if (recovery == null) { int maxSize = Math.max(msgQueueLimit, ackSndThreshold); @@ -2989,7 +3140,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5); GridNioRecoveryDescriptor old = - recoveryDescs.putIfAbsent(id, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log)); + recoveryDescs.putIfAbsent(key, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log)); if (old != null) recovery = old; @@ -3031,54 +3182,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return S.toString(TcpCommunicationSpi.class, this); } - /** - * - */ - private static class ClientKey { - /** */ - private UUID nodeId; - - /** */ - private long order; - - /** - * @param nodeId Node ID. - * @param order Node order. - */ - private ClientKey(UUID nodeId, long order) { - this.nodeId = nodeId; - this.order = order; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - if (this == obj) - return true; - - if (obj == null || getClass() != obj.getClass()) - return false; - - ClientKey other = (ClientKey)obj; - - return order == other.order && nodeId.equals(other.nodeId); - - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = nodeId.hashCode(); - - res = 31 * res + (int)(order ^ (order >>> 32)); - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ClientKey.class, this); - } - } - /** Internal exception class for proper timeout handling. */ private static class HandshakeTimeoutException extends IgniteCheckedException { /** */ @@ -3178,9 +3281,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionKey connKey = ses.meta(CONN_ID_META); - return rmtNodeId != null ? formatter.writer(rmtNodeId) : null; + return connKey != null ? formatter.writer(connKey.nodeId()) : null; } }; @@ -3194,9 +3297,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionKey connKey = ses.meta(CONN_ID_META); - return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null; + return connKey != null ? formatter.reader(connKey.nodeId(), msgFactory) : null; } }; @@ -3277,72 +3380,72 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private void processIdle() { cleanupRecovery(); - for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) { + for (Map.Entry<UUID, GridCommunicationClient[]> e : clients.entrySet()) { UUID nodeId = e.getKey(); - GridCommunicationClient client = e.getValue(); - - ClusterNode node = getSpiContext().node(nodeId); + for (GridCommunicationClient client : e.getValue()) { + ClusterNode node = getSpiContext().node(nodeId); - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Forcing close of non-existent node connection: " + nodeId); + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Forcing close of non-existent node connection: " + nodeId); - client.forceClose(); + client.forceClose(); - clients.remove(nodeId, client); + removeNodeClient(nodeId, client); - continue; - } + continue; + } - GridNioRecoveryDescriptor recovery = null; + GridNioRecoveryDescriptor recovery = null; - if (!useTwoConnections(node) && client instanceof GridTcpNioCommunicationClient) { - recovery = recoveryDescs.get(new ClientKey(node.id(), node.order())); + if (!useMultipleConnections(node) && client instanceof GridTcpNioCommunicationClient) { + recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex())); - if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { - RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); + if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { + RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); - if (log.isDebugEnabled()) - log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + - ", rcvCnt=" + msg.received() + ']'); + if (log.isDebugEnabled()) + log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + + ", rcvCnt=" + msg.received() + ']'); - nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); + nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); - recovery.lastAcknowledged(msg.received()); + recovery.lastAcknowledged(msg.received()); - continue; + continue; + } } - } - long idleTime = client.getIdleTime(); + long idleTime = client.getIdleTime(); - if (idleTime >= idleConnTimeout) { - if (recovery == null && useTwoConnections(node)) - recovery = outRecDescs.get(new ClientKey(node.id(), node.order())); + if (idleTime >= idleConnTimeout) { + if (recovery == null && useMultipleConnections(node)) + recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex())); - if (recovery != null && - recovery.nodeAlive(getSpiContext().node(nodeId)) && - !recovery.messagesFutures().isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Node connection is idle, but there are unacknowledged messages, " + - "will wait: " + nodeId); + if (recovery != null && + recovery.nodeAlive(getSpiContext().node(nodeId)) && + !recovery.messagesFutures().isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Node connection is idle, but there are unacknowledged messages, " + + "will wait: " + nodeId); - continue; - } + continue; + } - if (log.isDebugEnabled()) - log.debug("Closing idle node connection: " + nodeId); + if (log.isDebugEnabled()) + log.debug("Closing idle node connection: " + nodeId); - if (client.close() || client.closed()) - clients.remove(nodeId, client); + if (client.close() || client.closed()) + removeNodeClient(nodeId, client); + } } } for (GridNioSession ses : nioSrvr.sessions()) { GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor(); - if (recovery != null && useTwoConnections(recovery.node())) { + if (recovery != null && useMultipleConnections(recovery.node())) { assert ses.accepted() : ses; sendAckOnTimeout(recovery, ses); @@ -3382,10 +3485,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * */ - private void cleanupRecovery(ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs) { - Set<ClientKey> left = null; + private void cleanupRecovery(ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs) { + Set<ConnectionKey> left = null; - for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) { + for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) { if (left != null && left.contains(e.getKey())) continue; @@ -3402,7 +3505,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (left != null) { assert !left.isEmpty(); - for (ClientKey id : left) { + for (ConnectionKey id : left) { GridNioRecoveryDescriptor recoverySnd = recoveryDescs.get(id); if (recoverySnd != null && recoverySnd.onNodeLeft()) @@ -3415,45 +3518,43 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param sesInfo Disconnected session information. */ private void processDisconnect(DisconnectedSessionInfo sesInfo) { - if (sesInfo.reconnect) { - GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc; - - ClusterNode node = recoveryDesc.node(); + GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc; - if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) - return; + ClusterNode node = recoveryDesc.node(); - try { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); + if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) + return; - GridCommunicationClient client = reserveClient(node); + try { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); - client.release(); - } - catch (IgniteCheckedException | IgniteException e) { - try { - if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect failed, will retry " + - "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + GridCommunicationClient client = reserveClient(node, sesInfo.connIdx); - addProcessDisconnectRequest(sesInfo); - } - else { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect failed, " + - "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + client.release(); + } + catch (IgniteCheckedException | IgniteException e) { + try { + if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect failed, will retry " + + "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); - onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", - e); - } + addProcessDisconnectRequest(sesInfo); } - catch (IgniteClientDisconnectedException e0) { + else { if (log.isDebugEnabled()) - log.debug("Failed to ping node, client disconnected."); + log.debug("Recovery reconnect failed, " + + "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + + onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", + e); } } + catch (IgniteClientDisconnectedException e0) { + if (log.isDebugEnabled()) + log.debug("Failed to ping node, client disconnected."); + } } } @@ -3657,6 +3758,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** + * @return Connection index. + */ + public int connectionIndex() { + return 0; + } + + /** * @return Connect count. */ public long connectCount() { @@ -3737,6 +3845,50 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** + * + */ + public static class HandshakeMessage2 extends HandshakeMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private int connIdx; + + /** + * + */ + public HandshakeMessage2() { + // No-op. + } + + /** + * @param nodeId Node ID. + * @param connectCnt Connect count. + * @param rcvCnt Number of received messages. + * @param connIdx Connection index. + */ + public HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) { + super(nodeId, connectCnt, rcvCnt); + this.connIdx = connIdx; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return HANDSHAKE_MSG_TYPE2; + } + + /** {@inheritDoc} */ + @Override public int connectionIndex() { + return connIdx; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HandshakeMessage2.class, this); + } + } + + /** * Recovery acknowledgment message. */ @SuppressWarnings("PublicInnerClass") @@ -3981,16 +4133,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private final GridNioRecoveryDescriptor recoveryDesc; /** */ - private final boolean reconnect; + private int connIdx; /** * @param recoveryDesc Recovery descriptor. - * @param reconnect Reconnect flag. + * @param connIdx Connection index. */ - DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc, - boolean reconnect) { + DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc, int connIdx) { this.recoveryDesc = recoveryDesc; - this.reconnect = reconnect; + this.connIdx = connIdx; } /** {@inheritDoc} */ @@ -3998,4 +4149,43 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return S.toString(DisconnectedSessionInfo.class, this); } } + + /** + * + */ + private static class ConnectionKey { + /** */ + private final UUID nodeId; + + /** */ + private final int idx; + + /** + * @param nodeId Node ID. + * @param idx Connection index. + */ + ConnectionKey(UUID nodeId, int idx) { + this.nodeId = nodeId; + this.idx = idx; + } + + /** + * @return Node ID. + */ + UUID nodeId() { + return nodeId; + } + + /** + * @return Connection index. + */ + int connectionIndex() { + return idx; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ConnectionKey.class, this); + } + } }