kirklund commented on a change in pull request #7035:
URL: https://github.com/apache/geode/pull/7035#discussion_r737910918



##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -489,7 +489,21 @@ public void 
addShadowPartitionedRegionForUserPR(PartitionedRegion userPR,
       boolean isAccessor = (userPR.getLocalMaxMemory() == 0);
 
       final String prQName = sender.getId() + QSTRING + 
convertPathToName(userPR.getFullPath());
-      prQ = (PartitionedRegion) cache.getRegion(prQName);
+
+      prQ = (PartitionedRegion) cache.getRegion(prQName, true);
+      if (prQ != null && prQ.isDestroyed()) {
+        PartitionedRegion oldPrQ = prQ;
+        while (oldPrQ == prQ) {
+          try {
+            Thread.sleep(50);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+          logger.info("wait for destroy to finish");

Review comment:
       I just realized this is `info` level which is turned on by default. The 
wording looks like it should be a `debug` statement.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2084,6 +2088,210 @@ public void 
testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopSe
   }
 
 
+  /**
+   * Enable persistence for GatewaySender. Pause the sender and do some puts 
in local region. Stop
+   * GatewaySender.
+   * Then start GatewaySender with clean-queues option. Check if the remote 
site receives all the
+   * events.
+   */
+  @Test
+  public void 
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectNoEventsReceived()
 {
+    // create locator on local site
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    // create locator on remote site
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    // create cache in remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+
+    // create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    // create senders with disk store
+    String diskStore1 = (String) vm4.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore2 = (String) vm5.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore3 = (String) vm6.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore4 = (String) vm7.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+
+    LogWriterUtils.getLogWriter()
+        .info("The DS are: " + diskStore1 + "," + diskStore2 + "," + 
diskStore3 + "," + diskStore4);
+
+    // create PR on remote site
+    vm2.invoke(
+        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 
1, 100, isOffHeap()));
+    vm3.invoke(
+        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 
1, 100, isOffHeap()));
+
+    // create PR on local site
+    vm4.invoke(createPartitionedRegionRunnable());
+    vm5.invoke(createPartitionedRegionRunnable());
+    vm6.invoke(createPartitionedRegionRunnable());
+    vm7.invoke(createPartitionedRegionRunnable());
+
+    // start the senders on local site
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    // wait for senders to become running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+
+    LogWriterUtils.getLogWriter().info("All senders are running.");
+
+    // start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    LogWriterUtils.getLogWriter().info("Check that no events are propagated to 
remote site");
+
+    // --------------------close and rebuild local site
+    // -------------------------------------------------
+    // stop the senders
+
+    vm4.invoke(() -> WANTestBase.stopSender("ln"));
+    vm5.invoke(() -> WANTestBase.stopSender("ln"));
+    vm6.invoke(() -> WANTestBase.stopSender("ln"));
+    vm7.invoke(() -> WANTestBase.stopSender("ln"));
+
+    LogWriterUtils.getLogWriter().info("Stopped all the senders.");
+
+    // wait for senders to stop
+    vm4.invoke(waitForSenderNonRunnable());
+    vm5.invoke(waitForSenderNonRunnable());
+    vm6.invoke(waitForSenderNonRunnable());
+    vm7.invoke(waitForSenderNonRunnable());
+
+
+    // create receiver on remote site
+    createReceiverInVMs(vm2, vm3);
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+
+    LogWriterUtils.getLogWriter().info("Start all the senders.");
+
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> 
startSenderwithCleanQueues("ln"));

Review comment:
       You should type these AsyncInvocations with Void: `AsyncInvocation<Void>`
   
   And I would give them names that describe "what" and "where":
   ```
   AsyncInvocation<Void> startSenderwithCleanQueuesInVM4 = vm4.invokeAsync(() 
-> startSenderwithCleanQueues("ln"));
   ```

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2084,6 +2088,210 @@ public void 
testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopSe
   }
 
 
