DonalEvans commented on a change in pull request #5829:
URL: https://github.com/apache/geode/pull/5829#discussion_r540364988



##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -1389,6 +1383,21 @@ private void 
peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> b
     }
   }
 
+  private Map<TransactionId, Integer> getIncompleteTransactionsInBatch(List 
batch) {

Review comment:
       The compiler warning here can be fixed if `List<GatewaySenderEventImpl>` 
is used as the method argument. This also means that the for loop below can be 
replaced with 
   ```
   for (GatewaySenderEventImpl event : batch) {
   ```
   and the cast to `GatewaySenderEventImpl` can be removed.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
##########
@@ -1389,6 +1383,21 @@ private void 
peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> b
     }
   }
 
+  private Map<TransactionId, Integer> getIncompleteTransactionsInBatch(List 
batch) {
+    Map<TransactionId, Integer> incompleteTransactionsInBatch = new 
HashMap<>();
+    for (Object object : batch) {
+      GatewaySenderEventImpl event = (GatewaySenderEventImpl) object;
+      if (event.getTransactionId() != null) {
+        if (event.isLastEventInTransaction()) {
+          incompleteTransactionsInBatch.remove(event.getTransactionId());
+        } else {
+          incompleteTransactionsInBatch.put(event.getTransactionId(), 
event.getBucketId());
+        }
+      }
+    }
+    return incompleteTransactionsInBatch;
+  }
+
   private boolean areAllTransactionsCompleteInBatch(Map 
incompleteTransactions) {

Review comment:
       While not part of the changes in this PR, this method could be inlined, 
since it's used in only one place and is only one line.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
##########
@@ -534,6 +523,23 @@ private boolean areAllTransactionsCompleteInBatch(Set 
incompleteTransactions) {
     return (incompleteTransactions.size() == 0);

Review comment:
       As with the similar method in `ParallelGatewaySenderQueue`, this method 
could also be inlined.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
##########
@@ -487,12 +476,12 @@ public Object peek() throws CacheException {
     // so no need to worry about off-heap refCount.
   }
 
-  private void peekEventsFromIncompleteTransactions(List<AsyncEvent> batch,
-      Set<TransactionId> incompleteTransactionIdsInBatch, long lastKey) {
+  private void peekEventsFromIncompleteTransactions(List<AsyncEvent> batch, 
long lastKey) {

Review comment:
       The compiler warning on this line and several others can be resolved by 
making `batch` a `List<AsyncEvent<?,?>>` here and the other places it's used. 
The lines requiring this change are 416, 421, 431-432, 438, 479 and 526.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
##########
@@ -148,6 +149,7 @@ public void 
testGetElementsMatchingWithHasTransactionIdPredicateAndIsLastEventIn
     TransactionId tx3 = new TXId(null, 3);
 
     GatewaySenderEventImpl event1 = createMockGatewaySenderEvent(1, tx1, 
false);
+    GatewaySenderEventImpl eventNotInTransaction1 = 
createMockGatewaySenderEvent(8, null, false);

Review comment:
       To make the numbering a little more consistent, could this event's key 
be 2, and the subsequent events have their key values increased by 1? That way 
the calls to `bucketRegionQueue.addToQueue()` will have matching keys with the 
keys here.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
##########
@@ -159,17 +161,18 @@ public void 
testGetElementsMatchingWithHasTransactionIdPredicateAndIsLastEventIn
         
.cleanUpDestroyedTokensAndMarkGIIComplete(InitialImageOperation.GIIStatus.NO_GII);
 
     this.bucketRegionQueue.addToQueue(Long.valueOf(1), event1);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(2), event2);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(3), event3);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(4), event4);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(5), event5);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(6), event6);
