albertogpz commented on a change in pull request #6052:
URL: https://github.com/apache/geode/pull/6052#discussion_r591130988



##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,200 @@ public void 
testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void 
testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverCustomerOrderShipmentPR(vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    int batchSize = 10;
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, 
true, null, false,
+            true));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, 
true, null, false,
+            true));
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    // Each transaction will contain one order plus the following shipments
+    int shipmentsPerTransaction = batchSize;
+    AsyncInvocation<Void> inv1 = asyncExecuteCustomerTransactions(vm4, 
customers,
+        transactionsPerCustomer, shipmentsPerTransaction);
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    stopSenderInVMsAsync("ln", vm4, vm5);
+
+    // Wait for customer transactions to finish
+    inv1.await();
+    int orderEntries = transactionsPerCustomer * customers;
+    int shipmentEntries = orderEntries * 10;
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
orderEntries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
orderEntries));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, 
shipmentEntries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, 
shipmentEntries));
+
+    checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(
+        shipmentsPerTransaction);
+
+    // Start sender to validate that queued events do not contain incomplete 
transactions after
+    // restart
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    
checkOnlyCompleteTransactionsAreReplicatedWithSenderRestarted(shipmentsPerTransaction);
+  }
+
+  @Test
+  public void 
testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStartedReceiverStopped()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverCustomerOrderShipmentPR(vm2);
+    createReceiverInVMs(vm2);
+    vm2.invoke(WANTestBase::stopReceivers);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    int batchSize = 10;
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
true, null, false,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
true, null, false,
+        true));
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    // Each transaction will contain one order plus the following shipments
+    int shipmentsPerTransaction = batchSize;
+
+    AsyncInvocation<Void> inv1 = asyncExecuteCustomerTransactions(vm4, 
customers,
+        transactionsPerCustomer, shipmentsPerTransaction);
+
+    // wait for some batches to be redistributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(5) > 0));
+
+    stopSenderInVMsAsync("ln", vm4, vm5);
+
+    // Wait for the customer transactions to finish
+    inv1.await();
+    int orderEntries = transactionsPerCustomer * customers;
+    int shipmentEntries = orderEntries * 10;
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
orderEntries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
orderEntries));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, 
shipmentEntries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, 
shipmentEntries));
+
+    // Start receiver and sender
+    vm2.invoke(WANTestBase::startReceivers);
+    startSenderInVMsAsync("ln", vm4, vm5);
+
+    
checkOnlyCompleteTransactionsAreReplicatedWithSenderRestarted(shipmentsPerTransaction);
+  }
+
+  private AsyncInvocation<Void> asyncExecuteCustomerTransactions(VM vm, int 
customers,
+      int transactionsPerCustomer, int shipmentsPerTransaction) {
+    final Map<Object, Object> keyValuesInTransactions = new LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    return vm.invokeAsync(
+        () -> 
WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+            eventsPerTransaction));
+  }
+
+  private void checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(
+      int shipmentsPerTransaction) {
+    waitForBatchesToBeAppliedInTheReceiver(shipmentsPerTransaction);
+
+    List<Integer> v4List =
+        vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+    List<Integer> v5List =
+        vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    // Check the entries replicated against the number of batches distributed
+    int batchesDistributed = v4List.get(4) + v5List.get(4);
+    checkOnlyCompleteTransactionsAreReplicated(shipmentsPerTransaction, 
batchesDistributed);
+  }
+
+  private void checkOnlyCompleteTransactionsAreReplicatedWithSenderRestarted(
+      int shipmentsPerTransaction) {
+    // Wait for sender queues to be drained
+    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+    List<Integer> v4List =
+        vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    List<Integer> v5List =
+        vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    assertEquals(0, v4List.get(0) + v5List.get(0));
+
+    // batches with incomplete transactions must be 0
+    assertEquals(0, (int) v4List.get(13));
+    assertEquals(0, (int) v5List.get(13));
+
+    waitForBatchesToBeAppliedInTheReceiver(shipmentsPerTransaction);
+
+    // Check the entries replicated against the number of batches distributed
+    int batchesDistributed = v4List.get(4) + v5List.get(4);
+    checkOnlyCompleteTransactionsAreReplicated(shipmentsPerTransaction, 
batchesDistributed);
+  }
+
+  private void checkOnlyCompleteTransactionsAreReplicated(int 
shipmentsPerTransaction,
+      int batchesDistributed) {
+    // Only complete transactions (1 order + 10 shipments) must be replicated
+    int orderRegionSize = vm2.invoke(() -> getRegionSize(orderRegionName));
+    int shipmentRegionSize = vm2.invoke(() -> 
getRegionSize(shipmentRegionName));
+    assertEquals(shipmentRegionSize, 10 * orderRegionSize);

Review comment:
       Agree




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to