+  /**
+   * Enable persistence for GatewaySender. Pause the sender and do some puts 
in local region. Stop
+   * GatewaySender.
+   * Then start GatewaySender with clean-queues option. Check if the remote 
site receives all the
+   * events.
+   */
+  @Test
+  public void 
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectNoEventsReceived()
 {
+    // create locator on local site
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    // create locator on remote site
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    // create cache in remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+
+    // create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    // create senders with disk store
+    String diskStore1 = (String) vm4.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore2 = (String) vm5.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore3 = (String) vm6.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore4 = (String) vm7.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+
+    LogWriterUtils.getLogWriter()
+        .info("The DS are: " + diskStore1 + "," + diskStore2 + "," + 
diskStore3 + "," + diskStore4);
+
+    // create PR on remote site
+    vm2.invoke(
+        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 
1, 100, isOffHeap()));
+    vm3.invoke(
+        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 
1, 100, isOffHeap()));
+
+    // create PR on local site
+    vm4.invoke(createPartitionedRegionRunnable());
+    vm5.invoke(createPartitionedRegionRunnable());
+    vm6.invoke(createPartitionedRegionRunnable());
+    vm7.invoke(createPartitionedRegionRunnable());
+
+    // start the senders on local site
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    // wait for senders to become running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+
+    LogWriterUtils.getLogWriter().info("All senders are running.");
+
+    // start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    LogWriterUtils.getLogWriter().info("Check that no events are propagated to 
remote site");
+
+    // --------------------close and rebuild local site
+    // -------------------------------------------------
+    // stop the senders
+
+    vm4.invoke(() -> WANTestBase.stopSender("ln"));
+    vm5.invoke(() -> WANTestBase.stopSender("ln"));
+    vm6.invoke(() -> WANTestBase.stopSender("ln"));
+    vm7.invoke(() -> WANTestBase.stopSender("ln"));
+
+    LogWriterUtils.getLogWriter().info("Stopped all the senders.");
+
+    // wait for senders to stop
+    vm4.invoke(waitForSenderNonRunnable());
+    vm5.invoke(waitForSenderNonRunnable());
+    vm6.invoke(waitForSenderNonRunnable());
+    vm7.invoke(waitForSenderNonRunnable());
+
+
+    // create receiver on remote site
+    createReceiverInVMs(vm2, vm3);
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+
+    LogWriterUtils.getLogWriter().info("Start all the senders.");
+
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> 
startSenderwithCleanQueues("ln"));
+    AsyncInvocation inv2 = vm5.invokeAsync(() -> {
+      try {
+        Thread.sleep(200);

Review comment:
       Please use `CountDownLatch` instead of `Thread.sleep`. Any uses of sleep 
are really just going to introduce more flaky tests that fail intermittently in 
precheckin runs.
   
   You should be able to find lots of examples of uses of CountDownLatch within 
various distributed tests.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2084,6 +2088,210 @@ public void 
testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopSe
   }
 
 
+  /**
+   * Enable persistence for GatewaySender. Pause the sender and do some puts 
in local region. Stop
+   * GatewaySender.
+   * Then start GatewaySender with clean-queues option. Check if the remote 
site receives all the
+   * events.
+   */
+  @Test
+  public void 
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectNoEventsReceived()
 {
+    // create locator on local site
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    // create locator on remote site
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    // create cache in remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+
+    // create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    // create senders with disk store
+    String diskStore1 = (String) vm4.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore2 = (String) vm5.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore3 = (String) vm6.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore4 = (String) vm7.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+
+    LogWriterUtils.getLogWriter()
+        .info("The DS are: " + diskStore1 + "," + diskStore2 + "," + 
diskStore3 + "," + diskStore4);
+
+    // create PR on remote site
+    vm2.invoke(
+        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 
1, 100, isOffHeap()));
+    vm3.invoke(
+        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 
1, 100, isOffHeap()));
+
+    // create PR on local site
+    vm4.invoke(createPartitionedRegionRunnable());
+    vm5.invoke(createPartitionedRegionRunnable());
+    vm6.invoke(createPartitionedRegionRunnable());
+    vm7.invoke(createPartitionedRegionRunnable());
+
+    // start the senders on local site
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    // wait for senders to become running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+
+    LogWriterUtils.getLogWriter().info("All senders are running.");
+
+    // start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    LogWriterUtils.getLogWriter().info("Check that no events are propagated to 
remote site");
+
+    // --------------------close and rebuild local site
+    // -------------------------------------------------
+    // stop the senders
+
+    vm4.invoke(() -> WANTestBase.stopSender("ln"));
+    vm5.invoke(() -> WANTestBase.stopSender("ln"));
+    vm6.invoke(() -> WANTestBase.stopSender("ln"));
+    vm7.invoke(() -> WANTestBase.stopSender("ln"));
+
+    LogWriterUtils.getLogWriter().info("Stopped all the senders.");
+
+    // wait for senders to stop
+    vm4.invoke(waitForSenderNonRunnable());
+    vm5.invoke(waitForSenderNonRunnable());
+    vm6.invoke(waitForSenderNonRunnable());
+    vm7.invoke(waitForSenderNonRunnable());
+
+
+    // create receiver on remote site
+    createReceiverInVMs(vm2, vm3);
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+
+    LogWriterUtils.getLogWriter().info("Start all the senders.");
+
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> 
startSenderwithCleanQueues("ln"));
+    AsyncInvocation inv2 = vm5.invokeAsync(() -> {
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      startSenderwithCleanQueues("ln");
+    });
+    AsyncInvocation inv3 = vm6.invokeAsync(() -> {
+      try {
+        Thread.sleep(250);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      startSenderwithCleanQueues("ln");
+    });
+    AsyncInvocation inv4 = vm7.invokeAsync(() -> {
+      try {
+        Thread.sleep(300);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      startSenderwithCleanQueues("ln");
+    });
+
+    try {
+      inv1.join();
+      inv2.join();
+      inv3.join();
+      inv4.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }

Review comment:
       It's much better to add `throws InterruptedException` to the method and 
let Java propagate it out. When the exception reaches JUnit, JUnit will provide 
a much better stack trace and failure message than what this does.
   
   You should also just use `await()`:
   ```
   inv1.await();
   inv2.await();
   inv3.await();
   inv4.await();
   ```

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
##########
@@ -1435,6 +1435,21 @@ public static void waitForSenderRunningState(String 
senderId) {
     }
   }
 
+
+  public static void waitForSenderNonRunningState(String senderId) {
+    final IgnoredException exln = IgnoredException.addIgnoredException("Could 
not connect");
+    try {

Review comment:
       If you want to minimize the scope of the `IgnoredException`, you might 
want to use try-with-resource syntax:
   ```
   try (IgnoredException ie = IgnoredException.addIgnoredException("Could not 
connect")) {
     ...
   }
   ```
   `IgnoredException` implements `AutoCloseable` by invoking `remove` for you.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -2084,6 +2088,210 @@ public void 
testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopSe
   }
 
 
+  /**
+   * Enable persistence for GatewaySender. Pause the sender and do some puts 
in local region. Stop
+   * GatewaySender.
+   * Then start GatewaySender with clean-queues option. Check if the remote 
site receives all the
+   * events.
+   */
+  @Test
+  public void 
testpersistentWanGateway_restartSenderWithCleanQueuesDelayed_expectNoEventsReceived()
 {
+    // create locator on local site
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    // create locator on remote site
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    // create cache in remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+
+    // create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    // create senders with disk store
+    String diskStore1 = (String) vm4.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore2 = (String) vm5.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore3 = (String) vm6.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore4 = (String) vm7.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+
+    LogWriterUtils.getLogWriter()
+        .info("The DS are: " + diskStore1 + "," + diskStore2 + "," + 
diskStore3 + "," + diskStore4);

Review comment:
       `LogWriterUtils` is deprecated in favor of using a Log4j2 logger:
   ```
   private static final Logger logger = LogService.getLogger();
   
   logger.info("The disk stores are: {}, {}, {}, {}", diskStore1, diskStore2, 
diskStore3, diskStore4);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to