This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 6fcb258a65 GEODE-10405: added ignore exceprion for GW queue region (#7831) 6fcb258a65 is described below commit 6fcb258a6515f268b119eacd88207860a8750720 Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Tue Sep 6 11:26:15 2022 +0200 GEODE-10405: added ignore exceprion for GW queue region (#7831) * GEODE-10405: added ignore exceprion for GW queue region * GEODE-10405: added test --- .../cache/persistence/PersistenceAdvisorImpl.java | 19 +- ...NPersistenceEnabledGatewaySender2DUnitTest.java | 205 +++++++++++++++++++++ 2 files changed, 223 insertions(+), 1 deletion(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java index 174e4e042c..185bb4b486 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java @@ -45,10 +45,13 @@ import org.apache.geode.distributed.internal.ProfileListener; import org.apache.geode.distributed.internal.ReplyException; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.CopyOnWriteHashSet; +import org.apache.geode.internal.cache.BucketAdvisor; import org.apache.geode.internal.cache.CacheDistributionAdvisor; import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; import org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice; import org.apache.geode.internal.cache.DiskRegionStats; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.ProxyBucketRegion; import org.apache.geode.internal.cache.persistence.PersistentMemberManager.MemberRevocationListener; import org.apache.geode.internal.cache.persistence.PersistentStateQueryMessage.PersistentStateQueryReplyProcessor; import org.apache.geode.internal.logging.log4j.LogMarker; @@ -542,8 +545,22 @@ public class PersistenceAdvisorImpl implements InternalPersistenceAdvisor { if (copyOfReplicates == null) { copyOfReplicates = new HashSet<>(replicates); } + + boolean gwRegion = false; + + if (cacheDistributionAdvisor instanceof BucketAdvisor) { + BucketAdvisor ba = (BucketAdvisor) cacheDistributionAdvisor; + if (ba.getAdvisee() instanceof ProxyBucketRegion) { + ProxyBucketRegion pbr = (ProxyBucketRegion) ba.getAdvisee(); + PartitionedRegion pr = pbr.getPartitionedRegion(); + if (pr != null) { + gwRegion = pr.isShadowPR(); + } + } + } + copyOfReplicates.remove(member); - if (copyOfReplicates.isEmpty()) { + if (copyOfReplicates.isEmpty() && !gwRegion) { throw new ConflictingPersistentDataException(message); } else { logger.info(message); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySender2DUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySender2DUnitTest.java new file mode 100644 index 0000000000..c510a58147 --- /dev/null +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySender2DUnitTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.parallel; + + + +import org.apache.logging.log4j.Logger; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.internal.cache.wan.WANTestBase; +import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.dunit.SerializableRunnableIF; +import org.apache.geode.test.junit.categories.WanTest; + +@Category({WanTest.class}) +public class ParallelWANPersistenceEnabledGatewaySender2DUnitTest extends WANTestBase { + + + private static final long serialVersionUID = 1L; + private static final Logger logger = LogService.getLogger(); + + public ParallelWANPersistenceEnabledGatewaySender2DUnitTest() { + super(); + } + + @Override + protected final void postSetUpWANTestBase() throws Exception { + // The restart tests log this string + IgnoredException.addIgnoredException("failed accepting client connection"); + } + + protected SerializableRunnableIF killSenderRunnable() { + return WANTestBase::killSender; + } + + protected SerializableRunnableIF createPartitionedRegionRunnable() { + return () -> WANTestBase.createPartitionedRegion(getTestMethodName(), "ln", 1, 100, + isOffHeap()); + } + + protected SerializableRunnableIF pauseSenderRunnable() { + return () -> WANTestBase.pauseSender("ln"); + } + + protected SerializableRunnableIF stopSenderRunnable() { + return () -> WANTestBase.stopSender("ln"); + } + + protected SerializableRunnableIF startSenderRunnable() { + return () -> WANTestBase.startSender("ln"); + } + + + protected SerializableRunnableIF waitForSenderRunnable() { + return () -> WANTestBase.waitForSenderRunningState("ln"); + } + + private SerializableRunnableIF waitForSenderNonRunnable() { + return () -> WANTestBase.waitForSenderNonRunningState("ln"); + } + + @Test + public void testPersistentPR_Restart_one_server_while_clean_queue() throws InterruptedException { + // create locator on local site + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + // create locator on remote site + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + // create cache in remote site + createCacheInVMs(nyPort, vm2, vm3); + + // create cache in local site + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + // create senders with disk store + String diskStore1 = vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, + true, 100, 10, false, true, null, null, true)); + String diskStore2 = vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, + true, 100, 10, false, true, null, null, true)); + String diskStore3 = vm6.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, + true, 100, 10, false, true, null, null, true)); + String diskStore4 = vm7.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, + true, 100, 10, false, true, null, null, true)); + + logger + .info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4); + + // create PR on remote site + vm2.invoke( + () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 1, 100, isOffHeap())); + vm3.invoke( + () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 1, 100, isOffHeap())); + + // create PR on local site + vm4.invoke(createPartitionedRegionRunnable()); + vm5.invoke(createPartitionedRegionRunnable()); + vm6.invoke(createPartitionedRegionRunnable()); + vm7.invoke(createPartitionedRegionRunnable()); + + + // start the senders on local site + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + // wait for senders to become running + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); + + logger.info("All senders are running."); + + // start puts in region on local site + vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000)); + logger.info("Completed puts in the region"); + + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0)); + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0)); + logger.info("Check that no events are propagated to remote site"); + + vm7.invoke(killSenderRunnable()); + + logger.info("Killed vm7 sender."); + // --------------------close and rebuild local site + // ------------------------------------------------- + // stop the senders + + vm4.invoke(() -> WANTestBase.stopSender("ln")); + vm5.invoke(() -> WANTestBase.stopSender("ln")); + vm6.invoke(() -> WANTestBase.stopSender("ln")); + + logger.info("Stopped all the senders."); + + // wait for senders to stop + vm4.invoke(waitForSenderNonRunnable()); + vm5.invoke(waitForSenderNonRunnable()); + vm6.invoke(waitForSenderNonRunnable()); + + // create receiver on remote site + createReceiverInVMs(vm2, vm3); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0)); + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0)); + + logger.info("Start all the senders."); + + AsyncInvocation<Void> startSenderwithCleanQueuesInVM4 = + vm4.invokeAsync(() -> startSenderwithCleanQueues("ln")); + + AsyncInvocation<Void> startSenderwithCleanQueuesInVM5 = + vm5.invokeAsync(() -> startSenderwithCleanQueues("ln")); + AsyncInvocation<Void> startSenderwithCleanQueuesInVM6 = + vm6.invokeAsync(() -> startSenderwithCleanQueues("ln")); + + + startSenderwithCleanQueuesInVM4.await(); + startSenderwithCleanQueuesInVM5.await(); + startSenderwithCleanQueuesInVM6.await(); + + logger.info("Waiting for senders running."); + // wait for senders running + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + + logger.info("All the senders are now running..."); + + // restart the vm + vm7.invoke("Create back the cache", () -> createCache(lnPort)); + + // create senders with disk store + vm7.invoke("Create sender back from the disk store.", + () -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true, + null, diskStore4, false)); + + // create PR on local site + vm7.invoke("Create back the partitioned region", + () -> WANTestBase.createPartitionedRegion(getTestMethodName(), "ln", 1, + 100, isOffHeap())); + + // wait for senders running + // ---------------------------------------------------------------------------------------------------- + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0)); + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0)); + + } + + +}