albertogpz commented on a change in pull request #4928:
URL: https://github.com/apache/geode/pull/4928#discussion_r423010749



##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -299,19 +312,316 @@ public void 
testPartitionedRegionParallelPropagation_AfterDispatch_NoRedundancy(
         (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
 
     assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + 
v7List.get(0)); // queue size
-    assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + 
v7List.get(1)); // eventsReceived
+    assertEquals(NUM_PUTS,
+        v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // 
eventsReceived
     assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + 
v7List.get(2)); // events
-                                                                               
            // queued
+    // queued
     assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + 
v7List.get(3)); // events
-                                                                               
            // distributed
+    // distributed
     assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) 
>= 10); // batches
-                                                                               
      // distributed
+    // distributed
     assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + 
v7List.get(5)); // batches
-                                                                               
     // redistributed
+    // redistributed
 
     vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, 
NUM_PUTS));
   }
 
+  @Test
+  public void 
testPartitionedRegionParallelPropagation_NoGroupTransactionEvents()
+      throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    int batchTimeInterval = 10000;
+    createSenders(lnPort, false, batchTimeInterval);
+
+    createReceiverCustomerOrderShipmentPR(vm2, 0);
+
+    createSenderCustomerOrderShipmentPRs(0);
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    final Map custKeyValue = new HashMap();
+    int intCustId = 1;
+    CustId custId = new CustId(intCustId);
+    custKeyValue.put(custId, new Customer());
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, 
custKeyValue));
+
+    int transactions = 3;
+    final Map keyValues = new HashMap();
+    for (int i = 0; i < transactions; i++) {
+      OrderId orderId = new OrderId(i, custId);
+      ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+      ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+      ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+      keyValues.put(orderId, new Order());
+      keyValues.put(shipmentId1, new Shipment());
+      keyValues.put(shipmentId2, new Shipment());
+      keyValues.put(shipmentId3, new Shipment());
+    }
+    int eventsPerTransaction = 4;
+    vm4.invoke(() -> 
WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues,
+        eventsPerTransaction));
+
+    int entries = (transactions * eventsPerTransaction) + 1;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
transactions));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, 
transactions * 3));
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + 
v7List.get(0)); // queue size
+    assertEquals(entries,
+        v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // 
eventsReceived
+    assertEquals(entries, v4List.get(2) + v5List.get(2) + v6List.get(2) + 
v7List.get(2)); // events
+    // queued
+    assertEquals(entries, v4List.get(3) + v5List.get(3) + v6List.get(3) + 
v7List.get(3)); // events
+    // distributed
+    assertEquals(2, v4List.get(4) + v5List.get(4) + v6List.get(4) + 
v7List.get(4)); // batches
+    // distributed
+    assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + 
v7List.get(5)); // batches
+    // redistributed
+  }
+
+  @Test
+  public void testPartitionedRegionParallelPropagation_GroupTransactionEvents()
+      throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    int batchTimeInterval = 10000;
+    createSenders(lnPort, true, batchTimeInterval);
+
+    createReceiverCustomerOrderShipmentPR(vm2, 0);
+
+    createSenderCustomerOrderShipmentPRs(0);
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+
+    final Map custKeyValue = new HashMap();
+    int intCustId = 1;
+    CustId custId = new CustId(intCustId);
+    custKeyValue.put(custId, new Customer());
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, 
custKeyValue));
+
+    int transactions = 3;
+    final Map keyValues = new HashMap();
+    for (int i = 0; i < transactions; i++) {
+      OrderId orderId = new OrderId(i, custId);
+      ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+      ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+      ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+      keyValues.put(orderId, new Order());
+      keyValues.put(shipmentId1, new Shipment());
+      keyValues.put(shipmentId2, new Shipment());
+      keyValues.put(shipmentId3, new Shipment());
+    }
+
+    // 3 transactions of 4 events each are sent so that the batch would
+    // initially contain the first 2 transactions complete and the first
+    // 2 events of the last transaction (10 entries).
+    // As --group-transaction-events is configured in the senders, the 
remaining
+    // 2 events of the last transaction are added to the batch which makes
+    // that only one batch of 12 events is sent.
+    int eventsPerTransaction = 4;
+    vm4.invoke(() -> 
WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues,
+        eventsPerTransaction));
+
+    int entries = (transactions * eventsPerTransaction) + 1;
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
transactions));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, 
transactions * 3));
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + 
v7List.get(0)); // queue size
+    assertEquals(entries,
+        v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // 
eventsReceived
+    assertEquals(entries, v4List.get(2) + v5List.get(2) + v6List.get(2) + 
v7List.get(2)); // events
+    // queued
+    assertEquals(entries, v4List.get(3) + v5List.get(3) + v6List.get(3) + 
v7List.get(3)); // events
+    // distributed
+    assertEquals(1, v4List.get(4) + v5List.get(4) + v6List.get(4) + 
v7List.get(4)); // batches
+    // distributed
+    assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + 
v7List.get(5)); // batches
+    // redistributed
+  }
+
+  @Test
+  public void 
testPartitionedRegionParallelPropagationBatchRedistributed_NoGroupTransactionEvents()
+      throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    int batchTimeInterval = 10000;
+    createSenders(lnPort, false, batchTimeInterval);
+
+    createSenderCustomerOrderShipmentPRs(0);
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    final Map custKeyValue = new HashMap();
+    int intCustId = 1;
+    CustId custId = new CustId(intCustId);
+    custKeyValue.put(custId, new Customer());
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, 
custKeyValue));
+
+    int transactions = 6;
+    final Map keyValues = new HashMap();
+    for (int i = 0; i < transactions; i++) {
+      OrderId orderId = new OrderId(i, custId);
+      ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+      ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+      ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+      keyValues.put(orderId, new Order());
+      keyValues.put(shipmentId1, new Shipment());
+      keyValues.put(shipmentId2, new Shipment());
+      keyValues.put(shipmentId3, new Shipment());
+    }
+    int eventsPerTransaction = 4;
+    vm4.invoke(() -> 
WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues,
+        eventsPerTransaction));
+
+    int entries = (transactions * eventsPerTransaction) + 1;
+
+    // The receiver is started later in order for the batch to be 
redistributed (sent again)
+    Thread.sleep(2000);
+    createReceiverCustomerOrderShipmentPR(vm2, 0);
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
transactions));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, 
transactions * 3));
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + 
v7List.get(0)); // queue size
+    assertEquals(entries,
+        v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // 
eventsReceived
+    assertEquals(entries, v4List.get(2) + v5List.get(2) + v6List.get(2) + 
v7List.get(2)); // events
+    // queued
+    assertEquals(entries, v4List.get(3) + v5List.get(3) + v6List.get(3) + 
v7List.get(3)); // events
+    // distributed
+    assertEquals(3, v4List.get(4) + v5List.get(4) + v6List.get(4) + 
v7List.get(4)); // batches
+    // distributed
+    assertTrue("Batch was not redistributed",
+        (v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)) > 0); 
// batches
+    // redistributed
+  }
+
+  @Test
+  public void 
testPartitionedRegionParallelPropagationBatchRedistributed_GroupTransactionEvents()
+      throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+
+    int batchTimeInterval = 10000;
+    createSenders(lnPort, true, batchTimeInterval);
+
+    createReceiverCustomerOrderShipmentPR(vm2, 0);
+
+    createSenderCustomerOrderShipmentPRs(0);
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+
+    final Map custKeyValue = new HashMap();
+    int intCustId = 1;
+    CustId custId = new CustId(intCustId);
+    custKeyValue.put(custId, new Customer());
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, 
custKeyValue));
+
+    int transactions = 6;
+    final Map keyValues = new HashMap();
+    for (int i = 0; i < transactions; i++) {
+      OrderId orderId = new OrderId(i, custId);
+      ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+      ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+      ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+      keyValues.put(orderId, new Order());
+      keyValues.put(shipmentId1, new Shipment());
+      keyValues.put(shipmentId2, new Shipment());
+      keyValues.put(shipmentId3, new Shipment());
+    }
+
+    // 6 transactions of 4 events each are sent so that the first batch
+    // would initially contain the first 2 transactions complete and the first
+    // 2 events of the next transaction (10 entries).
+    // As --group-transaction-events is configured in the senders, the 
remaining
+    // 2 events of the second transaction are added to the batch which makes
+    // that the first batch is sent with 12 events. The same happens with the
+    // second batch which will contain 12 events too.
+    int eventsPerTransaction = 4;
+    vm4.invoke(() -> 
WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValues,
+        eventsPerTransaction));
+
+    int entries = (transactions * eventsPerTransaction) + 1;
+
+    // The receiver is started later in order for the batch to be 
redistributed (sent again)
+    Thread.sleep(2000);
+    createReceiverInVMs(vm2);
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(customerRegionName, 1));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(orderRegionName, 
transactions));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(shipmentRegionName, 
transactions * 3));
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + 
v7List.get(0)); // queue size
+    assertEquals(entries,
+        v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // 
eventsReceived
+    assertEquals(entries, v4List.get(2) + v5List.get(2) + v6List.get(2) + 
v7List.get(2)); // events
+    // queued
+    assertEquals(entries, v4List.get(3) + v5List.get(3) + v6List.get(3) + 
v7List.get(3)); // events
+    // distributed
+    assertEquals(2, v4List.get(4) + v5List.get(4) + v6List.get(4) + 
v7List.get(4)); // batches
+    // distributed
+    assertTrue("Batch was not redistributed",
+        (v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)) > 0); 
// batches
+    // redistributed

Review comment:
       ok

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
##########
@@ -343,15 +653,16 @@ public void 
testPartitionedRegionParallelPropagation_AfterDispatch_Redundancy_3(
         (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 
0));
 
     assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + 
v7List.get(0)); // queue size
-    assertEquals(400, v4List.get(1) + v5List.get(1) + v6List.get(1) + 
v7List.get(1)); // eventsReceived
+    assertEquals(400,
+        v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // 
eventsReceived
     assertEquals(400, v4List.get(2) + v5List.get(2) + v6List.get(2) + 
v7List.get(2)); // events
-                                                                               
       // queued
+    // queued
     assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + 
v7List.get(3)); // events
-                                                                               
            // distributed
+    // distributed
     assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) 
>= 10); // batches
-                                                                               
      // distributed
+    // distributed
     assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + 
v7List.get(5)); // batches
-                                                                               
     // redistributed
+    // redistributed

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to