albertogpz commented on a change in pull request #6052:
URL: https://github.com/apache/geode/pull/6052#discussion_r589521553
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -472,6 +473,322 @@ public void
testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
}
+ @Test
+ public void
testPRParallelPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+ throws InterruptedException {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+
+ createCacheInVMs(lnPort, vm4, vm5);
+ vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
true, null, true,
+ true));
+ vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
true, null, true,
+ true));
+
+ createReceiverCustomerOrderShipmentPR(vm2);
+
+ createSenderCustomerOrderShipmentPRs(vm4);
+ createSenderCustomerOrderShipmentPRs(vm5);
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ int customers = 4;
+ int transactionsPerCustomer = 100;
+ int shipmentsPerTransaction = 10;
+ final LinkedHashMap<Object, Object> keyValuesInTransactions = new
LinkedHashMap<>();
+ for (int custId = 0; custId < customers; custId++) {
+ for (int i = 0; i < transactionsPerCustomer; i++) {
+ CustId custIdObject = new CustId(custId);
+ OrderId orderId = new OrderId(i, custIdObject);
+ keyValuesInTransactions.put(orderId, new Order());
+ for (int j = 0; j < shipmentsPerTransaction; j++) {
+ ShipmentId shipmentId = new ShipmentId(i + j, orderId);
+ keyValuesInTransactions.put(shipmentId, new Shipment());
+ }
+ }
+ }
+ int eventsPerTransaction = 1 + shipmentsPerTransaction;
+ AsyncInvocation<Void> inv1 =
+ vm4.invokeAsync(
+ () ->
WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+ eventsPerTransaction));
+
+ // wait for some batches to be distributed and then stop the sender
+ vm4.invoke(() -> await()
+ .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+ System.out.println("Stopping sender");
+ stopSenderInVMsAsync("ln", vm4, vm5);
+ System.out.println("Stopped sender");
+
+ inv1.await();
+ int entries =
+ transactionsPerCustomer * customers;
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+ vm5.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, entries));
+
+ // Wait for events to replicate: when batches received does not change
+ // we can assume that replication has finished.
+ int batchesReceived = (vm2.invoke(() -> getReceiverStats())).get(0);
+ while (true) {
Review comment:
I have tried by checking that the number of batches sent by the sender
is equal to the number of batches received by the receiver.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]