albertogpz commented on a change in pull request #6052:
URL: https://github.com/apache/geode/pull/6052#discussion_r589521553



##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void 
testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
 
   }
 
+  @Test
+  public void 
testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+      throws InterruptedException {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
true, null, true,
+        true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
true, null, true,
+        true));
+
+    createReceiverCustomerOrderShipmentPR(vm2);
+
+    createSenderCustomerOrderShipmentPRs(vm4);
+    createSenderCustomerOrderShipmentPRs(vm5);
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    int customers = 4;
+    int transactionsPerCustomer = 100;
+    int shipmentsPerTransaction = 10;
+    final LinkedHashMap<Object, Object> keyValuesInTransactions = new 
LinkedHashMap<>();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        keyValuesInTransactions.put(orderId, new Order());
+        for (int j = 0; j < shipmentsPerTransaction; j++) {
+          ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+          keyValuesInTransactions.put(shipmentId, new Shipment());
+        }
+      }
+    }
+    int eventsPerTransaction = 1 + shipmentsPerTransaction;
+    AsyncInvocation<Void> inv1 =
+        vm4.invokeAsync(
+            () -> 
WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    // wait for some batches to be distributed and then stop the sender
+    vm4.invoke(() -> await()
+        .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+    System.out.println("Stopping sender");
+    stopSenderInVMsAsync("ln", vm4, vm5);
+    System.out.println("Stopped sender");
+
+    inv1.await();
+    int entries =
+        transactionsPerCustomer * customers;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+    // Wait for events to replicate: when batches received does not change
+    // we can assume that replication has finished.
+    int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+    while (true) {

Review comment:
       I have tried by checking that the number of batches sent by the sender 
is equal to the number of batches received by the receiver.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to