Fix TestClientIdsDUnitTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7737e8a2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7737e8a2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7737e8a2 Branch: refs/heads/feature/GEODE-1930 Commit: 7737e8a21881f6176853c6e206142f537c10bed2 Parents: b286fe7 Author: Kirk Lund <kl...@apache.org> Authored: Mon Nov 7 11:50:39 2016 -0800 Committer: Kirk Lund <kl...@apache.org> Committed: Tue Nov 15 12:26:26 2016 -0800 ---------------------------------------------------------------------- .../geode/management/ManagementTestRule.java | 6 + .../internal/pulse/TestClientIdsDUnitTest.java | 297 +++++++------------ 2 files changed, 107 insertions(+), 196 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7737e8a2/geode-core/src/test/java/org/apache/geode/management/ManagementTestRule.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/management/ManagementTestRule.java b/geode-core/src/test/java/org/apache/geode/management/ManagementTestRule.java index 630c95e..ac60d0c 100644 --- a/geode-core/src/test/java/org/apache/geode/management/ManagementTestRule.java +++ b/geode-core/src/test/java/org/apache/geode/management/ManagementTestRule.java @@ -32,6 +32,8 @@ import org.junit.runners.model.FrameworkMethod; import org.junit.runners.model.Statement; import org.apache.geode.cache.Cache; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystemDisconnectedException; import org.apache.geode.distributed.internal.DM; @@ -171,6 +173,10 @@ public class ManagementTestRule implements MethodRule, Serializable { return this.helper.getCache(); } + public ClientCache getClientCache() { + return this.helper.getClientCache(new ClientCacheFactory()); + } + public boolean hasCache() { // Cache cache = GemFireCacheImpl.getInstance(); // if (cache != null && !cache.isClosed()) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7737e8a2/geode-core/src/test/java/org/apache/geode/management/internal/pulse/TestClientIdsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/pulse/TestClientIdsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/pulse/TestClientIdsDUnitTest.java index 9e98024..3591cb5 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/pulse/TestClientIdsDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/pulse/TestClientIdsDUnitTest.java @@ -14,11 +14,22 @@ */ package org.apache.geode.management.internal.pulse; +import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.geode.distributed.ConfigurationProperties.*; +import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.*; +import java.io.IOException; +import java.io.Serializable; import java.util.Properties; +import javax.management.ObjectName; + +import com.jayway.awaitility.Awaitility; +import com.jayway.awaitility.core.ConditionFactory; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -29,247 +40,141 @@ import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.Scope; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientRegionFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; import org.apache.geode.cache.client.PoolManager; import org.apache.geode.cache.client.internal.PoolImpl; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.distributed.DistributedSystem; -import org.apache.geode.internal.AvailablePort; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.management.CacheServerMXBean; -import org.apache.geode.management.MBeanUtil; -import org.apache.geode.management.ManagementTestBase; -import org.apache.geode.management.internal.cli.CliUtil; +import org.apache.geode.management.ManagementTestRule; +import org.apache.geode.management.Manager; +import org.apache.geode.management.internal.SystemManagementService; import org.apache.geode.test.dunit.Assert; import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.LogWriterUtils; -import org.apache.geode.test.dunit.NetworkUtils; import org.apache.geode.test.dunit.SerializableCallable; import org.apache.geode.test.dunit.SerializableRunnable; import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.Wait; -import org.apache.geode.test.dunit.WaitCriterion; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; import org.apache.geode.test.junit.categories.DistributedTest; /** * This is for testing client IDs */ @Category(DistributedTest.class) -@SuppressWarnings("serial") -public class TestClientIdsDUnitTest extends ManagementTestBase { - - private static final String k1 = "k1"; - private static final String k2 = "k2"; - private static final String client_k1 = "client-k1"; - private static final String client_k2 = "client-k2"; - private static final String REGION_NAME = "ClientHealthStatsDUnitTest_Region"; - - private static VM server = null; - private static VM client = null; - private static VM client2 = null; - - @Override - public final void postSetUpManagementTestBase() throws Exception { - server = Host.getHost(0).getVM(1); - client = Host.getHost(0).getVM(2); - client2 = Host.getHost(0).getVM(3); - } - - @Override - public final void postTearDownManagementTestBase() throws Exception { - closeCache(server); - closeCache(client); - closeCache(client2); - - disconnectFromDS(); +@SuppressWarnings({ "serial", "unused" }) +public class TestClientIdsDUnitTest implements Serializable { + + private static final String KEY1 = "KEY1"; + private static final String KEY2 = "KEY2"; + private static final String VALUE1 = "client-KEY1"; + private static final String VALUE2 = "client-KEY2"; + private static final String REGION_NAME = TestClientIdsDUnitTest.class.getSimpleName() + "_Region"; + + @Manager + private VM managerVM; + + private VM serverVM; + private VM client1VM; + private VM client2VM; + + @Rule + public ManagementTestRule managementTestRule = ManagementTestRule.builder().start(false).build(); + + @Before + public void before() throws Exception { + serverVM = Host.getHost(0).getVM(1); + client1VM = Host.getHost(0).getVM(2); + client2VM = Host.getHost(0).getVM(3); } @Test public void testClientIds() throws Exception { - createManagementCache(managingNode); - startManagingNode(managingNode); - int port = (Integer) createServerCache(server); - DistributedMember serverMember = getMember(server); - createClientCache(client, NetworkUtils.getServerHostName(server.getHost()), port); - createClientCache(client2, NetworkUtils.getServerHostName(server.getHost()), port); - put(client); - put(client2); - verifyClientIds(managingNode, serverMember, port); - stopManagingNode(managingNode); - } + this.managementTestRule.createManagers(); - private Object createServerCache(VM vm) { - return vm.invoke(new SerializableCallable("Create Server Cache") { - public Object call() { - try { - return createServerCache(); - } catch (Exception e) { - fail("Error while createServerCache " + e); - } - return null; - } - }); - } + int port = this.serverVM.invoke(() -> createServerCache()); - private void createClientCache(VM vm, final String host, final Integer port1) { - vm.invoke(new SerializableCallable("Create Client Cache") { + this.client1VM.invoke(() -> createClientCache(getServerHostName(this.serverVM.getHost()), port)); + this.client2VM.invoke(() -> createClientCache(getServerHostName(this.serverVM.getHost()), port)); - public Object call() { + DistributedMember serverMember = this.managementTestRule.getDistributedMember(this.serverVM); + DistributedMember client1Member = this.managementTestRule.getDistributedMember(this.client1VM); + DistributedMember client2Member = this.managementTestRule.getDistributedMember(this.client2VM); + +// this.managerVM.invoke(() -> verifyClientIds(serverMember, port)); + this.managerVM.invoke(() -> { + CacheServerMXBean cacheServerMXBean = awaitCacheServerMXBean(serverMember, port); + await().until(() -> { try { - createClientCache(host, port1); + assertThat(cacheServerMXBean.getClientIds()).hasSize(2); } catch (Exception e) { - fail("Error while createClientCache " + e); + throw new Error(e); } - return null; - } + }); + assertThat(cacheServerMXBean.getClientIds()).hasSize(2); // TODO }); } - private Cache createCache(Properties props) throws Exception { - DistributedSystem ds = getSystem(props); - ds.disconnect(); - ds = getSystem(props); - assertNotNull(ds); - Cache cache = (GemFireCacheImpl) CacheFactory.create(ds); - assertNotNull(cache); - return cache; - } + private int createServerCache() throws IOException { + Cache cache = this.managementTestRule.getCache(); - private Integer createServerCache(DataPolicy dataPolicy) throws Exception { - Cache cache = createCache(false); AttributesFactory factory = new AttributesFactory(); factory.setScope(Scope.DISTRIBUTED_ACK); - factory.setDataPolicy(dataPolicy); + factory.setDataPolicy(DataPolicy.REPLICATE); + RegionAttributes attrs = factory.create(); cache.createRegion(REGION_NAME, attrs); - int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - CacheServer server1 = cache.addCacheServer(); - server1.setPort(port); - server1.setNotifyBySubscription(true); - server1.start(); - return new Integer(server1.getPort()); - } - public Integer createServerCache() throws Exception { - return createServerCache(DataPolicy.REPLICATE); + CacheServer cacheServer = cache.addCacheServer(); + cacheServer.setPort(0); + cacheServer.setNotifyBySubscription(true); + cacheServer.start(); + return cacheServer.getPort(); } - public Cache createClientCache(String host, Integer port1) throws Exception { - - Properties props = new Properties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, ""); - Cache cache = createCache(props); - PoolImpl p = (PoolImpl) PoolManager.createFactory().addServer(host, port1.intValue()) - .setSubscriptionEnabled(false).setThreadLocalConnections(true).setMinConnections(1) - .setReadTimeout(20000).setPingInterval(10000).setRetryAttempts(1) - .setSubscriptionEnabled(true).setStatisticInterval(1000) - .create("CacheServerManagementDUnitTest"); - - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - factory.setPoolName(p.getName()); - - RegionAttributes attrs = factory.create(); - Region region = cache.createRegion(REGION_NAME, attrs); - return cache; - + private void createClientCache(final String host, final int serverPort) { + ClientCache cache = this.managementTestRule.getClientCache(); + + PoolImpl pool = (PoolImpl) PoolManager.createFactory() + .addServer(host, serverPort) + .setSubscriptionEnabled(false) + .setThreadLocalConnections(true) + .setMinConnections(1) + .setReadTimeout(20000) + .setPingInterval(10000) + .setRetryAttempts(1) + .setSubscriptionEnabled(true) + .setStatisticInterval(1000) + .create(getClass().getSimpleName()); + + ClientRegionFactory factory = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY); + factory.setPoolName(pool.getName()); + factory.create(REGION_NAME); } - /** - * get member id - */ - protected static DistributedMember getMember() throws Exception { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - return cache.getDistributedSystem().getDistributedMember(); + private void verifyClientIds(final DistributedMember serverMember, final int serverPort) throws Exception { + CacheServerMXBean cacheServerMXBean = awaitCacheServerMXBean(serverMember, serverPort); + await().until(() -> { + try { + assertThat(cacheServerMXBean.getClientIds()).hasSize(2); + } catch (Exception e) { + throw new Error(e); + } + }); + assertThat(cacheServerMXBean.getClientIds()).hasSize(2); // TODO } - /** - * Verify the Cache Server details - * - * @param vm - */ - protected void verifyClientIds(final VM vm, - final DistributedMember serverMember, final int serverPort) { - SerializableRunnable verifyCacheServerRemote = new SerializableRunnable( - "Verify Cache Server Remote") { - public void run() { - try { - final WaitCriterion waitCriteria = new WaitCriterion() { - @Override - public boolean done() { - CacheServerMXBean bean = null; - try { - bean = MBeanUtil.getCacheServerMbeanProxy( - serverMember, serverPort); - if (bean != null) { - if( bean.getClientIds().length > 0){ - return true; - } - return false; - } + private CacheServerMXBean awaitCacheServerMXBean(final DistributedMember serverMember, final int port) { + SystemManagementService service = this.managementTestRule.getSystemManagementService(); + ObjectName objectName = service.getCacheServerMBeanName(port, serverMember); - @Override - public String description() { - return "wait for getNumOfClients bean to complete and get results"; - } - }; - Wait.waitForCriterion(waitCriteria, 2 * 60 * 1000, 3000, true); + await().until(() -> assertThat(service.getMBeanProxy(objectName, CacheServerMXBean.class)).isNotNull()); - // Now it is sure that bean would be available - CacheServerMXBean bean = MBeanUtil.getCacheServerMbeanProxy(serverMember, serverPort); - LogWriterUtils.getLogWriter().info("verifyClientIds = " + bean.getClientIds().length); - assertEquals(true, bean.getClientIds().length > 0 ? true : false); - } catch (Exception e) { - fail("Error while verifying cache server from remote member " + e); - } - } - }; - vm.invoke(verifyCacheServerRemote); + return service.getMBeanProxy(objectName, CacheServerMXBean.class); } - /** - * Verify the Cache Server details - * - * @param vm - */ - protected void put(final VM vm) { - SerializableRunnable put = new SerializableRunnable("put") { - public void run() { - try { - Cache cache = GemFireCacheImpl.getInstance(); - Region<Object, Object> r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME); - assertNotNull(r1); - - r1.put(k1, client_k1); - assertEquals(r1.getEntry(k1).getValue(), client_k1); - r1.put(k2, client_k2); - assertEquals(r1.getEntry(k2).getValue(), client_k2); - try { - Thread.sleep(10000); - } catch (Exception e) { - // sleep - } - r1.clear(); - - r1.put(k1, client_k1); - assertEquals(r1.getEntry(k1).getValue(), client_k1); - r1.put(k2, client_k2); - assertEquals(r1.getEntry(k2).getValue(), client_k2); - try { - Thread.sleep(10000); - } catch (Exception e) { - // sleep - } - r1.clear(); - } catch (Exception ex) { - Assert.fail("failed while put", ex); - } - } - - }; - vm.invoke(put); + private ConditionFactory await() { + return Awaitility.await().atMost(2, MINUTES); } - }