DonalEvans commented on a change in pull request #6052:
URL: https://github.com/apache/geode/pull/6052#discussion_r591745355
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,201 @@ public void
testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
}
@Test
- public void testReplicatedSerialPropagationWithMultipleDispatchers() throws
Exception {
+ public void
testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+ throws InterruptedException {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+
+ String regionName = testName + "_RR";
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null,
isOffHeap()));
+
+ createCacheInVMs(lnPort, vm4, vm5);
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln",
isOffHeap()));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln",
isOffHeap()));
+
+ boolean groupTransactionEvents = true;
+ int batchSize = 10;
+ vm4.invoke(
+ () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
true, null, false,
+ groupTransactionEvents));
+ vm5.invoke(
+ () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
true, null, false,
+ groupTransactionEvents));
+
+ int eventsPerTransaction = batchSize + 1;
+ // The number of entries must be big enough so that not all entries
+ // are replicated before the sender is stopped and also divisible by
eventsPerTransaction
+ int entries = 2200;
+ // Execute some transactions
+ AsyncInvocation<Void> inv1 =
+ asyncExecuteTransactions(regionName, eventsPerTransaction, entries);
+
+ // wait for batches to be distributed and then stop the sender
+ vm4.invoke(() -> await()
+ .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+ // These exceptions are ignored here because it could happen that when an
event
+ // is to be handled, the sender is stopped. The sender, when stopped,
shuts down
+ // the thread pool that would handle the event and this could provoke the
exception.
+ addIgnoredException("Exception occurred in CacheListener");
+ addIgnoredException(RejectedExecutionException.class);
+
+ // Stop the sender
+ stopSenderInVMsAsync("ln", vm4, vm5);
+
+ // Wait for transactions to finish
+ inv1.await();
+ vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+ vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+ // Check
+ checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(regionName,
+ eventsPerTransaction);
+
+ // Start the sender
+ startSenderInVMsAsync("ln", vm4, vm5);
+
+ // Check
+ checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(regionName,
+ eventsPerTransaction);
+ }
+
+ @Test
+ public void
testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStartedReceiverStopped()
+ throws InterruptedException {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+
+ String regionName = testName + "_RR";
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null,
isOffHeap()));
+ vm2.invoke(WANTestBase::stopReceivers);
+
+ createCacheInVMs(lnPort, vm4, vm5);
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln",
isOffHeap()));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln",
isOffHeap()));
+
+ boolean groupTransactionEvents = true;
+ int batchSize = 10;
+ vm4.invoke(
+ () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
true, null, false,
+ groupTransactionEvents));
+ vm5.invoke(
+ () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
true, null, false,
+ groupTransactionEvents));
+
+ int eventsPerTransaction = batchSize + 1;
+ // The number of entries must be big enough so that not all entries
+ // are replicated before the sender is stopped and also divisible by
eventsPerTransaction
+ int entries = 2200;
+ // Execute some transactions
+ AsyncInvocation<Void> inv1 =
+ asyncExecuteTransactions(regionName, eventsPerTransaction, entries);
+
+ // wait for batches to be redistributed and then stop the sender
+ vm4.invoke(() -> await()
+ .until(() -> WANTestBase.getSenderStats("ln", -1).get(5) > 0));
+
+ // Stop the sender
+ stopSenderInVMsAsync("ln", vm4, vm5);
+
+ // Wait for transactions to finish
+ inv1.await();
+ vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+ vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+ // Start the receiver and the sender
+ vm2.invoke(WANTestBase::startReceivers);
+ startSenderInVMsAsync("ln", vm4, vm5);
+
+ // Check
+ checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(regionName,
+ eventsPerTransaction);
+ }
+
+ private void
checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(String regionName,
+ int eventsPerTransaction) {
+ waitForBatchesToBeAppliedInTheReceiver(regionName, eventsPerTransaction);
+
+ List<Integer> v4List =
+ vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+ List<Integer> v5List =
+ vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+ // batches with incomplete transactions must be 0
+ assertEquals(0, (int) v4List.get(13));
+ assertEquals(0, (int) v5List.get(13));
+
+ int batchesDistributed = v4List.get(4) + v5List.get(4);
+ checkOnlyCompleteTransactionsAreReplicated(regionName,
eventsPerTransaction,
+ batchesDistributed);
+ }
+
+ private void
checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(String
regionName,
+ int eventsPerTransaction) {
+ // Wait for sender queues to be empty
+ List<Integer> v4List =
+ vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List =
+ vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ assertEquals(0, v4List.get(0) + v5List.get(0));
+
+ // batches with incomplete transactions must be 0
+ assertEquals(0, (int) v4List.get(13));
+ assertEquals(0, (int) v5List.get(13));
+
+ waitForBatchesToBeAppliedInTheReceiver(regionName, eventsPerTransaction);
+
+ int batchesDistributed = v4List.get(4) + v5List.get(4);
+ checkOnlyCompleteTransactionsAreReplicated(regionName,
eventsPerTransaction,
+ batchesDistributed);
+ }
+
+ private void checkOnlyCompleteTransactionsAreReplicated(String regionName,
+ int eventsPerTransaction, int batchesDistributed) {
+ int regionSize = vm2.invoke(() -> getRegionSize(regionName));
+
+ // The number of entries must be divisible by the number of events per
transaction
+ assertEquals(0, regionSize % eventsPerTransaction);
+
+ // Check the entries replicated against the number of batches distributed
+ vm2.invoke(() -> WANTestBase.validateRegionSize(regionName,
+ batchesDistributed * eventsPerTransaction));
+ }
+
+ private AsyncInvocation<Void> asyncExecuteTransactions(String regionName,
+ int eventsPerTransaction, int entries) {
+ final Map<Object, Object> keyValues = new LinkedHashMap<>();
+ for (int i = 0; i < entries; i++) {
+ keyValues.put(i, i + "_Value");
+ }
+
+ return vm4.invokeAsync(
+ () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+ eventsPerTransaction));
+ }
+
+ private void waitForBatchesToBeAppliedInTheReceiver(String regionName, int
eventsPerTransaction) {
+ int batchesSentTotal = vm4.invoke(() -> WANTestBase.getSenderStats("ln",
-1)).get(4) +
+ vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)).get(4);
+
+ // Wait for all batches to be received by the sender
Review comment:
It looks like this got missed in the last set of changed, but other than
that, everything looks good.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]