kirklund commented on a change in pull request #7035:
URL: https://github.com/apache/geode/pull/7035#discussion_r737910918
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -489,7 +489,21 @@ public void
addShadowPartitionedRegionForUserPR(PartitionedRegion userPR,
boolean isAccessor = (userPR.getLocalMaxMemory() == 0);
final String prQName = sender.getId() + QSTRING +
convertPathToName(userPR.getFullPath());
- prQ = (PartitionedRegion) cache.getRegion(prQName);
+
+ prQ = (PartitionedRegion) cache.getRegion(prQName, true);
+ if (prQ != null && prQ.isDestroyed()) {
+ PartitionedRegion oldPrQ = prQ;
+ while (oldPrQ == prQ) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ logger.info("wait for destroy to finish");
Review comment:
I just realized this is `info` level which is turned on by default. The
wording looks like it should be a `debug` statement.
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2084,6 +2088,210 @@ public void
testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopSe
}
+ /**
+ * Enable persistence for GatewaySender. Pause the sender and do some puts
in local region. Stop
+ * GatewaySender.
+ * Then start GatewaySender with clean-queues option. Check if the remote
site receives all the
+ * events.
+ */
+ @Test
+ public void
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectNoEventsReceived()
{
+ // create locator on local site
+ Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ // create locator on remote site
+ Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ // create cache in remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ // create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ // create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore2 = (String) vm5.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore3 = (String) vm6.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore4 = (String) vm7.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+
+ LogWriterUtils.getLogWriter()
+ .info("The DS are: " + diskStore1 + "," + diskStore2 + "," +
diskStore3 + "," + diskStore4);
+
+ // create PR on remote site
+ vm2.invoke(
+ () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null,
1, 100, isOffHeap()));
+ vm3.invoke(
+ () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null,
1, 100, isOffHeap()));
+
+ // create PR on local site
+ vm4.invoke(createPartitionedRegionRunnable());
+ vm5.invoke(createPartitionedRegionRunnable());
+ vm6.invoke(createPartitionedRegionRunnable());
+ vm7.invoke(createPartitionedRegionRunnable());
+
+ // start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ // wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("All senders are running.");
+
+ // start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ LogWriterUtils.getLogWriter().info("Check that no events are propagated to
remote site");
+
+ // --------------------close and rebuild local site
+ // -------------------------------------------------
+ // stop the senders
+
+ vm4.invoke(() -> WANTestBase.stopSender("ln"));
+ vm5.invoke(() -> WANTestBase.stopSender("ln"));
+ vm6.invoke(() -> WANTestBase.stopSender("ln"));
+ vm7.invoke(() -> WANTestBase.stopSender("ln"));
+
+ LogWriterUtils.getLogWriter().info("Stopped all the senders.");
+
+ // wait for senders to stop
+ vm4.invoke(waitForSenderNonRunnable());
+ vm5.invoke(waitForSenderNonRunnable());
+ vm6.invoke(waitForSenderNonRunnable());
+ vm7.invoke(waitForSenderNonRunnable());
+
+
+ // create receiver on remote site
+ createReceiverInVMs(vm2, vm3);
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+
+ LogWriterUtils.getLogWriter().info("Start all the senders.");
+
+ AsyncInvocation inv1 = vm4.invokeAsync(() ->
startSenderwithCleanQueues("ln"));
Review comment:
You should type these AsyncInvocations with Void: `AsyncInvocation<Void>`
And I would give them names that describe "what" and "where":
```
AsyncInvocation<Void> startSenderwithCleanQueuesInVM4 = vm4.invokeAsync(()
-> startSenderwithCleanQueues("ln"));
```
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2084,6 +2088,210 @@ public void
testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopSe
}
+ /**
+ * Enable persistence for GatewaySender. Pause the sender and do some puts
in local region. Stop
+ * GatewaySender.
+ * Then start GatewaySender with clean-queues option. Check if the remote
site receives all the
+ * events.
+ */
+ @Test
+ public void
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectNoEventsReceived()
{
+ // create locator on local site
+ Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ // create locator on remote site
+ Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ // create cache in remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ // create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ // create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore2 = (String) vm5.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore3 = (String) vm6.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore4 = (String) vm7.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+
+ LogWriterUtils.getLogWriter()
+ .info("The DS are: " + diskStore1 + "," + diskStore2 + "," +
diskStore3 + "," + diskStore4);
+
+ // create PR on remote site
+ vm2.invoke(
+ () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null,
1, 100, isOffHeap()));
+ vm3.invoke(
+ () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null,
1, 100, isOffHeap()));
+
+ // create PR on local site
+ vm4.invoke(createPartitionedRegionRunnable());
+ vm5.invoke(createPartitionedRegionRunnable());
+ vm6.invoke(createPartitionedRegionRunnable());
+ vm7.invoke(createPartitionedRegionRunnable());
+
+ // start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ // wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("All senders are running.");
+
+ // start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ LogWriterUtils.getLogWriter().info("Check that no events are propagated to
remote site");
+
+ // --------------------close and rebuild local site
+ // -------------------------------------------------
+ // stop the senders
+
+ vm4.invoke(() -> WANTestBase.stopSender("ln"));
+ vm5.invoke(() -> WANTestBase.stopSender("ln"));
+ vm6.invoke(() -> WANTestBase.stopSender("ln"));
+ vm7.invoke(() -> WANTestBase.stopSender("ln"));
+
+ LogWriterUtils.getLogWriter().info("Stopped all the senders.");
+
+ // wait for senders to stop
+ vm4.invoke(waitForSenderNonRunnable());
+ vm5.invoke(waitForSenderNonRunnable());
+ vm6.invoke(waitForSenderNonRunnable());
+ vm7.invoke(waitForSenderNonRunnable());
+
+
+ // create receiver on remote site
+ createReceiverInVMs(vm2, vm3);
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+
+ LogWriterUtils.getLogWriter().info("Start all the senders.");
+
+ AsyncInvocation inv1 = vm4.invokeAsync(() ->
startSenderwithCleanQueues("ln"));
+ AsyncInvocation inv2 = vm5.invokeAsync(() -> {
+ try {
+ Thread.sleep(200);
Review comment:
Please use `CountDownLatch` instead of `Thread.sleep`. Any uses of sleep
are really just going to introduce more flaky tests that fail intermittently in
precheckin runs.
You should be able to find lots of examples of uses of CountDownLatch within
various distributed tests.
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2084,6 +2088,210 @@ public void
testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopSe
}
+ /**
+ * Enable persistence for GatewaySender. Pause the sender and do some puts
in local region. Stop
+ * GatewaySender.
+ * Then start GatewaySender with clean-queues option. Check if the remote
site receives all the
+ * events.
+ */
+ @Test
+ public void
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectNoEventsReceived()
{
+ // create locator on local site
+ Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ // create locator on remote site
+ Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ // create cache in remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ // create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ // create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore2 = (String) vm5.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore3 = (String) vm6.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore4 = (String) vm7.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+
+ LogWriterUtils.getLogWriter()
+ .info("The DS are: " + diskStore1 + "," + diskStore2 + "," +
diskStore3 + "," + diskStore4);
+
+ // create PR on remote site
+ vm2.invoke(
+ () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null,
1, 100, isOffHeap()));
+ vm3.invoke(
+ () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null,
1, 100, isOffHeap()));
+
+ // create PR on local site
+ vm4.invoke(createPartitionedRegionRunnable());
+ vm5.invoke(createPartitionedRegionRunnable());
+ vm6.invoke(createPartitionedRegionRunnable());
+ vm7.invoke(createPartitionedRegionRunnable());
+
+ // start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ // wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("All senders are running.");
+
+ // start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ LogWriterUtils.getLogWriter().info("Check that no events are propagated to
remote site");
+
+ // --------------------close and rebuild local site
+ // -------------------------------------------------
+ // stop the senders
+
+ vm4.invoke(() -> WANTestBase.stopSender("ln"));
+ vm5.invoke(() -> WANTestBase.stopSender("ln"));
+ vm6.invoke(() -> WANTestBase.stopSender("ln"));
+ vm7.invoke(() -> WANTestBase.stopSender("ln"));
+
+ LogWriterUtils.getLogWriter().info("Stopped all the senders.");
+
+ // wait for senders to stop
+ vm4.invoke(waitForSenderNonRunnable());
+ vm5.invoke(waitForSenderNonRunnable());
+ vm6.invoke(waitForSenderNonRunnable());
+ vm7.invoke(waitForSenderNonRunnable());
+
+
+ // create receiver on remote site
+ createReceiverInVMs(vm2, vm3);
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+
+ LogWriterUtils.getLogWriter().info("Start all the senders.");
+
+ AsyncInvocation inv1 = vm4.invokeAsync(() ->
startSenderwithCleanQueues("ln"));
+ AsyncInvocation inv2 = vm5.invokeAsync(() -> {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ startSenderwithCleanQueues("ln");
+ });
+ AsyncInvocation inv3 = vm6.invokeAsync(() -> {
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ startSenderwithCleanQueues("ln");
+ });
+ AsyncInvocation inv4 = vm7.invokeAsync(() -> {
+ try {
+ Thread.sleep(300);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ startSenderwithCleanQueues("ln");
+ });
+
+ try {
+ inv1.join();
+ inv2.join();
+ inv3.join();
+ inv4.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
Review comment:
It's much better to add `throws InterruptedException` to the method and
let Java propagate it out. When the exception reaches JUnit, JUnit will provide
a much better stack trace and failure message than what this does.
You should also just use `await()`:
```
inv1.await();
inv2.await();
inv3.await();
inv4.await();
```
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
##########
@@ -1435,6 +1435,21 @@ public static void waitForSenderRunningState(String
senderId) {
}
}
+
+ public static void waitForSenderNonRunningState(String senderId) {
+ final IgnoredException exln = IgnoredException.addIgnoredException("Could
not connect");
+ try {
Review comment:
If you want to minimize the scope of the `IgnoredException`, you might
want to use try-with-resource syntax:
```
try (IgnoredException ie = IgnoredException.addIgnoredException("Could not
connect")) {
...
}
```
`IgnoredException` implements `AutoCloseable` by invoking `remove` for you.
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2084,6 +2088,210 @@ public void
testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopSe
}
+ /**
+ * Enable persistence for GatewaySender. Pause the sender and do some puts
in local region. Stop
+ * GatewaySender.
+ * Then start GatewaySender with clean-queues option. Check if the remote
site receives all the
+ * events.
+ */
+ @Test
+ public void
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectNoEventsReceived()
{
+ // create locator on local site
+ Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ // create locator on remote site
+ Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ // create cache in remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ // create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ // create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore2 = (String) vm5.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore3 = (String) vm6.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+ String diskStore4 = (String) vm7.invoke(() ->
WANTestBase.createSenderWithDiskStore("ln", 2,
+ true, 100, 10, false, true, null, null, true));
+
+ LogWriterUtils.getLogWriter()
+ .info("The DS are: " + diskStore1 + "," + diskStore2 + "," +
diskStore3 + "," + diskStore4);
Review comment:
`LogWriterUtils` is deprecated in favor of using a Log4j2 logger:
```
private static final Logger logger = LogService.getLogger();
logger.info("The disk stores are: {}, {}, {}, {}", diskStore1, diskStore2,
diskStore3, diskStore4);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]