ignite git commit: conn
Repository: ignite Updated Branches: refs/heads/ignite-comm-balance 6a4d81965 -> 51e9bf887 conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/51e9bf88 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/51e9bf88 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/51e9bf88 Branch: refs/heads/ignite-comm-balance Commit: 51e9bf887d6094b3ecfa32aa922e3a7ef727a4f9 Parents: 6a4d819 Author: sboikovAuthored: Thu Sep 22 17:50:00 2016 +0300 Committer: sboikov Committed: Thu Sep 22 17:50:00 2016 +0300 -- .../IgniteCommunicationBalanceTest.java | 69 +--- 1 file changed, 46 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/51e9bf88/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java -- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java index 86d43e7..ddd979b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.managers.communication; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; @@ -105,7 +107,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest { client.compute(client.cluster().forNode(node)).run(new DummyRunnable(null)); } -waitNioBalanceStop(client, 30_000); +waitNioBalanceStop(Collections.singletonList(client), 10_000); final GridNioServer srv = GridTestUtils.getFieldValue(client.configuration().getCommunicationSpi(), "nioSrvr"); @@ -135,7 +137,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest { } }, 10_000); -waitNioBalanceStop(client, 30_000); +waitNioBalanceStop(Collections.singletonList(client), 30_000); long readMoveCnt2 = srv.readerMoveCount(); long writeMoveCnt2 = srv.writerMoveCount(); @@ -147,8 +149,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest { writeMoveCnt1 = writeMoveCnt2; } -for (Ignite node : G.allGrids()) -waitNioBalanceStop(node, 10_000); +waitNioBalanceStop(G.allGrids(), 10_000); } finally { System.setProperty(GridNioServer.IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, ""); @@ -168,7 +169,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest { startGridsMultiThreaded(5, 5); -for (int i = 0; i < 20; i++) { +for (int i = 0; i < 10; i++) { log.info("Iteration: " + i); final AtomicInteger idx = new AtomicInteger(); @@ -195,8 +196,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest { } }, 30, "test-thread"); -for (Ignite node : G.allGrids()) -waitNioBalanceStop(node, 10_000); +waitNioBalanceStop(G.allGrids(), 10_000); } } finally { @@ -205,34 +205,57 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest { } /** - * @param node Node. + * @param nodes Node. * @param timeout Timeout. * @throws Exception If failed. */ -private void waitNioBalanceStop(Ignite node, long timeout) throws Exception { -TcpCommunicationSpi spi = (TcpCommunicationSpi)node.configuration().getCommunicationSpi(); +private void waitNioBalanceStop(List nodes, long timeout) throws Exception { +final List srvs = new ArrayList<>(); -final GridNioServer srv = GridTestUtils.getFieldValue(spi, "nioSrvr"); +for (Ignite node : nodes) { +TcpCommunicationSpi spi = (TcpCommunicationSpi) node.configuration().getCommunicationSpi(); + +GridNioServer srv = GridTestUtils.getFieldValue(spi, "nioSrvr"); + +srvs.add(srv); +} assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() { @Override public boolean applyx() throws
[1/2] ignite git commit: conn
Repository: ignite Updated Branches: refs/heads/ignite-comm-balance 51e9bf887 -> 21805aef8 conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dbfd6ead Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dbfd6ead Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dbfd6ead Branch: refs/heads/ignite-comm-balance Commit: dbfd6eadfebad98d72fcc801e98ad358d0327c12 Parents: 8d21741 Author: sboikovAuthored: Thu Sep 22 17:39:07 2016 +0300 Committer: sboikov Committed: Thu Sep 22 17:39:07 2016 +0300 -- .../ignite/spi/communication/tcp/TcpCommunicationSpi.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/dbfd6ead/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 784b081..92022df 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -44,7 +44,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -3110,8 +3109,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter rcvCnt = buf.getLong(1); } - // if (log.isDebugEnabled()) -log.info("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']'); +if (log.isDebugEnabled()) +log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']'); if (rcvCnt == -1) { if (log.isDebugEnabled())
[04/12] ignite git commit: conn
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b0ffee0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b0ffee0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b0ffee0 Branch: refs/heads/ignite-comm-balance Commit: 3b0ffee055ed843616282f013daa9d0b982e13bf Parents: c604e8c Author: sboikovAuthored: Wed Sep 21 12:54:53 2016 +0300 Committer: sboikov Committed: Wed Sep 21 12:54:53 2016 +0300 -- .../util/nio/GridSelectorNioSessionImpl.java| 2 +- .../communication/tcp/TcpCommunicationSpi.java | 47 +++- .../IgniteCacheMessageWriteTimeoutTest.java | 13 -- 3 files changed, 47 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index a680a33..88721ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -303,7 +303,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { if (!accepted() && val instanceof GridNioRecoveryDescriptor) { outRecovery = (GridNioRecoveryDescriptor)val; -outRecovery.connected(); +outRecovery.onConnected(); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index c9d9bf7..c131cf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -424,13 +424,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (msg instanceof NodeIdMessage) { sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0); -connKey = new ConnectionKey(sndId, 0); +connKey = new ConnectionKey(sndId, 0, -1); } else { assert msg instanceof HandshakeMessage : msg; +HandshakeMessage msg0 = (HandshakeMessage)msg; + sndId = ((HandshakeMessage)msg).nodeId(); -connKey = new ConnectionKey(sndId, ((HandshakeMessage)msg).connectionIndex()); +connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount()); } if (log.isDebugEnabled()) @@ -470,8 +472,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (reserve) connectedNew(recoveryDesc, ses, true); else { -if (c.failed) -ses.close(); +if (c.failed) { +ses.send(new RecoveryLastReceivedMessage(-1)); + +for (GridNioSession ses0 : nioSrvr.sessions()) { +ConnectionKey key0 = ses0.meta(CONN_IDX_META); + +if (ses0.accepted() && key0 != null && +key0.nodeId().equals(connKey.nodeId()) && +key0.connectionIndex() == connKey.connectionIndex() && +key0.connectCount() < connKey.connectCount()) +ses0.close(); +} +} } } else { @@ -2369,7 +2382,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // Do not allow concurrent connects. GridFutureAdapter fut = new ConnectFuture(); -ConnectionKey connKey = new ConnectionKey(nodeId, connIdx); +ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1); GridFutureAdapter oldFut = clientFuts.putIfAbsent(connKey, fut); @@ -2705,7 +2718,7 @@ public class
[02/12] ignite git commit: conn
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bb465cc9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bb465cc9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bb465cc9 Branch: refs/heads/ignite-comm-balance Commit: bb465cc9c4a8960fa218c6a1ce2b806ea24a11b3 Parents: c1d2436 Author: sboikovAuthored: Wed Sep 21 10:18:45 2016 +0300 Committer: sboikov Committed: Wed Sep 21 10:18:45 2016 +0300 -- .../internal/managers/communication/GridIoMessageFactory.java | 2 +- .../apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/bb465cc9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 908543c..711c03f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -755,7 +755,7 @@ public class GridIoMessageFactory implements MessageFactory { break; -// [-3..119] [124] - this +// [-3..119] [124-125] - this // [120..123] - DR // [-4..-22, -30..-35] - SQL default: http://git-wip-us.apache.org/repos/asf/ignite/blob/bb465cc9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index ee30420..c9d9bf7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -334,7 +334,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter public static final byte HANDSHAKE_MSG_TYPE = -3; /** */ -public static final byte HANDSHAKE_MSG_TYPE2 = -4; +public static final byte HANDSHAKE_MSG_TYPE2 = 125; /** */ private ConnectGateway connectGate;
[12/12] ignite git commit: conn
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a4d8196 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a4d8196 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a4d8196 Branch: refs/heads/ignite-comm-balance Commit: 6a4d81965a78dd3f47bea3b33f823c62e994dd9a Parents: 9c87e2c Author: sboikovAuthored: Thu Sep 22 17:37:05 2016 +0300 Committer: sboikov Committed: Thu Sep 22 17:42:27 2016 +0300 -- .../ignite/internal/util/nio/GridNioServer.java | 34 ++- .../communication/tcp/TcpCommunicationSpi.java | 4 +- .../IgniteCommunicationBalanceTest.java | 215 +++ .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + 4 files changed, 206 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/6a4d8196/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 2d5cc64..7352b5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -47,6 +47,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -92,6 +93,9 @@ public class GridNioServer { /** */ public static final String IGNITE_NIO_SES_BALANCER_CLASS_NAME = "IGNITE_NIO_SES_BALANCER_CLASS_NAME"; +/** */ +public static final String IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD = "IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD"; + /** Default session write timeout. */ public static final int DFLT_SES_WRITE_TIMEOUT = 5000; @@ -215,10 +219,10 @@ public class GridNioServer { private IgniteBiInClosure msgQueueLsnr; /** */ -private volatile long writerMoveCnt; +private final AtomicLong readerMoveCnt = new AtomicLong(); /** */ -private volatile long readerMoveCnt; +private final AtomicLong writerMoveCnt = new AtomicLong(); /** */ private final Balancer balancer; @@ -361,6 +365,14 @@ public class GridNioServer { this.balancer = balancer0; } +public long readerMoveCount() { +return readerMoveCnt.get(); +} + +public long writerMoveCount() { +return writerMoveCnt.get(); +} + /** * @return Configured port. */ @@ -1505,6 +1517,11 @@ public class GridNioServer { ses.finishMoveSession(this); +if (idx % 2 == 0) +readerMoveCnt.incrementAndGet(); +else +writerMoveCnt.incrementAndGet(); + SelectionKey key = f.movedSocketChannel().register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, ses); @@ -2948,10 +2965,13 @@ public class GridNioServer { /** */ private long lastBalance; +/** */ +private final long balancePeriod = IgniteSystemProperties.getLong(IGNITE_NIO_SES_BALANCER_BALANCE_PERIOD, 5000); + /** * @param srv Server. */ -public SizeBasedBalancer(GridNioServer srv) { +SizeBasedBalancer(GridNioServer srv) { this.srv = srv; log = srv.log; @@ -2961,13 +2981,13 @@ public class GridNioServer { @Override public void balance() { long now = U.currentTimeMillis(); -if (lastBalance + 5000 < now) { +if (lastBalance + balancePeriod < now) { lastBalance = now; long maxRcvd0 = -1, minRcvd0 = -1, maxSent0 = -1, minSent0 = -1; int maxRcvdIdx = -1, minRcvdIdx = -1, maxSentIdx = -1, minSentIdx = -1; -boolean print = Thread.currentThread().getName().contains("IgniteCommunicationBalanceTest4"); +boolean print = false;//Thread.currentThread().getName().contains("IgniteCommunicationBalanceTest4"); List clientWorkers = (List)srv.clientWorkers; @@ -3051,7 +3071,7 @@ public class GridNioServer { log.info("Will move
[06/12] ignite git commit: conn
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/79ca4aab Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/79ca4aab Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/79ca4aab Branch: refs/heads/ignite-comm-balance Commit: 79ca4aabab4e9d5f69271fbcd6885299c37cb9bf Parents: 04a4d45 Author: sboikovAuthored: Thu Sep 22 11:51:38 2016 +0300 Committer: sboikov Committed: Thu Sep 22 11:51:38 2016 +0300 -- ...GridTcpCommunicationSpiRecoverySelfTest.java | 21 +--- 1 file changed, 18 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/79ca4aab/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java -- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java index fbbc5de..3d33fff 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java @@ -327,7 +327,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest assertTrue("Failed to wait for session close", ses0.closeTime() != 0); -ses1.resumeReads().get(); +try { +ses1.resumeReads().get(); +} +catch (IgniteCheckedException ignore) { +// Can fail is ses1 was closed. +} for (int j = 0; j < 100; j++) { spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0)); @@ -437,7 +442,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest assertTrue("Failed to wait for session close", ses0.closeTime() != 0); -ses1.resumeReads().get(); +try { +ses1.resumeReads().get(); +} +catch (IgniteCheckedException ignore) { +// Can fail is ses1 was closed. +} // Wait when session is closed, then try to open new connection from node1. GridTestUtils.waitForCondition(new GridAbsPredicate() { @@ -554,7 +564,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest assertTrue("Failed to wait for session close", ses0.closeTime() != 0); -ses1.resumeReads().get(); +try { +ses1.resumeReads().get(); +} +catch (IgniteCheckedException ignore) { +// Can fail is ses1 was closed. +} sndFut.get();
[09/12] ignite git commit: conn
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/831aa16d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/831aa16d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/831aa16d Branch: refs/heads/ignite-comm-balance Commit: 831aa16dcdad7501ef9c58f2b3e7a4e191e5452b Parents: 79ca4aab Author: sboikovAuthored: Thu Sep 22 14:35:53 2016 +0300 Committer: sboikov Committed: Thu Sep 22 14:35:53 2016 +0300 -- .../ignite/spi/GridTcpSpiForwardingSelfTest.java | 13 +++-- ...mmunicationSpiRecoveryFailureDetectionSelfTest.java | 1 + 2 files changed, 8 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/831aa16d/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java -- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java index deda313..5ca8f26 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java @@ -69,7 +69,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { private static final int commExtPort2 = 20100; /** */ -private AddressResolver resolver; +private AddressResolver rslvr; /** */ private boolean ipFinderUseLocPorts; @@ -127,12 +127,13 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { commSpi.setLocalPort(commLocPort); commSpi.setLocalPortRange(1); commSpi.setSharedMemoryPort(-1); +commSpi.setConnectionsPerNode(1); cfg.setCommunicationSpi(commSpi); -assert resolver != null; +assert rslvr != null; -cfg.setAddressResolver(resolver); +cfg.setAddressResolver(rslvr); return cfg; } @@ -148,7 +149,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { map.put(new InetSocketAddress("127.0.0.1", locPort2), F.asList(new InetSocketAddress("127.0.0.1", extPort2))); map.put(new InetSocketAddress("127.0.0.1", commLocPort2), F.asList(new InetSocketAddress("127.0.0.1", commExtPort2))); -resolver = new AddressResolver() { +rslvr = new AddressResolver() { @Override public Collection getExternalAddresses(InetSocketAddress addr) { return map.get(addr); } @@ -168,7 +169,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { map.put("127.0.0.1:" + locPort2, "127.0.0.1:" + extPort2); map.put("127.0.0.1:" + commLocPort2, "127.0.0.1:" + commExtPort2); -resolver = new BasicAddressResolver(map); +rslvr = new BasicAddressResolver(map); doTestForward(); } @@ -181,7 +182,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { map.put("127.0.0.1", "127.0.0.1"); -resolver = new BasicAddressResolver(map); +rslvr = new BasicAddressResolver(map); ipFinderUseLocPorts = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/831aa16d/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java -- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java index 95c9e40..b1aa119 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java @@ -33,6 +33,7 @@ public class GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest extends Gri spi.setAckSendThreshold(5); spi.setSocketSendBuffer(512); spi.setSocketReceiveBuffer(512); +spi.setConnectionsPerNode(1); return spi; }
[05/12] ignite git commit: conn
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04a4d458 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04a4d458 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04a4d458 Branch: refs/heads/ignite-comm-balance Commit: 04a4d4584ecdb9e44a1107921b640aa57bcc6cbb Parents: 3b0ffee Author: sboikovAuthored: Thu Sep 22 10:21:29 2016 +0300 Committer: sboikov Committed: Thu Sep 22 10:21:29 2016 +0300 -- .../ignite/spi/communication/tcp/TcpCommunicationSpi.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/04a4d458/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index c131cf2..639e23d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -315,7 +315,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000; /** Default connections per node. */ -public static final int DFLT_CONN_PER_NODE = 1; +public static final int DFLT_CONN_PER_NODE = 2; /** No-op runnable. */ private static final IgniteRunnable NOOP = new IgniteRunnable() { @@ -1132,7 +1132,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * TODO * - * @param maxConnectionsPerNode + * @param maxConnectionsPerNode Number of connections per node. */ public void setConnectionsPerNode(int maxConnectionsPerNode) { this.connectionsPerNode = maxConnectionsPerNode; @@ -1141,7 +1141,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * TODO * - * @return + * @return Number of connections per node. */ public int getConnectionsPerNode() { return connectionsPerNode;
[03/12] ignite git commit: conn
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c604e8cb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c604e8cb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c604e8cb Branch: refs/heads/ignite-comm-balance Commit: c604e8cb291bae294bf97d8eb13fc16b8cf8a12e Parents: bb465cc Author: sboikovAuthored: Wed Sep 21 11:04:40 2016 +0300 Committer: sboikov Committed: Wed Sep 21 11:04:40 2016 +0300 -- .../GridTcpCommunicationSpiAbstractTest.java| 28 -- ...mmunicationSpiConcurrentConnectSelfTest.java | 40 ++-- 2 files changed, 61 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/c604e8cb/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java -- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java index 076724d..3c4fea0 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java @@ -90,16 +90,36 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica super.afterTest(); for (CommunicationSpi spi : spis.values()) { -ConcurrentMap clients = U.field(spi, "clients"); +ConcurrentMap clients = U.field(spi, "clients"); + +for (int i = 0; i < 20; i++) { +GridCommunicationClient client0 = null; + +for (GridCommunicationClient[] clients0 : clients.values()) { +for (GridCommunicationClient client : clients0) { +if (client != null) { +client0 = client; + +break; +} +} + +if (client0 != null) +break; +} + +if (client0 == null) +return; -for (int i = 0; i < 20 && !clients.isEmpty(); i++) { info("Check failed for SPI [grid=" + -GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") + ", spi=" + spi + ']'); +GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") + +", client=" + client0 + +", spi=" + spi + ']'); U.sleep(1000); } -assert clients.isEmpty() : "Clients: " + clients; +fail("Failed to wait when clients are closed."); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/c604e8cb/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java -- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index bd66319..ed047fa 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; @@ -253,7 +254,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest() { +final Callable c = new Callable() { @Override public Void call() throws Exception { int idx0 = idx.getAndIncrement(); @@ -279,7 +280,40 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest threadsList = new ArrayList<>(); + +final AtomicBoolean fail = new AtomicBoolean(); + +final AtomicLong tId = new AtomicLong(); + +
[01/12] ignite git commit: conn
Repository: ignite Updated Branches: refs/heads/ignite-comm-balance 26eaaba8b -> 6a4d81965 conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/03e54ac2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/03e54ac2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/03e54ac2 Branch: refs/heads/ignite-comm-balance Commit: 03e54ac24ed666fac9cbfd84dfbf6dc3d89a3a28 Parents: 26eaaba Author: sboikovAuthored: Wed Sep 21 10:12:46 2016 +0300 Committer: sboikov Committed: Wed Sep 21 10:12:46 2016 +0300 -- .../org/apache/ignite/internal/util/nio/GridNioServer.java | 8 +--- 1 file changed, 1 insertion(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/03e54ac2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index b7b02f5..5da557b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -1320,10 +1320,6 @@ public class GridNioServer { } } -public interface NioWorker { - -} - /** * Thread performing only read operations from the channel. */ @@ -1448,7 +1444,7 @@ public class GridNioServer { * * @param req Change request. */ -@Override public void offer(GridNioFuture req) { +@Override public void offer(GridNioFuture req) { changeReqs.offer((NioOperationFuture)req); selector.wakeup(); @@ -2115,8 +2111,6 @@ public class GridNioServer { * @throws IgniteCheckedException If failed. */ private void accept() throws IgniteCheckedException { -long lastBalance = U.currentTimeMillis(); - try { while (!closed && selector.isOpen() && !Thread.currentThread().isInterrupted()) { // Wake up every 2 seconds to check if closed.
[08/12] ignite git commit: conn
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/81832e1d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/81832e1d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/81832e1d Branch: refs/heads/ignite-comm-balance Commit: 81832e1dc9576fee9c8f74c451f4893cd5633d99 Parents: ffd654a Author: sboikovAuthored: Thu Sep 22 14:02:39 2016 +0300 Committer: sboikov Committed: Thu Sep 22 14:23:04 2016 +0300 -- .../ignite/internal/util/nio/GridNioServer.java | 131 --- .../ignite/internal/util/nio/GridNioWorker.java | 19 ++- .../util/nio/GridSelectorNioSessionImpl.java| 62 - .../IgniteCommunicationBalanceTest.java | 2 +- 4 files changed, 161 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/81832e1d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 5da557b..2d5cc64 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -435,10 +435,6 @@ public class GridNioServer { NioOperationFuture fut = new NioOperationFuture<>(impl, NioOperation.CLOSE); impl.offerStateChange(fut); -//int idx = impl.selectorIndex(); // TODO -// -//if (idx != -1) -//clientWorkers.get(idx).offer(fut); return fut; } @@ -498,14 +494,9 @@ public class GridNioServer { if (ses.removeFuture(fut)) fut.connectionClosed(); } -else if (msgCnt == 1) { +else if (msgCnt == 1) // Change from 0 to 1 means that worker thread should be waken up. -//int idx = ses.selectorIndex(); -// -//if (idx != -1) // TODO revisit -//clientWorkers.get(idx).offer(fut); ses.offerStateChange(fut); -} if (msgQueueLsnr != null) msgQueueLsnr.apply(ses, msgCnt); @@ -578,7 +569,6 @@ public class GridNioServer { ses0.resend(futs); // Wake up worker. - //clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0)); ses0.offerStateChange(fut0); } } @@ -598,6 +588,10 @@ public class GridNioServer { } public void moveSession(GridNioSession ses, int from, int to) { +assert from >= 0 && from < clientWorkers.size() : from; +assert to >= 0 && to < clientWorkers.size() : to; +assert from != to; + clientWorkers.get(from).offer(new SessionMoveFuture((GridSelectorNioSessionImpl)ses, to)); } @@ -1336,13 +1330,15 @@ public class GridNioServer { /** Worker index. */ private final int idx; +/** Sessions assigned to this worker. */ +private final GridConcurrentHashSet workerSessions = +new GridConcurrentHashSet<>(); + private volatile long bytesRcvd; private volatile long bytesSent; private volatile long bytesRcvd0; private volatile long bytesSent0; -private final GridConcurrentHashSet sessions0 = new GridConcurrentHashSet<>(); - /** * @param idx Index of this worker in server's array. * @param gridName Grid name. @@ -1360,7 +1356,7 @@ public class GridNioServer { } public Collection sessions() { -return sessions0; +return workerSessions; } /** {@inheritDoc} */ @@ -1409,15 +1405,15 @@ public class GridNioServer { try { SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); -Class selectorImplClass = +Class selectorImplCls = Class.forName("sun.nio.ch.SelectorImpl", false, U.gridClassLoader()); // Ensure the current selector implementation is what we can instrument. -if (!selectorImplClass.isAssignableFrom(selector.getClass())) +if (!selectorImplCls.isAssignableFrom(selector.getClass())) return; -Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); -Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); +Field selectedKeysField = selectorImplCls.getDeclaredField("selectedKeys"); +Field publicSelectedKeysField =
[10/12] ignite git commit: conn
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8d217413 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8d217413 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8d217413 Branch: refs/heads/ignite-comm-balance Commit: 8d2174135a5fb5fffe6d7e695228862241b4fcef Parents: 831aa16 Author: sboikovAuthored: Thu Sep 22 14:41:01 2016 +0300 Committer: sboikov Committed: Thu Sep 22 15:28:23 2016 +0300 -- .../apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java | 2 ++ .../ignite/internal/IgniteSlowClientDetectionSelfTest.java | 1 + .../communication/IgniteVariousConnectionNumberTest.java | 4 3 files changed, 7 insertions(+) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/8d217413/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 639e23d..784b081 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -3947,6 +3947,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * */ +@SuppressWarnings("PublicInnerClass") public static class HandshakeMessage2 extends HandshakeMessage { /** */ private static final long serialVersionUID = 0L; @@ -3969,6 +3970,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) { super(nodeId, connectCnt, rcvCnt); + this.connIdx = connIdx; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8d217413/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java -- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java index 760313b..5721887 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java @@ -75,6 +75,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest { commSpi.setSlowClientQueueLimit(50); commSpi.setSharedMemoryPort(-1); commSpi.setIdleConnectionTimeout(300_000); +commSpi.setConnectionsPerNode(1); cfg.setCommunicationSpi(commSpi); http://git-wip-us.apache.org/repos/asf/ignite/blob/8d217413/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java -- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java index 360eb8d..00a25d1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java @@ -115,6 +115,10 @@ public class IgniteVariousConnectionNumberTest extends GridCommonAbstractTest { int idx = ThreadLocalRandom.current().nextInt(NODES); +Ignite node = ignite(idx); + +client = node.configuration().isClientMode(); + stopGrid(idx); startGrid(idx);
ignite git commit: conn
Repository: ignite Updated Branches: refs/heads/ignite-comm-opts2 04a4d4584 -> 79ca4aaba conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/79ca4aab Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/79ca4aab Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/79ca4aab Branch: refs/heads/ignite-comm-opts2 Commit: 79ca4aabab4e9d5f69271fbcd6885299c37cb9bf Parents: 04a4d45 Author: sboikovAuthored: Thu Sep 22 11:51:38 2016 +0300 Committer: sboikov Committed: Thu Sep 22 11:51:38 2016 +0300 -- ...GridTcpCommunicationSpiRecoverySelfTest.java | 21 +--- 1 file changed, 18 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/79ca4aab/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java -- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java index fbbc5de..3d33fff 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java @@ -327,7 +327,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest assertTrue("Failed to wait for session close", ses0.closeTime() != 0); -ses1.resumeReads().get(); +try { +ses1.resumeReads().get(); +} +catch (IgniteCheckedException ignore) { +// Can fail is ses1 was closed. +} for (int j = 0; j < 100; j++) { spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0)); @@ -437,7 +442,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest assertTrue("Failed to wait for session close", ses0.closeTime() != 0); -ses1.resumeReads().get(); +try { +ses1.resumeReads().get(); +} +catch (IgniteCheckedException ignore) { +// Can fail is ses1 was closed. +} // Wait when session is closed, then try to open new connection from node1. GridTestUtils.waitForCondition(new GridAbsPredicate() { @@ -554,7 +564,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest assertTrue("Failed to wait for session close", ses0.closeTime() != 0); -ses1.resumeReads().get(); +try { +ses1.resumeReads().get(); +} +catch (IgniteCheckedException ignore) { +// Can fail is ses1 was closed. +} sndFut.get();
ignite git commit: conn
Repository: ignite Updated Branches: refs/heads/ignite-comm-opts2 3b0ffee05 -> 04a4d4584 conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04a4d458 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04a4d458 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04a4d458 Branch: refs/heads/ignite-comm-opts2 Commit: 04a4d4584ecdb9e44a1107921b640aa57bcc6cbb Parents: 3b0ffee Author: sboikovAuthored: Thu Sep 22 10:21:29 2016 +0300 Committer: sboikov Committed: Thu Sep 22 10:21:29 2016 +0300 -- .../ignite/spi/communication/tcp/TcpCommunicationSpi.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/04a4d458/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index c131cf2..639e23d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -315,7 +315,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000; /** Default connections per node. */ -public static final int DFLT_CONN_PER_NODE = 1; +public static final int DFLT_CONN_PER_NODE = 2; /** No-op runnable. */ private static final IgniteRunnable NOOP = new IgniteRunnable() { @@ -1132,7 +1132,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * TODO * - * @param maxConnectionsPerNode + * @param maxConnectionsPerNode Number of connections per node. */ public void setConnectionsPerNode(int maxConnectionsPerNode) { this.connectionsPerNode = maxConnectionsPerNode; @@ -1141,7 +1141,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * TODO * - * @return + * @return Number of connections per node. */ public int getConnectionsPerNode() { return connectionsPerNode;
ignite git commit: conn
Repository: ignite Updated Branches: refs/heads/ignite-comm-opts2 c604e8cb2 -> 3b0ffee05 conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b0ffee0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b0ffee0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b0ffee0 Branch: refs/heads/ignite-comm-opts2 Commit: 3b0ffee055ed843616282f013daa9d0b982e13bf Parents: c604e8c Author: sboikovAuthored: Wed Sep 21 12:54:53 2016 +0300 Committer: sboikov Committed: Wed Sep 21 12:54:53 2016 +0300 -- .../util/nio/GridSelectorNioSessionImpl.java| 2 +- .../communication/tcp/TcpCommunicationSpi.java | 47 +++- .../IgniteCacheMessageWriteTimeoutTest.java | 13 -- 3 files changed, 47 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index a680a33..88721ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -303,7 +303,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { if (!accepted() && val instanceof GridNioRecoveryDescriptor) { outRecovery = (GridNioRecoveryDescriptor)val; -outRecovery.connected(); +outRecovery.onConnected(); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index c9d9bf7..c131cf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -424,13 +424,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (msg instanceof NodeIdMessage) { sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0); -connKey = new ConnectionKey(sndId, 0); +connKey = new ConnectionKey(sndId, 0, -1); } else { assert msg instanceof HandshakeMessage : msg; +HandshakeMessage msg0 = (HandshakeMessage)msg; + sndId = ((HandshakeMessage)msg).nodeId(); -connKey = new ConnectionKey(sndId, ((HandshakeMessage)msg).connectionIndex()); +connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount()); } if (log.isDebugEnabled()) @@ -470,8 +472,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (reserve) connectedNew(recoveryDesc, ses, true); else { -if (c.failed) -ses.close(); +if (c.failed) { +ses.send(new RecoveryLastReceivedMessage(-1)); + +for (GridNioSession ses0 : nioSrvr.sessions()) { +ConnectionKey key0 = ses0.meta(CONN_IDX_META); + +if (ses0.accepted() && key0 != null && +key0.nodeId().equals(connKey.nodeId()) && +key0.connectionIndex() == connKey.connectionIndex() && +key0.connectCount() < connKey.connectCount()) +ses0.close(); +} +} } } else { @@ -2369,7 +2382,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // Do not allow concurrent connects. GridFutureAdapter fut = new ConnectFuture(); -ConnectionKey connKey = new ConnectionKey(nodeId, connIdx); +ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1); GridFutureAdapter oldFut
ignite git commit: conn
Repository: ignite Updated Branches: refs/heads/ignite-comm-opts2 bb465cc9c -> c604e8cb2 conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c604e8cb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c604e8cb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c604e8cb Branch: refs/heads/ignite-comm-opts2 Commit: c604e8cb291bae294bf97d8eb13fc16b8cf8a12e Parents: bb465cc Author: sboikovAuthored: Wed Sep 21 11:04:40 2016 +0300 Committer: sboikov Committed: Wed Sep 21 11:04:40 2016 +0300 -- .../GridTcpCommunicationSpiAbstractTest.java| 28 -- ...mmunicationSpiConcurrentConnectSelfTest.java | 40 ++-- 2 files changed, 61 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/c604e8cb/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java -- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java index 076724d..3c4fea0 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java @@ -90,16 +90,36 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica super.afterTest(); for (CommunicationSpi spi : spis.values()) { -ConcurrentMap clients = U.field(spi, "clients"); +ConcurrentMap clients = U.field(spi, "clients"); + +for (int i = 0; i < 20; i++) { +GridCommunicationClient client0 = null; + +for (GridCommunicationClient[] clients0 : clients.values()) { +for (GridCommunicationClient client : clients0) { +if (client != null) { +client0 = client; + +break; +} +} + +if (client0 != null) +break; +} + +if (client0 == null) +return; -for (int i = 0; i < 20 && !clients.isEmpty(); i++) { info("Check failed for SPI [grid=" + -GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") + ", spi=" + spi + ']'); +GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") + +", client=" + client0 + +", spi=" + spi + ']'); U.sleep(1000); } -assert clients.isEmpty() : "Clients: " + clients; +fail("Failed to wait when clients are closed."); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/c604e8cb/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java -- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index bd66319..ed047fa 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; @@ -253,7 +254,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest() { +final Callable c = new Callable() { @Override public Void call() throws Exception { int idx0 = idx.getAndIncrement(); @@ -279,7 +280,40 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest threadsList = new ArrayList<>(); + +final AtomicBoolean fail = new
ignite git commit: conn
Repository: ignite Updated Branches: refs/heads/ignite-comm-opts2 c1d243699 -> bb465cc9c conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bb465cc9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bb465cc9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bb465cc9 Branch: refs/heads/ignite-comm-opts2 Commit: bb465cc9c4a8960fa218c6a1ce2b806ea24a11b3 Parents: c1d2436 Author: sboikovAuthored: Wed Sep 21 10:18:45 2016 +0300 Committer: sboikov Committed: Wed Sep 21 10:18:45 2016 +0300 -- .../internal/managers/communication/GridIoMessageFactory.java | 2 +- .../apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/bb465cc9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 908543c..711c03f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -755,7 +755,7 @@ public class GridIoMessageFactory implements MessageFactory { break; -// [-3..119] [124] - this +// [-3..119] [124-125] - this // [120..123] - DR // [-4..-22, -30..-35] - SQL default: http://git-wip-us.apache.org/repos/asf/ignite/blob/bb465cc9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index ee30420..c9d9bf7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -334,7 +334,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter public static final byte HANDSHAKE_MSG_TYPE = -3; /** */ -public static final byte HANDSHAKE_MSG_TYPE2 = -4; +public static final byte HANDSHAKE_MSG_TYPE2 = 125; /** */ private ConnectGateway connectGate;
[20/22] ignite git commit: conn
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f6ea2d82 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f6ea2d82 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f6ea2d82 Branch: refs/heads/ignite-comm-balance Commit: f6ea2d82d233b393eed6178e475cf47eee874a31 Parents: fb58fc4 Author: sboikovAuthored: Tue Sep 20 16:09:52 2016 +0300 Committer: sboikov Committed: Tue Sep 20 16:09:52 2016 +0300 -- .../tcp/GridTcpCommunicationSpiRecoverySelfTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/f6ea2d82/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java -- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java index dfa8a54..fbbc5de 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java @@ -742,9 +742,8 @@ public class GridTcpCommunicationSpiRecoverySelfTest spi.spiStop(); } -for (IgniteTestResources rsrcs : spiRsrcs) { +for (IgniteTestResources rsrcs : spiRsrcs) rsrcs.stopThreads(); -} spis.clear(); nodes.clear();
[16/22] ignite git commit: conn
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30133896 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30133896 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30133896 Branch: refs/heads/ignite-comm-balance Commit: 3013389609de337cfebeaea8be5a34cdd93136b9 Parents: f27d747 Author: sboikovAuthored: Mon Sep 19 20:54:35 2016 +0300 Committer: sboikov Committed: Mon Sep 19 20:54:35 2016 +0300 -- .../communication/GridIoMessageFactory.java | 5 + .../nio/GridAbstractCommunicationClient.java| 11 +- .../util/nio/GridCommunicationClient.java | 5 + .../util/nio/GridShmemCommunicationClient.java | 6 +- .../util/nio/GridTcpNioCommunicationClient.java | 8 +- .../communication/tcp/TcpCommunicationSpi.java | 702 --- 6 files changed, 476 insertions(+), 261 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 1eebfd4..908543c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -195,6 +195,11 @@ public class GridIoMessageFactory implements MessageFactory { break; +case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE2: +msg = new TcpCommunicationSpi.HandshakeMessage2(); + +break; + case 0: msg = new GridJobCancelRequest(); http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java index 9b014ec..37bc170 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java @@ -35,14 +35,23 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati /** Metrics listener. */ protected final GridNioMetricsListener metricsLsnr; +/** */ +private final int connIdx; + /** * @param metricsLsnr Metrics listener. */ -protected GridAbstractCommunicationClient(@Nullable GridNioMetricsListener metricsLsnr) { +protected GridAbstractCommunicationClient(int connIdx, @Nullable GridNioMetricsListener metricsLsnr) { +this.connIdx = connIdx; this.metricsLsnr = metricsLsnr; } /** {@inheritDoc} */ +@Override public int connectionIndex() { +return connIdx; +} + +/** {@inheritDoc} */ @Override public boolean close() { return reserves.compareAndSet(0, -1); } http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java index 0de54e9..312a20e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java @@ -107,4 +107,9 @@ public interface GridCommunicationClient { * @return {@code True} if send is asynchronous. */ public boolean async(); + +/** + * @return Connection index. + */ +public int connectionIndex(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
[21/22] ignite git commit: conn
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1d24369 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1d24369 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1d24369 Branch: refs/heads/ignite-comm-balance Commit: c1d243699b7dcdca4d1e052267770edd4981a87f Parents: f6ea2d8 Author: sboikovAuthored: Tue Sep 20 16:30:40 2016 +0300 Committer: sboikov Committed: Tue Sep 20 16:30:40 2016 +0300 -- .../IgniteVariousConnectionNumberTest.java | 161 +++ .../ignite/testsuites/IgniteCacheTestSuite.java | 3 + 2 files changed, 164 insertions(+) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/c1d24369/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java -- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java new file mode 100644 index 000..360eb8d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteVariousConnectionNumberTest extends GridCommonAbstractTest { +/** */ +private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + +/** */ +private static final int NODES = 6; + +/** */ +private static Random rnd = new Random(); + +/** */ +private boolean client; + +/** {@inheritDoc} */ +@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { +IgniteConfiguration cfg = super.getConfiguration(gridName); + +((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + +int connections = rnd.nextInt(10) + 1; + +log.info("Node connections [name=" + gridName + ", connections=" + connections + ']'); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setConnectionsPerNode(connections); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + +cfg.setClientMode(client); + +return cfg; +} + +/** {@inheritDoc} */ +@Override protected void beforeTestsStarted() throws Exception { +super.beforeTestsStarted(); + +long seed = U.currentTimeMillis(); + +rnd.setSeed(seed); + +log.info("Random seed: " + seed); +} + +/** {@inheritDoc} */ +@Override protected void afterTestsStopped() throws Exception { +stopAllGrids(); + +super.afterTestsStopped(); +} + +/** + * @throws Exception If failed. + */ +public void
[18/22] ignite git commit: conn
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4030ef88 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4030ef88 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4030ef88 Branch: refs/heads/ignite-comm-balance Commit: 4030ef886043360e846926f2a72d5a8a81393e21 Parents: 6e72f51 Author: sboikovAuthored: Tue Sep 20 11:49:10 2016 +0300 Committer: sboikov Committed: Tue Sep 20 14:37:06 2016 +0300 -- .../nio/GridAbstractCommunicationClient.java| 1 + .../util/nio/GridShmemCommunicationClient.java | 1 + .../util/nio/GridTcpNioCommunicationClient.java | 2 +- .../communication/tcp/TcpCommunicationSpi.java | 244 ++- ...eAtomicMessageRecovery10ConnectionsTest.java | 28 +++ .../IgniteCacheMessageRecoveryAbstractTest.java | 24 +- .../spi/GridTcpSpiForwardingSelfTest.java | 5 +- ...mmunicationSpiConcurrentConnectSelfTest.java | 17 +- .../GridTcpCommunicationSpiConfigSelfTest.java | 3 + ...cpCommunicationSpiMultithreadedSelfTest.java | 12 +- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 1 + ...GridTcpCommunicationSpiRecoverySelfTest.java | 1 + ...CommunicationRecoveryAckClosureSelfTest.java | 1 + .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + 14 files changed, 268 insertions(+), 74 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java index 37bc170..f2ab932 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java @@ -39,6 +39,7 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati private final int connIdx; /** + * @param connIdx Connection index. * @param metricsLsnr Metrics listener. */ protected GridAbstractCommunicationClient(int connIdx, @Nullable GridNioMetricsListener metricsLsnr) { http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index 74d58b2..d941bae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -48,6 +48,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien private final MessageFormatter formatter; /** + * @param connIdx Connection index. * @param metricsLsnr Metrics listener. * @param port Shared memory IPC server port. * @param connTimeout Connection timeout. http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index 90f17b9..fcb40c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -45,8 +45,8 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie private final IgniteLogger log; /** - * @param ses Session. * @param connIdx Connection index. + * @param ses Session. * @param log Logger. */ public GridTcpNioCommunicationClient( http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
[19/22] ignite git commit: conn
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fb58fc4f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fb58fc4f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fb58fc4f Branch: refs/heads/ignite-comm-balance Commit: fb58fc4fd17509a798bedfd02c391411989088e4 Parents: 4030ef8 Author: sboikovAuthored: Tue Sep 20 14:54:25 2016 +0300 Committer: sboikov Committed: Tue Sep 20 15:16:16 2016 +0300 -- .../ignite/internal/util/nio/GridNioServer.java | 3 +- .../communication/tcp/TcpCommunicationSpi.java | 115 +--- ...acheConnectionRecovery10ConnectionsTest.java | 35 + .../IgniteCacheMessageWriteTimeoutTest.java | 4 +- .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + .../testsuites/IgniteCacheTestSuite2.java | 130 +-- 6 files changed, 229 insertions(+), 60 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/fb58fc4f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 578c73e..83ed513e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -1473,7 +1473,8 @@ public class GridNioServer { MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY); sb.append("Connection info [") - .append("rmtAddr=").append(ses.remoteAddress()) +.append("in=").append(ses.accepted()) +.append(", rmtAddr=").append(ses.remoteAddress()) .append(", locAddr=").append(ses.localAddress()); GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor(); http://git-wip-us.apache.org/repos/asf/ignite/blob/fb58fc4f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 2d1a2b2..ee30420 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -339,6 +339,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** */ private ConnectGateway connectGate; +/** */ +private ConnectionPolicy connPlc; + /** Server listener. */ private final GridNioServerListener srvLsnr = new GridNioServerListenerAdapter() { @@ -374,7 +377,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (nodeClients != null) { for (GridCommunicationClient client : nodeClients) { if (client instanceof GridTcpNioCommunicationClient && -((GridTcpNioCommunicationClient) client).session() == ses) { + ((GridTcpNioCommunicationClient)client).session() == ses) { client.close(); removeNodeClient(id, client); @@ -1615,6 +1618,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter nioSrvr.dumpStats(); } +/** */ +private final ThreadLocal threadConnIdx = new ThreadLocal<>(); + +/** */ +private final AtomicInteger connIdx = new AtomicInteger(); + /** {@inheritDoc} */ @Override public Map getNodeAttributes() throws IgniteSpiException { initFailureDetectionTimeout(); @@ -1649,6 +1658,53 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'."); } +if (connectionsPerNode > 1) { +int idxMode = IgniteSystemProperties.getInteger("CONN_IDX_MODE", 0); + +switch (idxMode) { +case 0: { +connPlc = new ConnectionPolicy() { +@Override public int connectionIndex() { +return (int)(Thread.currentThread().getId() %
[2/2] ignite git commit: conn
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1d24369 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1d24369 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1d24369 Branch: refs/heads/ignite-comm-opts2 Commit: c1d243699b7dcdca4d1e052267770edd4981a87f Parents: f6ea2d8 Author: sboikovAuthored: Tue Sep 20 16:30:40 2016 +0300 Committer: sboikov Committed: Tue Sep 20 16:30:40 2016 +0300 -- .../IgniteVariousConnectionNumberTest.java | 161 +++ .../ignite/testsuites/IgniteCacheTestSuite.java | 3 + 2 files changed, 164 insertions(+) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/c1d24369/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java -- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java new file mode 100644 index 000..360eb8d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteVariousConnectionNumberTest extends GridCommonAbstractTest { +/** */ +private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + +/** */ +private static final int NODES = 6; + +/** */ +private static Random rnd = new Random(); + +/** */ +private boolean client; + +/** {@inheritDoc} */ +@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { +IgniteConfiguration cfg = super.getConfiguration(gridName); + +((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + +int connections = rnd.nextInt(10) + 1; + +log.info("Node connections [name=" + gridName + ", connections=" + connections + ']'); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setConnectionsPerNode(connections); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + +cfg.setClientMode(client); + +return cfg; +} + +/** {@inheritDoc} */ +@Override protected void beforeTestsStarted() throws Exception { +super.beforeTestsStarted(); + +long seed = U.currentTimeMillis(); + +rnd.setSeed(seed); + +log.info("Random seed: " + seed); +} + +/** {@inheritDoc} */ +@Override protected void afterTestsStopped() throws Exception { +stopAllGrids(); + +super.afterTestsStopped(); +} + +/** + * @throws Exception If failed. + */ +public void
ignite git commit: conn
Repository: ignite Updated Branches: refs/heads/ignite-comm-opts2 4030ef886 -> fb58fc4fd conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fb58fc4f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fb58fc4f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fb58fc4f Branch: refs/heads/ignite-comm-opts2 Commit: fb58fc4fd17509a798bedfd02c391411989088e4 Parents: 4030ef8 Author: sboikovAuthored: Tue Sep 20 14:54:25 2016 +0300 Committer: sboikov Committed: Tue Sep 20 15:16:16 2016 +0300 -- .../ignite/internal/util/nio/GridNioServer.java | 3 +- .../communication/tcp/TcpCommunicationSpi.java | 115 +--- ...acheConnectionRecovery10ConnectionsTest.java | 35 + .../IgniteCacheMessageWriteTimeoutTest.java | 4 +- .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + .../testsuites/IgniteCacheTestSuite2.java | 130 +-- 6 files changed, 229 insertions(+), 60 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/fb58fc4f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 578c73e..83ed513e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -1473,7 +1473,8 @@ public class GridNioServer { MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY); sb.append("Connection info [") - .append("rmtAddr=").append(ses.remoteAddress()) +.append("in=").append(ses.accepted()) +.append(", rmtAddr=").append(ses.remoteAddress()) .append(", locAddr=").append(ses.localAddress()); GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor(); http://git-wip-us.apache.org/repos/asf/ignite/blob/fb58fc4f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 2d1a2b2..ee30420 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -339,6 +339,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** */ private ConnectGateway connectGate; +/** */ +private ConnectionPolicy connPlc; + /** Server listener. */ private final GridNioServerListener srvLsnr = new GridNioServerListenerAdapter() { @@ -374,7 +377,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (nodeClients != null) { for (GridCommunicationClient client : nodeClients) { if (client instanceof GridTcpNioCommunicationClient && -((GridTcpNioCommunicationClient) client).session() == ses) { + ((GridTcpNioCommunicationClient)client).session() == ses) { client.close(); removeNodeClient(id, client); @@ -1615,6 +1618,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter nioSrvr.dumpStats(); } +/** */ +private final ThreadLocal threadConnIdx = new ThreadLocal<>(); + +/** */ +private final AtomicInteger connIdx = new AtomicInteger(); + /** {@inheritDoc} */ @Override public Map getNodeAttributes() throws IgniteSpiException { initFailureDetectionTimeout(); @@ -1649,6 +1658,53 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'."); } +if (connectionsPerNode > 1) { +int idxMode = IgniteSystemProperties.getInteger("CONN_IDX_MODE", 0); + +switch (idxMode) { +case 0: { +connPlc = new ConnectionPolicy() { +@Override public int
ignite git commit: conn
Repository: ignite Updated Branches: refs/heads/ignite-comm-opts2 6e72f5198 -> 4030ef886 conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4030ef88 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4030ef88 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4030ef88 Branch: refs/heads/ignite-comm-opts2 Commit: 4030ef886043360e846926f2a72d5a8a81393e21 Parents: 6e72f51 Author: sboikovAuthored: Tue Sep 20 11:49:10 2016 +0300 Committer: sboikov Committed: Tue Sep 20 14:37:06 2016 +0300 -- .../nio/GridAbstractCommunicationClient.java| 1 + .../util/nio/GridShmemCommunicationClient.java | 1 + .../util/nio/GridTcpNioCommunicationClient.java | 2 +- .../communication/tcp/TcpCommunicationSpi.java | 244 ++- ...eAtomicMessageRecovery10ConnectionsTest.java | 28 +++ .../IgniteCacheMessageRecoveryAbstractTest.java | 24 +- .../spi/GridTcpSpiForwardingSelfTest.java | 5 +- ...mmunicationSpiConcurrentConnectSelfTest.java | 17 +- .../GridTcpCommunicationSpiConfigSelfTest.java | 3 + ...cpCommunicationSpiMultithreadedSelfTest.java | 12 +- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 1 + ...GridTcpCommunicationSpiRecoverySelfTest.java | 1 + ...CommunicationRecoveryAckClosureSelfTest.java | 1 + .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + 14 files changed, 268 insertions(+), 74 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java index 37bc170..f2ab932 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java @@ -39,6 +39,7 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati private final int connIdx; /** + * @param connIdx Connection index. * @param metricsLsnr Metrics listener. */ protected GridAbstractCommunicationClient(int connIdx, @Nullable GridNioMetricsListener metricsLsnr) { http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index 74d58b2..d941bae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -48,6 +48,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien private final MessageFormatter formatter; /** + * @param connIdx Connection index. * @param metricsLsnr Metrics listener. * @param port Shared memory IPC server port. * @param connTimeout Connection timeout. http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index 90f17b9..fcb40c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -45,8 +45,8 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie private final IgniteLogger log; /** - * @param ses Session. * @param connIdx Connection index. + * @param ses Session. * @param log Logger. */ public GridTcpNioCommunicationClient( http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java -- diff --git
ignite git commit: conn
Repository: ignite Updated Branches: refs/heads/ignite-comm-opts2 f27d7471f -> 301338960 conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30133896 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30133896 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30133896 Branch: refs/heads/ignite-comm-opts2 Commit: 3013389609de337cfebeaea8be5a34cdd93136b9 Parents: f27d747 Author: sboikovAuthored: Mon Sep 19 20:54:35 2016 +0300 Committer: sboikov Committed: Mon Sep 19 20:54:35 2016 +0300 -- .../communication/GridIoMessageFactory.java | 5 + .../nio/GridAbstractCommunicationClient.java| 11 +- .../util/nio/GridCommunicationClient.java | 5 + .../util/nio/GridShmemCommunicationClient.java | 6 +- .../util/nio/GridTcpNioCommunicationClient.java | 8 +- .../communication/tcp/TcpCommunicationSpi.java | 702 --- 6 files changed, 476 insertions(+), 261 deletions(-) -- http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 1eebfd4..908543c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -195,6 +195,11 @@ public class GridIoMessageFactory implements MessageFactory { break; +case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE2: +msg = new TcpCommunicationSpi.HandshakeMessage2(); + +break; + case 0: msg = new GridJobCancelRequest(); http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java index 9b014ec..37bc170 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java @@ -35,14 +35,23 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati /** Metrics listener. */ protected final GridNioMetricsListener metricsLsnr; +/** */ +private final int connIdx; + /** * @param metricsLsnr Metrics listener. */ -protected GridAbstractCommunicationClient(@Nullable GridNioMetricsListener metricsLsnr) { +protected GridAbstractCommunicationClient(int connIdx, @Nullable GridNioMetricsListener metricsLsnr) { +this.connIdx = connIdx; this.metricsLsnr = metricsLsnr; } /** {@inheritDoc} */ +@Override public int connectionIndex() { +return connIdx; +} + +/** {@inheritDoc} */ @Override public boolean close() { return reserves.compareAndSet(0, -1); } http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java index 0de54e9..312a20e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java @@ -107,4 +107,9 @@ public interface GridCommunicationClient { * @return {@code True} if send is asynchronous. */ public boolean async(); + +/** + * @return Connection index. + */ +public int connectionIndex(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java -- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java