-    this.bucketRegionQueue.addToQueue(Long.valueOf(7), event7);
+    this.bucketRegionQueue.addToQueue(Long.valueOf(2), eventNotInTransaction1);
+    this.bucketRegionQueue.addToQueue(Long.valueOf(3), event2);
+    this.bucketRegionQueue.addToQueue(Long.valueOf(4), event3);
+    this.bucketRegionQueue.addToQueue(Long.valueOf(5), event4);
+    this.bucketRegionQueue.addToQueue(Long.valueOf(6), event5);
+    this.bucketRegionQueue.addToQueue(Long.valueOf(7), event6);
+    this.bucketRegionQueue.addToQueue(Long.valueOf(8), event7);

Review comment:
       These calls to `Long.valueOf()` can be replaced with just `1L`, `2L` etc.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1235,6 +1242,107 @@ public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenA
     vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
   }
 
+  @Test
+  public void 
testPartitionedParallelPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions()
+      throws Exception {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm6.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm7.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+
+    vm4.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm5.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm6.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm7.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+    vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+
+    int customers = 4;
+
+    int transactionsPerCustomer = 1000;
+    final Map keyValuesInTransactions = new HashMap();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+        ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+        ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+        keyValuesInTransactions.put(orderId, new Order());
+        keyValuesInTransactions.put(shipmentId1, new Shipment());
+        keyValuesInTransactions.put(shipmentId2, new Shipment());
+        keyValuesInTransactions.put(shipmentId3, new Shipment());
+      }
+    }
+
+    int ordersPerCustomerNotInTransactions = 1000;
+
+    final Map keyValuesNotInTransactions = new HashMap();

Review comment:
       Compiler warnings on this line can be fixed by using
   ```
   final Map<Object, Object> keyValuesNotInTransactions = new HashMap<>();
   ```

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1235,6 +1242,107 @@ public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenA
     vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
   }
 
+  @Test
+  public void 
testPartitionedParallelPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions()
+      throws Exception {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm6.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm7.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+
+    vm4.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm5.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm6.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm7.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+    vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+
+    int customers = 4;
+
+    int transactionsPerCustomer = 1000;
+    final Map keyValuesInTransactions = new HashMap();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+        ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+        ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+        keyValuesInTransactions.put(orderId, new Order());
+        keyValuesInTransactions.put(shipmentId1, new Shipment());
+        keyValuesInTransactions.put(shipmentId2, new Shipment());
+        keyValuesInTransactions.put(shipmentId3, new Shipment());
+      }
+    }
+
+    int ordersPerCustomerNotInTransactions = 1000;
+
+    final Map keyValuesNotInTransactions = new HashMap();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < ordersPerCustomerNotInTransactions; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i + transactionsPerCustomer * customers, 
custIdObject);
+        keyValuesNotInTransactions.put(orderId, new Order());
+      }
+    }
+
+    // eventsPerTransaction is 1 (orders) + 3 (shipments)
+    int eventsPerTransaction = 4;
+    AsyncInvocation inv1 =

Review comment:
       The compiler warning here can be removed by using 
`AsyncInvocation<Void>`.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1235,6 +1242,107 @@ public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenA
     vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
   }
 
+  @Test
+  public void 
testPartitionedParallelPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions()
+      throws Exception {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm6.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm7.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+
+    vm4.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm5.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm6.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm7.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+    vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+
+    int customers = 4;
+
+    int transactionsPerCustomer = 1000;
+    final Map keyValuesInTransactions = new HashMap();

Review comment:
       Compiler warnings on this line can be fixed by using
   ```
   final Map<Object, Object> keyValuesInTransactions = new HashMap<>();
   ```

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
##########
@@ -371,4 +379,107 @@ public void 
testPartitionedSerialPropagationWithParallelThreads() throws Excepti
     vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 1000));
     vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 1000));
   }
+
+  @Test
+  public void 
testPartitionedSerialPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions()

Review comment:
       All comments that apply to `ParallelWANPropagationDUnitTest` also apply 
to this test method, since the code in both tests is largely identical.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
##########
@@ -1737,26 +1737,37 @@ public static GatewaySenderFactory 
configureGateway(DiskStoreFactory dsf, File[]
     return gateway;
   }
 
