This is an automated email from the ASF dual-hosted git repository. alberto pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new dd485b2d11 GEODE-10417: Fix NullPointerException in WAN replication (#7845) dd485b2d11 is described below commit dd485b2d118f12adf7e1629748de1439fa3d2e0d Author: Alberto Gomez <alberto.go...@est.tech> AuthorDate: Wed Sep 7 13:32:43 2022 +0200 GEODE-10417: Fix NullPointerException in WAN replication (#7845) * GEODE-10417: Fix NullPointerException in WAN replication When the WAN group-transa$ction-events feature is enabled in a parallel gateway sender, it is possible to get a NullPointerException when retrieving events from the queue to complete a transaction if the event in the queue is null. If this situation is reached then the gateway sender dispatcher will not dispatch queue events anymore and therefore the WAN replication will not progress. This happens because the predicates that check if elements in the queue contain a transactionId are not protected against the event being null. A null check has been added before the predicates are invoked so that in case of a null event, the predicate is not invoked and the event is skipped from the checking. * GEODE-10417: Change assertEquals to assertThat --- .../geode/internal/cache/BucketRegionQueue.java | 4 ++ .../internal/cache/BucketRegionQueueJUnitTest.java | 56 +++++++++++++++++++--- .../serial/SerialGatewaySenderQueueJUnitTest.java | 28 +++++------ 3 files changed, 66 insertions(+), 22 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index 8fb572c236..f77422eef3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -484,6 +484,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { List<Object> elementsMatching = new ArrayList<>(); for (final Object key : eventSeqNumDeque) { Object object = optimalGet(key); + if (object == null) { + continue; + } + if (matchingPredicate.test(object)) { elementsMatching.add(object); eventSeqNumDeque.remove(key); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java index 6643695ea0..1dcbdef6c5 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java @@ -17,7 +17,6 @@ package org.apache.geode.internal.cache; import static org.apache.geode.cache.Region.SEPARATOR; import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -175,22 +174,65 @@ public class BucketRegionQueueJUnitTest { List<Object> objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate); - assertEquals(2, objects.size()); - assertEquals(objects, Arrays.asList(event1, event3)); + assertThat(objects.size()).isEqualTo(2); + assertThat(objects).isEqualTo(Arrays.asList(event1, event3)); objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate); - assertEquals(1, objects.size()); - assertEquals(objects, Arrays.asList(event7)); + assertThat(objects.size()).isEqualTo(1); + assertThat(objects).isEqualTo(Arrays.asList(event7)); hasTransactionIdPredicate = ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx2); objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate); - assertEquals(2, objects.size()); - assertEquals(objects, Arrays.asList(event2, event4)); + assertThat(objects.size()).isEqualTo(2); + assertThat(objects).isEqualTo(Arrays.asList(event2, event4)); } + @Test + public void testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesObjectReadNullDoesNotThrowException() + throws ForceReattemptException { + ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender); + + TransactionId tx1 = new TXId(null, 1); + TransactionId tx2 = new TXId(null, 2); + TransactionId tx3 = new TXId(null, 3); + + GatewaySenderEventImpl event1 = createMockGatewaySenderEvent(1, tx1, false); + GatewaySenderEventImpl eventNotInTransaction1 = createMockGatewaySenderEvent(2, null, false); + GatewaySenderEventImpl event2 = createMockGatewaySenderEvent(3, tx2, false); + GatewaySenderEventImpl event3 = null; // createMockGatewaySenderEvent(4, tx1, true); + GatewaySenderEventImpl event4 = createMockGatewaySenderEvent(5, tx2, true); + GatewaySenderEventImpl event5 = createMockGatewaySenderEvent(6, tx3, false); + GatewaySenderEventImpl event6 = createMockGatewaySenderEvent(7, tx3, false); + GatewaySenderEventImpl event7 = createMockGatewaySenderEvent(8, tx1, true); + + this.bucketRegionQueue + .cleanUpDestroyedTokensAndMarkGIIComplete(InitialImageOperation.GIIStatus.NO_GII); + + this.bucketRegionQueue.addToQueue(1L, event1); + this.bucketRegionQueue.addToQueue(2L, eventNotInTransaction1); + this.bucketRegionQueue.addToQueue(3L, event2); + this.bucketRegionQueue.addToQueue(4L, event3); + this.bucketRegionQueue.addToQueue(5L, event4); + this.bucketRegionQueue.addToQueue(6L, event5); + this.bucketRegionQueue.addToQueue(7L, event6); + this.bucketRegionQueue.addToQueue(8L, event7); + + Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate = + ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx1); + Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate = + ParallelGatewaySenderQueue.getIsLastEventInTransactionPredicate(); + when(bucketRegionQueue.getValueInVMOrDiskWithoutFaultIn(4L)).thenReturn(null); + List<Object> objects = this.bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate, + isLastEventInTransactionPredicate); + + assertThat(objects.size()).isEqualTo(2); + assertThat(objects).isEqualTo(Arrays.asList(new Object[] {event1, event7})); + } + + @Test public void testPeekedElementsArePossibleDuplicate() throws Exception { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java index 88ec275fd8..46f390ac87 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java @@ -15,7 +15,6 @@ package org.apache.geode.internal.cache.wan.serial; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; @@ -111,9 +110,9 @@ public class SerialGatewaySenderQueueJUnitTest { queue.setGroupTransactionEvents(true); List<AsyncEvent<?, ?>> peeked = queue.peek(3, 100); - assertEquals(4, peeked.size()); + assertThat(peeked.size()).isEqualTo(4); List<AsyncEvent<?, ?>> peekedAfter = queue.peek(3, 100); - assertEquals(3, peekedAfter.size()); + assertThat(peekedAfter.size()).isEqualTo(3); } @Test @@ -146,7 +145,7 @@ public class SerialGatewaySenderQueueJUnitTest { .when(queue).getElementsMatching(any(), any(), anyLong()); List<AsyncEvent<?, ?>> peeked = queue.peek(-1, 1); - assertEquals(4, peeked.size()); + assertThat(peeked.size()).isEqualTo(4); } @Test @@ -155,11 +154,11 @@ public class SerialGatewaySenderQueueJUnitTest { QUEUE_REGION, metaRegionFactory); List<AsyncEvent<?, ?>> peeked = queue.peek(3, 100); - assertEquals(3, peeked.size()); + assertThat(peeked.size()).isEqualTo(3); List<AsyncEvent<?, ?>> peekedAfter = queue.peek(3, 100); - assertEquals(3, peekedAfter.size()); + assertThat(peekedAfter.size()).isEqualTo(3); peekedAfter = queue.peek(1, 100); - assertEquals(1, peekedAfter.size()); + assertThat(peekedAfter.size()).isEqualTo(1); } @Test @@ -192,7 +191,7 @@ public class SerialGatewaySenderQueueJUnitTest { .when(queue).getElementsMatching(any(), any(), anyLong()); List<AsyncEvent<?, ?>> peeked = queue.peek(-1, 1); - assertEquals(3, peeked.size()); + assertThat(peeked.size()).isEqualTo(3); } @Test @@ -216,24 +215,23 @@ public class SerialGatewaySenderQueueJUnitTest { QUEUE_REGION, metaRegionFactory); queue.setGroupTransactionEvents(true); List<AsyncEvent<?, ?>> peeked = queue.peek(3, -1); - assertEquals(4, peeked.size()); + assertThat(peeked.size()).isEqualTo(4); assertThat(queue.getLastPeekedId()).isEqualTo(2); - assertThat(queue.getExtraPeekedIds().contains(5L)).isTrue(); - + assertThat(queue.getExtraPeekedIds()).contains(5L); for (Object ignored : peeked) { queue.remove(); } - assertThat(queue.getExtraPeekedIds().contains(5L)).isTrue(); + assertThat(queue.getExtraPeekedIds()).contains(5L); peeked = queue.peek(3, -1); - assertEquals(3, peeked.size()); - assertThat(queue.getExtraPeekedIds().contains(5L)).isTrue(); + assertThat(peeked.size()).isEqualTo(3); + assertThat(queue.getExtraPeekedIds()).contains(5L); for (Object ignored : peeked) { queue.remove(); } - assertThat(queue.getExtraPeekedIds().contains(5L)).isFalse(); + assertThat(queue.getExtraPeekedIds()).doesNotContain(5L); } private GatewaySenderEventImpl createMockGatewaySenderEventImpl(int transactionId,