kirklund commented on a change in pull request #7035:
URL: https://github.com/apache/geode/pull/7035#discussion_r743210190
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -315,6 +324,10 @@ protected SerializableRunnableIF waitForSenderRunnable() {
return () -> WANTestBase.waitForSenderRunningState("ln");
}
+ protected SerializableRunnableIF waitForSenderNonRunnable() {
Review comment:
`private`
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2084,6 +2097,152 @@ public void
testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopSe
}
+ /**
+ * Enable persistence for GatewaySender. Pause the sender and do some puts
in local region. Stop
+ * GatewaySender.
+ * Then start GatewaySender with clean-queues option. Check if the remote
site receives all the
+ * events.
+ */
+ @Test
+ public void
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectNoEventsReceived()
+ throws InterruptedException {
+ // create locator on local site
+ Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ // create locator on remote site
+ Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ // create cache in remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ // create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ // create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore2 = (String) vm5.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore3 = (String) vm6.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore4 = (String) vm7.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+
+ logger
+ .info("The DS are: " + diskStore1 + "," + diskStore2 + "," +
diskStore3 + "," + diskStore4);
+
+ // create PR on remote site
+ vm2.invoke(
+ () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null,
1, 100, isOffHeap()));
+ vm3.invoke(
+ () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null,
1, 100, isOffHeap()));
+
+ // create PR on local site
+ vm4.invoke(createPartitionedRegionRunnable());
+ vm5.invoke(createPartitionedRegionRunnable());
+ vm6.invoke(createPartitionedRegionRunnable());
+ vm7.invoke(createPartitionedRegionRunnable());
+
+ vm7.invoke(() -> {
+ DistributionMessageObserver.setInstance(new
BlockingDestroyRegionObserver());
+ });
+
+ // start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ // wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ logger.info("All senders are running.");
+
+ // start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
+ logger.info("Completed puts in the region");
+
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ logger.info("Check that no events are propagated to remote site");
+
+ // --------------------close and rebuild local site
+ // -------------------------------------------------
+ // stop the senders
+
+ vm4.invoke(() -> WANTestBase.stopSender("ln"));
+ vm5.invoke(() -> WANTestBase.stopSender("ln"));
+ vm6.invoke(() -> WANTestBase.stopSender("ln"));
+ vm7.invoke(() -> WANTestBase.stopSender("ln"));
+
+ logger.info("Stopped all the senders.");
+
+ // wait for senders to stop
+ vm4.invoke(waitForSenderNonRunnable());
+ vm5.invoke(waitForSenderNonRunnable());
+ vm6.invoke(waitForSenderNonRunnable());
+ vm7.invoke(waitForSenderNonRunnable());
+
+ // create receiver on remote site
+ createReceiverInVMs(vm2, vm3);
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+
+ logger.info("Start all the senders.");
+
+ AsyncInvocation<Void> startSenderwithCleanQueuesInVM4 =
+ vm4.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+
+ AsyncInvocation<Void> startSenderwithCleanQueuesInVM5 =
+ vm5.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+ AsyncInvocation<Void> startSenderwithCleanQueuesInVM6 =
+ vm6.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+ AsyncInvocation<Void> waitForConditionInVM7 = vm7.invokeAsync(() -> {
+ BlockingDestroyRegionObserver observer =
+ (BlockingDestroyRegionObserver)
DistributionMessageObserver.getInstance();
+ observer.startedBlocking.await(1, TimeUnit.MINUTES);
+ });
+
+ try {
+ waitForConditionInVM7.await();
+ } finally {
+ }
+
+ AsyncInvocation<Void> startSenderwithCleanQueuesInVM7 =
+ vm7.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+
+ startSenderwithCleanQueuesInVM4.await();
+ startSenderwithCleanQueuesInVM5.await();
+ startSenderwithCleanQueuesInVM6.await();
+ startSenderwithCleanQueuesInVM7.await();
+
+ logger.info("Waiting for senders running.");
+ // wait for senders running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ logger.info("All the senders are now running...");
+
+ //
----------------------------------------------------------------------------------------------------
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ }
+
+ private static class BlockingDestroyRegionObserver extends
DistributionMessageObserver {
+ private CountDownLatch startedBlocking = new CountDownLatch(1);
Review comment:
Make this `private final`
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -39,6 +47,7 @@
public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
WANTestBase {
private static final long serialVersionUID = 1L;
+ protected static final Logger logger = LogService.getLogger();
Review comment:
`logger` can be narrowed to `private`
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2084,6 +2097,152 @@ public void
testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopSe
}
+ /**
+ * Enable persistence for GatewaySender. Pause the sender and do some puts
in local region. Stop
+ * GatewaySender.
+ * Then start GatewaySender with clean-queues option. Check if the remote
site receives all the
+ * events.
+ */
+ @Test
+ public void
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectNoEventsReceived()
+ throws InterruptedException {
+ // create locator on local site
+ Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ // create locator on remote site
+ Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ // create cache in remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ // create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ // create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore2 = (String) vm5.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore3 = (String) vm6.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore4 = (String) vm7.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+
+ logger
+ .info("The DS are: " + diskStore1 + "," + diskStore2 + "," +
diskStore3 + "," + diskStore4);
+
+ // create PR on remote site
+ vm2.invoke(
+ () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null,
1, 100, isOffHeap()));
+ vm3.invoke(
+ () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null,
1, 100, isOffHeap()));
+
+ // create PR on local site
+ vm4.invoke(createPartitionedRegionRunnable());
+ vm5.invoke(createPartitionedRegionRunnable());
+ vm6.invoke(createPartitionedRegionRunnable());
+ vm7.invoke(createPartitionedRegionRunnable());
+
+ vm7.invoke(() -> {
+ DistributionMessageObserver.setInstance(new
BlockingDestroyRegionObserver());
+ });
+
+ // start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ // wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ logger.info("All senders are running.");
+
+ // start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
+ logger.info("Completed puts in the region");
+
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ logger.info("Check that no events are propagated to remote site");
+
+ // --------------------close and rebuild local site
+ // -------------------------------------------------
+ // stop the senders
+
+ vm4.invoke(() -> WANTestBase.stopSender("ln"));
+ vm5.invoke(() -> WANTestBase.stopSender("ln"));
+ vm6.invoke(() -> WANTestBase.stopSender("ln"));
+ vm7.invoke(() -> WANTestBase.stopSender("ln"));
+
+ logger.info("Stopped all the senders.");
+
+ // wait for senders to stop
+ vm4.invoke(waitForSenderNonRunnable());
+ vm5.invoke(waitForSenderNonRunnable());
+ vm6.invoke(waitForSenderNonRunnable());
+ vm7.invoke(waitForSenderNonRunnable());
+
+ // create receiver on remote site
+ createReceiverInVMs(vm2, vm3);
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+
+ logger.info("Start all the senders.");
+
+ AsyncInvocation<Void> startSenderwithCleanQueuesInVM4 =
+ vm4.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+
+ AsyncInvocation<Void> startSenderwithCleanQueuesInVM5 =
+ vm5.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+ AsyncInvocation<Void> startSenderwithCleanQueuesInVM6 =
+ vm6.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+ AsyncInvocation<Void> waitForConditionInVM7 = vm7.invokeAsync(() -> {
+ BlockingDestroyRegionObserver observer =
+ (BlockingDestroyRegionObserver)
DistributionMessageObserver.getInstance();
+ observer.startedBlocking.await(1, TimeUnit.MINUTES);
+ });
+
+ try {
+ waitForConditionInVM7.await();
+ } finally {
+ }
Review comment:
Delete the `try` and `finally`
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2084,6 +2097,152 @@ public void
testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopSe
}
+ /**
+ * Enable persistence for GatewaySender. Pause the sender and do some puts
in local region. Stop
+ * GatewaySender.
+ * Then start GatewaySender with clean-queues option. Check if the remote
site receives all the
+ * events.
+ */
+ @Test
+ public void
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectNoEventsReceived()
+ throws InterruptedException {
+ // create locator on local site
+ Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ // create locator on remote site
+ Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ // create cache in remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ // create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ // create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore2 = (String) vm5.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore3 = (String) vm6.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore4 = (String) vm7.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+
+ logger
+ .info("The DS are: " + diskStore1 + "," + diskStore2 + "," +
diskStore3 + "," + diskStore4);
+
+ // create PR on remote site
+ vm2.invoke(
+ () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null,
1, 100, isOffHeap()));
+ vm3.invoke(
+ () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null,
1, 100, isOffHeap()));
+
+ // create PR on local site
+ vm4.invoke(createPartitionedRegionRunnable());
+ vm5.invoke(createPartitionedRegionRunnable());
+ vm6.invoke(createPartitionedRegionRunnable());
+ vm7.invoke(createPartitionedRegionRunnable());
+
+ vm7.invoke(() -> {
+ DistributionMessageObserver.setInstance(new
BlockingDestroyRegionObserver());
+ });
+
+ // start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ // wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ logger.info("All senders are running.");
+
+ // start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
+ logger.info("Completed puts in the region");
+
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ logger.info("Check that no events are propagated to remote site");
+
+ // --------------------close and rebuild local site
+ // -------------------------------------------------
+ // stop the senders
+
+ vm4.invoke(() -> WANTestBase.stopSender("ln"));
+ vm5.invoke(() -> WANTestBase.stopSender("ln"));
+ vm6.invoke(() -> WANTestBase.stopSender("ln"));
+ vm7.invoke(() -> WANTestBase.stopSender("ln"));
+
+ logger.info("Stopped all the senders.");
+
+ // wait for senders to stop
+ vm4.invoke(waitForSenderNonRunnable());
+ vm5.invoke(waitForSenderNonRunnable());
+ vm6.invoke(waitForSenderNonRunnable());
+ vm7.invoke(waitForSenderNonRunnable());
+
+ // create receiver on remote site
+ createReceiverInVMs(vm2, vm3);
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+
+ logger.info("Start all the senders.");
+
+ AsyncInvocation<Void> startSenderwithCleanQueuesInVM4 =
+ vm4.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+
+ AsyncInvocation<Void> startSenderwithCleanQueuesInVM5 =
+ vm5.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+ AsyncInvocation<Void> startSenderwithCleanQueuesInVM6 =
+ vm6.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+ AsyncInvocation<Void> waitForConditionInVM7 = vm7.invokeAsync(() -> {
+ BlockingDestroyRegionObserver observer =
+ (BlockingDestroyRegionObserver)
DistributionMessageObserver.getInstance();
+ observer.startedBlocking.await(1, TimeUnit.MINUTES);
Review comment:
You should always use `GeodeAwaitility.getTimeOut` for any await type
calls. The timeout was created and set to 5 minutes because lower values (even
2 minutes) results in intermittent flaky failures in the current environments
(AWS or GCP):
```
import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import static java.util.concurrent.TimeUnit.MINUTES;
```
```
observer.startedBlocking.await(getTimeout().toMinutes(), MINUTES);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]