http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3b5664c/geode-core/src/test/java/org/apache/geode/management/DiskManagementDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/management/DiskManagementDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/DiskManagementDUnitTest.java index bcfadde..bfeebea 100644 --- a/geode-core/src/test/java/org/apache/geode/management/DiskManagementDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/DiskManagementDUnitTest.java @@ -14,24 +14,25 @@ */ package org.apache.geode.management; -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; -import org.apache.geode.test.junit.categories.DistributedTest; +import static java.util.concurrent.TimeUnit.*; +import static org.assertj.core.api.Assertions.*; import java.io.File; -import java.util.Arrays; -import java.util.List; +import java.io.Serializable; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import javax.management.ObjectName; -import org.apache.geode.LogWriter; +import com.jayway.awaitility.Awaitility; +import com.jayway.awaitility.core.ConditionFactory; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + import org.apache.geode.cache.Cache; import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.DiskStore; @@ -40,674 +41,366 @@ import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionFactory; import org.apache.geode.cache.Scope; import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.internal.cache.DiskRegion; import org.apache.geode.internal.cache.DiskRegionStats; import org.apache.geode.internal.cache.DistributedRegion; import org.apache.geode.internal.cache.GemFireCacheImpl; -import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.persistence.PersistentMemberID; import org.apache.geode.internal.cache.persistence.PersistentMemberManager; -import org.apache.geode.management.internal.MBeanJMXAdapter; +import org.apache.geode.internal.process.ProcessUtils; +import org.apache.geode.management.internal.SystemManagementService; import org.apache.geode.test.dunit.AsyncInvocation; -import org.apache.geode.test.dunit.SerializableCallable; -import org.apache.geode.test.dunit.SerializableRunnable; import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.Wait; -import org.apache.geode.test.dunit.WaitCriterion; +import org.apache.geode.test.junit.categories.DistributedTest; +import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; /** - * Test cases to cover all test cases which pertains to disk from Management layer - * - * + * Test cases to cover all test cases which pertains to disk from Management + * layer */ @Category(DistributedTest.class) -public class DiskManagementDUnitTest extends ManagementTestBase { - - /** - * - */ - private static final long serialVersionUID = 1L; - - // This must be bigger than the dunit ack-wait-threshold for the revoke - // tests. The command line is setting the ack-wait-threshold to be - // 60 seconds. - private static final int MAX_WAIT = 70 * 1000; - - boolean testFailed = false; +@SuppressWarnings({ "serial", "unused" }) +public class DiskManagementDUnitTest implements Serializable { - String failureCause = ""; - static final String REGION_NAME = "region"; + private static final String REGION_NAME = DiskManagementDUnitTest.class.getSimpleName() + "_region"; private File diskDir; - protected static LogWriter logWriter; + @Manager + private VM managerVM; - public DiskManagementDUnitTest() throws Exception { - super(); + @Member + private VM[] memberVMs; - diskDir = new File("diskDir-" + getName()).getAbsoluteFile(); - org.apache.geode.internal.FileUtil.delete(diskDir); - diskDir.mkdir(); - diskDir.deleteOnExit(); - } + @Rule + public ManagementTestRule managementTestRule = ManagementTestRule.builder().start(true).build(); - @Override - protected final void postSetUpManagementTestBase() throws Exception { - failureCause = ""; - testFailed = false; - } + @Rule + public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); - @Override - protected final void postTearDownManagementTestBase() throws Exception { - org.apache.geode.internal.FileUtil.delete(diskDir); + @Before + public void before() throws Exception { + this.diskDir = this.temporaryFolder.newFolder("diskDir"); } /** - * Tests Disk Compaction from a MemberMbean which is at cache level. All the disks which belong to - * the cache should be compacted. - * - * @throws Exception + * Tests Disk Compaction from a MemberMXBean which is at cache level. All the + * disks which belong to the cache should be compacted. */ - @Test - public void testDiskCompact() throws Throwable { - initManagement(false); - for (VM vm : getManagedNodeList()) { - createPersistentRegion(vm); - makeDiskCompactable(vm); + public void testDiskCompact() throws Exception { + for (VM memberVM : this.memberVMs) { + createPersistentRegion(memberVM); + makeDiskCompactable(memberVM); } - for (VM vm : getManagedNodeList()) { - compactAllDiskStores(vm); + for (VM memberVM : this.memberVMs) { + compactAllDiskStores(memberVM); } - } /** - * Tests Disk Compaction from a MemberMbean which is at cache level. All the disks which belong to - * the cache should be compacted. - * - * @throws Exception + * Tests Disk Compaction from a MemberMXBean which is at cache level. All the + * disks which belong to the cache should be compacted. */ - @Test - public void testDiskCompactRemote() throws Throwable { - - initManagement(false); - for (VM vm : getManagedNodeList()) { - createPersistentRegion(vm); - makeDiskCompactable(vm); + public void testDiskCompactRemote() throws Exception { + for (VM memberVM : this.memberVMs) { + createPersistentRegion(memberVM); + makeDiskCompactable(memberVM); } - compactDiskStoresRemote(managingNode); + compactDiskStoresRemote(this.managerVM, this.memberVMs.length); } /** * Tests various operations defined on DiskStore Mbean - * - * @throws Exception */ - @Test - public void testDiskOps() throws Throwable { - - initManagement(false); - for (VM vm : getManagedNodeList()) { - createPersistentRegion(vm); - makeDiskCompactable(vm); - invokeFlush(vm); - invokeForceRoll(vm); - invokeForceCompaction(vm); + public void testDiskOps() throws Exception { + for (VM memberVM : this.memberVMs) { + createPersistentRegion(memberVM); + makeDiskCompactable(memberVM); + invokeFlush(memberVM); + invokeForceRoll(memberVM); + invokeForceCompaction(memberVM); } - } @Test - public void testDiskBackupAllMembers() throws Throwable { - initManagement(false); - for (VM vm : getManagedNodeList()) { - createPersistentRegion(vm); - makeDiskCompactable(vm); - + public void testDiskBackupAllMembers() throws Exception { + for (VM memberVM : this.memberVMs) { + createPersistentRegion(memberVM); + makeDiskCompactable(memberVM); } - backupAllMembers(managingNode); + + backupAllMembers(this.managerVM, this.memberVMs.length); } /** - * Checks the test case of missing disks and revoking them through MemberMbean interfaces - * - * @throws Throwable + * Checks the test case of missing disks and revoking them through MemberMXBean + * interfaces */ - @SuppressWarnings("serial") @Test - public void testMissingMembers() throws Throwable { + public void testMissingMembers() throws Exception { + VM memberVM1 = this.memberVMs[0]; + VM memberVM2 = this.memberVMs[1]; - initManagement(false); - VM vm0 = getManagedNodeList().get(0); - VM vm1 = getManagedNodeList().get(1); - VM vm2 = getManagedNodeList().get(2); + createPersistentRegion(memberVM1); + createPersistentRegion(memberVM2); - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("Creating region in VM0"); - createPersistentRegion(vm0); - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("Creating region in VM1"); - createPersistentRegion(vm1); + putAnEntry(memberVM1); - putAnEntry(vm0); + this.managerVM.invoke("checkForMissingDiskStores", () -> { + ManagementService service = this.managementTestRule.getManagementService(); + DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean(); + PersistentMemberDetails[] missingDiskStores = distributedSystemMXBean.listMissingDiskStores(); + assertThat(missingDiskStores).isNull(); + }); - managingNode.invoke(new SerializableRunnable("Check for waiting regions") { + closeRegion(memberVM1); - public void run() { - Cache cache = getCache(); - ManagementService service = getManagementService(); - DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); - PersistentMemberDetails[] missingDiskStores = bean.listMissingDiskStores(); + updateTheEntry(memberVM2, "C"); - assertNull(missingDiskStores); - } - }); + closeRegion(memberVM2); - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("closing region in vm0"); - closeRegion(vm0); - - updateTheEntry(vm1); - - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("closing region in vm1"); - closeRegion(vm1); - AsyncInvocation future = createPersistentRegionAsync(vm0); - waitForBlockedInitialization(vm0); - assertTrue(future.isAlive()); - - managingNode.invoke(new SerializableRunnable("Revoke the member") { - - public void run() { - Cache cache = getCache(); - GemFireCacheImpl cacheImpl = (GemFireCacheImpl) cache; - ManagementService service = getManagementService(); - DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); - PersistentMemberDetails[] missingDiskStores = bean.listMissingDiskStores(); - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() - .info("waiting members=" + missingDiskStores); - assertNotNull(missingDiskStores); - assertEquals(1, missingDiskStores.length); - - for (PersistentMemberDetails id : missingDiskStores) { - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() - .info("Missing DiskStoreID is =" + id.getDiskStoreId()); - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() - .info("Missing Host is =" + id.getHost()); - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() - .info("Missing Directory is =" + id.getDirectory()); - - try { - bean.revokeMissingDiskStores(id.getDiskStoreId()); - } catch (Exception e) { - fail("revokeMissingDiskStores failed with exception " + e); - } - } - } - }); + AsyncInvocation creatingPersistentRegionAsync = createPersistentRegionAsync(memberVM1); - future.join(MAX_WAIT); - if (future.isAlive()) { - fail("Region not created within" + MAX_WAIT); - } - if (future.exceptionOccurred()) { - throw new Exception(future.getException()); - } - checkForRecoveryStat(vm0, true); - // Check to make sure we recovered the old - // value of the entry. - SerializableRunnable checkForEntry = new SerializableRunnable("check for the entry") { - - public void run() { - Cache cache = getCache(); - Region region = cache.getRegion(REGION_NAME); - assertEquals("B", region.get("A")); - } - }; - vm0.invoke(checkForEntry); + memberVM1.invoke(() -> + await().until(() -> { + GemFireCacheImpl cache = (GemFireCacheImpl) this.managementTestRule.getCache(); + PersistentMemberManager persistentMemberManager = cache.getPersistentMemberManager(); + Map<String, Set<PersistentMemberID>> regions = persistentMemberManager.getWaitingRegions(); + return !regions.isEmpty(); + }) + ); - } + assertThat(creatingPersistentRegionAsync.isAlive()).isTrue(); - protected void checkNavigation(final VM vm, final DistributedMember diskMember, - final String diskStoreName) { - SerializableRunnable checkNavigation = new SerializableRunnable("Check Navigation") { - public void run() { + this.managerVM.invoke("revokeMissingDiskStore", () -> { + ManagementService service = this.managementTestRule.getManagementService(); + DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); + PersistentMemberDetails[] missingDiskStores = bean.listMissingDiskStores(); - final ManagementService service = getManagementService(); + assertThat(missingDiskStores).isNotNull().hasSize(1); - DistributedSystemMXBean disMBean = service.getDistributedSystemMXBean(); - try { - ObjectName expected = - MBeanJMXAdapter.getDiskStoreMBeanName(diskMember.getId(), diskStoreName); - ObjectName actual = disMBean.fetchDiskStoreObjectName(diskMember.getId(), diskStoreName); - assertEquals(expected, actual); - } catch (Exception e) { - fail("Disk Store Navigation Failed " + e); - } + assertThat(bean.revokeMissingDiskStores(missingDiskStores[0].getDiskStoreId())).isTrue(); + }); + await(creatingPersistentRegionAsync); - } - }; - vm.invoke(checkNavigation); - } + verifyRecoveryStats(memberVM1, true); - /** - * get Distributed member for a given vm - */ - @SuppressWarnings("serial") - protected static DistributedMember getMember() throws Exception { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - return cache.getDistributedSystem().getDistributedMember(); + // Check to make sure we recovered the old value of the entry. + memberVM1.invoke("check for the entry", () -> { + Cache cache = this.managementTestRule.getCache(); + Region region = cache.getRegion(REGION_NAME); + assertThat(region.get("A")).isEqualTo("B"); + }); } /** * Invokes flush on the given disk store by MBean interface - * - * @param vm reference to VM */ - @SuppressWarnings("serial") - public void invokeFlush(final VM vm) { - SerializableRunnable invokeFlush = new SerializableRunnable("Invoke Flush On Disk") { - public void run() { - Cache cache = getCache(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - String name = "testFlush_" + vm.getPid(); - DiskStore ds = dsf.create(name); - - ManagementService service = getManagementService(); - DiskStoreMXBean bean = service.getLocalDiskStoreMBean(name); - assertNotNull(bean); - bean.flush(); - } - }; - vm.invoke(invokeFlush); + private void invokeFlush(final VM memberVM) { + memberVM.invoke("invokeFlush", () -> { + Cache cache = this.managementTestRule.getCache(); + DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory(); + String name = "testFlush_" + ProcessUtils.identifyPid(); + DiskStore diskStore = diskStoreFactory.create(name); + + ManagementService service = this.managementTestRule.getManagementService(); + DiskStoreMXBean diskStoreMXBean = service.getLocalDiskStoreMBean(name); + assertThat(diskStoreMXBean).isNotNull(); + assertThat(diskStoreMXBean.getName()).isEqualTo(diskStore.getName()); + + diskStoreMXBean.flush(); + }); } /** * Invokes force roll on disk store by MBean interface - * - * @param vm reference to VM */ - @SuppressWarnings("serial") - public void invokeForceRoll(final VM vm) { - SerializableRunnable invokeForceRoll = new SerializableRunnable("Invoke Force Roll") { - public void run() { - Cache cache = getCache(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - String name = "testForceRoll_" + vm.getPid(); - DiskStore ds = dsf.create(name); - ManagementService service = getManagementService(); - DiskStoreMXBean bean = service.getLocalDiskStoreMBean(name); - assertNotNull(bean); - bean.forceRoll(); - } - }; - vm.invoke(invokeForceRoll); + private void invokeForceRoll(final VM memberVM) { + memberVM.invoke("invokeForceRoll", () -> { + Cache cache = this.managementTestRule.getCache(); + DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory(); + String name = "testForceRoll_" + ProcessUtils.identifyPid(); + DiskStore diskStore = diskStoreFactory.create(name); + + ManagementService service = this.managementTestRule.getManagementService(); + DiskStoreMXBean diskStoreMXBean = service.getLocalDiskStoreMBean(name); + assertThat(diskStoreMXBean).isNotNull(); + assertThat(diskStoreMXBean.getName()).isEqualTo(diskStore.getName()); + + diskStoreMXBean.forceRoll(); + }); } /** * Invokes force compaction on disk store by MBean interface - * - * @param vm reference to VM */ - @SuppressWarnings("serial") - public void invokeForceCompaction(final VM vm) { - SerializableRunnable invokeForceCompaction = - new SerializableRunnable("Invoke Force Compaction") { - public void run() { - Cache cache = getCache(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - dsf.setAllowForceCompaction(true); - String name = "testForceCompaction_" + vm.getPid(); - DiskStore ds = dsf.create(name); - ManagementService service = getManagementService(); - DiskStoreMXBean bean = service.getLocalDiskStoreMBean(name); - assertNotNull(bean); - assertEquals(false, bean.forceCompaction()); - } - }; - vm.invoke(invokeForceCompaction); + private void invokeForceCompaction(final VM memberVM) { + memberVM.invoke("invokeForceCompaction", () -> { + Cache cache = this.managementTestRule.getCache(); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + dsf.setAllowForceCompaction(true); + String name = "testForceCompaction_" + ProcessUtils.identifyPid(); + DiskStore diskStore = dsf.create(name); + + ManagementService service = this.managementTestRule.getManagementService(); + DiskStoreMXBean diskStoreMXBean = service.getLocalDiskStoreMBean(name); + assertThat(diskStoreMXBean).isNotNull(); + assertThat(diskStoreMXBean.getName()).isEqualTo(diskStore.getName()); + + assertThat(diskStoreMXBean.forceCompaction()).isFalse(); + }); } /** * Makes the disk compactable by adding and deleting some entries - * - * @throws Exception */ - @SuppressWarnings("serial") - public void makeDiskCompactable(VM vm1) throws Exception { - vm1.invoke(new SerializableRunnable("Make The Disk Compactable") { - - public void run() { - Cache cache = getCache(); - Region region = cache.getRegion(REGION_NAME); - DiskRegion dr = ((LocalRegion) region).getDiskRegion(); - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("putting key1"); - region.put("key1", "value1"); - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("putting key2"); - region.put("key2", "value2"); - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info("removing key2"); - region.remove("key2"); - // now that it is compactable the following forceCompaction should - // go ahead and do a roll and compact it. - } + private void makeDiskCompactable(final VM memberVM) throws Exception { + memberVM.invoke("makeDiskCompactable", () -> { + Cache cache = this.managementTestRule.getCache(); + Region region = cache.getRegion(REGION_NAME); + region.put("key1", "value1"); + region.put("key2", "value2"); + region.remove("key2"); + // now that it is compactable the following forceCompaction should + // go ahead and do a roll and compact it. }); - } - - /** * Compacts all DiskStores belonging to a member - * - * @param vm1 reference to VM - * @throws Exception */ - @SuppressWarnings("serial") - public void compactAllDiskStores(VM vm1) throws Exception { - - vm1.invoke(new SerializableCallable("Compact All Disk Stores") { - - public Object call() throws Exception { - ManagementService service = getManagementService(); - MemberMXBean memberBean = service.getMemberMXBean(); - String[] compactedDiskStores = memberBean.compactAllDiskStores(); - - assertTrue(compactedDiskStores.length > 0); - for (int i = 0; i < compactedDiskStores.length; i++) { - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() - .info("<ExpectedString> Compacted Store " + i + " " + compactedDiskStores[i] - + "</ExpectedString> "); - } - - return null; - } + private void compactAllDiskStores(final VM memberVM) throws Exception { + memberVM.invoke("compactAllDiskStores", () -> { + ManagementService service = this.managementTestRule.getManagementService(); + MemberMXBean memberMXBean = service.getMemberMXBean(); + String[] compactedDiskStores = memberMXBean.compactAllDiskStores(); + assertThat(compactedDiskStores).hasSize(1); }); - } /** * Takes a back up of all the disk store in a given directory */ - @SuppressWarnings("serial") - public void backupAllMembers(final VM managingVM) throws Exception { + private void backupAllMembers(final VM managerVM, final int memberCount) { + managerVM.invoke("backupAllMembers", () -> { + ManagementService service = this.managementTestRule.getManagementService(); + DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); + File backupDir = this.temporaryFolder.newFolder("backupDir"); - managingVM.invoke(new SerializableCallable("Backup All Disk Stores") { + DiskBackupStatus status = bean.backupAllMembers(backupDir.getAbsolutePath(), null); - public Object call() throws Exception { - ManagementService service = getManagementService(); - DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); - DiskBackupStatus status = - bean.backupAllMembers(getBackupDir("test_backupAllMembers").getAbsolutePath(), null); - - return null; - } + assertThat(status.getBackedUpDiskStores().keySet().size()).isEqualTo(memberCount); + assertThat(status.getOfflineDiskStores()).isEqualTo(null); // TODO: fix GEODE-1946 }); - } /** - * Compact a disk store from Managing node + * Compact a disk store from managerVM VM */ - @SuppressWarnings("serial") - public void compactDiskStoresRemote(VM managingVM) throws Exception { - { - - managingVM.invoke(new SerializableCallable("Compact All Disk Stores Remote") { - - public Object call() throws Exception { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - Set<DistributedMember> otherMemberSet = - cache.getDistributionManager().getOtherNormalDistributionManagerIds(); - - for (DistributedMember member : otherMemberSet) { - MemberMXBean bean = MBeanUtil.getMemberMbeanProxy(member); - String[] allDisks = bean.listDiskStores(true); - assertNotNull(allDisks); - List<String> listString = Arrays.asList(allDisks); - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() - .info("<ExpectedString> Remote All Disk Stores Are " + listString.toString() - + "</ExpectedString> "); - String[] compactedDiskStores = bean.compactAllDiskStores(); - assertTrue(compactedDiskStores.length > 0); - for (int i = 0; i < compactedDiskStores.length; i++) { - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter() - .info("<ExpectedString> Remote Compacted Store " + i + " " - + compactedDiskStores[i] + "</ExpectedString> "); - } - - } - return null; - } - }); + private void compactDiskStoresRemote(final VM managerVM, final int memberCount) { + managerVM.invoke("compactDiskStoresRemote", () -> { + Set<DistributedMember> otherMemberSet = this.managementTestRule.getOtherNormalMembers();// ((GemFireCacheImpl)cache).getDistributionManager().getOtherNormalDistributionManagerIds(); + assertThat(otherMemberSet.size()).isEqualTo(memberCount); - } + SystemManagementService service = this.managementTestRule.getSystemManagementService(); - } - - /** - * Checks if a file with the given extension is present - * - * @param fileExtension file extension - * @throws Exception - */ - protected void checkIfContainsFileWithExt(String fileExtension) throws Exception { - File[] files = diskDir.listFiles(); - for (int j = 0; j < files.length; j++) { - if (files[j].getAbsolutePath().endsWith(fileExtension)) { - fail("file \"" + files[j].getAbsolutePath() + "\" still exists"); - } - } + for (DistributedMember member : otherMemberSet) { + MemberMXBean memberMXBean = awaitMemberMXBeanProxy(member); - } + String[] allDisks = memberMXBean.listDiskStores(true); + assertThat(allDisks).isNotNull().hasSize(1); - /** - * Update Entry - * - * @param vm1 reference to VM - */ - protected void updateTheEntry(VM vm1) { - updateTheEntry(vm1, "C"); - } - - /** - * Update an Entry - * - * @param vm1 reference to VM - * @param value Value which is updated - */ - @SuppressWarnings("serial") - protected void updateTheEntry(VM vm1, final String value) { - vm1.invoke(new SerializableRunnable("change the entry") { - - public void run() { - Cache cache = getCache(); - Region region = cache.getRegion(REGION_NAME); - region.put("A", value); + String[] compactedDiskStores = memberMXBean.compactAllDiskStores(); + assertThat(compactedDiskStores).hasSize(1); } }); } - /** - * Put an entry to region - * - * @param vm0 reference to VM - */ - @SuppressWarnings("serial") - protected void putAnEntry(VM vm0) { - vm0.invoke(new SerializableRunnable("Put an entry") { - - public void run() { - Cache cache = getCache(); - Region region = cache.getRegion(REGION_NAME); - region.put("A", "B"); - } + private void updateTheEntry(final VM memberVM, final String value) { + memberVM.invoke("updateTheEntry", () -> { + Cache cache = this.managementTestRule.getCache(); + Region region = cache.getRegion(REGION_NAME); + region.put("A", value); }); } - /** - * Close the given region REGION_NAME - * - * @param vm reference to VM - */ - @SuppressWarnings("serial") - protected void closeRegion(final VM vm) { - SerializableRunnable closeRegion = new SerializableRunnable("Close persistent region") { - public void run() { - Cache cache = getCache(); - Region region = cache.getRegion(REGION_NAME); - region.close(); - } - }; - vm.invoke(closeRegion); + private void putAnEntry(final VM memberVM) { + memberVM.invoke("putAnEntry", () -> { + Cache cache = managementTestRule.getCache(); + Region region = cache.getRegion(REGION_NAME); + region.put("A", "B"); + }); } - /** - * Waiting to blocked waiting for another persistent member to come online - * - * @param vm reference to VM - */ - @SuppressWarnings("serial") - private void waitForBlockedInitialization(VM vm) { - vm.invoke(new SerializableRunnable() { - - public void run() { - Wait.waitForCriterion(new WaitCriterion() { - - public String description() { - return "Waiting to blocked waiting for another persistent member to come online"; - } - - public boolean done() { - Cache cache = getCache(); - GemFireCacheImpl cacheImpl = (GemFireCacheImpl) cache; - PersistentMemberManager mm = cacheImpl.getPersistentMemberManager(); - Map<String, Set<PersistentMemberID>> regions = mm.getWaitingRegions(); - boolean done = !regions.isEmpty(); - return done; - } - - }, MAX_WAIT, 100, true); - - } - + private void closeRegion(final VM memberVM) { + memberVM.invoke("closeRegion", () -> { + Cache cache = this.managementTestRule.getCache(); + Region region = cache.getRegion(REGION_NAME); + region.close(); }); } - /** - * Creates a persistent region - * - * @param vm reference to VM - * @throws Throwable - */ - protected void createPersistentRegion(VM vm) throws Throwable { - AsyncInvocation future = createPersistentRegionAsync(vm); - future.join(MAX_WAIT); - if (future.isAlive()) { - fail("Region not created within" + MAX_WAIT); - } - if (future.exceptionOccurred()) { - throw new RuntimeException(future.getException()); - } + private void createPersistentRegion(final VM memberVM) throws InterruptedException, ExecutionException, TimeoutException { + await(createPersistentRegionAsync(memberVM)); } - /** - * Creates a persistent region in async manner - * - * @param vm reference to VM - * @return reference to AsyncInvocation - */ - @SuppressWarnings("serial") - protected AsyncInvocation createPersistentRegionAsync(final VM vm) { - SerializableRunnable createRegion = new SerializableRunnable("Create persistent region") { - public void run() { - Cache cache = getCache(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - File dir = getDiskDirForVM(vm); - dir.mkdirs(); - dsf.setDiskDirs(new File[] {dir}); - dsf.setMaxOplogSize(1); - dsf.setAllowForceCompaction(true); - dsf.setAutoCompact(false); - DiskStore ds = dsf.create(REGION_NAME); - RegionFactory rf = cache.createRegionFactory(); - rf.setDiskStoreName(ds.getName()); - rf.setDiskSynchronous(true); - rf.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); - rf.setScope(Scope.DISTRIBUTED_ACK); - rf.create(REGION_NAME); - } - }; - return vm.invokeAsync(createRegion); + private AsyncInvocation createPersistentRegionAsync(final VM memberVM) { + return memberVM.invokeAsync("createPersistentRegionAsync", () -> { + File dir = new File(diskDir, String.valueOf(ProcessUtils.identifyPid())); + + Cache cache = this.managementTestRule.getCache(); + + DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory(); + diskStoreFactory.setDiskDirs(new File[] { dir }); + diskStoreFactory.setMaxOplogSize(1); + diskStoreFactory.setAllowForceCompaction(true); + diskStoreFactory.setAutoCompact(false); + DiskStore diskStore = diskStoreFactory.create(REGION_NAME); + + RegionFactory regionFactory = cache.createRegionFactory(); + regionFactory.setDiskStoreName(diskStore.getName()); + regionFactory.setDiskSynchronous(true); + regionFactory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); + regionFactory.setScope(Scope.DISTRIBUTED_ACK); + regionFactory.create(REGION_NAME); + }); } - /** - * Validates a persistent region - * - * @param vm reference to VM - */ - @SuppressWarnings("serial") - protected void validatePersistentRegion(final VM vm) { - SerializableRunnable validateDisk = new SerializableRunnable("Validate persistent region") { - public void run() { - Cache cache = getCache(); - ManagementService service = getManagementService(); - DiskStoreMXBean bean = service.getLocalDiskStoreMBean(REGION_NAME); - assertNotNull(bean); + private void verifyRecoveryStats(final VM memberVM, final boolean localRecovery) { + memberVM.invoke("verifyRecoveryStats", () -> { + Cache cache = this.managementTestRule.getCache(); + Region region = cache.getRegion(REGION_NAME); + DistributedRegion distributedRegion = (DistributedRegion) region; + DiskRegionStats stats = distributedRegion.getDiskRegion().getStats(); + + if (localRecovery) { + assertThat(stats.getLocalInitializations()).isEqualTo(1); + assertThat(stats.getRemoteInitializations()).isEqualTo(0); + } else { + assertThat(stats.getLocalInitializations()).isEqualTo(0); + assertThat(stats.getRemoteInitializations()).isEqualTo(1); } - }; - vm.invoke(validateDisk); + }); } - /** - * Appends vm id to disk dir - * - * @param vm reference to VM - * @return - */ - protected File getDiskDirForVM(final VM vm) { - File dir = new File(diskDir, String.valueOf(vm.getPid())); - return dir; + private MemberMXBean awaitMemberMXBeanProxy(final DistributedMember member) { + SystemManagementService service = this.managementTestRule.getSystemManagementService(); + ObjectName objectName = service.getMemberMBeanName(member); + await().until(() -> assertThat(service.getMBeanProxy(objectName, MemberMXBean.class)).isNotNull()); + return service.getMBeanProxy(objectName, MemberMXBean.class); } - /** - * Checks recovery status - * - * @param vm reference to VM - * @param localRecovery local recovery on or not - */ - @SuppressWarnings("serial") - private void checkForRecoveryStat(VM vm, final boolean localRecovery) { - vm.invoke(new SerializableRunnable("check disk region stat") { - - public void run() { - Cache cache = getCache(); - Region region = cache.getRegion(REGION_NAME); - DistributedRegion distributedRegion = (DistributedRegion) region; - DiskRegionStats stats = distributedRegion.getDiskRegion().getStats(); - if (localRecovery) { - assertEquals(1, stats.getLocalInitializations()); - assertEquals(0, stats.getRemoteInitializations()); - } else { - assertEquals(0, stats.getLocalInitializations()); - assertEquals(1, stats.getRemoteInitializations()); - } - - } - }); + private void await(final AsyncInvocation createPersistentRegionAsync) throws InterruptedException, ExecutionException, TimeoutException { + createPersistentRegionAsync.await(2, MINUTES); } - /** - * - * @return back up directory - */ - protected static File getBackupDir(String name) throws Exception { - File backUpDir = new File("BackupDir-" + name).getAbsoluteFile(); - org.apache.geode.internal.FileUtil.delete(backUpDir); - backUpDir.mkdir(); - backUpDir.deleteOnExit(); - return backUpDir; + private ConditionFactory await() { + return Awaitility.await().atMost(2, MINUTES); } }
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3b5664c/geode-core/src/test/java/org/apache/geode/management/DistributedSystemDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/management/DistributedSystemDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/DistributedSystemDUnitTest.java index 4eaba67..cd05cde 100644 --- a/geode-core/src/test/java/org/apache/geode/management/DistributedSystemDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/DistributedSystemDUnitTest.java @@ -14,23 +14,19 @@ */ package org.apache.geode.management; -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; -import org.apache.geode.test.junit.categories.DistributedTest; +import static java.util.concurrent.TimeUnit.*; +import static org.apache.geode.test.dunit.Host.*; +import static org.apache.geode.test.dunit.Invoke.*; +import static org.assertj.core.api.Assertions.*; +import java.io.Serializable; +import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import javax.management.InstanceNotFoundException; import javax.management.ListenerNotFoundException; import javax.management.MBeanServer; import javax.management.Notification; @@ -39,11 +35,16 @@ import javax.management.NotificationFilter; import javax.management.NotificationListener; import javax.management.ObjectName; +import com.jayway.awaitility.Awaitility; +import com.jayway.awaitility.core.ConditionFactory; import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; -import org.apache.geode.cache.Cache; import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.admin.Alert; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.logging.LogService; @@ -57,428 +58,269 @@ import org.apache.geode.management.internal.SystemManagementService; import org.apache.geode.management.internal.beans.MemberMBean; import org.apache.geode.management.internal.beans.SequenceNumber; import org.apache.geode.test.dunit.IgnoredException; -import org.apache.geode.test.dunit.LogWriterUtils; -import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.SerializableCallable; -import org.apache.geode.test.dunit.SerializableRunnable; import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.Wait; -import org.apache.geode.test.dunit.WaitCriterion; +import org.apache.geode.test.junit.categories.DistributedTest; /** - * Distributed System tests - * + * Distributed System management tests + * <p> * a) For all the notifications - * * i) gemfire.distributedsystem.member.joined - * * ii) gemfire.distributedsystem.member.left - * * iii) gemfire.distributedsystem.member.suspect - * * iv ) All notifications emitted by member mbeans - * * vi) Alerts - * - * b) Concurrently modify proxy list by removing member and accessing the distributed system MBean - * + * <p> + * b) Concurrently modify proxy list by removing member and accessing the + * distributed system MBean + * <p> * c) Aggregate Operations like shutDownAll - * + * <p> * d) Member level operations like fetchJVMMetrics() - * + * <p> * e ) Statistics - * - * - * */ @Category(DistributedTest.class) -public class DistributedSystemDUnitTest extends ManagementTestBase { +@SuppressWarnings({ "serial", "unused" }) +public class DistributedSystemDUnitTest implements Serializable { private static final Logger logger = LogService.getLogger(); - private static final long serialVersionUID = 1L; - + private static final String WARNING_LEVEL_MESSAGE = "Warning Level Alert Message"; + private static final String SEVERE_LEVEL_MESSAGE = "Severe Level Alert Message"; - private static final int MAX_WAIT = 10 * 1000; + private static List<Notification> notifications; + private static Map<ObjectName, NotificationListener> notificationListenerMap; - private static MBeanServer mbeanServer = MBeanJMXAdapter.mbeanServer; + @Manager + private VM managerVM; - static List<Notification> notifList = new ArrayList<>(); + @Member + private VM[] memberVMs; - static Map<ObjectName, NotificationListener> notificationListenerMap = - new HashMap<ObjectName, NotificationListener>(); + @Rule + public ManagementTestRule managementTestRule = ManagementTestRule.builder().build(); - static final String WARNING_LEVEL_MESSAGE = "Warninglevel Alert Message"; - - static final String SEVERE_LEVEL_MESSAGE = "Severelevel Alert Message"; + @Before + public void before() throws Exception { + notifications = new ArrayList<>(); + notificationListenerMap = new HashMap<>(); + invokeInEveryVM(() -> notifications = new ArrayList<>()); + invokeInEveryVM(() -> notificationListenerMap = new HashMap<>()); + } - public DistributedSystemDUnitTest() { - super(); + @After + public void after() throws Exception { + resetAlertCounts(this.managerVM); } /** * Tests each and every operations that is defined on the MemberMXBean - * - * @throws Exception */ @Test public void testDistributedSystemAggregate() throws Exception { - VM managingNode = getManagingNode(); - createManagementCache(managingNode); - startManagingNode(managingNode); - addNotificationListener(managingNode); + this.managementTestRule.createManager(this.managerVM); + addNotificationListener(this.managerVM); - for (VM vm : getManagedNodeList()) { - createCache(vm); + for (VM memberVM : this.memberVMs) { + this.managementTestRule.createMember(memberVM); } - checkAggregate(managingNode); - for (VM vm : getManagedNodeList()) { - closeCache(vm); - } - - closeCache(managingNode); - + verifyDistributedSystemMXBean(this.managerVM); } /** * Tests each and every operations that is defined on the MemberMXBean - * - * @throws Exception */ @Test public void testAlertManagedNodeFirst() throws Exception { - - for (VM vm : getManagedNodeList()) { - createCache(vm); - warnLevelAlert(vm); - severeLevelAlert(vm); + for (VM memberVM : this.memberVMs) { + this.managementTestRule.createMember(memberVM); + generateWarningAlert(memberVM); + generateSevereAlert(memberVM); } - VM managingNode = getManagingNode(); - - createManagementCache(managingNode); - startManagingNode(managingNode); - addAlertListener(managingNode); - checkAlertCount(managingNode, 0, 0); + this.managementTestRule.createManager(this.managerVM); + addAlertListener(this.managerVM); + verifyAlertCount(this.managerVM, 0, 0); - final DistributedMember managingMember = getMember(managingNode); + DistributedMember managerDistributedMember = this.managementTestRule.getDistributedMember(this.managerVM); - // Before we start we need to ensure that the initial (implicit) SEVERE alert has propagated - // everywhere. - for (VM vm : getManagedNodeList()) { - ensureLoggerState(vm, managingMember, Alert.SEVERE); + // Before we start we need to ensure that the initial (implicit) SEVERE alert has propagated everywhere. + for (VM memberVM : this.memberVMs) { + verifyAlertAppender(memberVM, managerDistributedMember, Alert.SEVERE); } - setAlertLevel(managingNode, AlertDetails.getAlertLevelAsString(Alert.WARNING)); + setAlertLevel(this.managerVM, AlertDetails.getAlertLevelAsString(Alert.WARNING)); - for (VM vm : getManagedNodeList()) { - ensureLoggerState(vm, managingMember, Alert.WARNING); - warnLevelAlert(vm); - severeLevelAlert(vm); + for (VM memberVM : this.memberVMs) { + verifyAlertAppender(memberVM, managerDistributedMember, Alert.WARNING); + generateWarningAlert(memberVM); + generateSevereAlert(memberVM); } - checkAlertCount(managingNode, 3, 3); - resetAlertCounts(managingNode); + verifyAlertCount(this.managerVM, 3, 3); + resetAlertCounts(this.managerVM); - setAlertLevel(managingNode, AlertDetails.getAlertLevelAsString(Alert.SEVERE)); + setAlertLevel(this.managerVM, AlertDetails.getAlertLevelAsString(Alert.SEVERE)); - for (VM vm : getManagedNodeList()) { - ensureLoggerState(vm, managingMember, Alert.SEVERE); - warnLevelAlert(vm); - severeLevelAlert(vm); + for (VM memberVM : this.memberVMs) { + verifyAlertAppender(memberVM, managerDistributedMember, Alert.SEVERE); + generateWarningAlert(memberVM); + generateSevereAlert(memberVM); } - checkAlertCount(managingNode, 3, 0); - resetAlertCounts(managingNode); - - for (VM vm : getManagedNodeList()) { - closeCache(vm); - } - - closeCache(managingNode); - } - - @SuppressWarnings("serial") - public void ensureLoggerState(VM vm1, final DistributedMember member, final int alertLevel) - throws Exception { - { - vm1.invoke(new SerializableCallable("Ensure Logger State") { - - public Object call() throws Exception { - - Wait.waitForCriterion(new WaitCriterion() { - public String description() { - return "Waiting for all alert Listener to register with managed node"; - } - - public boolean done() { - - if (AlertAppender.getInstance().hasAlertListener(member, alertLevel)) { - return true; - } - return false; - } - - }, MAX_WAIT, 500, true); - - return null; - } - }); - - } + verifyAlertCount(this.managerVM, 3, 0); } /** * Tests each and every operations that is defined on the MemberMXBean - * - * @throws Exception */ @Test public void testShutdownAll() throws Exception { - final Host host = Host.getHost(0); - VM managedNode1 = host.getVM(0); - VM managedNode2 = host.getVM(1); - VM managedNode3 = host.getVM(2); - - VM managingNode = host.getVM(3); - - // Managing Node is created first - createManagementCache(managingNode); - startManagingNode(managingNode); - - createCache(managedNode1); - createCache(managedNode2); - createCache(managedNode3); - shutDownAll(managingNode); - closeCache(managingNode); + VM memberVM1 = getHost(0).getVM(0); + VM memberVM2 = getHost(0).getVM(1); + VM memberVM3 = getHost(0).getVM(2); + + VM managerVM = getHost(0).getVM(3); + + // managerVM Node is created first + this.managementTestRule.createManager(managerVM); + + this.managementTestRule.createMember(memberVM1); + this.managementTestRule.createMember(memberVM2); + this.managementTestRule.createMember(memberVM3); + + shutDownAll(managerVM); } @Test public void testNavigationAPIS() throws Exception { + this.managementTestRule.createManager(this.managerVM); - final Host host = Host.getHost(0); - - createManagementCache(managingNode); - startManagingNode(managingNode); - - for (VM vm : managedNodeList) { - createCache(vm); + for (VM memberVM : this.memberVMs) { + this.managementTestRule.createMember(memberVM); } - checkNavigationAPIs(managingNode); + verifyFetchMemberObjectName(this.managerVM, this.memberVMs.length + 1); } @Test public void testNotificationHub() throws Exception { - this.initManagement(false); + this.managementTestRule.createMembers(); + this.managementTestRule.createManagers(); class NotificationHubTestListener implements NotificationListener { + @Override public synchronized void handleNotification(Notification notification, Object handback) { logger.info("Notification received {}", notification); - notifList.add(notification); + notifications.add(notification); } } - managingNode.invoke(new SerializableRunnable("Add Listener to MemberMXBean") { - - public void run() { - Cache cache = getCache(); - ManagementService service = getManagementService(); - final DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); - - Wait.waitForCriterion(new WaitCriterion() { - public String description() { - return "Waiting for all members to send their initial Data"; - } - - public boolean done() { - if (bean.listMemberObjectNames().length == 5) {// including locator - return true; - } else { - return false; - } - } - }, MAX_WAIT, 500, true); - for (ObjectName objectName : bean.listMemberObjectNames()) { - NotificationHubTestListener listener = new NotificationHubTestListener(); - try { - mbeanServer.addNotificationListener(objectName, listener, null, null); - notificationListenerMap.put(objectName, listener); - } catch (InstanceNotFoundException e) { - LogWriterUtils.getLogWriter().error(e); - } - } + this.managerVM.invoke("addListenerToMemberMXBean", () -> { + ManagementService service = this.managementTestRule.getManagementService(); + final DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean(); + + await().until(() -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(5)); + + for (ObjectName objectName : distributedSystemMXBean.listMemberObjectNames()) { + NotificationHubTestListener listener = new NotificationHubTestListener(); + ManagementFactory.getPlatformMBeanServer().addNotificationListener(objectName, listener, null, null); + notificationListenerMap.put(objectName, listener); } }); // Check in all VMS - for (VM vm : managedNodeList) { - vm.invoke(new SerializableRunnable("Check Hub Listener num count") { - - public void run() { - Cache cache = getCache(); - SystemManagementService service = (SystemManagementService) getManagementService(); - NotificationHub hub = service.getNotificationHub(); - Map<ObjectName, NotificationHubListener> listenerObjectMap = hub.getListenerObjectMap(); - assertEquals(1, listenerObjectMap.keySet().size()); - ObjectName memberMBeanName = MBeanJMXAdapter - .getMemberMBeanName(cache.getDistributedSystem().getDistributedMember()); + for (VM memberVM : this.memberVMs) { + memberVM.invoke("checkNotificationHubListenerCount", () -> { + SystemManagementService service = this.managementTestRule.getSystemManagementService(); + NotificationHub notificationHub = service.getNotificationHub(); + Map<ObjectName, NotificationHubListener> listenerMap = notificationHub.getListenerObjectMap(); + assertThat(listenerMap.keySet()).hasSize(1); - NotificationHubListener listener = listenerObjectMap.get(memberMBeanName); + ObjectName memberMBeanName = MBeanJMXAdapter.getMemberMBeanName(this.managementTestRule.getDistributedMember()); + NotificationHubListener listener = listenerMap.get(memberMBeanName); - /* - * Counter of listener should be 2 . One for default Listener which is added for each - * member mbean by distributed system mbean One for the added listener in test - */ - assertEquals(2, listener.getNumCounter()); + /* + * Counter of listener should be 2 . One for default Listener which is + * added for each member mbean by distributed system mbean One for the + * added listener in test + */ + assertThat(listener.getNumCounter()).isEqualTo(2); - // Raise some notifications + // Raise some notifications - NotificationBroadcasterSupport memberLevelNotifEmitter = - (MemberMBean) service.getMemberMXBean(); + NotificationBroadcasterSupport notifier = (MemberMBean) service.getMemberMXBean(); + String memberSource = MBeanJMXAdapter.getMemberNameOrId(this.managementTestRule.getDistributedMember()); - String memberSource = MBeanJMXAdapter - .getMemberNameOrId(cache.getDistributedSystem().getDistributedMember()); - - // Only a dummy notification , no actual region is creates - Notification notification = new Notification(JMXNotificationType.REGION_CREATED, - memberSource, SequenceNumber.next(), System.currentTimeMillis(), - ManagementConstants.REGION_CREATED_PREFIX + "/test"); - memberLevelNotifEmitter.sendNotification(notification); - - } + // Only a dummy notification , no actual region is created + Notification notification = new Notification(JMXNotificationType.REGION_CREATED, memberSource, SequenceNumber.next(), System.currentTimeMillis(), ManagementConstants.REGION_CREATED_PREFIX + "/test"); + notifier.sendNotification(notification); }); } - managingNode.invoke(new SerializableRunnable("Check notifications && Remove Listeners") { - - public void run() { - - Wait.waitForCriterion(new WaitCriterion() { - public String description() { - return "Waiting for all Notifications to reach the Managing Node"; - } - - public boolean done() { - if (notifList.size() == 3) { - return true; - } else { - return false; - } - } - }, MAX_WAIT, 500, true); + this.managerVM.invoke("checkNotificationsAndRemoveListeners", () -> { + await().until(() -> assertThat(notifications).hasSize(3)); - notifList.clear(); - - Iterator<ObjectName> it = notificationListenerMap.keySet().iterator(); - while (it.hasNext()) { - ObjectName objectName = it.next(); - NotificationListener listener = notificationListenerMap.get(objectName); - try { - mbeanServer.removeNotificationListener(objectName, listener); - } catch (ListenerNotFoundException e) { - LogWriterUtils.getLogWriter().error(e); - } catch (InstanceNotFoundException e) { - LogWriterUtils.getLogWriter().error(e); - } - } + notifications.clear(); + for (ObjectName objectName : notificationListenerMap.keySet()) { + NotificationListener listener = notificationListenerMap.get(objectName); + ManagementFactory.getPlatformMBeanServer().removeNotificationListener(objectName, listener); } }); // Check in all VMS again - for (VM vm : managedNodeList) { - vm.invoke(new SerializableRunnable("Check Hub Listener num count Again") { - - public void run() { - Cache cache = getCache(); - SystemManagementService service = (SystemManagementService) getManagementService(); - NotificationHub hub = service.getNotificationHub(); - Map<ObjectName, NotificationHubListener> listenerObjectMap = hub.getListenerObjectMap(); - - assertEquals(1, listenerObjectMap.keySet().size()); - - ObjectName memberMBeanName = MBeanJMXAdapter - .getMemberMBeanName(cache.getDistributedSystem().getDistributedMember()); - - NotificationHubListener listener = listenerObjectMap.get(memberMBeanName); - - /* - * Counter of listener should be 1 for the default Listener which is added for each member - * mbean by distributed system mbean. - */ - assertEquals(1, listener.getNumCounter()); - - } + for (VM memberVM : this.memberVMs) { + memberVM.invoke("checkNotificationHubListenerCountAgain", () -> { + SystemManagementService service = this.managementTestRule.getSystemManagementService(); + NotificationHub hub = service.getNotificationHub(); + Map<ObjectName, NotificationHubListener> listenerObjectMap = hub.getListenerObjectMap(); + assertThat(listenerObjectMap.keySet().size()).isEqualTo(1); + + ObjectName memberMBeanName = MBeanJMXAdapter.getMemberMBeanName(this.managementTestRule.getDistributedMember()); + NotificationHubListener listener = listenerObjectMap.get(memberMBeanName); + + /* + * Counter of listener should be 1 for the default Listener which is + * added for each member mbean by distributed system mbean. + */ + assertThat(listener.getNumCounter()).isEqualTo(1); }); } - managingNode.invoke(new SerializableRunnable("Remove Listener from MemberMXBean") { - - public void run() { - Cache cache = getCache(); - ManagementService service = getManagementService(); - final DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); - - Wait.waitForCriterion(new WaitCriterion() { - public String description() { - return "Waiting for all members to send their initial Data"; - } - - public boolean done() { - if (bean.listMemberObjectNames().length == 5) {// including locator - return true; - } else { - return false; - } - - } - - }, MAX_WAIT, 500, true); - for (ObjectName objectName : bean.listMemberObjectNames()) { - NotificationHubTestListener listener = new NotificationHubTestListener(); - try { - mbeanServer.removeNotificationListener(objectName, listener); - } catch (InstanceNotFoundException e) { - LogWriterUtils.getLogWriter().error(e); - } catch (ListenerNotFoundException e) { - // TODO: apparently there is never a notification listener on any these mbeans at this - // point - // fix this test so it doesn't hit these unexpected exceptions -- - // getLogWriter().error(e); - } + this.managerVM.invoke("removeListenerFromMemberMXBean", () -> { + ManagementService service = this.managementTestRule.getManagementService(); + DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean(); + + await().until(() -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(5)); + + for (ObjectName objectName : distributedSystemMXBean.listMemberObjectNames()) { + NotificationHubTestListener listener = new NotificationHubTestListener(); + try { + ManagementFactory.getPlatformMBeanServer().removeNotificationListener(objectName, listener); // because new instance!! + } catch (ListenerNotFoundException e) { + // TODO: [old] apparently there is never a notification listener on any these mbeans at this point [fix this] + // fix this test so it doesn't hit these unexpected exceptions -- getLogWriter().error(e); } } }); - for (VM vm : managedNodeList) { - vm.invoke(new SerializableRunnable("Check Hub Listeners clean up") { - - public void run() { - Cache cache = getCache(); - SystemManagementService service = (SystemManagementService) getManagementService(); - NotificationHub hub = service.getNotificationHub(); - hub.cleanUpListeners(); - assertEquals(0, hub.getListenerObjectMap().size()); - - Iterator<ObjectName> it = notificationListenerMap.keySet().iterator(); - while (it.hasNext()) { - ObjectName objectName = it.next(); - NotificationListener listener = notificationListenerMap.get(objectName); - try { - mbeanServer.removeNotificationListener(objectName, listener); - fail("Found Listeners inspite of clearing them"); - } catch (ListenerNotFoundException e) { - // Expected Exception Do nothing - } catch (InstanceNotFoundException e) { - LogWriterUtils.getLogWriter().error(e); - } - } + for (VM memberVM : this.memberVMs) { + memberVM.invoke("verifyNotificationHubListenersWereRemoved", () -> { + SystemManagementService service = this.managementTestRule.getSystemManagementService(); + NotificationHub notificationHub = service.getNotificationHub(); + notificationHub.cleanUpListeners(); + assertThat(notificationHub.getListenerObjectMap()).isEmpty(); + + for (ObjectName objectName : notificationListenerMap.keySet()) { + NotificationListener listener = notificationListenerMap.get(objectName); + assertThatThrownBy(() -> ManagementFactory.getPlatformMBeanServer().removeNotificationListener(objectName, listener)).isExactlyInstanceOf(ListenerNotFoundException.class); } }); } @@ -486,404 +328,212 @@ public class DistributedSystemDUnitTest extends ManagementTestBase { /** * Tests each and every operations that is defined on the MemberMXBean - * - * @throws Exception */ @Test public void testAlert() throws Exception { - VM managingNode = getManagingNode(); - - createManagementCache(managingNode); - startManagingNode(managingNode); - addAlertListener(managingNode); - resetAlertCounts(managingNode); + this.managementTestRule.createManager(this.managerVM); + addAlertListener(this.managerVM); + resetAlertCounts(this.managerVM); - final DistributedMember managingMember = getMember(managingNode); + DistributedMember managerDistributedMember = this.managementTestRule.getDistributedMember(this.managerVM); + generateWarningAlert(this.managerVM); + generateSevereAlert(this.managerVM); + verifyAlertCount(this.managerVM, 1, 0); + resetAlertCounts(this.managerVM); + for (VM memberVM : this.memberVMs) { + this.managementTestRule.createMember(memberVM); - warnLevelAlert(managingNode); - severeLevelAlert(managingNode); - checkAlertCount(managingNode, 1, 0); - resetAlertCounts(managingNode); - - for (VM vm : getManagedNodeList()) { - - createCache(vm); - // Default is severe ,So only Severe level alert is expected - - ensureLoggerState(vm, managingMember, Alert.SEVERE); - - warnLevelAlert(vm); - severeLevelAlert(vm); + verifyAlertAppender(memberVM, managerDistributedMember, Alert.SEVERE); + generateWarningAlert(memberVM); + generateSevereAlert(memberVM); } - checkAlertCount(managingNode, 3, 0); - resetAlertCounts(managingNode); - setAlertLevel(managingNode, AlertDetails.getAlertLevelAsString(Alert.WARNING)); + verifyAlertCount(this.managerVM, 3, 0); + resetAlertCounts(this.managerVM); + setAlertLevel(this.managerVM, AlertDetails.getAlertLevelAsString(Alert.WARNING)); - for (VM vm : getManagedNodeList()) { - // warning and severe alerts both are to be checked - ensureLoggerState(vm, managingMember, Alert.WARNING); - warnLevelAlert(vm); - severeLevelAlert(vm); + for (VM memberVM : this.memberVMs) { + verifyAlertAppender(memberVM, managerDistributedMember, Alert.WARNING); + generateWarningAlert(memberVM); + generateSevereAlert(memberVM); } - checkAlertCount(managingNode, 3, 3); - - resetAlertCounts(managingNode); + verifyAlertCount(this.managerVM, 3, 3); - setAlertLevel(managingNode, AlertDetails.getAlertLevelAsString(Alert.OFF)); + resetAlertCounts(this.managerVM); - for (VM vm : getManagedNodeList()) { - ensureLoggerState(vm, managingMember, Alert.OFF); - warnLevelAlert(vm); - severeLevelAlert(vm); - } - checkAlertCount(managingNode, 0, 0); - resetAlertCounts(managingNode); + setAlertLevel(this.managerVM, AlertDetails.getAlertLevelAsString(Alert.OFF)); - for (VM vm : getManagedNodeList()) { - closeCache(vm); + for (VM memberVM : this.memberVMs) { + verifyAlertAppender(memberVM, managerDistributedMember, Alert.OFF); + generateWarningAlert(memberVM); + generateSevereAlert(memberVM); } - closeCache(managingNode); - + verifyAlertCount(this.managerVM, 0, 0); } - @SuppressWarnings("serial") - public void checkAlertCount(VM vm1, final int expectedSevereAlertCount, - final int expectedWarningAlertCount) throws Exception { - { - vm1.invoke(new SerializableCallable("Check Alert Count") { - - public Object call() throws Exception { - final AlertNotifListener nt = AlertNotifListener.getInstance(); - Wait.waitForCriterion(new WaitCriterion() { - public String description() { - return "Waiting for all alerts to reach the Managing Node"; - } - - public boolean done() { - if (expectedSevereAlertCount == nt.getseverAlertCount() - && expectedWarningAlertCount == nt.getWarnigAlertCount()) { - return true; - } else { - return false; - } - - } - - }, MAX_WAIT, 500, true); - - return null; - } - }); - - } + private void verifyAlertAppender(final VM memberVM, final DistributedMember member, final int alertLevel) { + memberVM.invoke("verifyAlertAppender", () -> await().until(() -> assertThat(AlertAppender.getInstance().hasAlertListener(member, alertLevel)).isTrue())); } + private void verifyAlertCount(final VM managerVM, final int expectedSevereAlertCount, final int expectedWarningAlertCount) { + managerVM.invoke("verifyAlertCount", () -> { + AlertNotificationListener listener = AlertNotificationListener.getInstance(); - - @SuppressWarnings("serial") - public void setAlertLevel(VM vm1, final String alertLevel) throws Exception { - { - vm1.invoke(new SerializableCallable("Set Alert level") { - - public Object call() throws Exception { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - ManagementService service = getManagementService(); - DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); - assertNotNull(bean); - bean.changeAlertLevel(alertLevel); - - return null; - } - }); - - } + await().until(() -> assertThat(listener.getSevereAlertCount()).isEqualTo(expectedSevereAlertCount)); + await().until(() -> assertThat(listener.getWarningAlertCount()).isEqualTo(expectedWarningAlertCount)); + }); } - @SuppressWarnings("serial") - public void warnLevelAlert(VM vm1) throws Exception { - { - vm1.invoke(new SerializableCallable("Warning level Alerts") { - - public Object call() throws Exception { - final IgnoredException warnEx = - IgnoredException.addIgnoredException(WARNING_LEVEL_MESSAGE); - logger.warn(WARNING_LEVEL_MESSAGE); - warnEx.remove(); - return null; - } - }); - - } + private void setAlertLevel(final VM managerVM, final String alertLevel) { + managerVM.invoke("setAlertLevel", () -> { + ManagementService service = this.managementTestRule.getManagementService(); + DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean(); + distributedSystemMXBean.changeAlertLevel(alertLevel); + }); } - - @SuppressWarnings("serial") - public void resetAlertCounts(VM vm1) throws Exception { - { - vm1.invoke(new SerializableCallable("Reset Alert Count") { - - public Object call() throws Exception { - AlertNotifListener nt = AlertNotifListener.getInstance(); - nt.resetCount(); - return null; - } - }); - - } + private void generateWarningAlert(final VM anyVM) { + anyVM.invoke("generateWarningAlert", () -> { + IgnoredException ignoredException = IgnoredException.addIgnoredException(WARNING_LEVEL_MESSAGE); + logger.warn(WARNING_LEVEL_MESSAGE); + ignoredException.remove(); + }); } - @SuppressWarnings("serial") - public void severeLevelAlert(VM vm1) throws Exception { - { - vm1.invoke(new SerializableCallable("Severe Level Alert") { - - public Object call() throws Exception { - // add expected exception strings - - final IgnoredException severeEx = - IgnoredException.addIgnoredException(SEVERE_LEVEL_MESSAGE); - logger.fatal(SEVERE_LEVEL_MESSAGE); - severeEx.remove(); - return null; - } - }); - - } + private void resetAlertCounts(final VM managerVM) { + managerVM.invoke("resetAlertCounts", () -> { + AlertNotificationListener listener = AlertNotificationListener.getInstance(); + listener.resetCount(); + }); } - @SuppressWarnings("serial") - public void addAlertListener(VM vm1) throws Exception { - { - vm1.invoke(new SerializableCallable("Add Alert Listener") { - - public Object call() throws Exception { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - ManagementService service = getManagementService(); - DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); - AlertNotifListener nt = AlertNotifListener.getInstance(); - nt.resetCount(); - - NotificationFilter notificationFilter = new NotificationFilter() { - @Override - public boolean isNotificationEnabled(Notification notification) { - return notification.getType().equals(JMXNotificationType.SYSTEM_ALERT); - } + private void generateSevereAlert(final VM anyVM) { + anyVM.invoke("generateSevereAlert", () -> { + IgnoredException ignoredException = IgnoredException.addIgnoredException(SEVERE_LEVEL_MESSAGE); + logger.fatal(SEVERE_LEVEL_MESSAGE); + ignoredException.remove(); + }); + } - }; + private void addAlertListener(final VM managerVM) { + managerVM.invoke("addAlertListener", () -> { + AlertNotificationListener listener = AlertNotificationListener.getInstance(); + listener.resetCount(); - mbeanServer.addNotificationListener(MBeanJMXAdapter.getDistributedSystemName(), nt, - notificationFilter, null); + NotificationFilter notificationFilter = (Notification notification) -> notification.getType().equals(JMXNotificationType.SYSTEM_ALERT); - return null; - } - }); - - } + ManagementFactory.getPlatformMBeanServer().addNotificationListener(MBeanJMXAdapter.getDistributedSystemName(), listener, notificationFilter, null); + }); } /** * Check aggregate related functions and attributes - * - * @param vm1 - * @throws Exception */ - @SuppressWarnings("serial") - public void checkAggregate(VM vm1) throws Exception { - { - vm1.invoke(new SerializableCallable("Chech Aggregate Attributes") { - - public Object call() throws Exception { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + private void verifyDistributedSystemMXBean(final VM managerVM) { + managerVM.invoke("verifyDistributedSystemMXBean", () -> { + ManagementService service = this.managementTestRule.getManagementService(); + DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean(); - ManagementService service = getManagementService(); + await().until(() -> assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5)); - final DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); - assertNotNull(service.getDistributedSystemMXBean()); - - Wait.waitForCriterion(new WaitCriterion() { - public String description() { - return "Waiting All members to intitialize DistributedSystemMBean expect 5 but found " - + bean.getMemberCount(); - } - - public boolean done() { - // including locator - if (bean.getMemberCount() == 5) { - return true; - } else { - return false; - } - - } - - }, MAX_WAIT, 500, true); - - - - final Set<DistributedMember> otherMemberSet = - cache.getDistributionManager().getOtherNormalDistributionManagerIds(); - Iterator<DistributedMember> memberIt = otherMemberSet.iterator(); - while (memberIt.hasNext()) { - DistributedMember member = memberIt.next(); - LogWriterUtils.getLogWriter().info("JVM Metrics For Member " + member.getId() + ":" - + bean.showJVMMetrics(member.getId())); - LogWriterUtils.getLogWriter().info("OS Metrics For Member " + member.getId() + ":" - + bean.showOSMetrics(member.getId())); - } - - return null; - } - }); - - } + Set<DistributedMember> otherMemberSet = this.managementTestRule.getOtherNormalMembers(); + for (DistributedMember member : otherMemberSet) { + // TODO: need assertions? JVMMetrics and OSMetrics + } + }); } - @SuppressWarnings("serial") - public void addNotificationListener(VM vm1) throws Exception { - { - vm1.invoke(new SerializableCallable("Add Notification Listener") { - - public Object call() throws Exception { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - ManagementService service = getManagementService(); - DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); - assertNotNull(bean); - TestDistributedSystemNotif nt = new TestDistributedSystemNotif(); - mbeanServer.addNotificationListener(MBeanJMXAdapter.getDistributedSystemName(), nt, null, - null); - - return null; - } - }); + private void addNotificationListener(final VM managerVM) { + managerVM.invoke("addNotificationListener", () -> { + ManagementService service = this.managementTestRule.getManagementService(); + DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean(); + assertThat(distributedSystemMXBean).isNotNull(); - } + DistributedSystemNotificationListener listener = new DistributedSystemNotificationListener(); + ManagementFactory.getPlatformMBeanServer().addNotificationListener(MBeanJMXAdapter.getDistributedSystemName(), listener, null, null); + }); } + private void shutDownAll(final VM managerVM) { + managerVM.invoke("shutDownAll", () -> { + ManagementService service = this.managementTestRule.getManagementService(); + DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean(); + distributedSystemMXBean.shutDownAllMembers(); - - @SuppressWarnings("serial") - public void shutDownAll(VM vm1) throws Exception { - { - vm1.invoke(new SerializableCallable("Shut Down All") { - - public Object call() throws Exception { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - ManagementService service = getManagementService(); - DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); - assertNotNull(service.getDistributedSystemMXBean()); - bean.shutDownAllMembers(); - Wait.pause(2000); - assertEquals(cache.getDistributedSystem().getAllOtherMembers().size(), 1); - return null; - } - }); - - } + await().until(() -> assertThat(this.managementTestRule.getOtherNormalMembers()).hasSize(0)); + }); } + private void verifyFetchMemberObjectName(final VM managerVM, final int memberCount) { + managerVM.invoke("verifyFetchMemberObjectName", () -> { + ManagementService service = this.managementTestRule.getManagementService(); + DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean(); + await().until(() -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(memberCount)); - @SuppressWarnings("serial") - public void checkNavigationAPIs(VM vm1) throws Exception { - { - vm1.invoke(new SerializableCallable("Check Navigation APIS") { - - public Object call() throws Exception { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - ManagementService service = getManagementService(); - final DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); - - assertNotNull(service.getDistributedSystemMXBean()); - - waitForAllMembers(4); - - for (int i = 0; i < bean.listMemberObjectNames().length; i++) { - LogWriterUtils.getLogWriter() - .info("ObjectNames Of the Mmeber" + bean.listMemberObjectNames()[i]); - } - - - ObjectName thisMemberName = MBeanJMXAdapter.getMemberMBeanName( - InternalDistributedSystem.getConnectedInstance().getDistributedMember().getId()); - - ObjectName memberName = bean.fetchMemberObjectName( - InternalDistributedSystem.getConnectedInstance().getDistributedMember().getId()); - assertEquals(thisMemberName, memberName); - - return null; - } - }); - - } + String memberId = this.managementTestRule.getDistributedMember().getId(); + ObjectName thisMemberName = MBeanJMXAdapter.getMemberMBeanName(memberId); + ObjectName memberName = distributedSystemMXBean.fetchMemberObjectName(memberId); + assertThat(memberName).isEqualTo(thisMemberName); + }); } + private ConditionFactory await() { + return Awaitility.await().atMost(2, MINUTES); + } - /** - * Notification handler - * - * - */ - private static class TestDistributedSystemNotif implements NotificationListener { + private static class DistributedSystemNotificationListener implements NotificationListener { @Override - public void handleNotification(Notification notification, Object handback) { - assertNotNull(notification); + public void handleNotification(final Notification notification, final Object handback) { + assertThat(notification).isNotNull(); } - } - /** - * Notification handler - * - * - */ - private static class AlertNotifListener implements NotificationListener { + private static class AlertNotificationListener implements NotificationListener { + + private static AlertNotificationListener listener = new AlertNotificationListener(); + + private int warningAlertCount = 0; - private static AlertNotifListener listener = new AlertNotifListener(); + private int severeAlertCount = 0; - public static AlertNotifListener getInstance() { + static AlertNotificationListener getInstance() { // TODO: get rid of singleton return listener; } - private int warnigAlertCount = 0; + @Override + public synchronized void handleNotification(final Notification notification, final Object handback) { + assertThat(notification).isNotNull(); - private int severAlertCount = 0; + Map<String, String> notificationUserData = (Map<String, String>) notification.getUserData(); - @Override - public synchronized void handleNotification(Notification notification, Object handback) { - assertNotNull(notification); - logger.info("Notification received {}", notification); - Map<String, String> notifUserData = (Map<String, String>) notification.getUserData(); - if (notifUserData.get(JMXNotificationUserData.ALERT_LEVEL).equalsIgnoreCase("warning")) { - assertEquals(WARNING_LEVEL_MESSAGE, notification.getMessage()); - ++warnigAlertCount; + if (notificationUserData.get(JMXNotificationUserData.ALERT_LEVEL).equalsIgnoreCase("warning")) { + assertThat(notification.getMessage()).isEqualTo(WARNING_LEVEL_MESSAGE); + warningAlertCount++; } - if (notifUserData.get(JMXNotificationUserData.ALERT_LEVEL).equalsIgnoreCase("severe")) { - assertEquals(SEVERE_LEVEL_MESSAGE, notification.getMessage()); - ++severAlertCount; + if (notificationUserData.get(JMXNotificationUserData.ALERT_LEVEL).equalsIgnoreCase("severe")) { + assertThat(notification.getMessage()).isEqualTo(SEVERE_LEVEL_MESSAGE); + severeAlertCount++; } } - public void resetCount() { - warnigAlertCount = 0; - - severAlertCount = 0; + void resetCount() { + warningAlertCount = 0; + severeAlertCount = 0; } - public int getWarnigAlertCount() { - return warnigAlertCount; + int getWarningAlertCount() { + return warningAlertCount; } - public int getseverAlertCount() { - return severAlertCount; + int getSevereAlertCount() { + return severeAlertCount; } - } - } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3b5664c/geode-core/src/test/java/org/apache/geode/management/JMXMBeanDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/management/JMXMBeanDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/JMXMBeanDUnitTest.java index 0feb4c2..0096f0d 100644 --- a/geode-core/src/test/java/org/apache/geode/management/JMXMBeanDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/JMXMBeanDUnitTest.java @@ -49,7 +49,7 @@ import org.apache.geode.test.junit.categories.FlakyTest; import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; import org.apache.geode.util.test.TestUtil; -public class JMXMBeanDUnitTest extends DistributedTestCase { +public class JMXMBeanDUnitTest extends DistributedTestCase { // TODO: rename and fix on Mac private Host host; private VM locator;