Repository: incubator-geode Updated Branches: refs/heads/feature/GEODE-866 58038ec1f -> 0804a1396 (forced update)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0804a139/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java index bfc95c9..474231c 100644 --- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java +++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java @@ -33,6 +33,8 @@ import com.gemstone.gemfire.internal.cache.wan.WANTestBase.MyGatewayEventFilter; import com.gemstone.gemfire.test.dunit.AsyncInvocation; import com.gemstone.gemfire.test.dunit.IgnoredException; import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.SerializableCallableIF; +import com.gemstone.gemfire.test.dunit.SerializableRunnableIF; import com.gemstone.gemfire.test.dunit.Wait; public class ParallelWANPropagationDUnitTest extends WANTestBase { @@ -85,10 +87,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); //keep a larger batch to minimize number of exception occurrences in the log vm4.invoke(() -> WANTestBase.createSender( "ln", 2, @@ -100,19 +102,15 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 300, false, false, null, true )); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(startSenderRunnable()); + vm5.invoke(startSenderRunnable()); + vm6.invoke(startSenderRunnable()); + vm7.invoke(startSenderRunnable()); vm4.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", false )); vm5.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", false)); @@ -120,22 +118,20 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", false )); //make sure all the senders are running before doing any puts - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); + vm3.invoke(createReceiverRunnable(nyPort)); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); //verify all buckets drained on all sender nodes. vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); @@ -150,6 +146,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_PR", 1000 )); } + + protected SerializableRunnableIF createCacheRunnable(Integer lnPort) { + return () -> WANTestBase.createCache( lnPort ); + } /** * Normal happy scenario test case. @@ -159,13 +159,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); + vm3.invoke(createReceiverRunnable(nyPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); @@ -176,31 +176,25 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(startSenderRunnable()); + vm5.invoke(startSenderRunnable()); + vm6.invoke(startSenderRunnable()); + vm7.invoke(startSenderRunnable()); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); //before doing any puts, let the senders be running in order to ensure that //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); @@ -215,17 +209,35 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { getTestMethodName() + "_PR", 1000 )); } + protected SerializableRunnableIF createReceiverPartitionedRegionRedundancy1() { + return () -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ); + } + + protected SerializableRunnableIF createPartitionedRegionRedundancy1Runnable() { + return () -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ); + } + + protected SerializableRunnableIF startSenderRunnable() { + return () -> WANTestBase.startSender( "ln" ); + } + + protected SerializableRunnableIF waitForSenderRunnable() { + return () -> WANTestBase.waitForSenderRunningState( "ln" ); + } + public void testParallelPropagation_ManualStart() throws Exception { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); + vm3.invoke(createReceiverRunnable(nyPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, false )); @@ -236,26 +248,20 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, false )); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); //before doing any puts, let the senders be running in order to ensure that //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); @@ -278,13 +284,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); + vm3.invoke(createReceiverRunnable(nyPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); @@ -295,22 +301,18 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); - AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.startSender( "ln" )); - AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.startSender( "ln" )); - AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.startSender( "ln" )); + AsyncInvocation inv1 = vm4.invokeAsync(startSenderRunnable()); + AsyncInvocation inv2 = vm5.invokeAsync(startSenderRunnable()); + AsyncInvocation inv3 = vm6.invokeAsync(startSenderRunnable()); + AsyncInvocation inv4 = vm7.invokeAsync(startSenderRunnable()); try{ inv1.join(); @@ -321,17 +323,15 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { catch(InterruptedException ie) { fail("Caught interrupted exception"); } - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); //before doing any puts, let the senders be running in order to ensure that //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); @@ -353,13 +353,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); + vm3.invoke(createReceiverRunnable(nyPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); @@ -370,31 +370,25 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(startSenderRunnable()); + vm5.invoke(startSenderRunnable()); + vm6.invoke(startSenderRunnable()); + vm7.invoke(startSenderRunnable()); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); //before doing any puts, let the senders be running in order to ensure that //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); @@ -411,10 +405,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_PR" )); LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); @@ -430,24 +424,20 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); vm7.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(startSenderRunnable()); + vm5.invoke(startSenderRunnable()); + vm6.invoke(startSenderRunnable()); + vm7.invoke(startSenderRunnable()); - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); //------------------------------------------------------------------------------------ IgnoredException.addIgnoredException(EntryExistsException.class.getName()); @@ -472,13 +462,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); + vm3.invoke(createReceiverRunnable(nyPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); @@ -494,10 +484,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(startSenderRunnable()); + vm5.invoke(startSenderRunnable()); + vm6.invoke(startSenderRunnable()); + vm7.invoke(startSenderRunnable()); vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); @@ -526,13 +516,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); + vm3.invoke(createReceiverRunnable(nyPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); @@ -548,10 +538,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() )); vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(startSenderRunnable()); + vm5.invoke(startSenderRunnable()); + vm6.invoke(startSenderRunnable()); + vm7.invoke(startSenderRunnable()); vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap() )); vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap() )); @@ -579,13 +569,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); + vm3.invoke(createReceiverRunnable(nyPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); @@ -605,10 +595,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(startSenderRunnable()); + vm5.invoke(startSenderRunnable()); + vm6.invoke(startSenderRunnable()); + vm7.invoke(startSenderRunnable()); vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName(), null, 1, 100, isOffHeap() )); @@ -616,10 +606,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { getTestMethodName(), null, 1, 100, isOffHeap() )); //let all the senders start before doing any puts to ensure that none of the events is lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); vm4.invoke(() -> WANTestBase.doHeavyPuts( getTestMethodName(), 150 )); @@ -639,13 +629,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); + vm3.invoke(createReceiverRunnable(nyPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "lnSerial", 2, false, 100, 10, false, false, null, true )); @@ -666,10 +656,8 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); @@ -720,13 +708,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( tkPort )); + vm2.invoke(createReceiverRunnable(nyPort)); + vm3.invoke(createReceiverRunnable(tkPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "lnParallel1", 2, true, 100, 10, false, false, null, true )); @@ -765,10 +753,8 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.startSender( "lnParallel2" )); vm7.invoke(() -> WANTestBase.startSender( "lnParallel2" )); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); //before doing puts, make sure that the senders are started. //this will ensure that not a single events is lost @@ -811,13 +797,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); + vm3.invoke(createReceiverRunnable(nyPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); @@ -842,15 +828,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(startSenderRunnable()); + vm5.invoke(startSenderRunnable()); + vm6.invoke(startSenderRunnable()); + vm7.invoke(startSenderRunnable()); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); AsyncInvocation inv1 = vm7.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 )); Wait.pause(500); @@ -882,13 +866,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); + vm3.invoke(createReceiverRunnable(nyPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, @@ -912,10 +896,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(startSenderRunnable()); + vm5.invoke(startSenderRunnable()); + vm6.invoke(startSenderRunnable()); + vm7.invoke(startSenderRunnable()); vm2.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName(), null, 1, 100, isOffHeap() )); @@ -924,10 +908,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { //wait for senders to be running before doing any puts. This will ensure that //not a single events is lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); @@ -941,13 +925,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); + vm3.invoke(createReceiverRunnable(nyPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); @@ -958,31 +942,25 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(startSenderRunnable()); + vm5.invoke(startSenderRunnable()); + vm6.invoke(startSenderRunnable()); + vm7.invoke(startSenderRunnable()); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); //before doing any puts, let the senders be running in order to ensure that //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); vm4.invoke(() -> WANTestBase.doPutAll( getTestMethodName() + "_PR", 100 , 50 )); @@ -1009,13 +987,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); + vm3.invoke(createReceiverRunnable(nyPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 100, false, false, null, true )); @@ -1026,31 +1004,25 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm7.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 100, false, false, null, true )); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(startSenderRunnable()); + vm5.invoke(startSenderRunnable()); + vm6.invoke(startSenderRunnable()); + vm7.invoke(startSenderRunnable()); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); //before doing any puts, let the senders be running in order to ensure that //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); @@ -1096,11 +1068,11 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); + vm3.invoke(createReceiverRunnable(nyPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); //vm6.invoke(() -> WANTestBase.createCache( lnPort )); //vm7.invoke(() -> WANTestBase.createCache( lnPort )); @@ -1113,29 +1085,25 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { //vm7.invoke(() -> WANTestBase.createSender( "ln", 2, // true, 100, 10, false, false, null, true )); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); // vm6.invoke(() -> WANTestBase.createPartitionedRegion( // testName + "_PR", "ln", true, 1, 100, isOffHeap() )); // vm7.invoke(() -> WANTestBase.createPartitionedRegion( // testName + "_PR", "ln", true, 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(startSenderRunnable()); + vm5.invoke(startSenderRunnable()); // vm6.invoke(() -> WANTestBase.startSender( "ln" )); // vm7.invoke(() -> WANTestBase.startSender( "ln" )); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); //before doing any puts, let the senders be running in order to ensure that //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); // vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); // vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); vm4.invoke(() -> WANTestBase.doTxPuts( getTestMethodName() + "_PR", @@ -1155,26 +1123,24 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(startSenderRunnable()); + vm5.invoke(startSenderRunnable()); - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); @@ -1210,26 +1176,24 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + vm4.invoke(startSenderRunnable()); + vm5.invoke(startSenderRunnable()); - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); @@ -1269,17 +1233,17 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); //create cache and receiver on site2 - vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm2.invoke(createReceiverRunnable(nyPort)); //create cache on site1 - vm3.invoke(() -> WANTestBase.createCache( lnPort )); + vm3.invoke(createCacheRunnable(lnPort)); //create sender on site1 vm3.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, null, true )); //start sender on site1 - vm3.invoke(() -> WANTestBase.startSender( "ln" )); + vm3.invoke(startSenderRunnable()); //create leader (parent) PR on site1 vm3.invoke(() -> WANTestBase.createPartitionedRegion( @@ -1309,13 +1273,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm6.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm7.invoke(() -> WANTestBase.createReceiver( nyPort )); + vm6.invoke(createReceiverRunnable(nyPort)); + vm7.invoke(createReceiverRunnable(nyPort)); - vm2.invoke(() -> WANTestBase.createCache( lnPort )); - vm3.invoke(() -> WANTestBase.createCache( lnPort )); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm2.invoke(createCacheRunnable(lnPort)); + vm3.invoke(createCacheRunnable(lnPort)); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); vm2.invoke(() -> WANTestBase.createSender( "ln", 2, true, 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true )); @@ -1335,10 +1299,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { vm5.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm2.invoke(() -> WANTestBase.startSender( "ln" )); - vm3.invoke(() -> WANTestBase.startSender( "ln" )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); + vm2.invoke(startSenderRunnable()); + vm3.invoke(startSenderRunnable()); + vm4.invoke(startSenderRunnable()); + vm5.invoke(startSenderRunnable()); vm6.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName(), null, 1, 100, isOffHeap() )); @@ -1348,10 +1312,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { // wait for senders to be running before doing any puts. This will ensure // that // not a single events is lost - vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm2.invoke(waitForSenderRunnable()); + vm3.invoke(waitForSenderRunnable()); + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); @@ -1385,5 +1349,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase { assertEquals(2000, (vm2Acks + vm3Acks + vm4Acks + vm5Acks)); + } + + protected SerializableCallableIF<Integer> createReceiverRunnable( + Integer nyPort) { + return () -> WANTestBase.createReceiver( nyPort ); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0804a139/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationDUnitTest.java index 91234c9..d89c864 100644 --- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationDUnitTest.java +++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationDUnitTest.java @@ -28,6 +28,7 @@ import com.gemstone.gemfire.internal.cache.wan.WANTestBase; import com.gemstone.gemfire.test.dunit.AsyncInvocation; import com.gemstone.gemfire.test.dunit.IgnoredException; import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.SerializableRunnableIF; import com.gemstone.gemfire.test.dunit.Wait; public class SerialWANPropogationDUnitTest extends WANTestBase { @@ -52,10 +53,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { public void disabledtestReplicatedSerialPropagation_withoutRemoteLocator() throws Exception { Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - vm5.invoke(() -> WANTestBase.createCache(lnPort )); - vm6.invoke(() -> WANTestBase.createCache(lnPort )); - vm7.invoke(() -> WANTestBase.createCache(lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); //keep the batch size high enough to reduce the number of exceptions in the log vm4.invoke(() -> WANTestBase.createSender( "ln", 2, @@ -66,22 +67,18 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.startSender( "ln" )); vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 1000 )); Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm2.invoke(() -> WANTestBase.createCache( nyPort )); - vm3.invoke(() -> WANTestBase.createCache( nyPort )); + vm2.invoke(createCacheRunnable(nyPort)); + vm3.invoke(createCacheRunnable(nyPort)); vm2.invoke(() -> WANTestBase.createReceiver2(nyPort )); vm3.invoke(() -> WANTestBase.createReceiver2(nyPort )); @@ -98,6 +95,15 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { getTestMethodName() + "_RR", 1000 )); vm3.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR", 1000 )); + } + + protected SerializableRunnableIF createReplicatedRegionRunnable() { + return () -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() ); + } + + protected SerializableRunnableIF createCacheRunnable(Integer lnPort) { + return () -> WANTestBase.createCache(lnPort ); } @@ -106,10 +112,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - vm5.invoke(() -> WANTestBase.createCache(lnPort )); - vm6.invoke(() -> WANTestBase.createCache(lnPort )); - vm7.invoke(() -> WANTestBase.createCache(lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); //keep the batch size high enough to reduce the number of exceptions in the log vm4.invoke(() -> WANTestBase.createSender( "ln", 2, @@ -120,14 +126,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.startSender( "ln" )); vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); IgnoredException.addIgnoredException(BatchException70.class.getName()); IgnoredException.addIgnoredException(ServerOperationException.class.getName()); @@ -140,8 +142,8 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 1000 )); - vm2.invoke(() -> WANTestBase.createCache( nyPort )); - vm3.invoke(() -> WANTestBase.createCache( nyPort )); + vm2.invoke(createCacheRunnable(nyPort)); + vm3.invoke(createCacheRunnable(nyPort)); vm2.invoke(() -> WANTestBase.createReceiver2(nyPort )); vm3.invoke(() -> WANTestBase.createReceiver2(nyPort )); @@ -171,10 +173,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); // reduce the batch-size so maximum number of batches will be sent vm4.invoke(() -> WANTestBase.createSender( "ln", 2, @@ -185,14 +187,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.startSender( "ln" )); vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); IgnoredException.addIgnoredException(BatchException70.class.getName()); IgnoredException.addIgnoredException(ServerOperationException.class.getName()); @@ -208,8 +206,8 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { // pause for some time before starting up the remote site Wait.pause(10000); - vm2.invoke(() -> WANTestBase.createCache( nyPort )); - vm3.invoke(() -> WANTestBase.createCache( nyPort )); + vm2.invoke(createCacheRunnable(nyPort)); + vm3.invoke(createCacheRunnable(nyPort)); vm2.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); @@ -238,10 +236,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false, null, true )); @@ -256,14 +254,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.startSender( "ln" )); vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 1000 )); @@ -281,10 +275,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false, null, true )); @@ -299,14 +293,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.startSender( "ln" )); vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 1000 )); @@ -321,10 +311,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_RR" )); LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false, null, true )); @@ -334,14 +324,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); vm5.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true )); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); vm4.invoke(() -> WANTestBase.startSender( "ln" )); vm5.invoke(() -> WANTestBase.startSender( "ln" )); @@ -380,10 +366,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); //these are part of local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); //senders are created on local site vm4.invoke(() -> WANTestBase.createSender( "ln", 2, @@ -465,10 +451,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); //these are part of local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); //senders are created on local site vm4.invoke(() -> WANTestBase.createSender( "ln", 2, @@ -537,10 +523,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); //these are part of local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); //senders are created on local site vm4.invoke(() -> WANTestBase.createSender( "ln", 2, @@ -623,10 +609,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { // these are part of local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); // senders are created on local site vm4.invoke(() -> WANTestBase.createSender( "ln", 2, @@ -734,10 +720,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { //vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); //these are part of local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); //senders are created on local site. Batch size is kept to a high (170) so //there will be less number of exceptions (occur during dispatchBatch) in the log @@ -794,8 +780,8 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); // these are part of local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); // senders are created on local site. Batch size is kept to a high (170) so // there will be less number of exceptions (occur during dispatchBatch) in @@ -861,8 +847,8 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); // these are part of local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); // senders are created on local site. Batch size is kept to a high (170) so // there will be less number of exceptions (occur during dispatchBatch) in @@ -929,8 +915,8 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); // these are part of local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); // senders are created on local site. Batch size is kept to a high (170) so // there will be less number of exceptions (occur during dispatchBatch) in @@ -1002,8 +988,8 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.createReceiver( nyPort1 )); // these are part of local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); // senders are created on local site. Batch size is kept to a high (170) so // there will be less number of exceptions (occur during dispatchBatch) in @@ -1064,11 +1050,11 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { // these are part of remote site vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm3.invoke(() -> WANTestBase.createCache( nyPort )); + vm3.invoke(createCacheRunnable(nyPort)); // these are part of local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); // senders are created on local site. Batch size is kept to a high (170) so // there will be less number of exceptions (occur during dispatchBatch) in @@ -1133,10 +1119,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); vm3.invoke(() -> WANTestBase.createReceiver( tkPort )); - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - vm5.invoke(() -> WANTestBase.createCache(lnPort )); - vm6.invoke(() -> WANTestBase.createCache(lnPort )); - vm7.invoke(() -> WANTestBase.createCache(lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "lnSerial1", 2, false, 100, 10, false, false, null, true )); @@ -1185,10 +1171,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); vm3.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - vm5.invoke(() -> WANTestBase.createCache(lnPort )); - vm6.invoke(() -> WANTestBase.createCache(lnPort )); - vm7.invoke(() -> WANTestBase.createCache(lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false, null, true )); @@ -1203,14 +1189,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.startSender( "ln" )); vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10000 )); Wait.pause(2000); @@ -1249,8 +1231,8 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { LogWriterUtils.getLogWriter().info("Started receivers on remote site"); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false, null, true )); @@ -1271,10 +1253,8 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { LogWriterUtils.getLogWriter().info("Started senders on local site"); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10000 )); LogWriterUtils.getLogWriter().info("Started async puts on local site"); @@ -1334,8 +1314,8 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { LogWriterUtils.getLogWriter().info("Started receivers on remote site"); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false, null, true )); @@ -1356,10 +1336,8 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { LogWriterUtils.getLogWriter().info("Started senders on local site"); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10000 )); LogWriterUtils.getLogWriter().info("Started async puts on local site"); @@ -1385,12 +1363,11 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { // ---------------------------REBUILD vm4 // -------------------------------------- LogWriterUtils.getLogWriter().info("Rebuilding vm4...."); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false, null, true )); vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm4.invoke(createReplicatedRegionRunnable()); LogWriterUtils.getLogWriter().info("Rebuilt vm4"); // ----------------------------------------------------------------------------- @@ -1427,8 +1404,8 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm4.invoke(createCacheRunnable(lnPort)); + vm5.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false, null, true )); @@ -1484,8 +1461,8 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.createReceiver( lnPort )); vm5.invoke(() -> WANTestBase.createReceiver( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); + vm6.invoke(createCacheRunnable(lnPort)); + vm7.invoke(createCacheRunnable(lnPort)); vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false, null, true )); @@ -1495,14 +1472,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.startSender( "ln" )); vm5.invoke(() -> WANTestBase.startSender( "ln" )); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm4.invoke(createReplicatedRegionRunnable()); + vm5.invoke(createReplicatedRegionRunnable()); + vm6.invoke(createReplicatedRegionRunnable()); + vm7.invoke(createReplicatedRegionRunnable()); vm2.invoke(() -> WANTestBase.createSender( "ny", 1, false, 100, 10, false, false, null, true ));