This is an automated email from the ASF dual-hosted git repository. jmelchior pushed a commit to branch support/1.14 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.14 by this push: new dede61d GEODE-9990: turn DiskAccessException into CacheClosedException (#7334) (#7379) dede61d is described below commit dede61d4813ecba9a51c7417214ea69092a88172 Author: Joris Melchior <joris.melch...@gmail.com> AuthorDate: Wed Feb 23 13:30:48 2022 -0500 GEODE-9990: turn DiskAccessException into CacheClosedException (#7334) (#7379) * GEODE-9990: turn DiskAccessException into CacheClosedException - when DiskInitFile is in closed state and DiskStoreImpl is closed or closing - catch DiskAccessException in PRHARedundancyProvider and turn into CacheClosedException if cache closing is in progress - change CreateBucketMessage to handle DiskAccessException as cause of ReplyException (cherry picked from commit a98197b5d0a3a2547e0581e475dcabaa82e6e92f) --- .../internal/cache/DiskInitFileJUnitTest.java | 72 ++++++++++++++++++++++ .../apache/geode/internal/cache/DiskInitFile.java | 27 +++++--- .../internal/cache/PRHARedundancyProvider.java | 10 +++ .../cache/partitioned/CreateBucketMessage.java | 5 +- .../internal/cache/PRHARedundancyProviderTest.java | 72 ++++++++++++++++++++++ 5 files changed, 177 insertions(+), 9 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java index 927616d..56b3e55 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java @@ -15,10 +15,15 @@ package org.apache.geode.internal.cache; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.File; @@ -31,8 +36,11 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.apache.geode.CancelCriterion; import org.apache.geode.Statistics; import org.apache.geode.StatisticsFactory; +import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.cache.DiskAccessException; import org.apache.geode.internal.cache.persistence.DiskRegionView; import org.apache.geode.internal.cache.persistence.DiskStoreID; @@ -134,4 +142,68 @@ public class DiskInitFileJUnitTest { assertThat(dif.hasKrf(2)).isFalse(); dif.destroy(); } + + @Test + public void markInitializedThrowsDiskAccessExceptionWhenInitFileClosedAndParentAndCacheNotClosing() { + markInitializedTestSetup(); + + DiskInitFile diskInitFile = + new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet()); + diskInitFile.close(); + + assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isInstanceOf( + DiskAccessException.class); + } + + @Test + public void markInitializedThrowsCacheClosedExceptionWhenInitFileClosedAndParentIsClosedOrClosing() { + markInitializedTestSetup(); + when(mockedDiskStoreImpl.isClosed()).thenReturn(Boolean.TRUE); + + DiskInitFile diskInitFile = + new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet()); + diskInitFile.close(); + + assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isInstanceOf( + CacheClosedException.class); + } + + @Test + public void markInitializedThrowsCacheClosedExceptionWhenCacheIsClosing() { + CancelCriterion cancelCriterion = markInitializedTestSetup(); + CacheClosedException cacheClosedException = new CacheClosedException("boom"); + doThrow(cacheClosedException).when(cancelCriterion).checkCancelInProgress(); + + DiskInitFile diskInitFile = + new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet()); + diskInitFile.close(); + + assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isEqualTo( + cacheClosedException); + } + + @Test + public void markInitializedCacheCloseIsCalledWhenParentHandlesDiskAccessException() { + markInitializedTestSetup(); + + DiskInitFile diskInitFile = + new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet()); + diskInitFile.close(); + + assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)) + .isInstanceOf(DiskAccessException.class); + verify(mockedDiskStoreImpl, times(1)).handleDiskAccessException(any(DiskAccessException.class)); + } + + private CancelCriterion markInitializedTestSetup() { + InternalCache internalCache = mock(InternalCache.class); + CancelCriterion cancelCriterion = mock(CancelCriterion.class); + DiskRegion diskRegion = mock(DiskRegion.class); + + when(mockedDiskStoreImpl.getCache()).thenReturn(internalCache); + when(mockedDiskStoreImpl.getById(anyLong())).thenReturn(diskRegion); + when(internalCache.getCancelCriterion()).thenReturn(cancelCriterion); + + return cancelCriterion; + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java index dd7b197..d3139ec 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java @@ -55,6 +55,7 @@ import org.apache.geode.CancelCriterion; import org.apache.geode.CancelException; import org.apache.geode.DataSerializer; import org.apache.geode.Instantiator; +import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.DiskAccessException; import org.apache.geode.cache.EvictionAction; import org.apache.geode.cache.EvictionAlgorithm; @@ -1391,11 +1392,24 @@ public class DiskInitFile implements DiskInitFileInterpreter { writeIFRecord(bb, true); } + private void checkClosed() { + if (closed) { + parent.getCache().getCancelCriterion().checkCancelInProgress(); + + if (parent.isClosed() || parent.isClosing()) { + throw new CacheClosedException("The disk store is closed or closing"); + } + + DiskAccessException dae = new DiskAccessException("The disk init file is closed", parent); + parent.handleDiskAccessException(dae); + + throw dae; + } + } + private void writeIFRecord(ByteBuffer bb, boolean doStats) throws IOException { assert lock.isHeldByCurrentThread(); - if (this.closed) { - throw new DiskAccessException("The disk store is closed", parent); - } + checkClosed(); this.ifRAF.write(bb.array(), 0, bb.position()); if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) { @@ -1411,10 +1425,9 @@ public class DiskInitFile implements DiskInitFileInterpreter { private void writeIFRecord(HeapDataOutputStream hdos, boolean doStats) throws IOException { assert lock.isHeldByCurrentThread(); - if (this.closed) { - throw new DiskAccessException("The disk store is closed", parent); - } - hdos.sendTo(this.ifRAF); + checkClosed(); + + hdos.sendTo(ifRAF); if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) { logger.trace(LogMarker.PERSIST_WRITES_VERBOSE, "DiskInitFile writeIFRecord HDOS"); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java index e07ab55..0e24e0f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java @@ -43,11 +43,13 @@ import java.util.function.BiFunction; import org.apache.logging.log4j.Logger; +import org.apache.geode.CancelCriterion; import org.apache.geode.CancelException; import org.apache.geode.SystemFailure; import org.apache.geode.annotations.Immutable; import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.annotations.internal.MakeNotStatic; +import org.apache.geode.cache.DiskAccessException; import org.apache.geode.cache.PartitionedRegionStorageException; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; @@ -726,6 +728,14 @@ public class PRHARedundancyProvider { return bucketPrimary; } } + } catch (DiskAccessException dae) { + CancelCriterion cancelCriterion = partitionedRegion.getCancelCriterion(); + if (cancelCriterion.isCancelInProgress()) { + needToElectPrimary = false; + cancelCriterion.checkCancelInProgress(dae); + } + + throw dae; } catch (CancelException | RegionDestroyedException e) { // We don't need to elect a primary if the cache was closed. The other members will // take care of it. This ensures we don't compromise redundancy. diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java index e71715d..ae1caaf 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; import org.apache.geode.DataSerializer; +import org.apache.geode.cache.DiskAccessException; import org.apache.geode.cache.PartitionedRegionStorageException; import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.DistributionManager; @@ -339,9 +340,9 @@ public class CreateBucketMessage extends PartitionMessage { waitForRepliesUninterruptibly(); } catch (ReplyException e) { Throwable t = e.getCause(); - if (t instanceof CancelException) { + if (t instanceof DiskAccessException || t instanceof CancelException) { logger.debug( - "NodeResponse got remote cancellation, throwing PartitionedRegionCommunication Exception {}", + "NodeResponse got remote exception, throwing PartitionedRegionCommunication Exception {}", t.getMessage(), t); return null; } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java index 0744a35..48352a7 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java @@ -16,16 +16,21 @@ package org.apache.geode.internal.cache; import static org.apache.geode.cache.Region.SEPARATOR; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Collections; import java.util.HashSet; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -45,10 +50,13 @@ import org.mockito.stubbing.Answer; import org.apache.geode.CancelCriterion; import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.DiskAccessException; import org.apache.geode.cache.PartitionAttributes; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.control.InternalResourceManager; +import org.apache.geode.internal.cache.partitioned.Bucket; import org.apache.geode.internal.cache.partitioned.InternalPRInfo; import org.apache.geode.internal.cache.partitioned.LoadProbe; import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp; @@ -248,6 +256,70 @@ public class PRHARedundancyProviderTest { } @Test + public void createBucketAtomicallyConvertsDiskAccessExceptionWhenCacheCloseInProgress() { + String partitionName = "partitionName"; + DiskAccessException diskAccessException = new DiskAccessException("boom"); + CacheClosedException cacheClosedException = new CacheClosedException(diskAccessException); + InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class); + Set<InternalDistributedMember> memberSet = Collections.singleton(internalDistributedMember); + InternalCache internalCache = mock(InternalCache.class); + RegionAdvisor regionAdvisor = mock(RegionAdvisor.class); + Bucket bucket = mock(Bucket.class); + BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class); + CancelCriterion cancelCriterion = mock(CancelCriterion.class); + + prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager); + + when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor); + when(partitionedRegion.getCache()).thenReturn(internalCache); + when(partitionedRegion.getCancelCriterion()).thenReturn(cancelCriterion); + when(regionAdvisor.getBucket(anyInt())).thenReturn(bucket); + when(bucket.getBucketAdvisor()).thenReturn(bucketAdvisor); + when(internalCache.isCacheAtShutdownAll()) + .thenReturn(Boolean.FALSE) + .thenThrow(diskAccessException); + when(cancelCriterion.isCancelInProgress()).thenReturn(Boolean.TRUE); + doThrow(cacheClosedException).when(cancelCriterion).checkCancelInProgress(diskAccessException); + when(partitionedRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName)) + .thenReturn(memberSet); + + assertThatThrownBy( + () -> prHaRedundancyProvider.createBucketAtomically(1, 5000, false, partitionName)) + .isEqualTo(cacheClosedException); + } + + @Test + public void createBucketAtomicallyPropagatesDiskAccessExceptionWhenCacheCloseNotInProgress() { + String partitionName = "partitionName"; + DiskAccessException diskAccessException = new DiskAccessException("boom"); + InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class); + Set<InternalDistributedMember> memberSet = Collections.singleton(internalDistributedMember); + InternalCache internalCache = mock(InternalCache.class); + RegionAdvisor regionAdvisor = mock(RegionAdvisor.class); + Bucket bucket = mock(Bucket.class); + BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class); + CancelCriterion cancelCriterion = mock(CancelCriterion.class); + + prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager); + + when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor); + when(partitionedRegion.getCache()).thenReturn(internalCache); + when(partitionedRegion.getCancelCriterion()).thenReturn(cancelCriterion); + when(regionAdvisor.getBucket(anyInt())).thenReturn(bucket); + when(bucket.getBucketAdvisor()).thenReturn(bucketAdvisor); + when(internalCache.isCacheAtShutdownAll()) + .thenReturn(Boolean.FALSE) + .thenThrow(diskAccessException); + when(cancelCriterion.isCancelInProgress()).thenReturn(Boolean.FALSE); + when(partitionedRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName)) + .thenReturn(memberSet); + + assertThatThrownBy( + () -> prHaRedundancyProvider.createBucketAtomically(1, 5000, false, partitionName)) + .isEqualTo(diskAccessException); + } + + @Test @Parameters({"RUNTIME", "CANCEL", "REGION_DESTROYED"}) @TestCaseName("{method}[{index}]: {params}") public void startTaskCompletesExceptionallyIfExceptionIsThrown(