DonalEvans commented on a change in pull request #6052:
URL: https://github.com/apache/geode/pull/6052#discussion_r591745355



##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,201 @@ public void 
testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
   }
 
   @Test
-  public void testReplicatedSerialPropagationWithMultipleDispatchers() throws 
Exception {
+  public void 
testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, 
isOffHeap()));
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", 
isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", 
isOffHeap()));
+
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
true, null, false,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
true, null, false,
+            groupTransactionEvents));
+
+    int eventsPerTransaction = batchSize + 1;
+    // The number of entries must be big enough so that not all entries
+    // are replicated before the sender is stopped and also divisible by 
eventsPerTransaction
+    int entries = 2200;
+    // Execute some transactions
+    AsyncInvocation<Void> inv1 =
+        asyncExecuteTransactions(regionName, eventsPerTransaction, entries);
+
+    // wait for batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    // These exceptions are ignored here because it could happen that when an 
event
+    // is to be handled, the sender is stopped. The sender, when stopped, 
shuts down
+    // the thread pool that would handle the event and this could provoke the 
exception.
+    addIgnoredException("Exception occurred in CacheListener");
+    addIgnoredException(RejectedExecutionException.class);
+
+    // Stop the sender
+    stopSenderInVMsAsync("ln", vm4, vm5);
+
+    // Wait for transactions to finish
+    inv1.await();
+    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+    // Check
+    checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(regionName,
+        eventsPerTransaction);
+
+    // Start the sender
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    // Check
+    checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(regionName,
+        eventsPerTransaction);
+  }
+
+  @Test
+  public void 
testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStartedReceiverStopped()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    String regionName = testName + "_RR";
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, 
isOffHeap()));
+    vm2.invoke(WANTestBase::stopReceivers);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", 
isOffHeap()));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", 
isOffHeap()));
+
+    boolean groupTransactionEvents = true;
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
true, null, false,
+            groupTransactionEvents));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
true, null, false,
+            groupTransactionEvents));
+
+    int eventsPerTransaction = batchSize + 1;
+    // The number of entries must be big enough so that not all entries
+    // are replicated before the sender is stopped and also divisible by 
eventsPerTransaction
+    int entries = 2200;
+    // Execute some transactions
+    AsyncInvocation<Void> inv1 =
+        asyncExecuteTransactions(regionName, eventsPerTransaction, entries);
+
+    // wait for batches to be redistributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(5) > 0));
+
+    // Stop the sender
+    stopSenderInVMsAsync("ln", vm4, vm5);
+
+    // Wait for transactions to finish
+    inv1.await();
+    vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+    // Start the receiver and the sender
+    vm2.invoke(WANTestBase::startReceivers);
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    // Check
+    checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(regionName,
+        eventsPerTransaction);
+  }
+
+  private void 
checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(String regionName,
+      int eventsPerTransaction) {
+    waitForBatchesToBeAppliedInTheReceiver(regionName, eventsPerTransaction);
+
+    List<Integer> v4List =
+        vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    List<Integer> v5List =
+        vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    int batchesDistributed = v4List.get(4) + v5List.get(4);
+    checkOnlyCompleteTransactionsAreReplicated(regionName, 
eventsPerTransaction,
+        batchesDistributed);
+  }
+
+  private void 
checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(String 
regionName,
+      int eventsPerTransaction) {
+    // Wait for sender queues to be empty
+    List<Integer> v4List =
+        vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    List<Integer> v5List =
+        vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    waitForBatchesToBeAppliedInTheReceiver(regionName, eventsPerTransaction);
+
+    int batchesDistributed = v4List.get(4) + v5List.get(4);
+    checkOnlyCompleteTransactionsAreReplicated(regionName, 
eventsPerTransaction,
+        batchesDistributed);
+  }
+
+  private void checkOnlyCompleteTransactionsAreReplicated(String regionName,
+      int eventsPerTransaction, int batchesDistributed) {
+    int regionSize = vm2.invoke(() -> getRegionSize(regionName));
+
+    // The number of entries must be divisible by the number of events per 
transaction
+    assertEquals(0, regionSize % eventsPerTransaction);
+
+    // Check the entries replicated against the number of batches distributed
+    vm2.invoke(() -> WANTestBase.validateRegionSize(regionName,
+        batchesDistributed * eventsPerTransaction));
+  }
+
+  private AsyncInvocation<Void> asyncExecuteTransactions(String regionName,
+      int eventsPerTransaction, int entries) {
+    final Map<Object, Object> keyValues = new LinkedHashMap<>();
+    for (int i = 0; i < entries; i++) {
+      keyValues.put(i, i + "_Value");
+    }
+
+    return vm4.invokeAsync(
+        () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+            eventsPerTransaction));
+  }
+
+  private void waitForBatchesToBeAppliedInTheReceiver(String regionName, int 
eventsPerTransaction) {
+    int batchesSentTotal = vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
-1)).get(4) +
+        vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)).get(4);
+
+    // Wait for all batches to be received by the sender

Review comment:
       It looks like this got missed in the last set of changed, but other than 
that, everything looks good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to