This is an automated email from the ASF dual-hosted git repository.

alberto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new dd485b2d11 GEODE-10417: Fix NullPointerException in WAN replication 
(#7845)
dd485b2d11 is described below

commit dd485b2d118f12adf7e1629748de1439fa3d2e0d
Author: Alberto Gomez <alberto.go...@est.tech>
AuthorDate: Wed Sep 7 13:32:43 2022 +0200

    GEODE-10417: Fix NullPointerException in WAN replication (#7845)
    
    * GEODE-10417: Fix NullPointerException in WAN replication
    
    When the WAN group-transa$ction-events feature is enabled in
    a parallel gateway sender, it is possible to get a NullPointerException
    when retrieving events from the queue to complete a transaction if
    the event in the queue is null.
    
    If this situation is reached then the gateway sender dispatcher will
    not dispatch queue events anymore and therefore the WAN replication
    will not progress.
    
    This happens because the predicates that check if elements
    in the queue contain a transactionId are not protected
    against the event being null.
    
    A null check has been added before the predicates are invoked
    so that in case of a null event, the predicate is not invoked
    and the event is skipped from the checking.
    
    * GEODE-10417: Change assertEquals to assertThat
---
 .../geode/internal/cache/BucketRegionQueue.java    |  4 ++
 .../internal/cache/BucketRegionQueueJUnitTest.java | 56 +++++++++++++++++++---
 .../serial/SerialGatewaySenderQueueJUnitTest.java  | 28 +++++------
 3 files changed, 66 insertions(+), 22 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 8fb572c236..f77422eef3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -484,6 +484,10 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
       List<Object> elementsMatching = new ArrayList<>();
       for (final Object key : eventSeqNumDeque) {
         Object object = optimalGet(key);
+        if (object == null) {
+          continue;
+        }
+
         if (matchingPredicate.test(object)) {
           elementsMatching.add(object);
           eventSeqNumDeque.remove(key);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
index 6643695ea0..1dcbdef6c5 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java
@@ -17,7 +17,6 @@ package org.apache.geode.internal.cache;
 import static org.apache.geode.cache.Region.SEPARATOR;
 import static 
org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -175,22 +174,65 @@ public class BucketRegionQueueJUnitTest {
     List<Object> objects = 
bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
         isLastEventInTransactionPredicate);
 
-    assertEquals(2, objects.size());
-    assertEquals(objects, Arrays.asList(event1, event3));
+    assertThat(objects.size()).isEqualTo(2);
+    assertThat(objects).isEqualTo(Arrays.asList(event1, event3));
 
     objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
         isLastEventInTransactionPredicate);
-    assertEquals(1, objects.size());
-    assertEquals(objects, Arrays.asList(event7));
+    assertThat(objects.size()).isEqualTo(1);
+    assertThat(objects).isEqualTo(Arrays.asList(event7));
 
     hasTransactionIdPredicate =
         ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx2);
     objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
         isLastEventInTransactionPredicate);
-    assertEquals(2, objects.size());
-    assertEquals(objects, Arrays.asList(event2, event4));
+    assertThat(objects.size()).isEqualTo(2);
+    assertThat(objects).isEqualTo(Arrays.asList(event2, event4));
   }
 
