This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new e25fba2a13 GEODE-10271: ConnectionProxyJUnitTest cleanup (#7652) e25fba2a13 is described below commit e25fba2a13f15e77af6aafabbe6921b919b95d19 Author: Jinmei Liao <jil...@pivotal.io> AuthorDate: Wed May 11 10:48:23 2022 -0700 GEODE-10271: ConnectionProxyJUnitTest cleanup (#7652) --- .../sockets/ConnectionProxyIntegrationTest.java | 395 +++++++++++ .../tier/sockets/ConnectionProxyJUnitTest.java | 771 --------------------- 2 files changed, 395 insertions(+), 771 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ConnectionProxyIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ConnectionProxyIntegrationTest.java new file mode 100644 index 0000000000..212c983bfb --- /dev/null +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ConnectionProxyIntegrationTest.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +/* + * Created on Feb 3, 2006 + * + */ +package org.apache.geode.internal.cache.tier.sockets; + +import static org.apache.geode.cache.client.PoolManager.createFactory; +import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.time.Duration; +import java.util.Map; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.client.NoAvailableServersException; +import org.apache.geode.cache.client.PoolFactory; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.cache.client.internal.PoolImpl; +import org.apache.geode.cache.client.internal.QueueStateImpl.SequenceIdAndExpirationObject; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.ha.ThreadIdentifier; +import org.apache.geode.test.junit.categories.ClientSubscriptionTest; +import org.apache.geode.test.junit.rules.ServerStarterRule; + +/** + * Tests the functionality of operations of AbstractConnectionProxy & its derived classes. + */ +@Category({ClientSubscriptionTest.class}) +public class ConnectionProxyIntegrationTest { + DistributedSystem system; + Cache cache; + PoolImpl proxy = null; + SequenceIdAndExpirationObject seo = null; + CacheServer server = null; + + final Duration timeoutToVerifyExpiry = Duration.ofSeconds(30); + final Duration timeoutToVerifyAckSend = Duration.ofSeconds(30); + + @Rule + public ServerStarterRule serverStarter = + new ServerStarterRule().withNoCacheServer().withAutoStart(); + + @Before + public void setUp() throws Exception { + cache = serverStarter.getCache(); + system = cache.getDistributedSystem(); + } + + @After + public void after() throws Exception { + if (server != null) { + server.stop(); + } + } + + @Test + public void connectedServerCount() throws Exception { + int port3 = getRandomAvailableTCPPort(); + + PoolFactory poolFactory = PoolManager.createFactory(); + poolFactory.addServer("localhost", port3); + poolFactory.setSubscriptionEnabled(false); + poolFactory.setReadTimeout(2000); + poolFactory.setMinConnections(1); + poolFactory.setSocketBufferSize(32768); + poolFactory.setRetryAttempts(1); + poolFactory.setPingInterval(500); + proxy = (PoolImpl) poolFactory.create("clientPool"); + + assertThatThrownBy(() -> proxy.acquireConnection()) + .isInstanceOf(NoAvailableServersException.class); + + assertThat(proxy.getConnectedServerCount()).isEqualTo(0); + addCacheServer(port3, 15000); + await().untilAsserted(() -> { + assertThat(proxy.getConnectedServerCount()).isEqualTo(1); + }); + + } + + @Test + public void threadIdToSequenceIdMapCreation() throws Exception { + int port3 = getRandomAvailableTCPPort(); + addCacheServer(port3, 10000); + + PoolFactory poolFactory = PoolManager.createFactory(); + poolFactory.addServer("localhost", port3); + poolFactory.setSubscriptionEnabled(true); + poolFactory.setSubscriptionRedundancy(-1); + proxy = (PoolImpl) poolFactory.create("clientPool"); + assertThat(proxy.getThreadIdToSequenceIdMap()).isNotNull(); + } + + @Test + public void threadIdToSequenceIdMapExpiryPositive() throws Exception { + int port3 = getRandomAvailableTCPPort(); + addCacheServer(port3, 10000); + + PoolFactory poolFactory = PoolManager.createFactory(); + poolFactory.addServer("localhost", port3); + poolFactory.setSubscriptionEnabled(true); + poolFactory.setSubscriptionRedundancy(-1); + poolFactory.setSubscriptionMessageTrackingTimeout(4000); + poolFactory.setSubscriptionAckInterval(2000); + proxy = (PoolImpl) poolFactory.create("clientPool"); + + EventID eventID = new EventID(new byte[0], 1, 1); + assertThat(proxy.verifyIfDuplicate(eventID)) + .describedAs(" eventID should not be duplicate as it is a new entry") + .isFalse(); + + verifyExpiry(); + + assertThat(proxy.verifyIfDuplicate(eventID)) + .describedAs(" eventID should not be duplicate as it is a new entry") + .isFalse(); + } + + + @Test + public void threadIdToSequenceIdMapExpiryNegative() throws Exception { + int port3 = getRandomAvailableTCPPort(); + addCacheServer(port3, 10000); + + PoolFactory poolFactory = createFactory(); + poolFactory.addServer("localhost", port3); + poolFactory.setSubscriptionEnabled(true); + poolFactory.setSubscriptionRedundancy(-1); + poolFactory.setSubscriptionMessageTrackingTimeout(10000); + + proxy = (PoolImpl) poolFactory.create("clientPool"); + + final EventID eventID = new EventID(new byte[0], 1, 1); + assertThat(proxy.verifyIfDuplicate(eventID)) + .describedAs(" eventID should not be duplicate as it is a new entry") + .isFalse(); + + await().untilAsserted(() -> assertThat(proxy.verifyIfDuplicate(eventID)).isTrue()); + } + + @Test + public void threadIdToSequenceIdMapConcurrency() throws Exception { + int port3 = getRandomAvailableTCPPort(); + addCacheServer(port3, 10000); + + PoolFactory poolFactory = PoolManager.createFactory(); + poolFactory.addServer("localhost", port3); + poolFactory.setSubscriptionEnabled(true); + poolFactory.setSubscriptionRedundancy(-1); + poolFactory.setSubscriptionMessageTrackingTimeout(5000); + poolFactory.setSubscriptionAckInterval(2000); + proxy = (PoolImpl) poolFactory.create("clientPool"); + + final int EVENT_ID_COUNT = 10000; // why 10,000? + EventID[] eventIds = new EventID[EVENT_ID_COUNT]; + for (int i = 0; i < EVENT_ID_COUNT; i++) { + eventIds[i] = new EventID(new byte[0], i, i); + assertThat(proxy.verifyIfDuplicate(eventIds[i])) + .describedAs("eventIds can never be duplicate, it is being created for the first time!") + .isFalse(); + } + verifyExpiry(); + + for (int i = 0; i < EVENT_ID_COUNT; i++) { + assertThat(proxy.verifyIfDuplicate(eventIds[i])) + .describedAs( + "eventIds can not be found to be duplicate since the entry should have expired! " + i) + .isFalse(); + } + } + + + @Test + public void duplicateSeqIdLesserThanCurrentSeqIdBeingIgnored() throws Exception { + int port3 = getRandomAvailableTCPPort(); + addCacheServer(port3, 10000); + + PoolFactory poolFactory = PoolManager.createFactory(); + poolFactory.addServer("localhost", port3); + poolFactory.setSubscriptionEnabled(true); + poolFactory.setSubscriptionRedundancy(-1); + poolFactory.setSubscriptionMessageTrackingTimeout(100000); + proxy = (PoolImpl) poolFactory.create("clientPool"); + + EventID eventId1 = new EventID(new byte[0], 1, 5); + assertThat(proxy.verifyIfDuplicate(eventId1)) + .describedAs("eventId1 can never be duplicate, it is being created for the first time!") + .isFalse(); + + EventID eventId2 = new EventID(new byte[0], 1, 2); + + assertThat(proxy.verifyIfDuplicate(eventId2)) + .describedAs("eventId2 should be duplicate, seqId is less than highest (5)") + .isTrue(); + + EventID eventId3 = new EventID(new byte[0], 1, 3); + + assertThat(proxy.verifyIfDuplicate(eventId3)) + .describedAs("eventId3 should be duplicate, seqId is less than highest (5)") + .isTrue(); + + assertThat(!proxy.getThreadIdToSequenceIdMap().isEmpty()).isTrue(); + proxy.destroy(); + } + + + @Test + public void cleanCloseOfThreadIdToSeqId() throws Exception { + int port3 = getRandomAvailableTCPPort(); + addCacheServer(port3, 10000); + + PoolFactory poolFactory = PoolManager.createFactory(); + poolFactory.addServer("localhost", port3); + poolFactory.setSubscriptionEnabled(true); + poolFactory.setSubscriptionRedundancy(-1); + poolFactory.setSubscriptionMessageTrackingTimeout(100000); + proxy = (PoolImpl) poolFactory.create("clientPool"); + + EventID eventID1 = new EventID(new byte[0], 1, 2); + + assertThat(proxy.verifyIfDuplicate(eventID1)) + .describedAs("eventID1 can never be duplicate, it is being created for the first time!") + .isFalse(); + + EventID eventID2 = new EventID(new byte[0], 1, 3); + assertThat(proxy.verifyIfDuplicate(eventID2)) + .describedAs("eventID2 can never be duplicate, since sequenceId is greater ") + .isFalse(); + + assertThat(proxy.verifyIfDuplicate(eventID2)) + .describedAs("eventID2 had to be a duplicate, since sequenceId is equal ") + .isTrue(); + + EventID eventID3 = new EventID(new byte[0], 1, 1); + assertThat(proxy.verifyIfDuplicate(eventID3)) + .describedAs("eventId3 had to be a duplicate, since sequenceId is lesser") + .isTrue(); + } + + @Test + public void twoClientsHavingDifferentThreadIdMaps() throws Exception { + int port3 = getRandomAvailableTCPPort(); + addCacheServer(port3, 10000); + + PoolFactory poolFactory = PoolManager.createFactory(); + poolFactory.addServer("localhost", port3); + poolFactory.setSubscriptionEnabled(true); + poolFactory.setSubscriptionRedundancy(-1); + poolFactory.setSubscriptionMessageTrackingTimeout(100000); + + PoolImpl proxy1 = (PoolImpl) poolFactory.create("clientPool1"); + try { + PoolImpl proxy2 = (PoolImpl) poolFactory.create("clientPool2"); + try { + Map map1 = proxy1.getThreadIdToSequenceIdMap(); + Map map2 = proxy2.getThreadIdToSequenceIdMap(); + assertThat(map1 == map2).isFalse(); + } finally { + proxy2.destroy(); + } + } finally { + proxy1.destroy(); + } + } + + @Test + public void periodicAckSendByClient() throws Exception { + int port = getRandomAvailableTCPPort(); + addCacheServer(port, null); + + PoolFactory poolFactory = PoolManager.createFactory(); + poolFactory.addServer("localhost", port); + poolFactory.setSubscriptionEnabled(true); + poolFactory.setSubscriptionRedundancy(1); + poolFactory.setReadTimeout(20000); + poolFactory.setSubscriptionMessageTrackingTimeout(15000); + poolFactory.setSubscriptionAckInterval(5000); + + proxy = (PoolImpl) poolFactory.create("clientPool"); + + EventID eventID = new EventID(new byte[0], 1, 1); + + assertThat(proxy.verifyIfDuplicate(eventID)) + .describedAs("eventID should not be duplicate as it is a new entry") + .isFalse(); + + seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap() + .get(new ThreadIdentifier(new byte[0], 1)); + assertThat(seo.getAckSend()).isFalse(); + + // should send the ack to server + seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap() + .get(new ThreadIdentifier(new byte[0], 1)); + verifyAckSend(true); + + // New update on same threadId + eventID = new EventID(new byte[0], 1, 2); + assertThat(proxy.verifyIfDuplicate(eventID)) + .describedAs("eventID should not be duplicate as it is a new entry") + .isFalse(); + + seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap() + .get(new ThreadIdentifier(new byte[0], 1)); + assertThat(seo.getAckSend()).isFalse(); + + // should send another ack to server + seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap() + .get(new ThreadIdentifier(new byte[0], 1)); + verifyAckSend(true); + + // should expire with the this mentioned. + verifyExpiry(); + } + + // No ack will be send if Redundancy level = 0 + @Test + public void noAckSendByClient() throws Exception { + int port = getRandomAvailableTCPPort(); + addCacheServer(port, null); + + PoolFactory poolFactory = PoolManager.createFactory(); + poolFactory.addServer("localhost", port); + poolFactory.setSubscriptionEnabled(true); + poolFactory.setSubscriptionRedundancy(1); + poolFactory.setReadTimeout(20000); + poolFactory.setSubscriptionMessageTrackingTimeout(8000); + poolFactory.setSubscriptionAckInterval(2000); + + proxy = (PoolImpl) poolFactory.create("clientPool"); + + EventID eventID = new EventID(new byte[0], 1, 1); + assertThat(proxy.verifyIfDuplicate(eventID)) + .describedAs("eventID should not be duplicate as it is a new entry") + .isFalse(); + + seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap() + .get(new ThreadIdentifier(new byte[0], 1)); + assertThat(seo.getAckSend()).isFalse(); + + // should not send an ack as redundancy level = 0; + seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap() + .get(new ThreadIdentifier(new byte[0], 1)); + verifyAckSend(false); + + // should expire without sending an ack as redundancy level = 0. + verifyExpiry(); + } + + private void verifyAckSend(final boolean expectedAckSend) { + await().timeout(timeoutToVerifyAckSend).untilAsserted(() -> { + assertThat(seo.getAckSend()).isEqualTo(expectedAckSend); + }); + } + + private void verifyExpiry() { + await().timeout(timeoutToVerifyExpiry).untilAsserted(() -> { + assertThat(proxy.getThreadIdToSequenceIdMap().size()).isEqualTo(0); + }); + } + + // start the server + private void addCacheServer(int serverPort, Integer maxBetweenPings) throws IOException { + server = cache.addCacheServer(); + if (maxBetweenPings != null) { + server.setMaximumTimeBetweenPings(maxBetweenPings); + } + server.setPort(serverPort); + server.start(); + } +} diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ConnectionProxyJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ConnectionProxyJUnitTest.java deleted file mode 100644 index d08606ad09..0000000000 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ConnectionProxyJUnitTest.java +++ /dev/null @@ -1,771 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -/* - * Created on Feb 3, 2006 - * - */ -package org.apache.geode.internal.cache.tier.sockets; - -import static org.apache.geode.cache.client.PoolManager.createFactory; -import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; -import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; -import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.time.Duration; -import java.util.Map; -import java.util.Properties; - -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.geode.cache.AttributesFactory; -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.EntryEvent; -import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionAttributes; -import org.apache.geode.cache.Scope; -import org.apache.geode.cache.client.PoolFactory; -import org.apache.geode.cache.client.PoolManager; -import org.apache.geode.cache.client.internal.Connection; -import org.apache.geode.cache.client.internal.PoolImpl; -import org.apache.geode.cache.client.internal.PutOp; -import org.apache.geode.cache.client.internal.QueueStateImpl.SequenceIdAndExpirationObject; -import org.apache.geode.cache.server.CacheServer; -import org.apache.geode.cache.util.CacheListenerAdapter; -import org.apache.geode.distributed.DistributedSystem; -import org.apache.geode.internal.cache.EntryEventImpl; -import org.apache.geode.internal.cache.EventID; -import org.apache.geode.internal.cache.ha.ThreadIdentifier; -import org.apache.geode.test.awaitility.GeodeAwaitility; -import org.apache.geode.test.junit.categories.ClientSubscriptionTest; - -/** - * - * Tests the functionality of operations of AbstractConnectionProxy & its derived classes. - */ -@Category({ClientSubscriptionTest.class}) -public class ConnectionProxyJUnitTest { - private static final String expectedRedundantErrorMsg = - "Could not find any server to host redundant client queue."; - private static final String expectedPrimaryErrorMsg = - "Could not find any server to host primary client queue."; - - DistributedSystem system; - - Cache cache; - - PoolImpl proxy = null; - - SequenceIdAndExpirationObject seo = null; - - final Duration timeoutToVerifyExpiry = Duration.ofSeconds(30); - final Duration timeoutToVerifyAckSend = Duration.ofSeconds(30); - - @Before - public void setUp() throws Exception { - - Properties p = new Properties(); - p.setProperty(MCAST_PORT, "0"); - p.setProperty(LOCATORS, ""); - system = DistributedSystem.connect(p); - cache = CacheFactory.create(system); - final String addExpectedPEM = - "<ExpectedException action=add>" + expectedPrimaryErrorMsg + "</ExpectedException>"; - final String addExpectedREM = - "<ExpectedException action=add>" + expectedRedundantErrorMsg + "</ExpectedException>"; - system.getLogWriter().info(addExpectedPEM); - system.getLogWriter().info(addExpectedREM); - } - - @After - public void tearDown() throws Exception { - cache.close(); - - final String removeExpectedPEM = - "<ExpectedException action=remove>" + expectedPrimaryErrorMsg + "</ExpectedException>"; - final String removeExpectedREM = - "<ExpectedException action=remove>" + expectedRedundantErrorMsg + "</ExpectedException>"; - - system.getLogWriter().info(removeExpectedPEM); - system.getLogWriter().info(removeExpectedREM); - - system.disconnect(); - if (proxy != null) { - proxy.destroy(); - } - } - - /** - * This test verifies the behaviour of client request when the listener on the server sits - * forever. This is done in following steps:<br> - * 1)create server<br> - * 2)initialize proxy object and create region for client having a CacheListener and make - * afterCreate in the listener to wait infinitely<br> - * 3)perform a PUT on client by acquiring Connection through proxy<br> - * 4)Verify that exception occurs due to infinite wait in the listener<br> - * 5)Verify that above exception occurs sometime after the readTimeout configured for the client - * <br> - * - */ - @Ignore - @Test - public void testListenerOnServerSitForever() throws Exception { - int port3 = getRandomAvailableTCPPort(); - Region testRegion = null; - - CacheServer server = cache.addCacheServer(); - server.setMaximumTimeBetweenPings(10000); - server.setPort(port3); - server.start(); - - try { - PoolFactory pf = PoolManager.createFactory(); - pf.addServer("localhost", port3); - pf.setSubscriptionEnabled(false); - pf.setSubscriptionRedundancy(-1); - pf.setReadTimeout(2000); - pf.setSocketBufferSize(32768); - pf.setRetryAttempts(1); - pf.setPingInterval(10000); - - proxy = (PoolImpl) pf.create("clientPool"); - - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - factory.setCacheListener(new CacheListenerAdapter() { - @Override - public void afterCreate(EntryEvent event) { - synchronized (ConnectionProxyJUnitTest.this) { - try { - ConnectionProxyJUnitTest.this.wait(); - } catch (InterruptedException e) { - fail("interrupted"); - } - } - } - }); - RegionAttributes attrs = factory.create(); - testRegion = cache.createRegion("testregion", attrs); - - } catch (Exception ex) { - ex.printStackTrace(); - fail("Failed to initialize client"); - } - Connection conn = (proxy).acquireConnection(); - long t1 = 0; - try { - t1 = System.currentTimeMillis(); - EntryEventImpl event = new EntryEventImpl((Object) null, false); - try { - event.setEventId(new EventID(new byte[] {1}, 1, 1)); - PutOp.execute(conn, proxy, testRegion.getFullPath(), "key1", "val1", event, null, false); - } finally { - event.release(); - } - fail("Test failed as exception was expected"); - } catch (Exception e) { - long t2 = System.currentTimeMillis(); - long net = (t2 - t1); - assertTrue(net / 1000 < 5); - } - synchronized (this) { - notify(); - } - } - - /** - * Tests the DeadServerMonitor when identifying an Endpoint as alive , does not create a - * persistent Ping connection ( i.e sends a CLOSE protocol , if the number of connections is zero. - */ - @Test - public void testDeadServerMonitorPingNature1() { - int port3 = getRandomAvailableTCPPort(); - - // final int maxWaitTime = 10000; - try { - PoolFactory pf = PoolManager.createFactory(); - pf.addServer("localhost", port3); - pf.setSubscriptionEnabled(false); - pf.setReadTimeout(2000); - pf.setMinConnections(1); - pf.setSocketBufferSize(32768); - pf.setRetryAttempts(1); - pf.setPingInterval(500); - - proxy = (PoolImpl) pf.create("clientPool"); - } catch (Exception ex) { - ex.printStackTrace(); - fail("Failed to initialize client"); - } - try { - (proxy).acquireConnection(); - } catch (Exception ok) { - ok.printStackTrace(); - } - - try { - (proxy).acquireConnection(); - } catch (Exception ok) { - ok.printStackTrace(); - } - - // long start = System.currentTimeMillis(); - assertEquals(0, proxy.getConnectedServerCount()); - // start the server - CacheServer server = null; - try { - try { - server = cache.addCacheServer(); - server.setMaximumTimeBetweenPings(15000); - server.setPort(port3); - server.start(); - } catch (Exception e) { - e.printStackTrace(); - fail("Failed to create server"); - } - GeodeAwaitility.await().untilAsserted(() -> { - assertEquals(1, proxy.getConnectedServerCount()); - }); - } finally { - if (server != null) { - server.stop(); - } - } - } - - /** - * Tests the DeadServerMonitor when identifying an Endpoint as alive , does creates a persistent - * Ping connection ( i.e sends a PING protocol , if the number of connections is more than zero. - */ - @Test - public void testDeadServerMonitorPingNature2() { - int port3 = getRandomAvailableTCPPort(); - - // final int maxWaitTime = 10000; - try { - PoolFactory pf = PoolManager.createFactory(); - pf.addServer("localhost", port3); - pf.setSubscriptionEnabled(false); - pf.setReadTimeout(2000); - pf.setMinConnections(1); - pf.setSocketBufferSize(32768); - pf.setRetryAttempts(1); - pf.setPingInterval(500); - proxy = (PoolImpl) pf.create("clientPool"); - } catch (Exception ex) { - ex.printStackTrace(); - fail("Failed to initialize client"); - } - // let LiveServerMonitor detect it as alive as the numConnection is more than zero - - // long start = System.currentTimeMillis(); - assertEquals(0, proxy.getConnectedServerCount()); - // start the server - CacheServer server = null; - try { - try { - server = cache.addCacheServer(); - server.setMaximumTimeBetweenPings(15000); - server.setPort(port3); - server.start(); - } catch (Exception e) { - e.printStackTrace(); - fail("Failed to create server"); - } - GeodeAwaitility.await().untilAsserted(() -> { - assertEquals(1, proxy.getConnectedServerCount()); - }); - } finally { - if (server != null) { - server.stop(); - } - } - } - - @Test - public void testThreadIdToSequenceIdMapCreation() { - int port3 = getRandomAvailableTCPPort(); - CacheServer server = null; - try { - try { - server = cache.addCacheServer(); - server.setMaximumTimeBetweenPings(10000); - server.setPort(port3); - server.start(); - } catch (Exception e) { - e.printStackTrace(); - fail("Failed to create server"); - } - try { - PoolFactory pf = PoolManager.createFactory(); - pf.addServer("localhost", port3); - pf.setSubscriptionEnabled(true); - pf.setSubscriptionRedundancy(-1); - proxy = (PoolImpl) pf.create("clientPool"); - if (proxy.getThreadIdToSequenceIdMap() == null) { - fail(" ThreadIdToSequenceIdMap is null. "); - } - } catch (Exception ex) { - ex.printStackTrace(); - fail("Failed to initialize client"); - } - } finally { - if (server != null) { - try { - Thread.sleep(500); - } catch (InterruptedException ie) { - fail("interrupted"); - } - server.stop(); - } - } - } - - @Test - public void testThreadIdToSequenceIdMapExpiryPositive() { - int port3 = getRandomAvailableTCPPort(); - CacheServer server = null; - try { - try { - server = cache.addCacheServer(); - server.setMaximumTimeBetweenPings(10000); - server.setPort(port3); - server.start(); - } catch (Exception e) { - e.printStackTrace(); - fail("Failed to create server"); - } - try { - PoolFactory pf = PoolManager.createFactory(); - pf.addServer("localhost", port3); - pf.setSubscriptionEnabled(true); - pf.setSubscriptionRedundancy(-1); - pf.setSubscriptionMessageTrackingTimeout(4000); - pf.setSubscriptionAckInterval(2000); - proxy = (PoolImpl) pf.create("clientPool"); - - EventID eid = new EventID(new byte[0], 1, 1); - if (proxy.verifyIfDuplicate(eid)) { - fail(" eid should not be duplicate as it is a new entry"); - } - - verifyExpiry(); - - if (proxy.verifyIfDuplicate(eid)) { - fail(" eid should not be duplicate as the previous entry should have expired "); - } - - } catch (Exception ex) { - ex.printStackTrace(); - fail("Failed to initialize client"); - } - } finally { - if (server != null) { - server.stop(); - } - } - } - - - @Test - public void testThreadIdToSequenceIdMapExpiryNegative() { - int port3 = getRandomAvailableTCPPort(); - CacheServer server = null; - try { - try { - server = cache.addCacheServer(); - server.setMaximumTimeBetweenPings(10000); - server.setPort(port3); - server.start(); - } catch (Exception e) { - e.printStackTrace(); - fail("Failed to create server"); - } - try { - PoolFactory pf = createFactory(); - pf.addServer("localhost", port3); - pf.setSubscriptionEnabled(true); - pf.setSubscriptionRedundancy(-1); - pf.setSubscriptionMessageTrackingTimeout(10000); - - proxy = (PoolImpl) pf.create("clientPool"); - - final EventID eid = new EventID(new byte[0], 1, 1); - if (proxy.verifyIfDuplicate(eid)) { - fail(" eid should not be duplicate as it is a new entry"); - } - - GeodeAwaitility.await().untilAsserted(() -> assertTrue(proxy.verifyIfDuplicate(eid))); - } catch (Exception ex) { - ex.printStackTrace(); - fail("Failed to initialize client"); - } - } finally { - if (server != null) { - server.stop(); - } - } - } - - @Test - public void testThreadIdToSequenceIdMapConcurrency() { - int port3 = getRandomAvailableTCPPort(); - CacheServer server = null; - try { - try { - server = cache.addCacheServer(); - server.setMaximumTimeBetweenPings(10000); - server.setPort(port3); - server.start(); - } catch (Exception e) { - e.printStackTrace(); - fail("Failed to create server"); - } - try { - PoolFactory pf = PoolManager.createFactory(); - pf.addServer("localhost", port3); - pf.setSubscriptionEnabled(true); - pf.setSubscriptionRedundancy(-1); - pf.setSubscriptionMessageTrackingTimeout(5000); - pf.setSubscriptionAckInterval(2000); - proxy = (PoolImpl) pf.create("clientPool"); - - final int EVENT_ID_COUNT = 10000; // why 10,000? - EventID[] eid = new EventID[EVENT_ID_COUNT]; - for (int i = 0; i < EVENT_ID_COUNT; i++) { - eid[i] = new EventID(new byte[0], i, i); - if (proxy.verifyIfDuplicate(eid[i])) { - fail(" eid can never be duplicate, it is being created for the first time! "); - } - } - verifyExpiry(); - - for (int i = 0; i < EVENT_ID_COUNT; i++) { - if (proxy.verifyIfDuplicate(eid[i])) { - fail( - " eid can not be found to be duplicate since the entry should have expired! " + i); - } - } - } catch (Exception ex) { - ex.printStackTrace(); - fail("Failed to initialize client"); - } - } finally { - if (server != null) { - server.stop(); - } - } - } - - - - @Test - public void testDuplicateSeqIdLesserThanCurrentSeqIdBeingIgnored() { - int port3 = getRandomAvailableTCPPort(); - CacheServer server = null; - try { - try { - server = cache.addCacheServer(); - server.setMaximumTimeBetweenPings(10000); - server.setPort(port3); - server.start(); - } catch (Exception e) { - e.printStackTrace(); - fail("Failed to create server"); - } - try { - PoolFactory pf = PoolManager.createFactory(); - pf.addServer("localhost", port3); - pf.setSubscriptionEnabled(true); - pf.setSubscriptionRedundancy(-1); - pf.setSubscriptionMessageTrackingTimeout(100000); - proxy = (PoolImpl) pf.create("clientPool"); - - EventID eid1 = new EventID(new byte[0], 1, 5); - if (proxy.verifyIfDuplicate(eid1)) { - fail(" eid1 can never be duplicate, it is being created for the first time! "); - } - - EventID eid2 = new EventID(new byte[0], 1, 2); - - if (!proxy.verifyIfDuplicate(eid2)) { - fail(" eid2 should be duplicate, seqId is less than highest (5)"); - } - - EventID eid3 = new EventID(new byte[0], 1, 3); - - if (!proxy.verifyIfDuplicate(eid3)) { - fail(" eid3 should be duplicate, seqId is less than highest (5)"); - } - - assertTrue(!proxy.getThreadIdToSequenceIdMap().isEmpty()); - proxy.destroy(); - } catch (Exception ex) { - ex.printStackTrace(); - fail("Failed to initialize client"); - } - } finally { - if (server != null) { - server.stop(); - } - } - } - - - - @Test - public void testCleanCloseOfThreadIdToSeqId() { - int port3 = getRandomAvailableTCPPort(); - CacheServer server = null; - try { - try { - server = cache.addCacheServer(); - server.setMaximumTimeBetweenPings(10000); - server.setPort(port3); - server.start(); - } catch (Exception e) { - e.printStackTrace(); - fail("Failed to create server"); - } - try { - PoolFactory pf = PoolManager.createFactory(); - pf.addServer("localhost", port3); - pf.setSubscriptionEnabled(true); - pf.setSubscriptionRedundancy(-1); - pf.setSubscriptionMessageTrackingTimeout(100000); - proxy = (PoolImpl) pf.create("clientPool"); - - EventID eid1 = new EventID(new byte[0], 1, 2); - if (proxy.verifyIfDuplicate(eid1)) { - fail(" eid can never be duplicate, it is being created for the first time! "); - } - EventID eid2 = new EventID(new byte[0], 1, 3); - if (proxy.verifyIfDuplicate(eid2)) { - fail(" eid can never be duplicate, since sequenceId is greater "); - } - - if (!proxy.verifyIfDuplicate(eid2)) { - fail(" eid had to be a duplicate, since sequenceId is equal "); - } - EventID eid3 = new EventID(new byte[0], 1, 1); - if (!proxy.verifyIfDuplicate(eid3)) { - fail(" eid had to be a duplicate, since sequenceId is lesser "); - } - } catch (Exception ex) { - ex.printStackTrace(); - fail("Failed to initialize client"); - } - } finally { - if (server != null) { - try { - Thread.sleep(500); - } catch (InterruptedException ie) { - fail("interrupted"); - } - server.stop(); - } - } - } - - @Test - public void testTwoClientsHavingDifferentThreadIdMaps() { - int port3 = getRandomAvailableTCPPort(); - CacheServer server = null; - try { - try { - server = cache.addCacheServer(); - server.setMaximumTimeBetweenPings(10000); - server.setPort(port3); - server.start(); - } catch (Exception e) { - e.printStackTrace(); - fail("Failed to create server"); - } - try { - PoolFactory pf = PoolManager.createFactory(); - pf.addServer("localhost", port3); - pf.setSubscriptionEnabled(true); - pf.setSubscriptionRedundancy(-1); - pf.setSubscriptionMessageTrackingTimeout(100000); - - PoolImpl proxy1 = (PoolImpl) pf.create("clientPool1"); - try { - PoolImpl proxy2 = (PoolImpl) pf.create("clientPool2"); - try { - - Map map1 = proxy1.getThreadIdToSequenceIdMap(); - Map map2 = proxy2.getThreadIdToSequenceIdMap(); - - assertTrue(!(map1 == map2)); - - } finally { - proxy2.destroy(); - } - } finally { - proxy1.destroy(); - } - } catch (Exception ex) { - ex.printStackTrace(); - fail("Failed to initialize client"); - } - } finally { - if (server != null) { - server.stop(); - } - } - } - - @Test - public void testPeriodicAckSendByClient() { - int port = getRandomAvailableTCPPort(); - CacheServer server = null; - try { - try { - server = cache.addCacheServer(); - server.setPort(port); - server.start(); - } catch (Exception e) { - e.printStackTrace(); - fail("Failed to create server"); - } - try { - PoolFactory pf = PoolManager.createFactory(); - pf.addServer("localhost", port); - pf.setSubscriptionEnabled(true); - pf.setSubscriptionRedundancy(1); - pf.setReadTimeout(20000); - pf.setSubscriptionMessageTrackingTimeout(15000); - pf.setSubscriptionAckInterval(5000); - - proxy = (PoolImpl) pf.create("clientPool"); - - EventID eid = new EventID(new byte[0], 1, 1); - - if (proxy.verifyIfDuplicate(eid)) { - fail(" eid should not be duplicate as it is a new entry"); - } - - seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap() - .get(new ThreadIdentifier(new byte[0], 1)); - assertFalse(seo.getAckSend()); - - // should send the ack to server - seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap() - .get(new ThreadIdentifier(new byte[0], 1)); - verifyAckSend(true); - - // New update on same threadId - eid = new EventID(new byte[0], 1, 2); - if (proxy.verifyIfDuplicate(eid)) { - fail(" eid should not be duplicate as it is a new entry"); - } - seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap() - .get(new ThreadIdentifier(new byte[0], 1)); - assertFalse(seo.getAckSend()); - - // should send another ack to server - seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap() - .get(new ThreadIdentifier(new byte[0], 1)); - verifyAckSend(true); - - // should expire with the this mentioned. - verifyExpiry(); - } catch (Exception ex) { - ex.printStackTrace(); - fail("Test testPeriodicAckSendByClient Failed"); - } - } finally { - if (server != null) { - server.stop(); - } - } - } - - // No ack will be send if Redundancy level = 0 - @Test - public void testNoAckSendByClient() { - int port = getRandomAvailableTCPPort(); - CacheServer server = null; - try { - try { - server = cache.addCacheServer(); - server.setPort(port); - server.start(); - } catch (Exception e) { - e.printStackTrace(); - fail("Failed to create server"); - } - try { - PoolFactory pf = PoolManager.createFactory(); - pf.addServer("localhost", port); - pf.setSubscriptionEnabled(true); - pf.setSubscriptionRedundancy(1); - pf.setReadTimeout(20000); - pf.setSubscriptionMessageTrackingTimeout(8000); - pf.setSubscriptionAckInterval(2000); - - proxy = (PoolImpl) pf.create("clientPool"); - - EventID eid = new EventID(new byte[0], 1, 1); - - if (proxy.verifyIfDuplicate(eid)) { - fail(" eid should not be duplicate as it is a new entry"); - } - - seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap() - .get(new ThreadIdentifier(new byte[0], 1)); - assertFalse(seo.getAckSend()); - - // should not send an ack as redundancy level = 0; - seo = (SequenceIdAndExpirationObject) proxy.getThreadIdToSequenceIdMap() - .get(new ThreadIdentifier(new byte[0], 1)); - verifyAckSend(false); - - // should expire without sending an ack as redundancy level = 0. - verifyExpiry(); - } - - catch (Exception ex) { - ex.printStackTrace(); - fail("Test testPeriodicAckSendByClient Failed"); - } - } finally { - if (server != null) { - server.stop(); - } - } - } - - private void verifyAckSend(final boolean expectedAckSend) { - GeodeAwaitility.await().timeout(timeoutToVerifyAckSend).untilAsserted(() -> { - assertEquals(expectedAckSend, seo.getAckSend()); - }); - } - - private void verifyExpiry() { - GeodeAwaitility.await().timeout(timeoutToVerifyExpiry).untilAsserted(() -> { - assertEquals(0, proxy.getThreadIdToSequenceIdMap().size()); - }); - } - -}