DonalEvans commented on a change in pull request #5476:
URL: https://github.com/apache/geode/pull/5476#discussion_r477509049



##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
##########
@@ -3197,6 +3114,18 @@ public static Boolean killSender(String senderId) {
     }
   }
 
+  private static AbstractGatewaySender getAbstractGatewaySender(String 
senderId) {

Review comment:
       This method almost entirely duplicates the existing `getGatewaySender()` 
method, so it can be removed and calls to it replaced with 
`(AbstractGatewaySender) getGatewaySender(senderId)`

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1131,6 +1141,111 @@ public void 
testParallelPropagationWithFilter_AfterAck() throws Exception {
 
   }
 
+  /**
+   * Test that, when a parallel gateway sender is added to a partitioned 
region through attributes
+   * mutator, transaction events are not sent to all region members but only 
to those who are
+   * hosting the bucket for the event and thus, events are not stored in the 
bucketToTempQueueMap
+   * member of the ParallelGatewaySenderQueue.
+   * Redundancy = 1 in the partitioned region.
+   *
+   */
+  @Test
+  public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutator()
+      throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
+    vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
+    vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
+
+    vm4.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+    vm5.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+    vm6.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+    vm7.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+    vm4.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + 
"_PR", "ln"));
+    vm5.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + 
"_PR", "ln"));
+    vm6.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + 
"_PR", "ln"));
+    vm7.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + 
"_PR", "ln"));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+
+    vm4.invoke(() -> WANTestBase.doTxPuts(getTestMethodName() + "_PR"));
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 3));
+    vm4.invoke(() -> WANTestBase.verifyQueueSize("ln", 3));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 0));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 0));
+
+    vm4.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+    vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+    vm6.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+    vm7.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+  }
+
+  /**
+   * Test that, when a parallel gateway sender is added to a partitioned 
region through attributes
+   * mutator, transaction events are not sent to all region members but only 
to those who are
+   * hosting the bucket for the event and thus, events are not stored in the 
bucketToTempQueueMap
+   * member of the ParallelGatewaySenderQueue.
+   * No redundancy in the partitioned region.
+   *
+   */
+  @Test
+  public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutatorNoRedundancy()
+      throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));

Review comment:
       These casts to `Integer` are redundant and can be removed.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -190,6 +190,16 @@ protected SerializableRunnableIF 
createPartitionedRegionRedundancy1Runnable() {
         isOffHeap());
   }
 
+  protected SerializableRunnableIF 
createPartitionedRegionRedundancy1RunnableNoSenders() {
+    return () -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", null, 1, 100,
+        isOffHeap());
+  }
+
+  protected SerializableRunnableIF 
createPartitionedRegionRedundancy0RunnableNoSenders() {
+    return () -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", null, 0, 100,
+        isOffHeap());
+  }
+

Review comment:
       These methods are very similar. Would it be possible to replace them 
with one method which takes an `int` argument for the desired redundancy level?

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1131,6 +1141,111 @@ public void 
testParallelPropagationWithFilter_AfterAck() throws Exception {
 
   }
 
+  /**
+   * Test that, when a parallel gateway sender is added to a partitioned 
region through attributes
+   * mutator, transaction events are not sent to all region members but only 
to those who are
+   * hosting the bucket for the event and thus, events are not stored in the 
bucketToTempQueueMap
+   * member of the ParallelGatewaySenderQueue.
+   * Redundancy = 1 in the partitioned region.
+   *
+   */
+  @Test
+  public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutator()
+      throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));

Review comment:
       These casts to `Integer` are redundant and can be removed.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
##########
@@ -3599,21 +3444,42 @@ public static Long getNumberOfEntriesInVM(final String 
senderId) {
   }
 
   public static void verifyTmpDroppedEventSize(String senderId, int size) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
 
     AbstractGatewaySender ags = (AbstractGatewaySender) sender;
     await().untilAsserted(() -> assertEquals("Expected tmpDroppedEvents size: 
" + size
         + " but actual size: " + ags.getTmpDroppedEventSize(), size, 
ags.getTmpDroppedEventSize()));
   }
 