+  public static void createSender(String dsName, int remoteDsId, boolean 
isParallel,
+      Integer maxMemory, Integer batchSize, boolean isConflation, boolean 
isPersistent,
+      GatewayEventFilter filter, boolean isManualStart, boolean 
groupTransactionEvents) {
+    createSender(dsName, remoteDsId, isParallel, maxMemory, batchSize, 
isConflation, isPersistent,
+        filter, isManualStart, groupTransactionEvents, 0);
+  }
+
   public static void createSender(String dsName, int remoteDsId, boolean 
isParallel,
       Integer maxMemory, Integer batchSize, boolean isConflation, boolean 
isPersistent,
       GatewayEventFilter filter, boolean isManualStart) {
     createSender(dsName, remoteDsId, isParallel, maxMemory, batchSize, 
isConflation, isPersistent,
-        filter, isManualStart, false);
+        filter, isManualStart, false, 0);
   }
 
   public static void createSender(String dsName, int remoteDsId, boolean 
isParallel,
       Integer maxMemory, Integer batchSize, boolean isConflation, boolean 
isPersistent,
-      GatewayEventFilter filter, boolean isManualStart, boolean 
groupTransactionEvents) {
+      GatewayEventFilter filter, boolean isManualStart, boolean 
groupTransactionEvents,
+      int dispatcherThreads) {

Review comment:
       Is it necessary to create a new overloaded method here? There is already 
a method `setNumDispatcherThreadsForTheRun()` in `WANTestBase` that allows this 
value to be set to whatever is desired.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1235,6 +1242,107 @@ public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenA
     vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
   }
 
+  @Test
+  public void 
testPartitionedParallelPropagationWithGroupTransactionEventsAndMixOfEventsInAndNotInTransactions()
+      throws Exception {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm5.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm6.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+    vm7.invoke(
+        () -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, 
null, true, true, 2));
+
+    vm4.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm5.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm6.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+    vm7.invoke(
+        () -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 
2, 10,
+            isOffHeap()));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+    vm3.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 8, 
isOffHeap()));
+
+    int customers = 4;
+
+    int transactionsPerCustomer = 1000;
+    final Map keyValuesInTransactions = new HashMap();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < transactionsPerCustomer; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i, custIdObject);
+        ShipmentId shipmentId1 = new ShipmentId(i, orderId);
+        ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
+        ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
+        keyValuesInTransactions.put(orderId, new Order());
+        keyValuesInTransactions.put(shipmentId1, new Shipment());
+        keyValuesInTransactions.put(shipmentId2, new Shipment());
+        keyValuesInTransactions.put(shipmentId3, new Shipment());
+      }
+    }
+
+    int ordersPerCustomerNotInTransactions = 1000;
+
+    final Map keyValuesNotInTransactions = new HashMap();
+    for (int custId = 0; custId < customers; custId++) {
+      for (int i = 0; i < ordersPerCustomerNotInTransactions; i++) {
+        CustId custIdObject = new CustId(custId);
+        OrderId orderId = new OrderId(i + transactionsPerCustomer * customers, 
custIdObject);
+        keyValuesNotInTransactions.put(orderId, new Order());
+      }
+    }
+
+    // eventsPerTransaction is 1 (orders) + 3 (shipments)
+    int eventsPerTransaction = 4;
+    AsyncInvocation inv1 =
+        vm7.invokeAsync(
+            () -> 
WANTestBase.doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
+                eventsPerTransaction));
+
+    AsyncInvocation inv2 =

Review comment:
       The compiler warning here can be removed by using 
`AsyncInvocation<Void>`.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
##########
@@ -138,7 +139,7 @@ public void 
testBasicDestroyConflationEnabledAndValueNotInRegion() {
   }
 
   @Test
-  public void 
testGetElementsMatchingWithHasTransactionIdPredicateAndIsLastEventInTransactionPredicate()
+  public void 
testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesAndSomeEventsNotInTransactions()
       throws ForceReattemptException {
     ParallelGatewaySenderEventProcessor processor =
         
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);

Review comment:
       This variable is never used, so these lines could be just
   ```
   
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to