This is an automated email from the ASF dual-hosted git repository. gosullivan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new f8740c8 GEODE-3080 Add a multiple-connections-per-thread test for new protocol. f8740c8 is described below commit f8740c88b5b04231417c63f3fd9ab9c9594c4538 Author: Galen O'Sullivan <gosulli...@pivotal.io> AuthorDate: Fri Sep 8 09:57:27 2017 -0700 GEODE-3080 Add a multiple-connections-per-thread test for new protocol. Signed-off-by: Hitesh Khamesra <hkames...@pivotal.io> --- .../acceptance/CacheMaxConnectionJUnitTest.java | 146 ++++++++++++++++----- 1 file changed, 113 insertions(+), 33 deletions(-) diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/CacheMaxConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/CacheMaxConnectionJUnitTest.java index 3c81608..8f95573 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/CacheMaxConnectionJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/CacheMaxConnectionJUnitTest.java @@ -16,12 +16,20 @@ package org.apache.geode.protocol.acceptance; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; -import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.awaitility.Awaitility; @@ -31,7 +39,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.contrib.java.lang.system.RestoreSystemProperties; import org.junit.experimental.categories.Category; -import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; import org.apache.geode.cache.Cache; @@ -44,8 +51,12 @@ import org.apache.geode.internal.cache.CacheServerImpl; import org.apache.geode.internal.cache.tier.CommunicationMode; import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl; import org.apache.geode.internal.net.SocketCreatorFactory; +import org.apache.geode.internal.protocol.protobuf.ClientProtocol; +import org.apache.geode.protocol.MessageUtil; +import org.apache.geode.protocol.exception.InvalidProtocolMessageException; import org.apache.geode.protocol.protobuf.ProtobufSerializationService; -import org.apache.geode.serialization.SerializationService; +import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer; +import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities; import org.apache.geode.test.junit.categories.IntegrationTest; /** @@ -53,6 +64,9 @@ import org.apache.geode.test.junit.categories.IntegrationTest; */ @Category(IntegrationTest.class) public class CacheMaxConnectionJUnitTest { + private static final String TEST_KEY = "testKey"; + private static final String TEST_VALUE = "testValue"; + private static final int TEST_PUT_CORRELATION_ID = 12355; private final String TEST_REGION = "testRegion"; @@ -66,6 +80,8 @@ public class CacheMaxConnectionJUnitTest { @Rule public TestName testName = new TestName(); + private ProtobufSerializationService serializationService; + private ProtobufProtocolSerializer protobufProtocolSerializer; @Before public void setup() throws Exception { @@ -90,6 +106,9 @@ public class CacheMaxConnectionJUnitTest { Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); outputStream = socket.getOutputStream(); outputStream.write(110); + + serializationService = new ProtobufSerializationService(); + protobufProtocolSerializer = new ProtobufProtocolSerializer(); } @After @@ -100,60 +119,121 @@ public class CacheMaxConnectionJUnitTest { } @Test - public void testNewProtocolRespectsMaxConnectionLimit() throws IOException, InterruptedException { - cache.getDistributedSystem().disconnect(); + public void testNewProtocolRespectsMaxConnectionLimit_notSelector() throws Exception { + testNewProtocolRespectsMaxConnectionLimit(0, false); + } - CacheFactory cacheFactory = new CacheFactory(); - cacheFactory.set(ConfigurationProperties.LOCATORS, ""); - cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0"); - cache = cacheFactory.create(); + @Test + public void testNewProtocolRespectsMaxConnectionLimit_isSelector() throws Exception { + testNewProtocolRespectsMaxConnectionLimit(4, true); + } + + private void testNewProtocolRespectsMaxConnectionLimit(int threads, boolean isSelector) + throws Exception { + final int connections = 16; + + List<CacheServer> cacheServers = cache.getCacheServers(); + assertEquals(1, cacheServers.size()); + cacheServers.get(0).stop(); CacheServer cacheServer = cache.addCacheServer(); final int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort(); cacheServer.setPort(cacheServerPort); - cacheServer.setMaxConnections(16); - cacheServer.setMaxThreads(16); + cacheServer.setMaxConnections(connections); + cacheServer.setMaxThreads(threads); cacheServer.start(); AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor(); - // Start 16 sockets, which is exactly the maximum that the server will support. - Socket[] sockets = new Socket[16]; - for (int i = 0; i < 16; i++) { - Socket socket = new Socket("localhost", cacheServerPort); - sockets[i] = socket; - Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); - socket.getOutputStream() - .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); + if (isSelector) { + assertTrue(acceptor.isSelector()); + } else { + assertFalse(acceptor.isSelector()); } - // try to start a new socket, expecting it to be disconnected. - try (Socket socket = new Socket("localhost", cacheServerPort)) { - Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); - socket.getOutputStream() - .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); - assertEquals(-1, socket.getInputStream().read()); // EOF implies disconnected. - } + validateSocketCreationAndDestruction(cacheServerPort, connections); - for (Socket currentSocket : sockets) { - currentSocket.close(); - } + // Once all connections are closed, the acceptor should have a connection count of 0. + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> acceptor.getClientServerCnxCount() == 0); + + // do it again, just to be sure there's no leak somewhere else. + validateSocketCreationAndDestruction(cacheServerPort, connections); // Once all connections are closed, the acceptor should have a connection count of 0. Awaitility.await().atMost(5, TimeUnit.SECONDS) .until(() -> acceptor.getClientServerCnxCount() == 0); - // Try to start 16 new connections, again at the limit. - for (int i = 0; i < 16; i++) { - Socket socket = new Socket("localhost", cacheServerPort); - sockets[i] = socket; + } + + // Start exactly as many that the server will support, check that they work. + // test that creating one more causes it to be disconnected. + // Close all the sockets when we're done. + private void validateSocketCreationAndDestruction(int cacheServerPort, int connections) + throws Exception { + final Socket[] sockets = new Socket[connections]; + + ExecutorService executor = Executors.newFixedThreadPool(20); + + // Used to assert the exception is non-null. + ArrayList<Callable<Exception>> callables = new ArrayList<>(); + + for (int i = 0; i < connections; i++) { + final int j = i; + callables.add(() -> { + try { + Socket socket = new Socket("localhost", cacheServerPort); + sockets[j] = socket; + + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); + OutputStream outputStream = socket.getOutputStream(); + outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); + + ClientProtocol.Message putMessage = + MessageUtil.makePutRequestMessage(serializationService, TEST_KEY, TEST_VALUE, + TEST_REGION, ProtobufUtilities.createMessageHeader(TEST_PUT_CORRELATION_ID)); + protobufProtocolSerializer.serialize(putMessage, outputStream); + validatePutResponse(socket, protobufProtocolSerializer); + } catch (Exception e) { + return e; + } + return null; + }); + } + List<Future<Exception>> futures = executor.invokeAll(callables); + + for (Future<Exception> f : futures) { + assertNull(f.get()); + } + + // try to start a new socket, expecting it to be disconnected. + try (Socket socket = new Socket("localhost", cacheServerPort)) { Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); socket.getOutputStream() .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber()); + assertEquals(-1, socket.getInputStream().read()); // EOF implies disconnected. } for (Socket currentSocket : sockets) { currentSocket.close(); } } + + private void validatePutResponse(Socket socket, + ProtobufProtocolSerializer protobufProtocolSerializer) throws Exception { + ClientProtocol.Response response = + deserializeResponse(socket, protobufProtocolSerializer, TEST_PUT_CORRELATION_ID); + assertEquals(ClientProtocol.Response.ResponseAPICase.PUTRESPONSE, + response.getResponseAPICase()); + } + + private ClientProtocol.Response deserializeResponse(Socket socket, + ProtobufProtocolSerializer protobufProtocolSerializer, int expectedCorrelationId) + throws InvalidProtocolMessageException, IOException { + ClientProtocol.Message message = + protobufProtocolSerializer.deserialize(socket.getInputStream()); + assertEquals(expectedCorrelationId, message.getMessageHeader().getCorrelationId()); + assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase()); + return message.getResponse(); + } } -- To stop receiving notification emails like this one, please contact ['"commits@geode.apache.org" <commits@geode.apache.org>'].