Repository: incubator-geode Updated Branches: refs/heads/develop 983010e4a -> 237035720
GEODE-1011: Rearranged the order of creating test elements * Caches are created first, followed by region and sender/receiver * Removed the pauses, as they are not necessary. Awaitility handles the waiting * Issue assumed to be resolved after WAN refactoring * Flaky tag removed Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/23703572 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/23703572 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/23703572 Branch: refs/heads/develop Commit: 237035720189d0ca72d63848460e2e77a9a2c3d0 Parents: 983010e Author: nabarun <n...@pivotal.io> Authored: Fri Jul 29 14:12:10 2016 -0700 Committer: nabarun <n...@pivotal.io> Committed: Mon Aug 1 10:09:41 2016 -0700 ---------------------------------------------------------------------- ...allelGatewaySenderOperation_1_DUnitTest.java | 48 ++++++++++---------- 1 file changed, 25 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/23703572/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java index 4645488..ab1c06b 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java @@ -597,26 +597,14 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes * Normal scenario in which a sender is stopped and then started again on accessor node. * @throws Exception */ - @Category(FlakyTest.class) // GEODE-1011: random ports, time sensitive, waitForCriterion, 5 minute timeouts, 5 second thread sleeps, async actions @Test public void testParallelPropagationSenderStartAfterStopOnAccessorNode() throws Throwable { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); vm5.invoke(() -> WANTestBase.createPartitionedRegion( @@ -626,6 +614,15 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes vm7.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor( getTestMethodName() + "_PR", "ln", 1, 100)); + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); + startSenderInVMs("ln", vm4, vm5, vm6, vm7); vm2.invoke(() -> WANTestBase.createPartitionedRegion( @@ -633,6 +630,9 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes vm3.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + createReceiverInVMs(vm2, vm3); + //make sure all the senders are not running on accessor nodes and running on non-accessor nodes vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); @@ -642,15 +642,13 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes //FIRST RUN: now, the senders are started. So, do some of the puts vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 )); - + //now, stop all of the senders vm4.invoke(() -> WANTestBase.stopSender( "ln" )); vm5.invoke(() -> WANTestBase.stopSender( "ln" )); vm6.invoke(() -> WANTestBase.stopSender( "ln" )); vm7.invoke(() -> WANTestBase.stopSender( "ln" )); - - Wait.pause(2000); - + //SECOND RUN: do some of the puts after the senders are stopped vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); @@ -662,11 +660,16 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes AsyncInvocation vm5start = vm5.invokeAsync(() -> WANTestBase.startSender( "ln" )); AsyncInvocation vm6start = vm6.invokeAsync(() -> WANTestBase.startSender( "ln" )); AsyncInvocation vm7start = vm7.invokeAsync(() -> WANTestBase.startSender( "ln" )); - int START_TIMEOUT = 30000; - vm4start.getResult(START_TIMEOUT); - vm5start.getResult(START_TIMEOUT); - vm6start.getResult(START_TIMEOUT); - vm7start.getResult(START_TIMEOUT); + vm4start.join(); + vm5start.join(); + vm6start.join(); + vm7start.join(); + + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + //Region size on remote site should remain same and below the number of puts done in the FIRST RUN vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); @@ -674,8 +677,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes //SECOND RUN: do some more puts AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); async.join(); - Wait.pause(5000); - + //verify all buckets drained only on non-accessor nodes. vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));