DonalEvans commented on a change in pull request #6052:
URL: https://github.com/apache/geode/pull/6052#discussion_r590864147
##########
File path:
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
##########
@@ -348,7 +349,201 @@ public void
testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv
}
@Test
- public void testReplicatedSerialPropagationWithMultipleDispatchers() throws
Exception {
+ public void
testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStarted()
+ throws InterruptedException {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+
+ String regionName = testName + "_RR";
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null,
isOffHeap()));
+
+ createCacheInVMs(lnPort, vm4, vm5);
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln",
isOffHeap()));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln",
isOffHeap()));
+
+ boolean groupTransactionEvents = true;
+ int batchSize = 10;
+ vm4.invoke(
+ () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
true, null, false,
+ groupTransactionEvents));
+ vm5.invoke(
+ () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
true, null, false,
+ groupTransactionEvents));
+
+ int eventsPerTransaction = batchSize + 1;
+ // The number of entries must be big enough so that not all entries
+ // are replicated before the sender is stopped and also divisible by
eventsPerTransaction
+ int entries = 2200;
+ // Execute some transactions
+ AsyncInvocation<Void> inv1 =
+ asyncExecuteTransactions(regionName, eventsPerTransaction, entries);
+
+ // wait for batches to be distributed and then stop the sender
+ vm4.invoke(() -> await()
+ .until(() -> WANTestBase.getSenderStats("ln", -1).get(4) > 0));
+
+ // These exceptions are ignored here because it could happen that when an
event
+ // is to be handled, the sender is stopped. The sender, when stopped,
shuts down
+ // the thread pool that would handle the event and this could provoke the
exception.
+ addIgnoredException("Exception occurred in CacheListener");
+ addIgnoredException(RejectedExecutionException.class);
+
+ // Stop the sender
+ stopSenderInVMsAsync("ln", vm4, vm5);
+
+ // Wait for transactions to finish
+ inv1.await();
+ vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+ vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+ // Check
+ checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(regionName,
+ eventsPerTransaction);
+
+ // Start the sender
+ startSenderInVMsAsync("ln", vm4, vm5);
+
+ // Check
+ checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(regionName,
+ eventsPerTransaction);
+ }
+
+ @Test
+ public void
testReplicatedSerialPropagationWithGroupTransactionEventsDoesNotSendBatchesWithIncompleteTransactionsIfGatewaySenderIsStoppedWhileReceivingTrafficAndLaterStartedReceiverStopped()
+ throws InterruptedException {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+
+ String regionName = testName + "_RR";
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null,
isOffHeap()));
+ vm2.invoke(WANTestBase::stopReceivers);
+
+ createCacheInVMs(lnPort, vm4, vm5);
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln",
isOffHeap()));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln",
isOffHeap()));
+
+ boolean groupTransactionEvents = true;
+ int batchSize = 10;
+ vm4.invoke(
+ () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
true, null, false,
+ groupTransactionEvents));
+ vm5.invoke(
+ () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
true, null, false,
+ groupTransactionEvents));
+
+ int eventsPerTransaction = batchSize + 1;
+ // The number of entries must be big enough so that not all entries
+ // are replicated before the sender is stopped and also divisible by
eventsPerTransaction
+ int entries = 2200;
+ // Execute some transactions
+ AsyncInvocation<Void> inv1 =
+ asyncExecuteTransactions(regionName, eventsPerTransaction, entries);
+
+ // wait for batches to be redistributed and then stop the sender
+ vm4.invoke(() -> await()
+ .until(() -> WANTestBase.getSenderStats("ln", -1).get(5) > 0));
+
+ // Stop the sender
+ stopSenderInVMsAsync("ln", vm4, vm5);
+
+ // Wait for transactions to finish
+ inv1.await();
+ vm4.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+ vm5.invoke(() -> WANTestBase.validateRegionSize(regionName, entries));
+
+ // Start the receiver and the sender
+ vm2.invoke(WANTestBase::startReceivers);
+ startSenderInVMsAsync("ln", vm4, vm5);
+
+ // Check
+ checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(regionName,
+ eventsPerTransaction);
+ }
+
+ private void
checkOnlyCompleteTransactionsAreReplicatedAfterSenderStopped(String regionName,
+ int eventsPerTransaction) {
+ waitForBatchesToBeAppliedInTheReceiver(regionName, eventsPerTransaction);
+
+ List<Integer> v4List =
+ vm4.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+ List<Integer> v5List =
+ vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1));
+
+ // batches with incomplete transactions must be 0
+ assertEquals(0, (int) v4List.get(13));
+ assertEquals(0, (int) v5List.get(13));
+
+ int batchesDistributed = v4List.get(4) + v5List.get(4);
+ checkOnlyCompleteTransactionsAreReplicated(regionName,
eventsPerTransaction,
+ batchesDistributed);
+ }
+
+ private void
checkOnlyCompleteTransactionsAreReplicatedAfterSenderRestarted(String
regionName,
+ int eventsPerTransaction) {
+ // Wait for sender queues to be empty
+ List<Integer> v4List =
+ vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ List<Integer> v5List =
+ vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ assertEquals(0, v4List.get(0) + v5List.get(0));
+
+ // batches with incomplete transactions must be 0
+ assertEquals(0, (int) v4List.get(13));
+ assertEquals(0, (int) v5List.get(13));
+
+ waitForBatchesToBeAppliedInTheReceiver(regionName, eventsPerTransaction);
+
+ int batchesDistributed = v4List.get(4) + v5List.get(4);
+ checkOnlyCompleteTransactionsAreReplicated(regionName,
eventsPerTransaction,
+ batchesDistributed);
+ }
+
+ private void checkOnlyCompleteTransactionsAreReplicated(String regionName,
+ int eventsPerTransaction, int batchesDistributed) {
+ int regionSize = vm2.invoke(() -> getRegionSize(regionName));
+
+ // The number of entries must be divisible by the number of events per
transaction
+ assertEquals(0, regionSize % eventsPerTransaction);
+
+ // Check the entries replicated against the number of batches distributed
+ vm2.invoke(() -> WANTestBase.validateRegionSize(regionName,
+ batchesDistributed * eventsPerTransaction));
+ }
+
+ private AsyncInvocation<Void> asyncExecuteTransactions(String regionName,
+ int eventsPerTransaction, int entries) {
+ final Map<Object, Object> keyValues = new LinkedHashMap<>();
+ for (int i = 0; i < entries; i++) {
+ keyValues.put(i, i + "_Value");
+ }
+
+ return vm4.invokeAsync(
+ () -> WANTestBase.doPutsInsideTransactions(regionName, keyValues,
+ eventsPerTransaction));
+ }
+
+ private void waitForBatchesToBeAppliedInTheReceiver(String regionName, int
eventsPerTransaction) {
+ int batchesSentTotal = vm4.invoke(() -> WANTestBase.getSenderStats("ln",
-1)).get(4) +
+ vm5.invoke(() -> WANTestBase.getSenderStats("ln", -1)).get(4);
+
+ // Wait for all batches to be received by the sender
Review comment:
I think that this should probably be "received by the receiver"
##########
File path:
geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImplTest.java
##########
@@ -88,5 +97,36 @@ public void testStartWithCleanQueue() {
assertTrue(((ConcurrentParallelGatewaySenderQueue)
queue).getCleanQueues());
}
+ @Test
+ public void
whenStoppedTwiceCloseInTimeWithGroupTransactionEventsPreStopWaitsTwice() {
+ gatewaysender.start();
+
+ long start = System.currentTimeMillis();
+
+ Thread t1 = new Thread(this::stopGatewaySenderAndCheckTime);
+ Thread t2 = new Thread(this::stopGatewaySenderAndCheckTime);
+ t1.start();
+ t2.start();
+ try {
+ t1.join();
+ t2.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ long finish = System.currentTimeMillis();
+ long timeElapsed = finish - start;
+ // Each call to preStop waits for 1 second but they are not serialized
Review comment:
I think this comment would be clearer if it read "Each call to preStop
waits for 1 second but these waits execute in parallel"
##########
File path:
geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImplTest.java
##########
@@ -102,6 +102,8 @@ private SerialGatewaySenderImpl
createSerialGatewaySenderImplSpy() {
doReturn(null).when(spySerialGatewaySender).getQueues();
+ doReturn(true).when(spySerialGatewaySender).mustGroupTransactionEvents();
Review comment:
The value returned here should only be `true` in tests that specifically
want to test the behaviour of group transaction events, since the default value
is `false`. Maybe the desired value could be passed as an argument to this
`createSerialGatewaySenderImplSpy()` method
##########
File path:
geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImplTest.java
##########
@@ -51,11 +54,17 @@ public void setUp() {
attrs = new GatewaySenderAttributes();
attrs.isParallel = true;
attrs.id = "sender";
+ attrs.groupTransactionEvents = true;
Review comment:
I think it would be best not to set this value in the `setUp()` method
for this class, as it's not a default value, and could lead to unexpected
behaviour if someone else tries to add tests to this class in the future. Only
the test cases that specifically rely on `groupTransactionEvents` being `true`
should have it set.
##########
File path:
geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImplTest.java
##########
@@ -132,4 +134,37 @@ public void whenStoppedShouldResetTheEventProcessor() {
assertThat(serialGatewaySender.getEventProcessor()).isNull();
}
+ @Test
+ public void
whenStoppedTwiceCloseInTimeWithGroupTransactionEventsPreStopWaitsTwice() {
+ serialGatewaySender = createSerialGatewaySenderImplSpy();
+
+ long start = System.currentTimeMillis();
+
+ Thread t1 = new Thread(this::stopGatewaySenderAndCheckTime);
+ Thread t2 = new Thread(this::stopGatewaySenderAndCheckTime);
+ t1.start();
+ t2.start();
+ try {
+ t1.join();
+ t2.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ long finish = System.currentTimeMillis();
+ long timeElapsed = finish - start;
+
+ // Each call to preStop waits for 1 second but they are not serialized
Review comment:
I think this comment would be clearer if it read "Each call to preStop
waits for 1 second but these waits execute in parallel"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]