GEODE-3055: The old primary's the shadow bucket is not
initialized when rebalance remove it. Thus the new primary candidate can
never initialize from it. The fix is to wait until new primary exists before
remove the old primary's bucket in rebalance.

There's another issue: the CreateMissingBucketsTask did not wait until the
region's buckets finished recovery, thus its check usually did nothing.
If a shadow region bucket failed to initialize due to race condition, then
no way to create missing bucket of the shadow region.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/24c004d8
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/24c004d8
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/24c004d8

Branch: refs/heads/feature/GEM-1483
Commit: 24c004d8a4ccf212fd3ea89adab027fc52522326
Parents: 0ea489e
Author: zhouxh <gz...@pivotal.io>
Authored: Thu Jun 8 14:52:56 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Fri Jul 21 14:00:02 2017 -0700

----------------------------------------------------------------------
 .../internal/cache/PRHARedundancyProvider.java  | 28 ++++++++--
 .../cache/PartitionedRegionDataStore.java       | 58 +++++++++++++++++++-
 2 files changed, 80 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/24c004d8/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
index f8e2108..2c1905f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
@@ -1798,10 +1798,30 @@ public class PRHARedundancyProvider {
   public void scheduleCreateMissingBuckets() {
     if (this.prRegion.getColocatedWith() != null
         && ColocationHelper.isColocationComplete(this.prRegion)) {
-      Runnable task = new CreateMissingBucketsTask(this);
-      final InternalResourceManager resourceManager =
-          this.prRegion.getGemFireCache().getInternalResourceManager();
-      resourceManager.getExecutor().execute(task);
+      Runnable waitThread = new Runnable() {
+        @Override
+        public void run() {
+          boolean interrupted = false;
+          while (!PRHARedundancyProvider.this.isPersistentRecoveryComplete()) {
+            try {
+              prRegion.getLogger().info("GGG7:waiting for recovery:" + 
prRegion.getFullPath());
+              prRegion.getCancelCriterion().checkCancelInProgress(null);
+              Thread.sleep(100);
+            } catch (InterruptedException e) {
+              interrupted = true;
+            }
+          }
+
+          if (!interrupted) {
+            prRegion.getLogger().info("GGG7:waited for recovery:" + 
prRegion.getFullPath());
+            Runnable task = new 
CreateMissingBucketsTask(PRHARedundancyProvider.this);
+            final InternalResourceManager resourceManager =
+                prRegion.getGemFireCache().getInternalResourceManager();
+            resourceManager.getExecutor().execute(task);
+          }
+        }
+      };
+      waitThread.run();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/24c004d8/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index 0318c75..bf708a9 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -1417,6 +1417,39 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
     }
   }
 
+  public boolean isRemotePrimaryReadyForColocatedChildren(int bucketId) {
+    boolean isRemotePrimaryReady = true;
+    InternalDistributedMember myId =
+        
this.partitionedRegion.getDistributionManager().getDistributionManagerId();
+
+    List<PartitionedRegion> colocatedChildPRs =
+        ColocationHelper.getColocatedChildRegions(this.partitionedRegion);
+    if (colocatedChildPRs != null) {
+      for (PartitionedRegion pr : colocatedChildPRs) {
+        InternalDistributedMember primaryChild = pr.getBucketPrimary(bucketId);
+        if (logger.isDebugEnabled()) {
+          logger.debug("Checking colocated child bucket " + pr + ", bucketId=" 
+ bucketId
+              + ", primary is " + primaryChild);
+        }
+        if (primaryChild == null || myId.equals(primaryChild)) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Colocated bucket region " + pr + " " + bucketId
+                + " does not have a remote primary yet. Not to remove.");
+          }
+          return false;
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger
+                .debug(pr + " bucketId=" + bucketId + " has remote primary, 
checking its children");
+          }
+          isRemotePrimaryReady = isRemotePrimaryReady
+              && 
pr.getDataStore().isRemotePrimaryReadyForColocatedChildren(bucketId);
+        }
+      }
+    }
+    return isRemotePrimaryReady;
+  }
+
   /**
    * Removes a redundant bucket hosted by this data store. The rebalancer 
invokes this method
    * directly or sends this member a message to invoke it.
@@ -1471,7 +1504,11 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
 
       }
 
+      PartitionedRegion leader = 
ColocationHelper.getLeaderRegion(this.partitionedRegion);
+      boolean isLeader = leader.equals(this.partitionedRegion);
       BucketAdvisor bucketAdvisor = bucketRegion.getBucketAdvisor();
+      InternalDistributedMember myId =
+          
this.partitionedRegion.getDistributionManager().getDistributionManagerId();
       Lock writeLock = bucketAdvisor.getActiveWriteLock();
 
       // Fix for 43613 - don't remove the bucket
@@ -1484,6 +1521,25 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
           return false;
         }
 
+        if (isLeader) {
+          if (!isRemotePrimaryReadyForColocatedChildren(bucketId)) {
+            return false;
+          }
+
+          InternalDistributedMember primary = bucketAdvisor.getPrimary();
+          if (primary == null || myId.equals(primary)) {
+            if (logger.isDebugEnabled()) {
+              logger.debug("Bucket region " + bucketRegion
+                  + " does not have a remote primary yet. Not to remove.");
+            }
+            return false;
+          }
+
+          if (logger.isDebugEnabled()) {
+            logger.debug("Bucket region " + bucketRegion + " has primary at " 
+ primary);
+          }
+        }
+
         // recurse down to each tier of children to remove first
         removeBucketForColocatedChildren(bucketId, forceRemovePrimary);
 
@@ -1513,8 +1569,6 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
       // because it won't block write operations while we're trying to acquire
       // the activePrimaryMoveLock
       InternalDistributedMember primary = bucketAdvisor.getPrimary();
-      InternalDistributedMember myId =
-          
this.partitionedRegion.getDistributionManager().getDistributionManagerId();
       if (!myId.equals(primary)) {
         StateFlushOperation flush = new StateFlushOperation(bucketRegion);
         int executor = DistributionManager.WAITING_POOL_EXECUTOR;

Reply via email to