This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-10279
in repository https://gitbox.apache.org/repos/asf/geode.git

commit b617db87a97516972508504ffa5b662a83bd78a8
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Wed May 4 17:03:49 2022 -0700

    GEODE-10279: Need to lock RVV and flush before backup
---
 .../apache/geode/internal/cache/DiskStoreImpl.java | 52 ++++++++++++++++++++++
 .../geode/internal/cache/backup/BackupTask.java    | 13 ++++--
 .../geode/internal/cache/backup/FlushToDisk.java   |  4 ++
 3 files changed, 66 insertions(+), 3 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index 9dee1c1c77..44be93e885 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -87,6 +87,7 @@ import org.apache.geode.cache.DiskStoreFactory;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.persistence.PersistentID;
 import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.backup.BackupService;
@@ -1509,6 +1510,57 @@ public class DiskStoreImpl implements DiskStore {
     return cache;
   }
 
+  HashMap<LocalRegion, RegionVersionVector> lockedRVVs = new HashMap<>();
+
+  public void unlockRVVForAllDiskRegions() {
+    InternalDistributedMember myId = 
cache.getInternalDistributedSystem().getDistributedMember();
+    Iterator<Map.Entry<LocalRegion, RegionVersionVector>> iterator =
+        lockedRVVs.entrySet().iterator();
+    boolean unlocked = false;
+    while (iterator.hasNext()) {
+      Map.Entry<LocalRegion, RegionVersionVector> entry = iterator.next();
+      LocalRegion lr = entry.getKey();
+      RegionVersionVector rvv = entry.getValue();
+      rvv.unlockForClear(myId);
+      iterator.remove();
+      if (logger.isDebugEnabled()) {
+        logger.debug("Unlocked " + lr.getFullPath() + "'s RVV:" + rvv);
+      }
+      unlocked = true;
+    }
+    if (unlocked && logger.isDebugEnabled()) {
+      logger.debug("Unlocked RVVs for all disk regions of diskstore " + 
getName());
+    }
+  }
+
+  public void lockRVVForAllDiskRegions() {
+    Collection<DiskRegion> diskRegions = getDiskRegions();
+    DistributionManager dm = cache.getDistributionManager();
+    InternalDistributedMember myId = 
cache.getInternalDistributedSystem().getDistributedMember();
+    boolean locked = false;
+    try {
+      for (DiskRegion dr : diskRegions) {
+        LocalRegion region = (LocalRegion) getCache().getRegion(dr.getName());
+        if (region != null && region.getVersionVector() != null) {
+          RegionVersionVector rvv = region.getVersionVector();
+          lockedRVVs.put(region, rvv);
+          rvv.lockForClear(dr.getName(), dm, myId);
+          if (logger.isDebugEnabled()) {
+            logger.debug("Locked " + dr.getName() + "'s RegionVersionVector");
+          }
+          locked = true;
+        }
+      }
+      if (locked && logger.isDebugEnabled()) {
+        logger.info("Locked RVVs for all disk regions of " + this.getName());
+      }
+    } catch (Exception e) {
+      unlockRVVForAllDiskRegions();
+      logger.info("lockRVVForAllDiskRegionsAndFlush failed due to ", e);
+      throw e;
+    }
+  }
+
   @Override
   public void flush() {
     forceFlush();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java
index 8124dee40d..0a42625cfb 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java
@@ -104,8 +104,8 @@ class BackupTask {
       return new HashSet<>();
     }
 
+    Collection<DiskStore> diskStores = 
cache.listDiskStoresIncludingRegionOwned();
     try {
-      Collection<DiskStore> diskStores = 
cache.listDiskStoresIncludingRegionOwned();
       temporaryFiles = TemporaryBackupFiles.create();
       fileCopier = new BackupFileCopier(cache,
           ClassPathLoader.getLatest().getJarDeploymentService(), 
temporaryFiles);
@@ -124,6 +124,9 @@ class BackupTask {
       }
       return persistentIds;
     } finally {
+      for (DiskStore ds : diskStores) {
+        ((DiskStoreImpl) ds).unlockRVVForAllDiskRegions();
+      }
       cleanup();
     }
   }
@@ -272,8 +275,12 @@ class BackupTask {
           backup = new DiskStoreBackup(allOplogs);
           backupByDiskStore.put(diskStore, backup);
 
-          fileCopier.copyDiskInitFile(diskStore);
-          diskStore.getPersistentOplogSet().forceRoll(null);
+          try {
+            fileCopier.copyDiskInitFile(diskStore);
+            diskStore.getPersistentOplogSet().forceRoll(null);
+          } finally {
+            diskStore.unlockRVVForAllDiskRegions();
+          }
 
           if (logger.isDebugEnabled()) {
             logger.debug("Finished backup of disk store {}", 
diskStore.getName());
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDisk.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDisk.java
index 8149d3406b..b5a02c62e3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDisk.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/FlushToDisk.java
@@ -15,6 +15,7 @@
 package org.apache.geode.internal.cache.backup;
 
 import org.apache.geode.cache.DiskStore;
+import org.apache.geode.internal.cache.DiskStoreImpl;
 import org.apache.geode.internal.cache.InternalCache;
 
 class FlushToDisk {
@@ -27,6 +28,9 @@ class FlushToDisk {
 
   void run() {
     if (cache != null) {
+      for (DiskStore diskStore : cache.listDiskStoresIncludingRegionOwned()) {
+        ((DiskStoreImpl) diskStore).lockRVVForAllDiskRegions();
+      }
       cache.listDiskStoresIncludingRegionOwned().forEach(DiskStore::flush);
     }
   }

Reply via email to