This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new bbe9a3a  GEODE-9853: get all members hosting bucket (#7144)
bbe9a3a is described below

commit bbe9a3acf2f0812ef733dfe74f07fb9412c886e3
Author: Mario Ivanac <48509724+miva...@users.noreply.github.com>
AuthorDate: Thu Jan 27 11:23:29 2022 +0100

    GEODE-9853: get all members hosting bucket (#7144)
    
    * GEODE-9853: get all members hosting bucket
---
 .../apache/geode/internal/cache/BucketAdvisor.java |   2 +-
 ...currentParallelGatewaySenderEventProcessor.java |   2 +-
 .../wan/parallel/ParallelGatewaySenderQueue.java   |  74 +++++++++----
 .../wan/parallel/ParallelQueueRemovalMessage.java  |   8 +-
 .../geode/internal/cache/BucketAdvisorTest.java    |  89 ++++++++++++++++
 .../ParallelQueueRemovalMessageJUnitTest.java      |   8 +-
 .../geode/internal/cache/wan/WANTestBase.java      | 114 ++++++++++----------
 .../ParallelGatewaySenderOperationsDUnitTest.java  | 116 +++++++++++++++++++++
 8 files changed, 328 insertions(+), 85 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index 29287c9..2b70f86 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -1768,7 +1768,7 @@ public class BucketAdvisor extends 
CacheDistributionAdvisor {
     return redundancyTracker.getCurrentRedundancy();
   }
 
-  Set<InternalDistributedMember> adviseInitialized() {
+  public Set<InternalDistributedMember> adviseInitialized() {
     return adviseFilter(profile -> {
       assert profile instanceof BucketProfile;
       BucketProfile bucketProfile = (BucketProfile) profile;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index bcd70b7..84205a0 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -167,7 +167,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor
     ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) 
cpgsq.getQueueByBucket(bucketId);
     boolean isPrimary = 
prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
     if (isPrimary) {
-      pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey);
+      pgsq.sendQueueRemovalMessageForDroppedEvent(prQ, bucketId, shadowKey);
       sender.getStatistics().incEventsDroppedDueToPrimarySenderNotRunning();
       if (logger.isDebugEnabled()) {
         logger.debug("register dropped event for primary queue. BucketId is " 
+ bucketId
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 235947e..54715b7 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -105,7 +105,8 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
       SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + SEPARATOR;
 
   // <PartitionedRegion, Map<Integer, List<Object>>>
-  private final Map regionToDispatchedKeysMap = new ConcurrentHashMap();
+  private final Map<String, Map<Integer, List<Object>>> 
regionToDispatchedKeysMap =
+      new ConcurrentHashMap<String, Map<Integer, List<Object>>>();
 
   protected final StoppableReentrantLock buckToDispatchLock;
   private final StoppableCondition regionToDispatchedKeysMapEmpty;
@@ -1172,9 +1173,10 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
       lock.lock();
       boolean wasEmpty = regionToDispatchedKeysMap.isEmpty();
       try {
-        Map bucketIdToDispatchedKeys = (Map) 
regionToDispatchedKeysMap.get(prQ.getFullPath());
+        Map<Integer, List<Object>> bucketIdToDispatchedKeys =
+            regionToDispatchedKeysMap.get(prQ.getFullPath());
         if (bucketIdToDispatchedKeys == null) {
-          bucketIdToDispatchedKeys = new ConcurrentHashMap();
+          bucketIdToDispatchedKeys = new ConcurrentHashMap<Integer, 
List<Object>>();
           regionToDispatchedKeysMap.put(prQ.getFullPath(), 
bucketIdToDispatchedKeys);
         }
         addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key);
@@ -1187,23 +1189,26 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     }
   }
 
-  public void sendQueueRemovalMesssageForDroppedEvent(PartitionedRegion prQ, 
int bucketId,
+  public void sendQueueRemovalMessageForDroppedEvent(PartitionedRegion prQ, 
int bucketId,
       Object key) {
-    final HashMap<String, Map<Integer, List>> temp = new HashMap<>();
-    Map bucketIdToDispatchedKeys = new ConcurrentHashMap();
-    temp.put(prQ.getFullPath(), bucketIdToDispatchedKeys);
-    addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key);
+
     Set<InternalDistributedMember> recipients =
-        removalThread.getAllRecipients(sender.getCache(), temp);
+        removalThread.getAllRecipientsForEvent(sender.getCache(), 
prQ.getFullPath(), bucketId);
+
     if (!recipients.isEmpty()) {
+      final Map<String, Map<Integer, List<Object>>> temp = new HashMap<>();
+      Map<Integer, List<Object>> bucketIdToDispatchedKeys = new 
ConcurrentHashMap<>();
+      temp.put(prQ.getFullPath(), bucketIdToDispatchedKeys);
+      addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key);
       ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(temp);
       pqrm.setRecipients(recipients);
       
sender.getCache().getInternalDistributedSystem().getDistributionManager().putOutgoing(pqrm);
     }
   }
 
-  private void addRemovedEventToMap(Map bucketIdToDispatchedKeys, int 
bucketId, Object key) {
-    List dispatchedKeys = (List) bucketIdToDispatchedKeys.get(bucketId);
+  private void addRemovedEventToMap(Map<Integer, List<Object>> 
bucketIdToDispatchedKeys,
+      int bucketId, Object key) {
+    List<Object> dispatchedKeys = bucketIdToDispatchedKeys.get(bucketId);
     if (dispatchedKeys == null) {
       dispatchedKeys = new ArrayList<>();
       bucketIdToDispatchedKeys.put(bucketId, dispatchedKeys);
@@ -1215,9 +1220,10 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     buckToDispatchLock.lock();
     boolean wasEmpty = regionToDispatchedKeysMap.isEmpty();
     try {
-      Map bucketIdToDispatchedKeys = (Map) 
regionToDispatchedKeysMap.get(prQ.getFullPath());
+      Map<Integer, List<Object>> bucketIdToDispatchedKeys =
+          regionToDispatchedKeysMap.get(prQ.getFullPath());
       if (bucketIdToDispatchedKeys == null) {
-        bucketIdToDispatchedKeys = new ConcurrentHashMap();
+        bucketIdToDispatchedKeys = new ConcurrentHashMap<>();
         regionToDispatchedKeysMap.put(prQ.getFullPath(), 
bucketIdToDispatchedKeys);
       }
       addRemovedEventsToMap(bucketIdToDispatchedKeys, bucketId, shadowKeys);
@@ -1233,9 +1239,9 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     buckToDispatchLock.lock();
     boolean wasEmpty = regionToDispatchedKeysMap.isEmpty();
     try {
-      Map bucketIdToDispatchedKeys = (Map) 
regionToDispatchedKeysMap.get(prQPath);
+      Map<Integer, List<Object>> bucketIdToDispatchedKeys = 
regionToDispatchedKeysMap.get(prQPath);
       if (bucketIdToDispatchedKeys == null) {
-        bucketIdToDispatchedKeys = new ConcurrentHashMap();
+        bucketIdToDispatchedKeys = new ConcurrentHashMap<>();
         regionToDispatchedKeysMap.put(prQPath, bucketIdToDispatchedKeys);
       }
       addRemovedEventsToMap(bucketIdToDispatchedKeys, bucketId, shadowKeys);
@@ -1247,8 +1253,9 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     }
   }
 
-  private void addRemovedEventsToMap(Map bucketIdToDispatchedKeys, int 
bucketId, List keys) {
-    List dispatchedKeys = (List) bucketIdToDispatchedKeys.get(bucketId);
+  private void addRemovedEventsToMap(Map<Integer, List<Object>> 
bucketIdToDispatchedKeys,
+      int bucketId, List<Object> keys) {
+    List<Object> dispatchedKeys = bucketIdToDispatchedKeys.get(bucketId);
     if (dispatchedKeys == null) {
       dispatchedKeys = keys == null ? new ArrayList<>() : keys;
     } else {
@@ -1886,7 +1893,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
               }
             }
 
-            final HashMap<String, Map<Integer, List>> temp;
+            final Map<String, Map<Integer, List<Object>>> temp;
             buckToDispatchLock.lock();
             try {
               boolean wasEmpty = regionToDispatchedKeysMap.isEmpty();
@@ -1897,7 +1904,9 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
                 continue;
               }
               // TODO: This should be optimized.
+
               temp = new HashMap<>(regionToDispatchedKeysMap);
+
               regionToDispatchedKeysMap.clear();
             } finally {
               buckToDispatchLock.unlock();
@@ -1966,6 +1975,35 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
       return recipients;
     }
 
+    private Set<InternalDistributedMember> 
getAllRecipientsForEvent(InternalCache cache,
+        String partitionedRegionName, int bucketId) {
+      Set<InternalDistributedMember> recipients = new ObjectOpenHashSet<>();
+      PartitionedRegion partitionedRegion =
+          (PartitionedRegion) cache.getRegion(partitionedRegionName);
+      if (partitionedRegion != null && partitionedRegion.getRegionAdvisor() != 
null) {
+        final String bucketFullPath =
+            SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + SEPARATOR
+                + partitionedRegion.getBucketName(bucketId);
+        AbstractBucketRegionQueue bucketRegionQueue =
+            (AbstractBucketRegionQueue) 
cache.getInternalRegionByPath(bucketFullPath);
+        if (bucketRegionQueue != null && bucketRegionQueue.getBucketAdvisor() 
!= null) {
+          Set<InternalDistributedMember> bucketMembers =
+              bucketRegionQueue.getBucketAdvisor().adviseInitialized();
+          if (!bucketMembers.isEmpty()) {
+            recipients.addAll(bucketMembers);
+          } else {
+            
recipients.addAll(partitionedRegion.getRegionAdvisor().adviseDataStore());
+          }
+        } else {
+          
recipients.addAll(partitionedRegion.getRegionAdvisor().adviseDataStore());
+        }
+      }
+
+      return recipients;
+    }
+
+
+
     /**
      * shutdown this thread and the caller thread will join this thread
      */
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index cc801f2..10951e8 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -20,7 +20,6 @@ import static 
org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BE
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -59,12 +58,13 @@ public class ParallelQueueRemovalMessage extends 
PooledDistributionMessage {
 
   private static final Logger logger = LogService.getLogger();
 
-  private HashMap regionToDispatchedKeysMap;
+  private Map<String, Map<Integer, List<Object>>> regionToDispatchedKeysMap;
 
   public ParallelQueueRemovalMessage() {}
 
-  public ParallelQueueRemovalMessage(HashMap rgnToDispatchedKeysMap) {
-    regionToDispatchedKeysMap = rgnToDispatchedKeysMap;
+  public ParallelQueueRemovalMessage(
+      Map<String, Map<Integer, List<Object>>> rgnToDispatchedKeysMap) {
+    this.regionToDispatchedKeysMap = rgnToDispatchedKeysMap;
   }
 
   @Override
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
index 1c8f1e1..817386c 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
@@ -228,4 +228,93 @@ public class BucketAdvisorTest {
     // Unknown Shadow Bucket
     assertThat(bucketAdvisor.isShadowBucketDestroyed(SEPARATOR + 
"b5")).isFalse();
   }
+
+  @Test
+  public void 
testGetAllHostingMembersReturnsNoMembersWhenBucketAdvisorHasNoProfiles() {
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    when(distributionManager.getId()).thenReturn(new 
InternalDistributedMember("localhost", 321));
+
+    Bucket bucket = mock(Bucket.class);
+    when(bucket.isHosting()).thenReturn(true);
+    when(bucket.isPrimary()).thenReturn(false);
+    when(bucket.getDistributionManager()).thenReturn(distributionManager);
+
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    when(partitionedRegion.getRedundantCopies()).thenReturn(0);
+    when(partitionedRegion.getPartitionAttributes()).thenReturn(new 
PartitionAttributesImpl());
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    when(regionAdvisor.getPartitionedRegion()).thenReturn(partitionedRegion);
+
+    BucketAdvisor bucketAdvisor = BucketAdvisor.createBucketAdvisor(bucket, 
regionAdvisor);
+    bucketAdvisor.setInitialized();
+
+    assertThat(bucketAdvisor.adviseInitialized().isEmpty()).isTrue();
+  }
+
+  @Test
+  public void 
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasOneProfileWithHostingBucket()
 {
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    InternalDistributedMember memberId = new 
InternalDistributedMember("localhost", 321);
+
+    when(distributionManager.getId()).thenReturn(memberId);
+
+    Bucket bucket = mock(Bucket.class);
+    when(bucket.isHosting()).thenReturn(true);
+    when(bucket.isPrimary()).thenReturn(false);
+    when(bucket.getDistributionManager()).thenReturn(distributionManager);
+
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    when(partitionedRegion.getRedundantCopies()).thenReturn(0);
+    when(partitionedRegion.getPartitionAttributes()).thenReturn(new 
PartitionAttributesImpl());
+    when(partitionedRegion.getRedundancyTracker())
+        .thenReturn(mock(PartitionedRegionRedundancyTracker.class));
+
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    when(regionAdvisor.getPartitionedRegion()).thenReturn(partitionedRegion);
+
+    BucketAdvisor bucketAdvisor = BucketAdvisor.createBucketAdvisor(bucket, 
regionAdvisor);
+    bucketAdvisor.setInitialized();
+
+    BucketAdvisor.BucketProfile bp = new BucketAdvisor.BucketProfile(memberId, 
0, bucket);
+
+    assertThat(bucketAdvisor.putProfile(bp, true)).isTrue();
+    assertThat(bucketAdvisor.adviseInitialized().size()).isEqualTo(1);
+  }
+
+  @Test
+  public void 
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasTwoProfilesAndOneIsHostingBucket()
 {
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    InternalDistributedMember memberId = new 
InternalDistributedMember("localhost", 321);
+    InternalDistributedMember memberId2 = new 
InternalDistributedMember("localhost", 323);
+
+    when(distributionManager.getId()).thenReturn(memberId);
+
+    Bucket bucket = mock(Bucket.class);
+    when(bucket.isHosting()).thenReturn(true);
+    when(bucket.isPrimary()).thenReturn(false);
+    when(bucket.getDistributionManager()).thenReturn(distributionManager);
+
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    when(partitionedRegion.getRedundantCopies()).thenReturn(0);
+    when(partitionedRegion.getPartitionAttributes()).thenReturn(new 
PartitionAttributesImpl());
+    when(partitionedRegion.getRedundancyTracker())
+        .thenReturn(mock(PartitionedRegionRedundancyTracker.class));
+
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    when(regionAdvisor.getPartitionedRegion()).thenReturn(partitionedRegion);
+
+    BucketAdvisor bucketAdvisor = BucketAdvisor.createBucketAdvisor(bucket, 
regionAdvisor);
+
+    BucketAdvisor.BucketProfile bp = new BucketAdvisor.BucketProfile(memberId, 
0, bucket);
+    BucketAdvisor.BucketProfile bp2 = new 
BucketAdvisor.BucketProfile(memberId2, 0, bucket);
+    bp2.isHosting = false;
+    bp2.isInitializing = true;
+    bp2.isPrimary = false;
+
+    bucketAdvisor.setInitialized();
+    assertThat(bucketAdvisor.putProfile(bp, true)).isTrue();
+    assertThat(bucketAdvisor.putProfile(bp2, true)).isTrue();
+
+    assertThat(bucketAdvisor.adviseInitialized().size()).isEqualTo(1);
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
index 2e58626..84a6048 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -249,10 +249,10 @@ public class ParallelQueueRemovalMessageJUnitTest {
     message.process((ClusterDistributionManager) 
cache.getDistributionManager());
   }
 
-  private HashMap<String, Map<Integer, List<Long>>> 
createRegionToDispatchedKeysMap() {
-    HashMap<String, Map<Integer, List<Long>>> regionToDispatchedKeys = new 
HashMap<>();
-    Map<Integer, List<Long>> bucketIdToDispatchedKeys = new HashMap<>();
-    List<Long> dispatchedKeys = new ArrayList<>();
+  private HashMap<String, Map<Integer, List<Object>>> 
createRegionToDispatchedKeysMap() {
+    HashMap<String, Map<Integer, List<Object>>> regionToDispatchedKeys = new 
HashMap<>();
+    Map<Integer, List<Object>> bucketIdToDispatchedKeys = new HashMap<>();
+    List<Object> dispatchedKeys = new ArrayList<>();
     dispatchedKeys.add(KEY);
     bucketIdToDispatchedKeys.put(BUCKET_ID, dispatchedKeys);
     
regionToDispatchedKeys.put(ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID),
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 4c5bba9..d272baa 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -243,9 +243,9 @@ public class WANTestBase extends DistributedTestCase {
   public void setUpWANTestBase() throws Exception {
     shuffleNumDispatcherThreads();
     Invoke.invokeInEveryVM(() -> 
setNumDispatcherThreadsForTheRun(dispatcherThreads.get(0)));
-    IgnoredException.addIgnoredException("Connection refused");
-    IgnoredException.addIgnoredException("Software caused connection abort");
-    IgnoredException.addIgnoredException("Connection reset");
+    addIgnoredException("Connection refused");
+    addIgnoredException("Software caused connection abort");
+    addIgnoredException("Connection reset");
     postSetUpWANTestBase();
   }
 
@@ -417,11 +417,11 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void createReplicatedRegion(String regionName, String 
senderIds, Boolean offHeap) {
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+        addIgnoredException(InterruptedException.class.getName());
     IgnoredException exp2 =
-        
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
+        addIgnoredException(GatewaySenderException.class.getName());
     try {
       RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE);
       if (senderIds != null) {
@@ -444,11 +444,11 @@ public class WANTestBase extends DistributedTestCase {
   public static void createReplicatedProxyRegion(String regionName, String 
senderIds,
       Boolean offHeap) {
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+        addIgnoredException(InterruptedException.class.getName());
     IgnoredException exp2 =
-        
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
+        addIgnoredException(GatewaySenderException.class.getName());
     try {
       RegionFactory fact = 
cache.createRegionFactory(RegionShortcut.REPLICATE_PROXY);
       if (senderIds != null) {
@@ -499,7 +499,7 @@ public class WANTestBase extends DistributedTestCase {
   public static void createReplicatedRegionWithAsyncEventQueue(String 
regionName,
       String asyncQueueIds, Boolean offHeap) {
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     try {
       RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE);
       if (asyncQueueIds != null) {
@@ -520,7 +520,7 @@ public class WANTestBase extends DistributedTestCase {
   public static void createReplicatedRegionWithSenderAndAsyncEventQueue(String 
regionName,
       String senderIds, String asyncChannelId, Boolean offHeap) {
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     try {
       RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE);
       if (senderIds != null) {
@@ -609,9 +609,9 @@ public class WANTestBase extends DistributedTestCase {
       Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap, 
RegionShortcut shortcut,
       boolean statisticsEnabled, boolean concurrencyChecksEnabled) {
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
+        addIgnoredException(PartitionOfflineException.class.getName());
     try {
       RegionFactory fact = cache.createRegionFactory(shortcut);
       if (senderIds != null) {
@@ -640,9 +640,9 @@ public class WANTestBase extends DistributedTestCase {
   public static void createPartitionedRegionWithPersistence(String regionName, 
String senderIds,
       Integer redundantCopies, Integer totalNumBuckets) {
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
+        addIgnoredException(PartitionOfflineException.class.getName());
     try {
       RegionFactory fact = 
cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT);
       if (senderIds != null) {
@@ -667,9 +667,9 @@ public class WANTestBase extends DistributedTestCase {
   public static void createColocatedPartitionedRegion(String regionName, 
String senderIds,
       Integer redundantCopies, Integer totalNumBuckets, String colocatedWith) {
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
+        addIgnoredException(PartitionOfflineException.class.getName());
     try {
       RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION);
       if (senderIds != null) {
@@ -750,9 +750,9 @@ public class WANTestBase extends DistributedTestCase {
       Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) {
 
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
+        addIgnoredException(PartitionOfflineException.class.getName());
     try {
 
       RegionFactory fact = 
cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT);
@@ -778,7 +778,7 @@ public class WANTestBase extends DistributedTestCase {
   public static void createCustomerOrderShipmentPartitionedRegion(String 
senderIds,
       Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) {
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     try {
       RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION);
       if (senderIds != null) {
@@ -1100,12 +1100,12 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void startSender(String senderId) {
-    final IgnoredException exln = IgnoredException.addIgnoredException("Could 
not connect");
+    final IgnoredException exln = addIgnoredException("Could not connect");
 
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+        addIgnoredException(InterruptedException.class.getName());
     try {
       GatewaySender sender = getGatewaySender(senderId);
       sender.start();
@@ -1118,12 +1118,12 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void startSenderwithCleanQueues(String senderId) {
-    final IgnoredException exln = IgnoredException.addIgnoredException("Could 
not connect");
+    final IgnoredException exln = addIgnoredException("Could not connect");
 
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+        addIgnoredException(InterruptedException.class.getName());
     try {
       GatewaySender sender = getGatewaySender(senderId);
       sender.startWithCleanQueue();
@@ -1431,7 +1431,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void waitForSenderRunningState(String senderId) {
-    final IgnoredException exln = IgnoredException.addIgnoredException("Could 
not connect");
+    final IgnoredException exln = addIgnoredException("Could not connect");
     try {
       Set<GatewaySender> senders = cache.getGatewaySenders();
       final GatewaySender sender = getGatewaySenderById(senders, senderId);
@@ -1641,9 +1641,9 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void pauseSender(String senderId) {
-    final IgnoredException exln = IgnoredException.addIgnoredException("Could 
not connect");
+    final IgnoredException exln = addIgnoredException("Could not connect");
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     try {
       GatewaySender sender = getGatewaySender(senderId);
       sender.pause();
@@ -1656,9 +1656,9 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void resumeSender(String senderId) {
-    final IgnoredException exln = IgnoredException.addIgnoredException("Could 
not connect");
+    final IgnoredException exln = addIgnoredException("Could not connect");
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     try {
       GatewaySender sender = getGatewaySender(senderId);
       sender.resume();
@@ -1683,9 +1683,9 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void stopSender(String senderId) {
-    final IgnoredException exln = IgnoredException.addIgnoredException("Could 
not connect");
+    final IgnoredException exln = addIgnoredException("Could not connect");
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     try {
       GatewaySender sender = getGatewaySender(senderId);
       AbstractGatewaySenderEventProcessor eventProcessor = null;
@@ -1767,7 +1767,7 @@ public class WANTestBase extends DistributedTestCase {
   public static void createSender(String dsName, int remoteDsId, boolean 
isParallel,
       Integer maxMemory, Integer batchSize, boolean isConflation, boolean 
isPersistent,
       GatewayEventFilter filter, boolean isManualStart, boolean 
groupTransactionEvents) {
-    final IgnoredException exln = IgnoredException.addIgnoredException("Could 
not connect");
+    final IgnoredException exln = addIgnoredException("Could not connect");
     try {
       File persistentDirectory =
           new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + 
VM.getCurrentVMNum());
@@ -1803,7 +1803,7 @@ public class WANTestBase extends DistributedTestCase {
       boolean isParallel, Integer maxMemory, Integer batchSize, boolean 
isConflation,
       boolean isPersistent, GatewayEventFilter filter, boolean isManualStart, 
int numDispatchers,
       OrderPolicy orderPolicy, int socketBufferSize) {
-    final IgnoredException exln = IgnoredException.addIgnoredException("Could 
not connect");
+    final IgnoredException exln = addIgnoredException("Could not connect");
     try {
       File persistentDirectory =
           new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + 
VM.getCurrentVMNum());
@@ -1870,7 +1870,7 @@ public class WANTestBase extends DistributedTestCase {
       List<GatewayEventFilter> eventFilters, List<GatewayTransportFilter> 
transportFilters,
       boolean isManualStart, boolean isDiskSync) {
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(RegionDestroyedException.class.getName());
+        addIgnoredException(RegionDestroyedException.class.getName());
     try {
       File persistentDirectory =
           new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + 
VM.getCurrentVMNum());
@@ -2286,9 +2286,9 @@ public class WANTestBase extends DistributedTestCase {
     txMgr.setDistributed(true);
 
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+        addIgnoredException(InterruptedException.class.getName());
     IgnoredException exp2 =
-        
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
+        addIgnoredException(GatewaySenderException.class.getName());
     try {
       Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
       for (long i = 1; i <= numPuts; i++) {
@@ -2305,9 +2305,9 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void doPuts(String regionName, int numPuts, Object value) {
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+        addIgnoredException(InterruptedException.class.getName());
     IgnoredException exp2 =
-        
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
+        addIgnoredException(GatewaySenderException.class.getName());
     try {
       Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
       for (long i = 0; i < numPuts; i++) {
@@ -2321,9 +2321,9 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void doPuts(String regionName, int numPuts) {
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+        addIgnoredException(InterruptedException.class.getName());
     IgnoredException exp2 =
-        
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
+        addIgnoredException(GatewaySenderException.class.getName());
     try {
       Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
       for (long i = 0; i < numPuts; i++) {
@@ -2337,9 +2337,9 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void doTxPuts(String regionName, int numPuts) {
     try (
-        IgnoredException ignored = 
IgnoredException.addIgnoredException(InterruptedException.class);
+        IgnoredException ignored = 
addIgnoredException(InterruptedException.class);
         IgnoredException ignored1 =
-            
IgnoredException.addIgnoredException(GatewaySenderException.class)) {
+            addIgnoredException(GatewaySenderException.class)) {
       Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
       for (long i = 0; i < numPuts; i++) {
         cache.getCacheTransactionManager().begin();
@@ -2351,9 +2351,9 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void doPutsSameKey(String regionName, int numPuts, String key) 
{
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+        addIgnoredException(InterruptedException.class.getName());
     IgnoredException exp2 =
-        
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
+        addIgnoredException(GatewaySenderException.class.getName());
     try {
       Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
       for (long i = 0; i < numPuts; i++) {
@@ -2473,7 +2473,7 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void localDestroyRegion(String regionName) {
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(PRLocallyDestroyedException.class.getName());
+        addIgnoredException(PRLocallyDestroyedException.class.getName());
     try {
       Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
       r.localDestroyRegion();
@@ -2774,7 +2774,7 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void doNextPuts(String regionName, int start, int numPuts) {
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(CacheClosedException.class.getName());
+        addIgnoredException(CacheClosedException.class.getName());
     try {
       Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
       for (long i = start; i < numPuts; i++) {
@@ -2900,9 +2900,9 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void validateRegionSize(String regionName, final int 
regionSize) {
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(CacheClosedException.class.getName());
+        addIgnoredException(CacheClosedException.class.getName());
     try {
       final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
       if (regionSize != r.keySet().size()) {
@@ -3169,11 +3169,11 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static Boolean killSender(String senderId) {
-    final IgnoredException exln = IgnoredException.addIgnoredException("Could 
not connect");
+    final IgnoredException exln = addIgnoredException("Could not connect");
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(CacheClosedException.class.getName());
+        addIgnoredException(CacheClosedException.class.getName());
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     try {
       AbstractGatewaySender sender = (AbstractGatewaySender) 
getGatewaySender(senderId);
       if (sender.isPrimary()) {
@@ -3278,9 +3278,9 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void validateQueueContents(final String senderId, final int 
regionSize) {
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+        addIgnoredException(InterruptedException.class.getName());
     IgnoredException exp2 =
-        
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
+        addIgnoredException(GatewaySenderException.class.getName());
     try {
       GatewaySender sender = getGatewaySender(senderId);
 
@@ -3430,9 +3430,9 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void validateParallelSenderQueueAllBucketsDrained(final String 
senderId) {
     IgnoredException exp =
-        
IgnoredException.addIgnoredException(RegionDestroyedException.class.getName());
+        addIgnoredException(RegionDestroyedException.class.getName());
     IgnoredException exp1 =
-        
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+        addIgnoredException(ForceReattemptException.class.getName());
     try {
       GatewaySender sender = getGatewaySender(senderId);
       final AbstractGatewaySender abstractSender = (AbstractGatewaySender) 
sender;
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index 7cd8f61..9a75d6b 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -26,6 +26,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -39,10 +40,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -55,6 +58,9 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.wan.GatewayEventFilter;
 import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.internal.cache.BucketRegion;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.PartitionedRegion;
@@ -69,6 +75,7 @@ import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.RMIException;
+import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
 import org.apache.geode.test.dunit.rules.MemberVM;
@@ -100,6 +107,13 @@ public class ParallelGatewaySenderOperationsDUnitTest 
extends WANTestBase {
     addIgnoredException("could not get remote locator information");
   }
 
+  @After
+  public void tearDown() {
+    for (VM vm : asList(vm4, vm5, vm6, vm7)) {
+      vm.invoke(() -> DistributionMessageObserver.setInstance(null));
+    }
+  }
+
   @Test(timeout = 300_000)
   public void testStopOneConcurrentGatewaySenderWithSSL() throws Exception {
     Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
@@ -1182,6 +1196,84 @@ public class ParallelGatewaySenderOperationsDUnitTest 
extends WANTestBase {
 
   }
 
+  /**
+   * Put entries in region after gateway sender is stopped. Count number of 
PQRM messages sent.
+   */
+  @Test
+  public void 
testDroppedEventsSignalizationToSecondaryQueueWhileSenderStopped() {
+    int lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    int nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+    createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
+
+    // make sure all the senders are running before doing any puts
+    waitForSendersRunning();
+
+    // FIRST RUN: now, the senders are started. So, start the puts
+    vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 100));
+
+    vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 
100));
+
+    stopSenders();
+
+    waitForAllSendersNotRunning();
+
+    vm4.invoke(() -> {
+      DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+    });
+    vm5.invoke(() -> {
+      DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+    });
+    vm6.invoke(() -> {
+      DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+    });
+    vm7.invoke(() -> {
+      DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+    });
+
+    // SECOND RUN: keep one thread doing puts to the region
+    vm4.invoke(() -> doPutsFrom(getUniqueName() + "_PR", 100, 200));
+
+    vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 
100));
+
+    int parallelQueueRemovalMessageCountInVm4 = vm4.invoke(() -> {
+      CountSentPQRMObserver observer =
+          (CountSentPQRMObserver) DistributionMessageObserver.getInstance();
+      return observer.getNumberOfSentPQRM();
+    });
+
+    int parallelQueueRemovalMessageCountInVm5 = vm5.invoke(() -> {
+      CountSentPQRMObserver observer =
+          (CountSentPQRMObserver) DistributionMessageObserver.getInstance();
+      return observer.getNumberOfSentPQRM();
+    });
+
+    int parallelQueueRemovalMessageCountInVm6 = vm6.invoke(() -> {
+      CountSentPQRMObserver observer =
+          (CountSentPQRMObserver) DistributionMessageObserver.getInstance();
+      return observer.getNumberOfSentPQRM();
+    });
+
+    int parallelQueueRemovalMessageCountInVm7 = vm7.invoke(() -> {
+      CountSentPQRMObserver observer =
+          (CountSentPQRMObserver) DistributionMessageObserver.getInstance();
+      return observer.getNumberOfSentPQRM();
+    });
+
+    assertThat(parallelQueueRemovalMessageCountInVm4 + 
parallelQueueRemovalMessageCountInVm5
+        + parallelQueueRemovalMessageCountInVm6 + 
parallelQueueRemovalMessageCountInVm7)
+            .isEqualTo(100);
+
+    await().untilAsserted(() -> {
+      int vm4SecondarySize = vm4.invoke(() -> 
getSecondaryQueueSizeInStats("ln"));
+      int vm5SecondarySize = vm5.invoke(() -> 
getSecondaryQueueSizeInStats("ln"));
+      int vm6SecondarySize = vm6.invoke(() -> 
getSecondaryQueueSizeInStats("ln"));
+      int vm7SecondarySize = vm7.invoke(() -> 
getSecondaryQueueSizeInStats("ln"));
+      assertThat(vm4SecondarySize + vm5SecondarySize + vm6SecondarySize + 
vm7SecondarySize)
+          .isEqualTo(0);
+    });
+
+  }
 
   private void clearShadowBucketRegions(PartitionedRegion shadowRegion) {
     PartitionedRegionDataStore.BucketVisitor bucketVisitor =
@@ -1506,10 +1598,34 @@ public class ParallelGatewaySenderOperationsDUnitTest 
extends WANTestBase {
     vm7.invoke(() -> waitForSenderRunningState("ln"));
   }
 
+  private void waitForAllSendersNotRunning() {
+    vm4.invoke(() -> waitForSenderNonRunningState("ln"));
+    vm5.invoke(() -> waitForSenderNonRunningState("ln"));
+    vm6.invoke(() -> waitForSenderNonRunningState("ln"));
+    vm7.invoke(() -> waitForSenderNonRunningState("ln"));
+  }
+
   private void validateParallelSenderQueueAllBucketsDrained() {
     vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
     vm5.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
     vm6.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
     vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
   }
+
+  private static class CountSentPQRMObserver extends 
DistributionMessageObserver
+      implements Serializable {
+    private final AtomicInteger numberOfSentPQRM = new AtomicInteger(0);
+
+    @Override
+    public void beforeSendMessage(ClusterDistributionManager dm, 
DistributionMessage message) {
+      if (message instanceof ParallelQueueRemovalMessage) {
+        numberOfSentPQRM.addAndGet(message.getRecipients().size());
+      }
+    }
+
+    public int getNumberOfSentPQRM() {
+      return numberOfSentPQRM.get();
+    }
+  }
+
 }

Reply via email to