DonalEvans commented on a change in pull request #5476:
URL: https://github.com/apache/geode/pull/5476#discussion_r477509049
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
##########
@@ -3197,6 +3114,18 @@ public static Boolean killSender(String senderId) {
}
}
+ private static AbstractGatewaySender getAbstractGatewaySender(String
senderId) {
Review comment:
This method almost entirely duplicates the existing `getGatewaySender()`
method, so it can be removed and calls to it replaced with
`(AbstractGatewaySender) getGatewaySender(senderId)`
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1131,6 +1141,111 @@ public void
testParallelPropagationWithFilter_AfterAck() throws Exception {
}
+ /**
+ * Test that, when a parallel gateway sender is added to a partitioned
region through attributes
+ * mutator, transaction events are not sent to all region members but only
to those who are
+ * hosting the bucket for the event and thus, events are not stored in the
bucketToTempQueueMap
+ * member of the ParallelGatewaySenderQueue.
+ * Redundancy = 1 in the partitioned region.
+ *
+ */
+ @Test
+ public void
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutator()
+ throws Exception {
+ Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
+ vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
+ vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
+ vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
+
+ vm4.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+ vm5.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+ vm6.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+ vm7.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+ vm4.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() +
"_PR", "ln"));
+ vm5.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() +
"_PR", "ln"));
+ vm6.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() +
"_PR", "ln"));
+ vm7.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() +
"_PR", "ln"));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.doTxPuts(getTestMethodName() + "_PR"));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() +
"_PR", 3));
+ vm4.invoke(() -> WANTestBase.verifyQueueSize("ln", 3));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() +
"_PR", 0));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() +
"_PR", 0));
+
+ vm4.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+ vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+ vm6.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+ vm7.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+ }
+
+ /**
+ * Test that, when a parallel gateway sender is added to a partitioned
region through attributes
+ * mutator, transaction events are not sent to all region members but only
to those who are
+ * hosting the bucket for the event and thus, events are not stored in the
bucketToTempQueueMap
+ * member of the ParallelGatewaySenderQueue.
+ * No redundancy in the partitioned region.
+ *
+ */
+ @Test
+ public void
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutatorNoRedundancy()
+ throws Exception {
+ Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
Review comment:
These casts to `Integer` are redundant and can be removed.
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -190,6 +190,16 @@ protected SerializableRunnableIF
createPartitionedRegionRedundancy1Runnable() {
isOffHeap());
}
+ protected SerializableRunnableIF
createPartitionedRegionRedundancy1RunnableNoSenders() {
+ return () -> WANTestBase.createPartitionedRegion(getTestMethodName() +
"_PR", null, 1, 100,
+ isOffHeap());
+ }
+
+ protected SerializableRunnableIF
createPartitionedRegionRedundancy0RunnableNoSenders() {
+ return () -> WANTestBase.createPartitionedRegion(getTestMethodName() +
"_PR", null, 0, 100,
+ isOffHeap());
+ }
+
Review comment:
These methods are very similar. Would it be possible to replace them
with one method which takes an `int` argument for the desired redundancy level?
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1131,6 +1141,111 @@ public void
testParallelPropagationWithFilter_AfterAck() throws Exception {
}
+ /**
+ * Test that, when a parallel gateway sender is added to a partitioned
region through attributes
+ * mutator, transaction events are not sent to all region members but only
to those who are
+ * hosting the bucket for the event and thus, events are not stored in the
bucketToTempQueueMap
+ * member of the ParallelGatewaySenderQueue.
+ * Redundancy = 1 in the partitioned region.
+ *
+ */
+ @Test
+ public void
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutator()
+ throws Exception {
+ Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
Review comment:
These casts to `Integer` are redundant and can be removed.
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
##########
@@ -3599,21 +3444,42 @@ public static Long getNumberOfEntriesInVM(final String
senderId) {
}
public static void verifyTmpDroppedEventSize(String senderId, int size) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
AbstractGatewaySender ags = (AbstractGatewaySender) sender;
await().untilAsserted(() -> assertEquals("Expected tmpDroppedEvents size:
" + size
+ " but actual size: " + ags.getTmpDroppedEventSize(), size,
ags.getTmpDroppedEventSize()));
}
- public static void verifyQueueSize(String senderId, int size) {
+ /**
+ * Checks that the bucketToTempQueueMap for a partitioned region
+ * that holds events for buckets that are not available locally, is empty.
+ */
+ public static void validateEmptyBucketToTempQueueMap(String senderId) {
+ GatewaySender sender = getGatewaySender(senderId);
+
+ int size = 0;
+ Set queues = ((AbstractGatewaySender) sender).getQueues();
Review comment:
This should be `Set<RegionQueue>` to avoid compiler warnings.
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
##########
@@ -3599,21 +3444,42 @@ public static Long getNumberOfEntriesInVM(final String
senderId) {
}
public static void verifyTmpDroppedEventSize(String senderId, int size) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
AbstractGatewaySender ags = (AbstractGatewaySender) sender;
await().untilAsserted(() -> assertEquals("Expected tmpDroppedEvents size:
" + size
+ " but actual size: " + ags.getTmpDroppedEventSize(), size,
ags.getTmpDroppedEventSize()));
}
- public static void verifyQueueSize(String senderId, int size) {
+ /**
+ * Checks that the bucketToTempQueueMap for a partitioned region
+ * that holds events for buckets that are not available locally, is empty.
+ */
+ public static void validateEmptyBucketToTempQueueMap(String senderId) {
+ GatewaySender sender = getGatewaySender(senderId);
+
+ int size = 0;
+ Set queues = ((AbstractGatewaySender) sender).getQueues();
+ for (Object queue : queues) {
+ PartitionedRegion region =
+ (PartitionedRegion) ((ConcurrentParallelGatewaySenderQueue)
queue).getRegion();
+ int buckets = region.getTotalNumberOfBuckets();
+ for (int bucket = 0; bucket < buckets; bucket++) {
+ BlockingQueue newQueue =
+ ((ConcurrentParallelGatewaySenderQueue)
queue).getBucketTmpQueue((int) bucket);
Review comment:
The cast to `int` here is unnecessary.
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
##########
@@ -3599,21 +3444,42 @@ public static Long getNumberOfEntriesInVM(final String
senderId) {
}
public static void verifyTmpDroppedEventSize(String senderId, int size) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
AbstractGatewaySender ags = (AbstractGatewaySender) sender;
await().untilAsserted(() -> assertEquals("Expected tmpDroppedEvents size:
" + size
+ " but actual size: " + ags.getTmpDroppedEventSize(), size,
ags.getTmpDroppedEventSize()));
}
- public static void verifyQueueSize(String senderId, int size) {
+ /**
+ * Checks that the bucketToTempQueueMap for a partitioned region
+ * that holds events for buckets that are not available locally, is empty.
+ */
+ public static void validateEmptyBucketToTempQueueMap(String senderId) {
+ GatewaySender sender = getGatewaySender(senderId);
+
+ int size = 0;
+ Set queues = ((AbstractGatewaySender) sender).getQueues();
+ for (Object queue : queues) {
+ PartitionedRegion region =
+ (PartitionedRegion) ((ConcurrentParallelGatewaySenderQueue)
queue).getRegion();
+ int buckets = region.getTotalNumberOfBuckets();
+ for (int bucket = 0; bucket < buckets; bucket++) {
+ BlockingQueue newQueue =
Review comment:
This should be `BlockingQueue<GatewaySenderEventImpl>` to avoid compiler
warnings.
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1131,6 +1141,111 @@ public void
testParallelPropagationWithFilter_AfterAck() throws Exception {
}
+ /**
+ * Test that, when a parallel gateway sender is added to a partitioned
region through attributes
+ * mutator, transaction events are not sent to all region members but only
to those who are
+ * hosting the bucket for the event and thus, events are not stored in the
bucketToTempQueueMap
+ * member of the ParallelGatewaySenderQueue.
+ * Redundancy = 1 in the partitioned region.
+ *
+ */
+ @Test
+ public void
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutator()
+ throws Exception {
+ Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
+ vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
+ vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
+ vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
+
+ vm4.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+ vm5.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+ vm6.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+ vm7.invoke(createPartitionedRegionRedundancy1RunnableNoSenders());
+
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+ vm4.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() +
"_PR", "ln"));
+ vm5.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() +
"_PR", "ln"));
+ vm6.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() +
"_PR", "ln"));
+ vm7.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() +
"_PR", "ln"));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.doTxPuts(getTestMethodName() + "_PR"));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() +
"_PR", 3));
+ vm4.invoke(() -> WANTestBase.verifyQueueSize("ln", 3));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() +
"_PR", 0));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() +
"_PR", 0));
+
+ vm4.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+ vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+ vm6.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+ vm7.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+ }
+
+ /**
+ * Test that, when a parallel gateway sender is added to a partitioned
region through attributes
+ * mutator, transaction events are not sent to all region members but only
to those who are
+ * hosting the bucket for the event and thus, events are not stored in the
bucketToTempQueueMap
+ * member of the ParallelGatewaySenderQueue.
+ * No redundancy in the partitioned region.
+ *
+ */
+ @Test
+ public void
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutatorNoRedundancy()
+ throws Exception {
Review comment:
No exception is thrown from this method, so this can be removed.
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
##########
@@ -1131,6 +1141,111 @@ public void
testParallelPropagationWithFilter_AfterAck() throws Exception {
}
+ /**
+ * Test that, when a parallel gateway sender is added to a partitioned
region through attributes
+ * mutator, transaction events are not sent to all region members but only
to those who are
+ * hosting the bucket for the event and thus, events are not stored in the
bucketToTempQueueMap
+ * member of the ParallelGatewaySenderQueue.
+ * Redundancy = 1 in the partitioned region.
+ *
+ */
+ @Test
+ public void
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutator()
+ throws Exception {
Review comment:
No exception is thrown from this method, so this can be removed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]