GEODE-3055: real root cause is: The old primary's the shadow bucket is not
initialized when rebalance remove it. Thus the new primary candidate can
never initialize from it. The fix is to wait until new primary exists before
remove the old primary's bucket in rebalance.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/56a3fa75
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/56a3fa75
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/56a3fa75

Branch: refs/heads/feature/GEM-1483
Commit: 56a3fa75ad35943b1e447af00bdba37d3d225297
Parents: 32c74ec
Author: zhouxh <gz...@pivotal.io>
Authored: Wed Jun 14 14:31:30 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Wed Jun 14 15:01:32 2017 -0700

----------------------------------------------------------------------
 .../asyncqueue/internal/AsyncEventQueueImpl.java   |  5 ++---
 .../internal/cache/PartitionedRegionDataStore.java | 17 ++++++++++++++---
 .../internal/cache/wan/AbstractGatewaySender.java  |  6 ++----
 ...tilParallelGatewaySenderFlushedCoordinator.java | 16 +---------------
 ...elGatewaySenderFlushedCoordinatorJUnitTest.java | 15 +++++----------
 .../distributed/WaitUntilFlushedFunction.java      | 12 +-----------
 .../WaitUntilFlushedFunctionJUnitTest.java         |  5 +----
 7 files changed, 26 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/56a3fa75/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
index a420dab..bf7e874 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
@@ -217,8 +217,7 @@ public class AsyncEventQueueImpl implements AsyncEventQueue 
{
     return ((AbstractGatewaySender) this.sender).isForwardExpirationDestroy();
   }
 
-  public boolean waitUntilFlushed(Set<Integer> bucketIds, long timeout, 
TimeUnit unit)
-      throws InterruptedException {
-    return ((AbstractGatewaySender) this.sender).waitUntilFlushed(bucketIds, 
timeout, unit);
+  public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws 
InterruptedException {
+    return ((AbstractGatewaySender) this.sender).waitUntilFlushed(timeout, 
unit);
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/56a3fa75/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index 7cef0a5..74b8e6d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -1472,6 +1472,19 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
       }
 
       BucketAdvisor bucketAdvisor = bucketRegion.getBucketAdvisor();
+      InternalDistributedMember primary = bucketAdvisor.getPrimary();
+      InternalDistributedMember myId =
+          
this.partitionedRegion.getDistributionManager().getDistributionManagerId();
+      if (primary == null || myId.equals(primary)) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Bucket region " + bucketRegion
+              + " does not have a remote primary yet. Not to remove.");
+        }
+        return false;
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("Bucket region " + bucketRegion + " has primary at " + 
primary);
+      }
       Lock writeLock = bucketAdvisor.getActiveWriteLock();
 
       // Fix for 43613 - don't remove the bucket
@@ -1512,9 +1525,7 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
       // wait for in progress writes. I choose to use the StateFlushOperation
       // because it won't block write operations while we're trying to acquire
       // the activePrimaryMoveLock
-      InternalDistributedMember primary = bucketAdvisor.getPrimary();
-      InternalDistributedMember myId =
-          
this.partitionedRegion.getDistributionManager().getDistributionManagerId();
+      primary = bucketAdvisor.getPrimary();
       if (!myId.equals(primary)) {
         StateFlushOperation flush = new StateFlushOperation(bucketRegion);
         int executor = DistributionManager.WAITING_POOL_EXECUTOR;

http://git-wip-us.apache.org/repos/asf/geode/blob/56a3fa75/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index b6a23d6..c38d547 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1254,14 +1254,12 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
     return lifeCycleLock;
   }
 
