This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new bbe9a3a GEODE-9853: get all members hosting bucket (#7144) bbe9a3a is described below commit bbe9a3acf2f0812ef733dfe74f07fb9412c886e3 Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Thu Jan 27 11:23:29 2022 +0100 GEODE-9853: get all members hosting bucket (#7144) * GEODE-9853: get all members hosting bucket --- .../apache/geode/internal/cache/BucketAdvisor.java | 2 +- ...currentParallelGatewaySenderEventProcessor.java | 2 +- .../wan/parallel/ParallelGatewaySenderQueue.java | 74 +++++++++---- .../wan/parallel/ParallelQueueRemovalMessage.java | 8 +- .../geode/internal/cache/BucketAdvisorTest.java | 89 ++++++++++++++++ .../ParallelQueueRemovalMessageJUnitTest.java | 8 +- .../geode/internal/cache/wan/WANTestBase.java | 114 ++++++++++---------- .../ParallelGatewaySenderOperationsDUnitTest.java | 116 +++++++++++++++++++++ 8 files changed, 328 insertions(+), 85 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java index 29287c9..2b70f86 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java @@ -1768,7 +1768,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor { return redundancyTracker.getCurrentRedundancy(); } - Set<InternalDistributedMember> adviseInitialized() { + public Set<InternalDistributedMember> adviseInitialized() { return adviseFilter(profile -> { assert profile instanceof BucketProfile; BucketProfile bucketProfile = (BucketProfile) profile; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java index bcd70b7..84205a0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java @@ -167,7 +167,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId); boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary(); if (isPrimary) { - pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey); + pgsq.sendQueueRemovalMessageForDroppedEvent(prQ, bucketId, shadowKey); sender.getStatistics().incEventsDroppedDueToPrimarySenderNotRunning(); if (logger.isDebugEnabled()) { logger.debug("register dropped event for primary queue. BucketId is " + bucketId diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 235947e..54715b7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -105,7 +105,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue { SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + SEPARATOR; // <PartitionedRegion, Map<Integer, List<Object>>> - private final Map regionToDispatchedKeysMap = new ConcurrentHashMap(); + private final Map<String, Map<Integer, List<Object>>> regionToDispatchedKeysMap = + new ConcurrentHashMap<String, Map<Integer, List<Object>>>(); protected final StoppableReentrantLock buckToDispatchLock; private final StoppableCondition regionToDispatchedKeysMapEmpty; @@ -1172,9 +1173,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue { lock.lock(); boolean wasEmpty = regionToDispatchedKeysMap.isEmpty(); try { - Map bucketIdToDispatchedKeys = (Map) regionToDispatchedKeysMap.get(prQ.getFullPath()); + Map<Integer, List<Object>> bucketIdToDispatchedKeys = + regionToDispatchedKeysMap.get(prQ.getFullPath()); if (bucketIdToDispatchedKeys == null) { - bucketIdToDispatchedKeys = new ConcurrentHashMap(); + bucketIdToDispatchedKeys = new ConcurrentHashMap<Integer, List<Object>>(); regionToDispatchedKeysMap.put(prQ.getFullPath(), bucketIdToDispatchedKeys); } addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key); @@ -1187,23 +1189,26 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } } - public void sendQueueRemovalMesssageForDroppedEvent(PartitionedRegion prQ, int bucketId, + public void sendQueueRemovalMessageForDroppedEvent(PartitionedRegion prQ, int bucketId, Object key) { - final HashMap<String, Map<Integer, List>> temp = new HashMap<>(); - Map bucketIdToDispatchedKeys = new ConcurrentHashMap(); - temp.put(prQ.getFullPath(), bucketIdToDispatchedKeys); - addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key); + Set<InternalDistributedMember> recipients = - removalThread.getAllRecipients(sender.getCache(), temp); + removalThread.getAllRecipientsForEvent(sender.getCache(), prQ.getFullPath(), bucketId); + if (!recipients.isEmpty()) { + final Map<String, Map<Integer, List<Object>>> temp = new HashMap<>(); + Map<Integer, List<Object>> bucketIdToDispatchedKeys = new ConcurrentHashMap<>(); + temp.put(prQ.getFullPath(), bucketIdToDispatchedKeys); + addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key); ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(temp); pqrm.setRecipients(recipients); sender.getCache().getInternalDistributedSystem().getDistributionManager().putOutgoing(pqrm); } } - private void addRemovedEventToMap(Map bucketIdToDispatchedKeys, int bucketId, Object key) { - List dispatchedKeys = (List) bucketIdToDispatchedKeys.get(bucketId); + private void addRemovedEventToMap(Map<Integer, List<Object>> bucketIdToDispatchedKeys, + int bucketId, Object key) { + List<Object> dispatchedKeys = bucketIdToDispatchedKeys.get(bucketId); if (dispatchedKeys == null) { dispatchedKeys = new ArrayList<>(); bucketIdToDispatchedKeys.put(bucketId, dispatchedKeys); @@ -1215,9 +1220,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue { buckToDispatchLock.lock(); boolean wasEmpty = regionToDispatchedKeysMap.isEmpty(); try { - Map bucketIdToDispatchedKeys = (Map) regionToDispatchedKeysMap.get(prQ.getFullPath()); + Map<Integer, List<Object>> bucketIdToDispatchedKeys = + regionToDispatchedKeysMap.get(prQ.getFullPath()); if (bucketIdToDispatchedKeys == null) { - bucketIdToDispatchedKeys = new ConcurrentHashMap(); + bucketIdToDispatchedKeys = new ConcurrentHashMap<>(); regionToDispatchedKeysMap.put(prQ.getFullPath(), bucketIdToDispatchedKeys); } addRemovedEventsToMap(bucketIdToDispatchedKeys, bucketId, shadowKeys); @@ -1233,9 +1239,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { buckToDispatchLock.lock(); boolean wasEmpty = regionToDispatchedKeysMap.isEmpty(); try { - Map bucketIdToDispatchedKeys = (Map) regionToDispatchedKeysMap.get(prQPath); + Map<Integer, List<Object>> bucketIdToDispatchedKeys = regionToDispatchedKeysMap.get(prQPath); if (bucketIdToDispatchedKeys == null) { - bucketIdToDispatchedKeys = new ConcurrentHashMap(); + bucketIdToDispatchedKeys = new ConcurrentHashMap<>(); regionToDispatchedKeysMap.put(prQPath, bucketIdToDispatchedKeys); } addRemovedEventsToMap(bucketIdToDispatchedKeys, bucketId, shadowKeys); @@ -1247,8 +1253,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } } - private void addRemovedEventsToMap(Map bucketIdToDispatchedKeys, int bucketId, List keys) { - List dispatchedKeys = (List) bucketIdToDispatchedKeys.get(bucketId); + private void addRemovedEventsToMap(Map<Integer, List<Object>> bucketIdToDispatchedKeys, + int bucketId, List<Object> keys) { + List<Object> dispatchedKeys = bucketIdToDispatchedKeys.get(bucketId); if (dispatchedKeys == null) { dispatchedKeys = keys == null ? new ArrayList<>() : keys; } else { @@ -1886,7 +1893,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } } - final HashMap<String, Map<Integer, List>> temp; + final Map<String, Map<Integer, List<Object>>> temp; buckToDispatchLock.lock(); try { boolean wasEmpty = regionToDispatchedKeysMap.isEmpty(); @@ -1897,7 +1904,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { continue; } // TODO: This should be optimized. + temp = new HashMap<>(regionToDispatchedKeysMap); + regionToDispatchedKeysMap.clear(); } finally { buckToDispatchLock.unlock(); @@ -1966,6 +1975,35 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return recipients; } + private Set<InternalDistributedMember> getAllRecipientsForEvent(InternalCache cache, + String partitionedRegionName, int bucketId) { + Set<InternalDistributedMember> recipients = new ObjectOpenHashSet<>(); + PartitionedRegion partitionedRegion = + (PartitionedRegion) cache.getRegion(partitionedRegionName); + if (partitionedRegion != null && partitionedRegion.getRegionAdvisor() != null) { + final String bucketFullPath = + SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + SEPARATOR + + partitionedRegion.getBucketName(bucketId); + AbstractBucketRegionQueue bucketRegionQueue = + (AbstractBucketRegionQueue) cache.getInternalRegionByPath(bucketFullPath); + if (bucketRegionQueue != null && bucketRegionQueue.getBucketAdvisor() != null) { + Set<InternalDistributedMember> bucketMembers = + bucketRegionQueue.getBucketAdvisor().adviseInitialized(); + if (!bucketMembers.isEmpty()) { + recipients.addAll(bucketMembers); + } else { + recipients.addAll(partitionedRegion.getRegionAdvisor().adviseDataStore()); + } + } else { + recipients.addAll(partitionedRegion.getRegionAdvisor().adviseDataStore()); + } + } + + return recipients; + } + + + /** * shutdown this thread and the caller thread will join this thread */ diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java index cc801f2..10951e8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java @@ -20,7 +20,6 @@ import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BE import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -59,12 +58,13 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage { private static final Logger logger = LogService.getLogger(); - private HashMap regionToDispatchedKeysMap; + private Map<String, Map<Integer, List<Object>>> regionToDispatchedKeysMap; public ParallelQueueRemovalMessage() {} - public ParallelQueueRemovalMessage(HashMap rgnToDispatchedKeysMap) { - regionToDispatchedKeysMap = rgnToDispatchedKeysMap; + public ParallelQueueRemovalMessage( + Map<String, Map<Integer, List<Object>>> rgnToDispatchedKeysMap) { + this.regionToDispatchedKeysMap = rgnToDispatchedKeysMap; } @Override diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java index 1c8f1e1..817386c 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java @@ -228,4 +228,93 @@ public class BucketAdvisorTest { // Unknown Shadow Bucket assertThat(bucketAdvisor.isShadowBucketDestroyed(SEPARATOR + "b5")).isFalse(); } + + @Test + public void testGetAllHostingMembersReturnsNoMembersWhenBucketAdvisorHasNoProfiles() { + DistributionManager distributionManager = mock(DistributionManager.class); + when(distributionManager.getId()).thenReturn(new InternalDistributedMember("localhost", 321)); + + Bucket bucket = mock(Bucket.class); + when(bucket.isHosting()).thenReturn(true); + when(bucket.isPrimary()).thenReturn(false); + when(bucket.getDistributionManager()).thenReturn(distributionManager); + + PartitionedRegion partitionedRegion = mock(PartitionedRegion.class); + when(partitionedRegion.getRedundantCopies()).thenReturn(0); + when(partitionedRegion.getPartitionAttributes()).thenReturn(new PartitionAttributesImpl()); + RegionAdvisor regionAdvisor = mock(RegionAdvisor.class); + when(regionAdvisor.getPartitionedRegion()).thenReturn(partitionedRegion); + + BucketAdvisor bucketAdvisor = BucketAdvisor.createBucketAdvisor(bucket, regionAdvisor); + bucketAdvisor.setInitialized(); + + assertThat(bucketAdvisor.adviseInitialized().isEmpty()).isTrue(); + } + + @Test + public void testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasOneProfileWithHostingBucket() { + DistributionManager distributionManager = mock(DistributionManager.class); + InternalDistributedMember memberId = new InternalDistributedMember("localhost", 321); + + when(distributionManager.getId()).thenReturn(memberId); + + Bucket bucket = mock(Bucket.class); + when(bucket.isHosting()).thenReturn(true); + when(bucket.isPrimary()).thenReturn(false); + when(bucket.getDistributionManager()).thenReturn(distributionManager); + + PartitionedRegion partitionedRegion = mock(PartitionedRegion.class); + when(partitionedRegion.getRedundantCopies()).thenReturn(0); + when(partitionedRegion.getPartitionAttributes()).thenReturn(new PartitionAttributesImpl()); + when(partitionedRegion.getRedundancyTracker()) + .thenReturn(mock(PartitionedRegionRedundancyTracker.class)); + + RegionAdvisor regionAdvisor = mock(RegionAdvisor.class); + when(regionAdvisor.getPartitionedRegion()).thenReturn(partitionedRegion); + + BucketAdvisor bucketAdvisor = BucketAdvisor.createBucketAdvisor(bucket, regionAdvisor); + bucketAdvisor.setInitialized(); + + BucketAdvisor.BucketProfile bp = new BucketAdvisor.BucketProfile(memberId, 0, bucket); + + assertThat(bucketAdvisor.putProfile(bp, true)).isTrue(); + assertThat(bucketAdvisor.adviseInitialized().size()).isEqualTo(1); + } + + @Test + public void testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasTwoProfilesAndOneIsHostingBucket() { + DistributionManager distributionManager = mock(DistributionManager.class); + InternalDistributedMember memberId = new InternalDistributedMember("localhost", 321); + InternalDistributedMember memberId2 = new InternalDistributedMember("localhost", 323); + + when(distributionManager.getId()).thenReturn(memberId); + + Bucket bucket = mock(Bucket.class); + when(bucket.isHosting()).thenReturn(true); + when(bucket.isPrimary()).thenReturn(false); + when(bucket.getDistributionManager()).thenReturn(distributionManager); + + PartitionedRegion partitionedRegion = mock(PartitionedRegion.class); + when(partitionedRegion.getRedundantCopies()).thenReturn(0); + when(partitionedRegion.getPartitionAttributes()).thenReturn(new PartitionAttributesImpl()); + when(partitionedRegion.getRedundancyTracker()) + .thenReturn(mock(PartitionedRegionRedundancyTracker.class)); + + RegionAdvisor regionAdvisor = mock(RegionAdvisor.class); + when(regionAdvisor.getPartitionedRegion()).thenReturn(partitionedRegion); + + BucketAdvisor bucketAdvisor = BucketAdvisor.createBucketAdvisor(bucket, regionAdvisor); + + BucketAdvisor.BucketProfile bp = new BucketAdvisor.BucketProfile(memberId, 0, bucket); + BucketAdvisor.BucketProfile bp2 = new BucketAdvisor.BucketProfile(memberId2, 0, bucket); + bp2.isHosting = false; + bp2.isInitializing = true; + bp2.isPrimary = false; + + bucketAdvisor.setInitialized(); + assertThat(bucketAdvisor.putProfile(bp, true)).isTrue(); + assertThat(bucketAdvisor.putProfile(bp2, true)).isTrue(); + + assertThat(bucketAdvisor.adviseInitialized().size()).isEqualTo(1); + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java index 2e58626..84a6048 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java @@ -249,10 +249,10 @@ public class ParallelQueueRemovalMessageJUnitTest { message.process((ClusterDistributionManager) cache.getDistributionManager()); } - private HashMap<String, Map<Integer, List<Long>>> createRegionToDispatchedKeysMap() { - HashMap<String, Map<Integer, List<Long>>> regionToDispatchedKeys = new HashMap<>(); - Map<Integer, List<Long>> bucketIdToDispatchedKeys = new HashMap<>(); - List<Long> dispatchedKeys = new ArrayList<>(); + private HashMap<String, Map<Integer, List<Object>>> createRegionToDispatchedKeysMap() { + HashMap<String, Map<Integer, List<Object>>> regionToDispatchedKeys = new HashMap<>(); + Map<Integer, List<Object>> bucketIdToDispatchedKeys = new HashMap<>(); + List<Object> dispatchedKeys = new ArrayList<>(); dispatchedKeys.add(KEY); bucketIdToDispatchedKeys.put(BUCKET_ID, dispatchedKeys); regionToDispatchedKeys.put(ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID), diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java index 4c5bba9..d272baa 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -243,9 +243,9 @@ public class WANTestBase extends DistributedTestCase { public void setUpWANTestBase() throws Exception { shuffleNumDispatcherThreads(); Invoke.invokeInEveryVM(() -> setNumDispatcherThreadsForTheRun(dispatcherThreads.get(0))); - IgnoredException.addIgnoredException("Connection refused"); - IgnoredException.addIgnoredException("Software caused connection abort"); - IgnoredException.addIgnoredException("Connection reset"); + addIgnoredException("Connection refused"); + addIgnoredException("Software caused connection abort"); + addIgnoredException("Connection reset"); postSetUpWANTestBase(); } @@ -417,11 +417,11 @@ public class WANTestBase extends DistributedTestCase { public static void createReplicatedRegion(String regionName, String senderIds, Boolean offHeap) { IgnoredException exp = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); IgnoredException exp1 = - IgnoredException.addIgnoredException(InterruptedException.class.getName()); + addIgnoredException(InterruptedException.class.getName()); IgnoredException exp2 = - IgnoredException.addIgnoredException(GatewaySenderException.class.getName()); + addIgnoredException(GatewaySenderException.class.getName()); try { RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE); if (senderIds != null) { @@ -444,11 +444,11 @@ public class WANTestBase extends DistributedTestCase { public static void createReplicatedProxyRegion(String regionName, String senderIds, Boolean offHeap) { IgnoredException exp = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); IgnoredException exp1 = - IgnoredException.addIgnoredException(InterruptedException.class.getName()); + addIgnoredException(InterruptedException.class.getName()); IgnoredException exp2 = - IgnoredException.addIgnoredException(GatewaySenderException.class.getName()); + addIgnoredException(GatewaySenderException.class.getName()); try { RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE_PROXY); if (senderIds != null) { @@ -499,7 +499,7 @@ public class WANTestBase extends DistributedTestCase { public static void createReplicatedRegionWithAsyncEventQueue(String regionName, String asyncQueueIds, Boolean offHeap) { IgnoredException exp1 = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); try { RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE); if (asyncQueueIds != null) { @@ -520,7 +520,7 @@ public class WANTestBase extends DistributedTestCase { public static void createReplicatedRegionWithSenderAndAsyncEventQueue(String regionName, String senderIds, String asyncChannelId, Boolean offHeap) { IgnoredException exp = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); try { RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE); if (senderIds != null) { @@ -609,9 +609,9 @@ public class WANTestBase extends DistributedTestCase { Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap, RegionShortcut shortcut, boolean statisticsEnabled, boolean concurrencyChecksEnabled) { IgnoredException exp = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); IgnoredException exp1 = - IgnoredException.addIgnoredException(PartitionOfflineException.class.getName()); + addIgnoredException(PartitionOfflineException.class.getName()); try { RegionFactory fact = cache.createRegionFactory(shortcut); if (senderIds != null) { @@ -640,9 +640,9 @@ public class WANTestBase extends DistributedTestCase { public static void createPartitionedRegionWithPersistence(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets) { IgnoredException exp = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); IgnoredException exp1 = - IgnoredException.addIgnoredException(PartitionOfflineException.class.getName()); + addIgnoredException(PartitionOfflineException.class.getName()); try { RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT); if (senderIds != null) { @@ -667,9 +667,9 @@ public class WANTestBase extends DistributedTestCase { public static void createColocatedPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, String colocatedWith) { IgnoredException exp = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); IgnoredException exp1 = - IgnoredException.addIgnoredException(PartitionOfflineException.class.getName()); + addIgnoredException(PartitionOfflineException.class.getName()); try { RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION); if (senderIds != null) { @@ -750,9 +750,9 @@ public class WANTestBase extends DistributedTestCase { Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) { IgnoredException exp = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); IgnoredException exp1 = - IgnoredException.addIgnoredException(PartitionOfflineException.class.getName()); + addIgnoredException(PartitionOfflineException.class.getName()); try { RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT); @@ -778,7 +778,7 @@ public class WANTestBase extends DistributedTestCase { public static void createCustomerOrderShipmentPartitionedRegion(String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) { IgnoredException exp = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); try { RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION); if (senderIds != null) { @@ -1100,12 +1100,12 @@ public class WANTestBase extends DistributedTestCase { } public static void startSender(String senderId) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + final IgnoredException exln = addIgnoredException("Could not connect"); IgnoredException exp = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); IgnoredException exp1 = - IgnoredException.addIgnoredException(InterruptedException.class.getName()); + addIgnoredException(InterruptedException.class.getName()); try { GatewaySender sender = getGatewaySender(senderId); sender.start(); @@ -1118,12 +1118,12 @@ public class WANTestBase extends DistributedTestCase { } public static void startSenderwithCleanQueues(String senderId) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + final IgnoredException exln = addIgnoredException("Could not connect"); IgnoredException exp = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); IgnoredException exp1 = - IgnoredException.addIgnoredException(InterruptedException.class.getName()); + addIgnoredException(InterruptedException.class.getName()); try { GatewaySender sender = getGatewaySender(senderId); sender.startWithCleanQueue(); @@ -1431,7 +1431,7 @@ public class WANTestBase extends DistributedTestCase { } public static void waitForSenderRunningState(String senderId) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + final IgnoredException exln = addIgnoredException("Could not connect"); try { Set<GatewaySender> senders = cache.getGatewaySenders(); final GatewaySender sender = getGatewaySenderById(senders, senderId); @@ -1641,9 +1641,9 @@ public class WANTestBase extends DistributedTestCase { } public static void pauseSender(String senderId) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + final IgnoredException exln = addIgnoredException("Could not connect"); IgnoredException exp = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); try { GatewaySender sender = getGatewaySender(senderId); sender.pause(); @@ -1656,9 +1656,9 @@ public class WANTestBase extends DistributedTestCase { } public static void resumeSender(String senderId) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + final IgnoredException exln = addIgnoredException("Could not connect"); IgnoredException exp = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); try { GatewaySender sender = getGatewaySender(senderId); sender.resume(); @@ -1683,9 +1683,9 @@ public class WANTestBase extends DistributedTestCase { } public static void stopSender(String senderId) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + final IgnoredException exln = addIgnoredException("Could not connect"); IgnoredException exp = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); try { GatewaySender sender = getGatewaySender(senderId); AbstractGatewaySenderEventProcessor eventProcessor = null; @@ -1767,7 +1767,7 @@ public class WANTestBase extends DistributedTestCase { public static void createSender(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, GatewayEventFilter filter, boolean isManualStart, boolean groupTransactionEvents) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + final IgnoredException exln = addIgnoredException("Could not connect"); try { File persistentDirectory = new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); @@ -1803,7 +1803,7 @@ public class WANTestBase extends DistributedTestCase { boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy orderPolicy, int socketBufferSize) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + final IgnoredException exln = addIgnoredException("Could not connect"); try { File persistentDirectory = new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); @@ -1870,7 +1870,7 @@ public class WANTestBase extends DistributedTestCase { List<GatewayEventFilter> eventFilters, List<GatewayTransportFilter> transportFilters, boolean isManualStart, boolean isDiskSync) { IgnoredException exp1 = - IgnoredException.addIgnoredException(RegionDestroyedException.class.getName()); + addIgnoredException(RegionDestroyedException.class.getName()); try { File persistentDirectory = new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); @@ -2286,9 +2286,9 @@ public class WANTestBase extends DistributedTestCase { txMgr.setDistributed(true); IgnoredException exp1 = - IgnoredException.addIgnoredException(InterruptedException.class.getName()); + addIgnoredException(InterruptedException.class.getName()); IgnoredException exp2 = - IgnoredException.addIgnoredException(GatewaySenderException.class.getName()); + addIgnoredException(GatewaySenderException.class.getName()); try { Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName); for (long i = 1; i <= numPuts; i++) { @@ -2305,9 +2305,9 @@ public class WANTestBase extends DistributedTestCase { public static void doPuts(String regionName, int numPuts, Object value) { IgnoredException exp1 = - IgnoredException.addIgnoredException(InterruptedException.class.getName()); + addIgnoredException(InterruptedException.class.getName()); IgnoredException exp2 = - IgnoredException.addIgnoredException(GatewaySenderException.class.getName()); + addIgnoredException(GatewaySenderException.class.getName()); try { Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName); for (long i = 0; i < numPuts; i++) { @@ -2321,9 +2321,9 @@ public class WANTestBase extends DistributedTestCase { public static void doPuts(String regionName, int numPuts) { IgnoredException exp1 = - IgnoredException.addIgnoredException(InterruptedException.class.getName()); + addIgnoredException(InterruptedException.class.getName()); IgnoredException exp2 = - IgnoredException.addIgnoredException(GatewaySenderException.class.getName()); + addIgnoredException(GatewaySenderException.class.getName()); try { Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName); for (long i = 0; i < numPuts; i++) { @@ -2337,9 +2337,9 @@ public class WANTestBase extends DistributedTestCase { public static void doTxPuts(String regionName, int numPuts) { try ( - IgnoredException ignored = IgnoredException.addIgnoredException(InterruptedException.class); + IgnoredException ignored = addIgnoredException(InterruptedException.class); IgnoredException ignored1 = - IgnoredException.addIgnoredException(GatewaySenderException.class)) { + addIgnoredException(GatewaySenderException.class)) { Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName); for (long i = 0; i < numPuts; i++) { cache.getCacheTransactionManager().begin(); @@ -2351,9 +2351,9 @@ public class WANTestBase extends DistributedTestCase { public static void doPutsSameKey(String regionName, int numPuts, String key) { IgnoredException exp1 = - IgnoredException.addIgnoredException(InterruptedException.class.getName()); + addIgnoredException(InterruptedException.class.getName()); IgnoredException exp2 = - IgnoredException.addIgnoredException(GatewaySenderException.class.getName()); + addIgnoredException(GatewaySenderException.class.getName()); try { Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName); for (long i = 0; i < numPuts; i++) { @@ -2473,7 +2473,7 @@ public class WANTestBase extends DistributedTestCase { public static void localDestroyRegion(String regionName) { IgnoredException exp = - IgnoredException.addIgnoredException(PRLocallyDestroyedException.class.getName()); + addIgnoredException(PRLocallyDestroyedException.class.getName()); try { Region<?, ?> r = cache.getRegion(SEPARATOR + regionName); r.localDestroyRegion(); @@ -2774,7 +2774,7 @@ public class WANTestBase extends DistributedTestCase { public static void doNextPuts(String regionName, int start, int numPuts) { IgnoredException exp = - IgnoredException.addIgnoredException(CacheClosedException.class.getName()); + addIgnoredException(CacheClosedException.class.getName()); try { Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName); for (long i = start; i < numPuts; i++) { @@ -2900,9 +2900,9 @@ public class WANTestBase extends DistributedTestCase { public static void validateRegionSize(String regionName, final int regionSize) { IgnoredException exp = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); IgnoredException exp1 = - IgnoredException.addIgnoredException(CacheClosedException.class.getName()); + addIgnoredException(CacheClosedException.class.getName()); try { final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName); if (regionSize != r.keySet().size()) { @@ -3169,11 +3169,11 @@ public class WANTestBase extends DistributedTestCase { } public static Boolean killSender(String senderId) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + final IgnoredException exln = addIgnoredException("Could not connect"); IgnoredException exp = - IgnoredException.addIgnoredException(CacheClosedException.class.getName()); + addIgnoredException(CacheClosedException.class.getName()); IgnoredException exp1 = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); try { AbstractGatewaySender sender = (AbstractGatewaySender) getGatewaySender(senderId); if (sender.isPrimary()) { @@ -3278,9 +3278,9 @@ public class WANTestBase extends DistributedTestCase { public static void validateQueueContents(final String senderId, final int regionSize) { IgnoredException exp1 = - IgnoredException.addIgnoredException(InterruptedException.class.getName()); + addIgnoredException(InterruptedException.class.getName()); IgnoredException exp2 = - IgnoredException.addIgnoredException(GatewaySenderException.class.getName()); + addIgnoredException(GatewaySenderException.class.getName()); try { GatewaySender sender = getGatewaySender(senderId); @@ -3430,9 +3430,9 @@ public class WANTestBase extends DistributedTestCase { public static void validateParallelSenderQueueAllBucketsDrained(final String senderId) { IgnoredException exp = - IgnoredException.addIgnoredException(RegionDestroyedException.class.getName()); + addIgnoredException(RegionDestroyedException.class.getName()); IgnoredException exp1 = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + addIgnoredException(ForceReattemptException.class.getName()); try { GatewaySender sender = getGatewaySender(senderId); final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender; diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java index 7cd8f61..9a75d6b 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java @@ -26,6 +26,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -39,10 +40,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -55,6 +58,9 @@ import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.wan.GatewayEventFilter; import org.apache.geode.cache.wan.GatewaySender; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; import org.apache.geode.internal.cache.BucketRegion; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.PartitionedRegion; @@ -69,6 +75,7 @@ import org.apache.geode.test.awaitility.GeodeAwaitility; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.RMIException; +import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.rules.ClusterStartupRule; import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; import org.apache.geode.test.dunit.rules.MemberVM; @@ -100,6 +107,13 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { addIgnoredException("could not get remote locator information"); } + @After + public void tearDown() { + for (VM vm : asList(vm4, vm5, vm6, vm7)) { + vm.invoke(() -> DistributionMessageObserver.setInstance(null)); + } + } + @Test(timeout = 300_000) public void testStopOneConcurrentGatewaySenderWithSSL() throws Exception { Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1)); @@ -1182,6 +1196,84 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { } + /** + * Put entries in region after gateway sender is stopped. Count number of PQRM messages sent. + */ + @Test + public void testDroppedEventsSignalizationToSecondaryQueueWhileSenderStopped() { + int lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1)); + int nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort)); + + createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); + + // make sure all the senders are running before doing any puts + waitForSendersRunning(); + + // FIRST RUN: now, the senders are started. So, start the puts + vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 100)); + + vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 100)); + + stopSenders(); + + waitForAllSendersNotRunning(); + + vm4.invoke(() -> { + DistributionMessageObserver.setInstance(new CountSentPQRMObserver()); + }); + vm5.invoke(() -> { + DistributionMessageObserver.setInstance(new CountSentPQRMObserver()); + }); + vm6.invoke(() -> { + DistributionMessageObserver.setInstance(new CountSentPQRMObserver()); + }); + vm7.invoke(() -> { + DistributionMessageObserver.setInstance(new CountSentPQRMObserver()); + }); + + // SECOND RUN: keep one thread doing puts to the region + vm4.invoke(() -> doPutsFrom(getUniqueName() + "_PR", 100, 200)); + + vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 100)); + + int parallelQueueRemovalMessageCountInVm4 = vm4.invoke(() -> { + CountSentPQRMObserver observer = + (CountSentPQRMObserver) DistributionMessageObserver.getInstance(); + return observer.getNumberOfSentPQRM(); + }); + + int parallelQueueRemovalMessageCountInVm5 = vm5.invoke(() -> { + CountSentPQRMObserver observer = + (CountSentPQRMObserver) DistributionMessageObserver.getInstance(); + return observer.getNumberOfSentPQRM(); + }); + + int parallelQueueRemovalMessageCountInVm6 = vm6.invoke(() -> { + CountSentPQRMObserver observer = + (CountSentPQRMObserver) DistributionMessageObserver.getInstance(); + return observer.getNumberOfSentPQRM(); + }); + + int parallelQueueRemovalMessageCountInVm7 = vm7.invoke(() -> { + CountSentPQRMObserver observer = + (CountSentPQRMObserver) DistributionMessageObserver.getInstance(); + return observer.getNumberOfSentPQRM(); + }); + + assertThat(parallelQueueRemovalMessageCountInVm4 + parallelQueueRemovalMessageCountInVm5 + + parallelQueueRemovalMessageCountInVm6 + parallelQueueRemovalMessageCountInVm7) + .isEqualTo(100); + + await().untilAsserted(() -> { + int vm4SecondarySize = vm4.invoke(() -> getSecondaryQueueSizeInStats("ln")); + int vm5SecondarySize = vm5.invoke(() -> getSecondaryQueueSizeInStats("ln")); + int vm6SecondarySize = vm6.invoke(() -> getSecondaryQueueSizeInStats("ln")); + int vm7SecondarySize = vm7.invoke(() -> getSecondaryQueueSizeInStats("ln")); + assertThat(vm4SecondarySize + vm5SecondarySize + vm6SecondarySize + vm7SecondarySize) + .isEqualTo(0); + }); + + } private void clearShadowBucketRegions(PartitionedRegion shadowRegion) { PartitionedRegionDataStore.BucketVisitor bucketVisitor = @@ -1506,10 +1598,34 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { vm7.invoke(() -> waitForSenderRunningState("ln")); } + private void waitForAllSendersNotRunning() { + vm4.invoke(() -> waitForSenderNonRunningState("ln")); + vm5.invoke(() -> waitForSenderNonRunningState("ln")); + vm6.invoke(() -> waitForSenderNonRunningState("ln")); + vm7.invoke(() -> waitForSenderNonRunningState("ln")); + } + private void validateParallelSenderQueueAllBucketsDrained() { vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); vm5.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); vm6.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); } + + private static class CountSentPQRMObserver extends DistributionMessageObserver + implements Serializable { + private final AtomicInteger numberOfSentPQRM = new AtomicInteger(0); + + @Override + public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage message) { + if (message instanceof ParallelQueueRemovalMessage) { + numberOfSentPQRM.addAndGet(message.getRecipients().size()); + } + } + + public int getNumberOfSentPQRM() { + return numberOfSentPQRM.get(); + } + } + }