This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4d5f84c27a GEODE-10310: Add disable reatempt on CacheClose (#7690)
4d5f84c27a is described below

commit 4d5f84c27aae4b268731e7afce6aa078f27c4e94
Author: Mario Ivanac <48509724+miva...@users.noreply.github.com>
AuthorDate: Thu May 19 22:22:07 2022 +0200

    GEODE-10310: Add disable reatempt on CacheClose (#7690)
    
    * GEODE-10310: Add disable reatempt on CacheClose
---
 ...onedRegionCacheCloseNoRetryDistributedTest.java | 288 +++++++++++++++++++++
 .../cache/partitioned/PartitionMessage.java        |  14 +
 2 files changed, 302 insertions(+)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionCacheCloseNoRetryDistributedTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionCacheCloseNoRetryDistributedTest.java
new file mode 100644
index 0000000000..e5e7f3ae4c
--- /dev/null
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionCacheCloseNoRetryDistributedTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT;
+import static 
org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER;
+import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.InternalGemFireException;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.control.InternalResourceManager;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+import org.apache.geode.util.internal.GeodeGlossary;
+
+public class PartitionedRegionCacheCloseNoRetryDistributedTest implements 
Serializable {
+
+  private String partitionedRegionName;
+
+  private VM vm0;
+  private VM vm1;
+  private VM vm2;
+  private VM vm3;
+
+  private static final long TIMEOUT_MILLIS = 
GeodeAwaitility.getTimeout().toMillis();
+
+  @Rule
+  public CacheRule cacheRule =
+      CacheRule.builder().addConfig(getDistributedSystemProperties()).build();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+
+  @Before
+  public void setUp() {
+    vm0 = getVM(0);
+    vm1 = getVM(1);
+    vm2 = getVM(2);
+    vm3 = getVM(3);
+
+    invokeInEveryVM(() -> {
+      System.setProperty(
+          GeodeGlossary.GEMFIRE_PREFIX + 
"PartitionMessage.DISABLE_REATTEMPT_ON_CACHE_CLOSE",
+          "true");
+    });
+    String uniqueName = getClass().getSimpleName() + "-" + 
testName.getMethodName();
+    partitionedRegionName = uniqueName + "-partitionedRegion";
+  }
+
+  @After
+  public void tearDown() {
+    invokeInEveryVM(() -> {
+      System.clearProperty(
+          GeodeGlossary.GEMFIRE_PREFIX + 
"PartitionMessage.DISABLE_REATTEMPT_ON_CACHE_CLOSE");
+      InternalResourceManager.setResourceObserver(null);
+      DistributionMessageObserver.setInstance(null);
+    });
+  }
+
+  private Properties getDistributedSystemProperties() {
+    Properties config = new Properties();
+    config.setProperty(SERIALIZABLE_OBJECT_FILTER, 
TestFunction.class.getName());
+    return config;
+  }
+
+
+  @Test
+  public void testCacheCloseDuringWrite()
+      throws InterruptedException {
+    int redundantCopies = 1;
+    int recoveryDelay = -1;
+    int numBuckets = 100;
+    boolean diskSynchronous = true;
+
+    vm0.invoke(() -> {
+      createPartitionedRegion(redundantCopies, recoveryDelay, numBuckets, 
diskSynchronous);
+      createData(0, numBuckets, "a");
+    });
+
+    vm1.invoke(() -> {
+      createPartitionedRegion(redundantCopies, recoveryDelay, numBuckets, 
diskSynchronous);
+    });
+
+    // Need to invoke this async because vm1 will wait for vm0 to come back 
online
+    // unless we explicitly revoke it.
+
+    int endData = 10000;
+
+    AsyncInvocation createRegionDataAsync = vm0.invokeAsync(
+        () -> {
+          Exception exc = null;
+          try {
+            createData(numBuckets, endData, "b");
+          } catch (Exception e) {
+            exc = e;
+          }
+
+          assertThat(exc).isNotNull();
+          assertThat(exc).isInstanceOf(InternalGemFireException.class);
+
+        });
+
+    AsyncInvocation closeCacheAsync = vm1.invokeAsync(
+        () -> {
+          getCache().close();
+        });
+
+    closeCacheAsync.get();
+    createRegionDataAsync.get();
+
+  }
+
+
+  @Test
+  public void testCacheCloseDuringInvalidate()
+      throws InterruptedException {
+    int redundantCopies = 1;
+    int recoveryDelay = -1;
+    int numBuckets = 100;
+    boolean diskSynchronous = true;
+    int endData = 10000;
+
+    vm0.invoke(() -> {
+      createPartitionedRegion(redundantCopies, recoveryDelay, numBuckets, 
diskSynchronous);
+      createData(0, endData, "a");
+    });
+
+    vm1.invoke(() -> {
+      createPartitionedRegion(redundantCopies, recoveryDelay, numBuckets, 
diskSynchronous);
+    });
+
+    // Need to invoke this async because vm1 will wait for vm0 to come back 
online
+    // unless we explicitly revoke it.
+
+    AsyncInvocation invalidateRegionDataAsync = vm0.invokeAsync(
+        () -> {
+          Exception exc = null;
+          try {
+            invalidateData(0, endData);
+          } catch (Exception e) {
+            exc = e;
+          }
+
+          assertThat(exc).isNotNull();
+          assertThat(exc).isInstanceOf(InternalGemFireException.class);
+
+        });
+
+    AsyncInvocation closeCacheAsync = vm1.invokeAsync(
+        () -> {
+          getCache().close();
+        });
+
+    closeCacheAsync.get();
+    invalidateRegionDataAsync.get();
+
+  }
+
+
+  private void createPartitionedRegion(final int redundancy, final int 
recoveryDelay,
+      final int numBuckets, final boolean synchronous) throws 
InterruptedException {
+    CountDownLatch recoveryDone = new CountDownLatch(1);
+
+    if (redundancy > 0) {
+      InternalResourceManager.ResourceObserver observer =
+          new InternalResourceManager.ResourceObserverAdapter() {
+            @Override
+            public void recoveryFinished(Region region) {
+              recoveryDone.countDown();
+            }
+          };
+
+      InternalResourceManager.setResourceObserver(observer);
+    } else {
+      recoveryDone.countDown();
+    }
+
+    PartitionAttributesFactory<?, ?> partitionAttributesFactory = new 
PartitionAttributesFactory();
+    partitionAttributesFactory.setRedundantCopies(redundancy);
+    partitionAttributesFactory.setRecoveryDelay(recoveryDelay);
+    partitionAttributesFactory.setTotalNumBuckets(numBuckets);
+    partitionAttributesFactory.setLocalMaxMemory(500);
+
+    RegionFactory<?, ?> regionFactory =
+        getCache().createRegionFactory(PARTITION_PERSISTENT);
+    regionFactory.setDiskSynchronous(synchronous);
+    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
+
+    regionFactory.create(partitionedRegionName);
+
+    recoveryDone.await(TIMEOUT_MILLIS, MILLISECONDS);
+  }
+
+
+  private InternalCache getCache() {
+    return cacheRule.getOrCreateCache();
+  }
+
+  private void createData(final int startKey, final int endKey, final String 
value) {
+    createDataFor(startKey, endKey, value, partitionedRegionName);
+  }
+
+  private void createDataFor(final int startKey, final int endKey, final 
String value,
+      final String regionName) {
+    Region<Integer, String> region = getCache().getRegion(regionName);
+    for (int i = startKey; i < endKey; i++) {
+      region.put(i, value);
+    }
+  }
+
+  private void invalidateData(final int startKey, final int endKey) {
+    invalidateDataFor(startKey, endKey, partitionedRegionName);
+  }
+
+  private void invalidateDataFor(final int startKey, final int endKey,
+      final String regionName) {
+    Region<?, ?> region = getCache().getRegion(regionName);
+    for (int i = startKey; i < endKey; i++) {
+      region.invalidate(i);
+    }
+  }
+
+
+  private static class TestFunction implements Function, Serializable {
+
+    @Override
+    public void execute(final FunctionContext context) {
+      context.getResultSender().lastResult(null);
+    }
+
+    @Override
+    public String getId() {
+      return TestFunction.class.getSimpleName();
+    }
+
+    @Override
+    public boolean hasResult() {
+      return true;
+    }
+
+    @Override
+    public boolean optimizeForWrite() {
+      return false;
+    }
+
+    @Override
+    public boolean isHA() {
+      return false;
+    }
+  }
+
+
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
index 49e5f51e0a..8ebc4da469 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
@@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
 import org.apache.geode.InternalGemFireError;
+import org.apache.geode.InternalGemFireException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.annotations.Immutable;
 import org.apache.geode.cache.CacheClosedException;
@@ -67,6 +68,7 @@ import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.serialization.SerializationContext;
 import org.apache.geode.internal.serialization.StaticSerialization;
 import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.util.internal.GeodeGlossary;
 
 /**
  * The base PartitionedRegion message type upon which other messages should be 
based.
@@ -84,6 +86,10 @@ public abstract class PartitionMessage extends 
DistributionMessage
           "Unknown exception")
               .fillInStackTrace();
 
+
+  private static final boolean disableReatemptOnCacheClose =
+      Boolean.getBoolean(
+          GeodeGlossary.GEMFIRE_PREFIX + 
"PartitionMessage.DISABLE_REATTEMPT_ON_CACHE_CLOSE");
   int regionId;
 
   int processorId;
@@ -838,6 +844,14 @@ public abstract class PartitionMessage extends 
DistributionMessage
           throw new PrimaryBucketException(
               "Peer failed primary test", t);
         } else if (t instanceof CancelException) {
+          if (disableReatemptOnCacheClose) {
+            logger.debug(
+                "PartitionResponse got CacheClosedException from {}, throwing 
InternalGemFireException",
+                e.getSender(), t);
+            throw new InternalGemFireException(
+                "No retry after CacheClosedException from " + e.getSender());
+          }
+
           logger.debug(
               "PartitionResponse got CacheClosedException from {}, throwing 
ForceReattemptException",
               e.getSender(), t);

Reply via email to