+  @Test
+  public void 
testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesObjectReadNullDoesNotThrowException()
+      throws ForceReattemptException {
+    
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
+
+    TransactionId tx1 = new TXId(null, 1);
+    TransactionId tx2 = new TXId(null, 2);
+    TransactionId tx3 = new TXId(null, 3);
+
+    GatewaySenderEventImpl event1 = createMockGatewaySenderEvent(1, tx1, 
false);
+    GatewaySenderEventImpl eventNotInTransaction1 = 
createMockGatewaySenderEvent(2, null, false);
+    GatewaySenderEventImpl event2 = createMockGatewaySenderEvent(3, tx2, 
false);
+    GatewaySenderEventImpl event3 = null; // createMockGatewaySenderEvent(4, 
tx1, true);
+    GatewaySenderEventImpl event4 = createMockGatewaySenderEvent(5, tx2, true);
+    GatewaySenderEventImpl event5 = createMockGatewaySenderEvent(6, tx3, 
false);
+    GatewaySenderEventImpl event6 = createMockGatewaySenderEvent(7, tx3, 
false);
+    GatewaySenderEventImpl event7 = createMockGatewaySenderEvent(8, tx1, true);
+
+    this.bucketRegionQueue
+        
.cleanUpDestroyedTokensAndMarkGIIComplete(InitialImageOperation.GIIStatus.NO_GII);
+
+    this.bucketRegionQueue.addToQueue(1L, event1);
+    this.bucketRegionQueue.addToQueue(2L, eventNotInTransaction1);
+    this.bucketRegionQueue.addToQueue(3L, event2);
+    this.bucketRegionQueue.addToQueue(4L, event3);
+    this.bucketRegionQueue.addToQueue(5L, event4);
+    this.bucketRegionQueue.addToQueue(6L, event5);
+    this.bucketRegionQueue.addToQueue(7L, event6);
+    this.bucketRegionQueue.addToQueue(8L, event7);
+
+    Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
+        ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx1);
+    Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
+        ParallelGatewaySenderQueue.getIsLastEventInTransactionPredicate();
+    
when(bucketRegionQueue.getValueInVMOrDiskWithoutFaultIn(4L)).thenReturn(null);
+    List<Object> objects = 
this.bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
+        isLastEventInTransactionPredicate);
+
+    assertThat(objects.size()).isEqualTo(2);
+    assertThat(objects).isEqualTo(Arrays.asList(new Object[] {event1, 
event7}));
+  }
+
+
   @Test
   public void testPeekedElementsArePossibleDuplicate()
       throws Exception {
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java
index 88ec275fd8..46f390ac87 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java
@@ -15,7 +15,6 @@
 package org.apache.geode.internal.cache.wan.serial;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -111,9 +110,9 @@ public class SerialGatewaySenderQueueJUnitTest {
     queue.setGroupTransactionEvents(true);
 
     List<AsyncEvent<?, ?>> peeked = queue.peek(3, 100);
-    assertEquals(4, peeked.size());
+    assertThat(peeked.size()).isEqualTo(4);
     List<AsyncEvent<?, ?>> peekedAfter = queue.peek(3, 100);
-    assertEquals(3, peekedAfter.size());
+    assertThat(peekedAfter.size()).isEqualTo(3);
   }
 
   @Test
@@ -146,7 +145,7 @@ public class SerialGatewaySenderQueueJUnitTest {
             .when(queue).getElementsMatching(any(), any(), anyLong());
 
     List<AsyncEvent<?, ?>> peeked = queue.peek(-1, 1);
-    assertEquals(4, peeked.size());
+    assertThat(peeked.size()).isEqualTo(4);
   }
 
   @Test
@@ -155,11 +154,11 @@ public class SerialGatewaySenderQueueJUnitTest {
         QUEUE_REGION, metaRegionFactory);
 
     List<AsyncEvent<?, ?>> peeked = queue.peek(3, 100);
-    assertEquals(3, peeked.size());
+    assertThat(peeked.size()).isEqualTo(3);
     List<AsyncEvent<?, ?>> peekedAfter = queue.peek(3, 100);
-    assertEquals(3, peekedAfter.size());
+    assertThat(peekedAfter.size()).isEqualTo(3);
     peekedAfter = queue.peek(1, 100);
-    assertEquals(1, peekedAfter.size());
+    assertThat(peekedAfter.size()).isEqualTo(1);
   }
 
   @Test
@@ -192,7 +191,7 @@ public class SerialGatewaySenderQueueJUnitTest {
             .when(queue).getElementsMatching(any(), any(), anyLong());
 
     List<AsyncEvent<?, ?>> peeked = queue.peek(-1, 1);
-    assertEquals(3, peeked.size());
+    assertThat(peeked.size()).isEqualTo(3);
   }
 
   @Test
@@ -216,24 +215,23 @@ public class SerialGatewaySenderQueueJUnitTest {
         QUEUE_REGION, metaRegionFactory);
     queue.setGroupTransactionEvents(true);
     List<AsyncEvent<?, ?>> peeked = queue.peek(3, -1);
-    assertEquals(4, peeked.size());
+    assertThat(peeked.size()).isEqualTo(4);
     assertThat(queue.getLastPeekedId()).isEqualTo(2);
-    assertThat(queue.getExtraPeekedIds().contains(5L)).isTrue();
-
+    assertThat(queue.getExtraPeekedIds()).contains(5L);
 
     for (Object ignored : peeked) {
       queue.remove();
     }
-    assertThat(queue.getExtraPeekedIds().contains(5L)).isTrue();
+    assertThat(queue.getExtraPeekedIds()).contains(5L);
 
     peeked = queue.peek(3, -1);
-    assertEquals(3, peeked.size());
-    assertThat(queue.getExtraPeekedIds().contains(5L)).isTrue();
+    assertThat(peeked.size()).isEqualTo(3);
+    assertThat(queue.getExtraPeekedIds()).contains(5L);
 
     for (Object ignored : peeked) {
       queue.remove();
     }
-    assertThat(queue.getExtraPeekedIds().contains(5L)).isFalse();
+    assertThat(queue.getExtraPeekedIds()).doesNotContain(5L);
   }
 
   private GatewaySenderEventImpl createMockGatewaySenderEventImpl(int 
transactionId,

Reply via email to