This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6fcb258a65 GEODE-10405: added ignore exceprion for GW queue region 
(#7831)
6fcb258a65 is described below

commit 6fcb258a6515f268b119eacd88207860a8750720
Author: Mario Ivanac <48509724+miva...@users.noreply.github.com>
AuthorDate: Tue Sep 6 11:26:15 2022 +0200

    GEODE-10405: added ignore exceprion for GW queue region (#7831)
    
    * GEODE-10405: added ignore exceprion for GW queue region
    
    * GEODE-10405: added test
---
 .../cache/persistence/PersistenceAdvisorImpl.java  |  19 +-
 ...NPersistenceEnabledGatewaySender2DUnitTest.java | 205 +++++++++++++++++++++
 2 files changed, 223 insertions(+), 1 deletion(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java
index 174e4e042c..185bb4b486 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/PersistenceAdvisorImpl.java
@@ -45,10 +45,13 @@ import 
org.apache.geode.distributed.internal.ProfileListener;
 import org.apache.geode.distributed.internal.ReplyException;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.CopyOnWriteHashSet;
+import org.apache.geode.internal.cache.BucketAdvisor;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
 import 
org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice;
 import org.apache.geode.internal.cache.DiskRegionStats;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.ProxyBucketRegion;
 import 
org.apache.geode.internal.cache.persistence.PersistentMemberManager.MemberRevocationListener;
 import 
org.apache.geode.internal.cache.persistence.PersistentStateQueryMessage.PersistentStateQueryReplyProcessor;
 import org.apache.geode.internal.logging.log4j.LogMarker;
@@ -542,8 +545,22 @@ public class PersistenceAdvisorImpl implements 
InternalPersistenceAdvisor {
         if (copyOfReplicates == null) {
           copyOfReplicates = new HashSet<>(replicates);
         }
+
+        boolean gwRegion = false;
+
+        if (cacheDistributionAdvisor instanceof BucketAdvisor) {
+          BucketAdvisor ba = (BucketAdvisor) cacheDistributionAdvisor;
+          if (ba.getAdvisee() instanceof ProxyBucketRegion) {
+            ProxyBucketRegion pbr = (ProxyBucketRegion) ba.getAdvisee();
+            PartitionedRegion pr = pbr.getPartitionedRegion();
+            if (pr != null) {
+              gwRegion = pr.isShadowPR();
+            }
+          }
+        }
+
         copyOfReplicates.remove(member);
-        if (copyOfReplicates.isEmpty()) {
+        if (copyOfReplicates.isEmpty() && !gwRegion) {
           throw new ConflictingPersistentDataException(message);
         } else {
           logger.info(message);
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySender2DUnitTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySender2DUnitTest.java
new file mode 100644
index 0000000000..c510a58147
--- /dev/null
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySender2DUnitTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.SerializableRunnableIF;
+import org.apache.geode.test.junit.categories.WanTest;
+
+@Category({WanTest.class})
+public class ParallelWANPersistenceEnabledGatewaySender2DUnitTest extends 
WANTestBase {
+
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger logger = LogService.getLogger();
+
+  public ParallelWANPersistenceEnabledGatewaySender2DUnitTest() {
+    super();
+  }
+
+  @Override
+  protected final void postSetUpWANTestBase() throws Exception {
+    // The restart tests log this string
+    IgnoredException.addIgnoredException("failed accepting client connection");
+  }
+
+  protected SerializableRunnableIF killSenderRunnable() {
+    return WANTestBase::killSender;
+  }
+
+  protected SerializableRunnableIF createPartitionedRegionRunnable() {
+    return () -> WANTestBase.createPartitionedRegion(getTestMethodName(), 
"ln", 1, 100,
+        isOffHeap());
+  }
+
+  protected SerializableRunnableIF pauseSenderRunnable() {
+    return () -> WANTestBase.pauseSender("ln");
+  }
+
+  protected SerializableRunnableIF stopSenderRunnable() {
+    return () -> WANTestBase.stopSender("ln");
+  }
+
+  protected SerializableRunnableIF startSenderRunnable() {
+    return () -> WANTestBase.startSender("ln");
+  }
+
+
+  protected SerializableRunnableIF waitForSenderRunnable() {
+    return () -> WANTestBase.waitForSenderRunningState("ln");
+  }
+
+  private SerializableRunnableIF waitForSenderNonRunnable() {
+    return () -> WANTestBase.waitForSenderNonRunningState("ln");
+  }
+
+  @Test
+  public void testPersistentPR_Restart_one_server_while_clean_queue() throws 
InterruptedException {
+    // create locator on local site
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    // create locator on remote site
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    // create cache in remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+
+    // create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    // create senders with disk store
+    String diskStore1 = vm4.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore2 = vm5.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore3 = vm6.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+    String diskStore4 = vm7.invoke(() -> 
WANTestBase.createSenderWithDiskStore("ln", 2,
+        true, 100, 10, false, true, null, null, true));
+
+    logger
+        .info("The DS are: " + diskStore1 + "," + diskStore2 + "," + 
diskStore3 + "," + diskStore4);
+
+    // create PR on remote site
+    vm2.invoke(
+        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 
1, 100, isOffHeap()));
+    vm3.invoke(
+        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 
1, 100, isOffHeap()));
+
+    // create PR on local site
+    vm4.invoke(createPartitionedRegionRunnable());
+    vm5.invoke(createPartitionedRegionRunnable());
+    vm6.invoke(createPartitionedRegionRunnable());
+    vm7.invoke(createPartitionedRegionRunnable());
+
+
+    // start the senders on local site
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    // wait for senders to become running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+
+    logger.info("All senders are running.");
+
+    // start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 3000));
+    logger.info("Completed puts in the region");
+
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    logger.info("Check that no events are propagated to remote site");
+
+    vm7.invoke(killSenderRunnable());
+
+    logger.info("Killed vm7 sender.");
+    // --------------------close and rebuild local site
+    // -------------------------------------------------
+    // stop the senders
+
+    vm4.invoke(() -> WANTestBase.stopSender("ln"));
+    vm5.invoke(() -> WANTestBase.stopSender("ln"));
+    vm6.invoke(() -> WANTestBase.stopSender("ln"));
+
+    logger.info("Stopped all the senders.");
+
+    // wait for senders to stop
+    vm4.invoke(waitForSenderNonRunnable());
+    vm5.invoke(waitForSenderNonRunnable());
+    vm6.invoke(waitForSenderNonRunnable());
+
+    // create receiver on remote site
+    createReceiverInVMs(vm2, vm3);
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+
+    logger.info("Start all the senders.");
+
+    AsyncInvocation<Void> startSenderwithCleanQueuesInVM4 =
+        vm4.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+
+    AsyncInvocation<Void> startSenderwithCleanQueuesInVM5 =
+        vm5.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+    AsyncInvocation<Void> startSenderwithCleanQueuesInVM6 =
+        vm6.invokeAsync(() -> startSenderwithCleanQueues("ln"));
+
+
+    startSenderwithCleanQueuesInVM4.await();
+    startSenderwithCleanQueuesInVM5.await();
+    startSenderwithCleanQueuesInVM6.await();
+
+    logger.info("Waiting for senders running.");
+    // wait for senders running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+
+    logger.info("All the senders are now running...");
+
+    // restart the vm
+    vm7.invoke("Create back the cache", () -> createCache(lnPort));
+
+    // create senders with disk store
+    vm7.invoke("Create sender back from the disk store.",
+        () -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, 
false, true,
+            null, diskStore4, false));
+
+    // create PR on local site
+    vm7.invoke("Create back the partitioned region",
+        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), "ln", 1,
+            100, isOffHeap()));
+
+    // wait for senders running
+    // 
----------------------------------------------------------------------------------------------------
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 0));
+
+  }
+
+
+}

Reply via email to