-  public boolean waitUntilFlushed(Set<Integer> bucketIds, long timeout, 
TimeUnit unit)
-      throws InterruptedException {
+  public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws 
InterruptedException {
     boolean result = false;
     if (isParallel()) {
       try {
         WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-            new WaitUntilParallelGatewaySenderFlushedCoordinator(this, 
bucketIds, timeout, unit,
-                true);
+            new WaitUntilParallelGatewaySenderFlushedCoordinator(this, 
timeout, unit, true);
         result = coordinator.waitUntilFlushed();
       } catch (BucketMovedException | CancelException | 
RegionDestroyedException e) {
         logger.warn(

http://git-wip-us.apache.org/repos/asf/geode/blob/56a3fa75/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
index 7856324..42ce68c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
@@ -19,7 +19,6 @@ import org.apache.geode.distributed.internal.*;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.*;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
-import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import 
org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordinator;
 
 import java.util.ArrayList;
@@ -34,12 +33,9 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator
     extends WaitUntilGatewaySenderFlushedCoordinator {
   final static private int CALLABLES_CHUNK_SIZE = 10;
 
-  protected Set<Integer> bucketIds;
-
   public 
WaitUntilParallelGatewaySenderFlushedCoordinator(AbstractGatewaySender sender,
-      Set<Integer> bucketIds, long timeout, TimeUnit unit, boolean initiator) {
+      long timeout, TimeUnit unit, boolean initiator) {
     super(sender, timeout, unit, initiator);
-    this.bucketIds = bucketIds;
   }
 
   public boolean waitUntilFlushed() throws Throwable {
@@ -57,16 +53,6 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator
     int callableCount = 0;
     long nanosRemaining = unit.toNanos(timeout);
     long endTime = System.nanoTime() + nanosRemaining;
-    for (int bucketId : bucketIds) {
-      BlockingQueue<GatewaySenderEventImpl> tempQueue = 
prq.getBucketTmpQueue(bucketId);
-      while (tempQueue != null && !tempQueue.isEmpty()) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator wait 
for tempQeue "
-              + bucketId + " to be empty");
-        }
-        Thread.sleep(20);
-      }
-    }
     Set<BucketRegion> localBucketRegions = getLocalBucketRegions(pr);
     for (BucketRegion br : localBucketRegions) {
       // timeout exceeded, do not submit more callables, return localResult 
false

http://git-wip-us.apache.org/repos/asf/geode/blob/56a3fa75/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
index eb505b1..5e12ed5 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
@@ -67,8 +67,7 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
     long timeout = 5000;
     TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new 
HashSet<Integer>(),
-            timeout, unit, false);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
timeout, unit, false);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
     
doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
     doReturn(getCallableResult(true)).when(coordinatorSpy)
@@ -82,8 +81,7 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
     long timeout = 500;
     TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new 
HashSet<Integer>(),
-            timeout, unit, false);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
timeout, unit, false);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
 
     Set<BucketRegion> bucketRegions = new HashSet<>();
@@ -111,8 +109,7 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
     long timeout = 5000;
     TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new 
HashSet<Integer>(),
-            timeout, unit, false);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
timeout, unit, false);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
     
doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
     doReturn(getCallableResult(false)).when(coordinatorSpy)
@@ -126,8 +123,7 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
     long timeout = 5000;
     TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new 
HashSet<Integer>(),
-            timeout, unit, true);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
timeout, unit, true);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
     
doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
     doReturn(getCallableResult(true)).when(coordinatorSpy)
@@ -141,8 +137,7 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
     long timeout = 5000;
     TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new 
HashSet<Integer>(),
-            timeout, unit, true);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
timeout, unit, true);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
     
doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
     doReturn(getCallableResult(false)).when(coordinatorSpy)

http://git-wip-us.apache.org/repos/asf/geode/blob/56a3fa75/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java
index 222bb5e..e11384c 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java
@@ -18,8 +18,6 @@ package org.apache.geode.cache.lucene.internal.distributed;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.geode.cache.Cache;
@@ -46,7 +44,6 @@ import 
org.apache.geode.cache.lucene.internal.repository.IndexResultCollector;
 import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
 import org.apache.geode.internal.InternalEntity;
 import org.apache.geode.internal.cache.BucketNotFoundException;
-import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.logging.LogService;
 
 /**
@@ -83,14 +80,7 @@ public class WaitUntilFlushedFunction implements Function, 
InternalEntity {
     AsyncEventQueueImpl queue = (AsyncEventQueueImpl) 
cache.getAsyncEventQueue(aeqId);
     if (queue != null) {
       try {
-        Set<Integer> bucketIds;
-        if (region instanceof PartitionedRegion) {
-          PartitionedRegion pr = (PartitionedRegion) region;
-          bucketIds = pr.getDataStore().getAllLocalBucketIds();
-        } else {
-          bucketIds = new HashSet<Integer>();
-        }
-        result = queue.waitUntilFlushed(bucketIds, timeout, unit);
+        result = queue.waitUntilFlushed(timeout, unit);
       } catch (InterruptedException e) {
       }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/56a3fa75/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java
index 902eb5f..f92a296 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java
@@ -20,8 +20,6 @@ import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.geode.cache.Region;
@@ -68,8 +66,7 @@ public class WaitUntilFlushedFunctionJUnitTest {
     when(mockContext.getArguments()).thenReturn(waitArgs);
     when(mockContext.getResultSender()).thenReturn(mockResultSender);
     when(mockCache.getAsyncEventQueue(any())).thenReturn(mockAEQ);
-    when(mockAEQ.waitUntilFlushed(new HashSet<Integer>(), 10000, 
TimeUnit.MILLISECONDS))
-        .thenReturn(true);
+    when(mockAEQ.waitUntilFlushed(10000, 
TimeUnit.MILLISECONDS)).thenReturn(true);
 
     WaitUntilFlushedFunction function = new WaitUntilFlushedFunction();
     function.execute(mockContext);

Reply via email to