-  public static void verifyQueueSize(String senderId, int size) {
+  /**
+   * Checks that the bucketToTempQueueMap for a partitioned region
+   * that holds events for buckets that are not available locally, is empty.
+   */
+  public static void validateEmptyBucketToTempQueueMap(String senderId) {
+    GatewaySender sender = getGatewaySender(senderId);
+
+    int size = 0;
+    Set queues = ((AbstractGatewaySender) sender).getQueues();

Review comment:
       This should be `Set<RegionQueue>` to avoid compiler warnings.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
##########
@@ -3599,21 +3444,42 @@ public static Long getNumberOfEntriesInVM(final String 
senderId) {
   }
 
   public static void verifyTmpDroppedEventSize(String senderId, int size) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
 
     AbstractGatewaySender ags = (AbstractGatewaySender) sender;
     await().untilAsserted(() -> assertEquals("Expected tmpDroppedEvents size: 
" + size
         + " but actual size: " + ags.getTmpDroppedEventSize(), size, 
ags.getTmpDroppedEventSize()));
   }
 
-  public static void verifyQueueSize(String senderId, int size) {
+  /**
+   * Checks that the bucketToTempQueueMap for a partitioned region
+   * that holds events for buckets that are not available locally, is empty.
+   */
+  public static void validateEmptyBucketToTempQueueMap(String senderId) {
+    GatewaySender sender = getGatewaySender(senderId);
+
+    int size = 0;
+    Set queues = ((AbstractGatewaySender) sender).getQueues();
+    for (Object queue : queues) {
+      PartitionedRegion region =
+          (PartitionedRegion) ((ConcurrentParallelGatewaySenderQueue) 
queue).getRegion();
+      int buckets = region.getTotalNumberOfBuckets();
+      for (int bucket = 0; bucket < buckets; bucket++) {
+        BlockingQueue newQueue =
+            ((ConcurrentParallelGatewaySenderQueue) 
queue).getBucketTmpQueue((int) bucket);

Review comment:
       The cast to `int` here is unnecessary.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
##########
@@ -3599,21 +3444,42 @@ public static Long getNumberOfEntriesInVM(final String 
senderId) {
   }
 
   public static void verifyTmpDroppedEventSize(String senderId, int size) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
 
     AbstractGatewaySender ags = (AbstractGatewaySender) sender;
     await().untilAsserted(() -> assertEquals("Expected tmpDroppedEvents size: 
" + size
         + " but actual size: " + ags.getTmpDroppedEventSize(), size, 
ags.getTmpDroppedEventSize()));
   }
 
-  public static void verifyQueueSize(String senderId, int size) {
+  /**
+   * Checks that the bucketToTempQueueMap for a partitioned region
+   * that holds events for buckets that are not available locally, is empty.
+   */
+  public static void validateEmptyBucketToTempQueueMap(String senderId) {
+    GatewaySender sender = getGatewaySender(senderId);
+
+    int size = 0;
+    Set queues = ((AbstractGatewaySender) sender).getQueues();
+    for (Object queue : queues) {
+      PartitionedRegion region =
+          (PartitionedRegion) ((ConcurrentParallelGatewaySenderQueue) 
queue).getRegion();
+      int buckets = region.getTotalNumberOfBuckets();
+      for (int bucket = 0; bucket < buckets; bucket++) {
+        BlockingQueue newQueue =

Review comment:
       This should be `BlockingQueue<GatewaySenderEventImpl>` to avoid compiler 
warnings.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1131,6 +1141,111 @@ public void 
testParallelPropagationWithFilter_AfterAck() throws Exception {
 
   }
 
+  /**
+   * Test that, when a parallel gateway sender is added to a partitioned 
region through attributes
+   * mutator, transaction events are not sent to all region members but only 
to those who are
+   * hosting the bucket for the event and thus, events are not stored in the 
bucketToTempQueueMap
+   * member of the ParallelGatewaySenderQueue.
+   * Redundancy = 1 in the partitioned region.
+   *
+   */
+  @Test
+  public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutator()
+      throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
+    vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
+    vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
+
+    vm4.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+    vm5.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+    vm6.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+    vm7.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+    vm4.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + 
"_PR", "ln"));
+    vm5.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + 
"_PR", "ln"));
+    vm6.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + 
"_PR", "ln"));
+    vm7.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + 
"_PR", "ln"));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+
+    vm4.invoke(() -> WANTestBase.doTxPuts(getTestMethodName() + "_PR"));
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 3));
+    vm4.invoke(() -> WANTestBase.verifyQueueSize("ln", 3));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 0));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 0));
+
+    vm4.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+    vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+    vm6.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+    vm7.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+  }
+
+  /**
+   * Test that, when a parallel gateway sender is added to a partitioned 
region through attributes
+   * mutator, transaction events are not sent to all region members but only 
to those who are
+   * hosting the bucket for the event and thus, events are not stored in the 
bucketToTempQueueMap
+   * member of the ParallelGatewaySenderQueue.
+   * No redundancy in the partitioned region.
+   *
+   */
+  @Test
+  public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutatorNoRedundancy()
+      throws Exception {

Review comment:
       No exception is thrown from this method, so this can be removed.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1131,6 +1141,111 @@ public void 
testParallelPropagationWithFilter_AfterAck() throws Exception {
 
   }
 
+  /**
+   * Test that, when a parallel gateway sender is added to a partitioned 
region through attributes
+   * mutator, transaction events are not sent to all region members but only 
to those who are
+   * hosting the bucket for the event and thus, events are not stored in the 
bucketToTempQueueMap
+   * member of the ParallelGatewaySenderQueue.
+   * Redundancy = 1 in the partitioned region.
+   *
+   */
+  @Test
+  public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutator()
+      throws Exception {

Review comment:
       No exception is thrown from this method, so this can be removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to