This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-10279 in repository https://gitbox.apache.org/repos/asf/geode.git
commit b617db87a97516972508504ffa5b662a83bd78a8 Author: zhouxh <gz...@pivotal.io> AuthorDate: Wed May 4 17:03:49 2022 -0700 GEODE-10279: Need to lock RVV and flush before backup --- .../apache/geode/internal/cache/DiskStoreImpl.java | 52 ++++++++++++++++++++++ .../geode/internal/cache/backup/BackupTask.java | 13 ++++-- .../geode/internal/cache/backup/FlushToDisk.java | 4 ++ 3 files changed, 66 insertions(+), 3 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java index 9dee1c1c77..44be93e885 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java @@ -87,6 +87,7 @@ import org.apache.geode.cache.DiskStoreFactory; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.persistence.PersistentID; import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.backup.BackupService; @@ -1509,6 +1510,57 @@ public class DiskStoreImpl implements DiskStore { return cache; } + HashMap<LocalRegion, RegionVersionVector> lockedRVVs = new HashMap<>(); + + public void unlockRVVForAllDiskRegions() { + InternalDistributedMember myId = cache.getInternalDistributedSystem().getDistributedMember(); + Iterator<Map.Entry<LocalRegion, RegionVersionVector>> iterator = + lockedRVVs.entrySet().iterator(); + boolean unlocked = false; + while (iterator.hasNext()) { + Map.Entry<LocalRegion, RegionVersionVector> entry = iterator.next(); + LocalRegion lr = entry.getKey(); + RegionVersionVector rvv = entry.getValue(); + rvv.unlockForClear(myId); + iterator.remove(); + if (logger.isDebugEnabled()) { + logger.debug("Unlocked " + lr.getFullPath() + "'s RVV:" + rvv); + } + unlocked = true; + } + if (unlocked && logger.isDebugEnabled()) { + logger.debug("Unlocked RVVs for all disk regions of diskstore " + getName()); + } + } + + public void lockRVVForAllDiskRegions() { + Collection<DiskRegion> diskRegions = getDiskRegions(); + DistributionManager dm = cache.getDistributionManager(); + InternalDistributedMember myId = cache.getInternalDistributedSystem().getDistributedMember(); + boolean locked = false; + try { + for (DiskRegion dr : diskRegions) { + LocalRegion region = (LocalRegion) getCache().getRegion(dr.getName()); + if (region != null && region.getVersionVector() != null) { + RegionVersionVector rvv = region.getVersionVector(); + lockedRVVs.put(region, rvv); + rvv.lockForClear(dr.getName(), dm, myId); + if (logger.isDebugEnabled()) { + logger.debug("Locked " + dr.getName() + "'s RegionVersionVector"); + } + locked = true; + } + } + if (locked && logger.isDebugEnabled()) { + logger.info("Locked RVVs for all disk regions of " + this.getName()); + } + } catch (Exception e) { + unlockRVVForAllDiskRegions(); + logger.info("lockRVVForAllDiskRegionsAndFlush failed due to ", e); + throw e; + } + } + @Override public void flush() { forceFlush(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java index 8124dee40d..0a42625cfb 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java @@ -104,8 +104,8 @@ class BackupTask { return new HashSet<>(); } + Collection<DiskStore> diskStores = cache.listDiskStoresIncludingRegionOwned(); try { - Collection<DiskStore> diskStores = cache.listDiskStoresIncludingRegionOwned(); temporaryFiles = TemporaryBackupFiles.create(); fileCopier = new BackupFileCopier(cache, ClassPathLoader.getLatest().getJarDeploymentService(), temporaryFiles); @@ -124,6 +124,9 @@ class BackupTask { } return persistentIds; } finally { + for (DiskStore ds : diskStores) { + ((DiskStoreImpl) ds).unlockRVVForAllDiskRegions(); + } cleanup(); } } @@ -272,8 +275,12 @@ class BackupTask { backup = new DiskStoreBackup(allOplogs); backupByDiskStore.put(diskStore, backup); - fileCopier.copyDiskInitFile(diskStore); - diskStore.getPersistentOplogSet().forceRoll(null); + try { + fileCopier.copyDiskInitFile(diskStore); + diskStore.getPersistentOplogSet().forceRoll(null); + } finally { + diskStore.unlockRVVForAllDiskRegions(); + } if (logger.isDebugEnabled()) { logger.debug("Finished backup of disk store {}", diskStore.getName()); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDisk.java b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDisk.java index 8149d3406b..b5a02c62e3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDisk.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDisk.java @@ -15,6 +15,7 @@ package org.apache.geode.internal.cache.backup; import org.apache.geode.cache.DiskStore; +import org.apache.geode.internal.cache.DiskStoreImpl; import org.apache.geode.internal.cache.InternalCache; class FlushToDisk { @@ -27,6 +28,9 @@ class FlushToDisk { void run() { if (cache != null) { + for (DiskStore diskStore : cache.listDiskStoresIncludingRegionOwned()) { + ((DiskStoreImpl) diskStore).lockRVVForAllDiskRegions(); + } cache.listDiskStoresIncludingRegionOwned().forEach(DiskStore::flush); } }