This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 4d5f84c27a GEODE-10310: Add disable reatempt on CacheClose (#7690) 4d5f84c27a is described below commit 4d5f84c27aae4b268731e7afce6aa078f27c4e94 Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Thu May 19 22:22:07 2022 +0200 GEODE-10310: Add disable reatempt on CacheClose (#7690) * GEODE-10310: Add disable reatempt on CacheClose --- ...onedRegionCacheCloseNoRetryDistributedTest.java | 288 +++++++++++++++++++++ .../cache/partitioned/PartitionMessage.java | 14 + 2 files changed, 302 insertions(+) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionCacheCloseNoRetryDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionCacheCloseNoRetryDistributedTest.java new file mode 100644 index 0000000000..e5e7f3ae4c --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionCacheCloseNoRetryDistributedTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.partitioned; + + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT; +import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER; +import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM; +import static org.apache.geode.test.dunit.VM.getVM; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.InternalGemFireException; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.control.InternalResourceManager; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; +import org.apache.geode.util.internal.GeodeGlossary; + +public class PartitionedRegionCacheCloseNoRetryDistributedTest implements Serializable { + + private String partitionedRegionName; + + private VM vm0; + private VM vm1; + private VM vm2; + private VM vm3; + + private static final long TIMEOUT_MILLIS = GeodeAwaitility.getTimeout().toMillis(); + + @Rule + public CacheRule cacheRule = + CacheRule.builder().addConfig(getDistributedSystemProperties()).build(); + + @Rule + public SerializableTestName testName = new SerializableTestName(); + + + @Before + public void setUp() { + vm0 = getVM(0); + vm1 = getVM(1); + vm2 = getVM(2); + vm3 = getVM(3); + + invokeInEveryVM(() -> { + System.setProperty( + GeodeGlossary.GEMFIRE_PREFIX + "PartitionMessage.DISABLE_REATTEMPT_ON_CACHE_CLOSE", + "true"); + }); + String uniqueName = getClass().getSimpleName() + "-" + testName.getMethodName(); + partitionedRegionName = uniqueName + "-partitionedRegion"; + } + + @After + public void tearDown() { + invokeInEveryVM(() -> { + System.clearProperty( + GeodeGlossary.GEMFIRE_PREFIX + "PartitionMessage.DISABLE_REATTEMPT_ON_CACHE_CLOSE"); + InternalResourceManager.setResourceObserver(null); + DistributionMessageObserver.setInstance(null); + }); + } + + private Properties getDistributedSystemProperties() { + Properties config = new Properties(); + config.setProperty(SERIALIZABLE_OBJECT_FILTER, TestFunction.class.getName()); + return config; + } + + + @Test + public void testCacheCloseDuringWrite() + throws InterruptedException { + int redundantCopies = 1; + int recoveryDelay = -1; + int numBuckets = 100; + boolean diskSynchronous = true; + + vm0.invoke(() -> { + createPartitionedRegion(redundantCopies, recoveryDelay, numBuckets, diskSynchronous); + createData(0, numBuckets, "a"); + }); + + vm1.invoke(() -> { + createPartitionedRegion(redundantCopies, recoveryDelay, numBuckets, diskSynchronous); + }); + + // Need to invoke this async because vm1 will wait for vm0 to come back online + // unless we explicitly revoke it. + + int endData = 10000; + + AsyncInvocation createRegionDataAsync = vm0.invokeAsync( + () -> { + Exception exc = null; + try { + createData(numBuckets, endData, "b"); + } catch (Exception e) { + exc = e; + } + + assertThat(exc).isNotNull(); + assertThat(exc).isInstanceOf(InternalGemFireException.class); + + }); + + AsyncInvocation closeCacheAsync = vm1.invokeAsync( + () -> { + getCache().close(); + }); + + closeCacheAsync.get(); + createRegionDataAsync.get(); + + } + + + @Test + public void testCacheCloseDuringInvalidate() + throws InterruptedException { + int redundantCopies = 1; + int recoveryDelay = -1; + int numBuckets = 100; + boolean diskSynchronous = true; + int endData = 10000; + + vm0.invoke(() -> { + createPartitionedRegion(redundantCopies, recoveryDelay, numBuckets, diskSynchronous); + createData(0, endData, "a"); + }); + + vm1.invoke(() -> { + createPartitionedRegion(redundantCopies, recoveryDelay, numBuckets, diskSynchronous); + }); + + // Need to invoke this async because vm1 will wait for vm0 to come back online + // unless we explicitly revoke it. + + AsyncInvocation invalidateRegionDataAsync = vm0.invokeAsync( + () -> { + Exception exc = null; + try { + invalidateData(0, endData); + } catch (Exception e) { + exc = e; + } + + assertThat(exc).isNotNull(); + assertThat(exc).isInstanceOf(InternalGemFireException.class); + + }); + + AsyncInvocation closeCacheAsync = vm1.invokeAsync( + () -> { + getCache().close(); + }); + + closeCacheAsync.get(); + invalidateRegionDataAsync.get(); + + } + + + private void createPartitionedRegion(final int redundancy, final int recoveryDelay, + final int numBuckets, final boolean synchronous) throws InterruptedException { + CountDownLatch recoveryDone = new CountDownLatch(1); + + if (redundancy > 0) { + InternalResourceManager.ResourceObserver observer = + new InternalResourceManager.ResourceObserverAdapter() { + @Override + public void recoveryFinished(Region region) { + recoveryDone.countDown(); + } + }; + + InternalResourceManager.setResourceObserver(observer); + } else { + recoveryDone.countDown(); + } + + PartitionAttributesFactory<?, ?> partitionAttributesFactory = new PartitionAttributesFactory(); + partitionAttributesFactory.setRedundantCopies(redundancy); + partitionAttributesFactory.setRecoveryDelay(recoveryDelay); + partitionAttributesFactory.setTotalNumBuckets(numBuckets); + partitionAttributesFactory.setLocalMaxMemory(500); + + RegionFactory<?, ?> regionFactory = + getCache().createRegionFactory(PARTITION_PERSISTENT); + regionFactory.setDiskSynchronous(synchronous); + regionFactory.setPartitionAttributes(partitionAttributesFactory.create()); + + regionFactory.create(partitionedRegionName); + + recoveryDone.await(TIMEOUT_MILLIS, MILLISECONDS); + } + + + private InternalCache getCache() { + return cacheRule.getOrCreateCache(); + } + + private void createData(final int startKey, final int endKey, final String value) { + createDataFor(startKey, endKey, value, partitionedRegionName); + } + + private void createDataFor(final int startKey, final int endKey, final String value, + final String regionName) { + Region<Integer, String> region = getCache().getRegion(regionName); + for (int i = startKey; i < endKey; i++) { + region.put(i, value); + } + } + + private void invalidateData(final int startKey, final int endKey) { + invalidateDataFor(startKey, endKey, partitionedRegionName); + } + + private void invalidateDataFor(final int startKey, final int endKey, + final String regionName) { + Region<?, ?> region = getCache().getRegion(regionName); + for (int i = startKey; i < endKey; i++) { + region.invalidate(i); + } + } + + + private static class TestFunction implements Function, Serializable { + + @Override + public void execute(final FunctionContext context) { + context.getResultSender().lastResult(null); + } + + @Override + public String getId() { + return TestFunction.class.getSimpleName(); + } + + @Override + public boolean hasResult() { + return true; + } + + @Override + public boolean optimizeForWrite() { + return false; + } + + @Override + public boolean isHA() { + return false; + } + } + + +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java index 49e5f51e0a..8ebc4da469 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; import org.apache.geode.InternalGemFireError; +import org.apache.geode.InternalGemFireException; import org.apache.geode.SystemFailure; import org.apache.geode.annotations.Immutable; import org.apache.geode.cache.CacheClosedException; @@ -67,6 +68,7 @@ import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.internal.serialization.SerializationContext; import org.apache.geode.internal.serialization.StaticSerialization; import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.util.internal.GeodeGlossary; /** * The base PartitionedRegion message type upon which other messages should be based. @@ -84,6 +86,10 @@ public abstract class PartitionMessage extends DistributionMessage "Unknown exception") .fillInStackTrace(); + + private static final boolean disableReatemptOnCacheClose = + Boolean.getBoolean( + GeodeGlossary.GEMFIRE_PREFIX + "PartitionMessage.DISABLE_REATTEMPT_ON_CACHE_CLOSE"); int regionId; int processorId; @@ -838,6 +844,14 @@ public abstract class PartitionMessage extends DistributionMessage throw new PrimaryBucketException( "Peer failed primary test", t); } else if (t instanceof CancelException) { + if (disableReatemptOnCacheClose) { + logger.debug( + "PartitionResponse got CacheClosedException from {}, throwing InternalGemFireException", + e.getSender(), t); + throw new InternalGemFireException( + "No retry after CacheClosedException from " + e.getSender()); + } + logger.debug( "PartitionResponse got CacheClosedException from {}, throwing ForceReattemptException", e.getSender(), t);