GEODE-3055: real root cause is: The old primary's the shadow bucket is not initialized when rebalance remove it. Thus the new primary candidate can never initialize from it. The fix is to wait until new primary exists before remove the old primary's bucket in rebalance.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/56a3fa75 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/56a3fa75 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/56a3fa75 Branch: refs/heads/feature/GEM-1483 Commit: 56a3fa75ad35943b1e447af00bdba37d3d225297 Parents: 32c74ec Author: zhouxh <gz...@pivotal.io> Authored: Wed Jun 14 14:31:30 2017 -0700 Committer: zhouxh <gz...@pivotal.io> Committed: Wed Jun 14 15:01:32 2017 -0700 ---------------------------------------------------------------------- .../asyncqueue/internal/AsyncEventQueueImpl.java | 5 ++--- .../internal/cache/PartitionedRegionDataStore.java | 17 ++++++++++++++--- .../internal/cache/wan/AbstractGatewaySender.java | 6 ++---- ...tilParallelGatewaySenderFlushedCoordinator.java | 16 +--------------- ...elGatewaySenderFlushedCoordinatorJUnitTest.java | 15 +++++---------- .../distributed/WaitUntilFlushedFunction.java | 12 +----------- .../WaitUntilFlushedFunctionJUnitTest.java | 5 +---- 7 files changed, 26 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/56a3fa75/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java index a420dab..bf7e874 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java @@ -217,8 +217,7 @@ public class AsyncEventQueueImpl implements AsyncEventQueue { return ((AbstractGatewaySender) this.sender).isForwardExpirationDestroy(); } - public boolean waitUntilFlushed(Set<Integer> bucketIds, long timeout, TimeUnit unit) - throws InterruptedException { - return ((AbstractGatewaySender) this.sender).waitUntilFlushed(bucketIds, timeout, unit); + public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws InterruptedException { + return ((AbstractGatewaySender) this.sender).waitUntilFlushed(timeout, unit); } } http://git-wip-us.apache.org/repos/asf/geode/blob/56a3fa75/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java index 7cef0a5..74b8e6d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java @@ -1472,6 +1472,19 @@ public class PartitionedRegionDataStore implements HasCachePerfStats { } BucketAdvisor bucketAdvisor = bucketRegion.getBucketAdvisor(); + InternalDistributedMember primary = bucketAdvisor.getPrimary(); + InternalDistributedMember myId = + this.partitionedRegion.getDistributionManager().getDistributionManagerId(); + if (primary == null || myId.equals(primary)) { + if (logger.isDebugEnabled()) { + logger.debug("Bucket region " + bucketRegion + + " does not have a remote primary yet. Not to remove."); + } + return false; + } + if (logger.isDebugEnabled()) { + logger.debug("Bucket region " + bucketRegion + " has primary at " + primary); + } Lock writeLock = bucketAdvisor.getActiveWriteLock(); // Fix for 43613 - don't remove the bucket @@ -1512,9 +1525,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats { // wait for in progress writes. I choose to use the StateFlushOperation // because it won't block write operations while we're trying to acquire // the activePrimaryMoveLock - InternalDistributedMember primary = bucketAdvisor.getPrimary(); - InternalDistributedMember myId = - this.partitionedRegion.getDistributionManager().getDistributionManagerId(); + primary = bucketAdvisor.getPrimary(); if (!myId.equals(primary)) { StateFlushOperation flush = new StateFlushOperation(bucketRegion); int executor = DistributionManager.WAITING_POOL_EXECUTOR; http://git-wip-us.apache.org/repos/asf/geode/blob/56a3fa75/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index b6a23d6..c38d547 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -1254,14 +1254,12 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return lifeCycleLock; } - public boolean waitUntilFlushed(Set<Integer> bucketIds, long timeout, TimeUnit unit) - throws InterruptedException { + public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws InterruptedException { boolean result = false; if (isParallel()) { try { WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this, bucketIds, timeout, unit, - true); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this, timeout, unit, true); result = coordinator.waitUntilFlushed(); } catch (BucketMovedException | CancelException | RegionDestroyedException e) { logger.warn( http://git-wip-us.apache.org/repos/asf/geode/blob/56a3fa75/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java index 7856324..42ce68c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java @@ -19,7 +19,6 @@ import org.apache.geode.distributed.internal.*; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.*; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; -import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordinator; import java.util.ArrayList; @@ -34,12 +33,9 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator extends WaitUntilGatewaySenderFlushedCoordinator { final static private int CALLABLES_CHUNK_SIZE = 10; - protected Set<Integer> bucketIds; - public WaitUntilParallelGatewaySenderFlushedCoordinator(AbstractGatewaySender sender, - Set<Integer> bucketIds, long timeout, TimeUnit unit, boolean initiator) { + long timeout, TimeUnit unit, boolean initiator) { super(sender, timeout, unit, initiator); - this.bucketIds = bucketIds; } public boolean waitUntilFlushed() throws Throwable { @@ -57,16 +53,6 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator int callableCount = 0; long nanosRemaining = unit.toNanos(timeout); long endTime = System.nanoTime() + nanosRemaining; - for (int bucketId : bucketIds) { - BlockingQueue<GatewaySenderEventImpl> tempQueue = prq.getBucketTmpQueue(bucketId); - while (tempQueue != null && !tempQueue.isEmpty()) { - if (logger.isDebugEnabled()) { - logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator wait for tempQeue " - + bucketId + " to be empty"); - } - Thread.sleep(20); - } - } Set<BucketRegion> localBucketRegions = getLocalBucketRegions(pr); for (BucketRegion br : localBucketRegions) { // timeout exceeded, do not submit more callables, return localResult false http://git-wip-us.apache.org/repos/asf/geode/blob/56a3fa75/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java index eb505b1..5e12ed5 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java @@ -67,8 +67,7 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest long timeout = 5000; TimeUnit unit = TimeUnit.MILLISECONDS; WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new HashSet<Integer>(), - timeout, unit, false); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit, false); WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any()); doReturn(getCallableResult(true)).when(coordinatorSpy) @@ -82,8 +81,7 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest long timeout = 500; TimeUnit unit = TimeUnit.MILLISECONDS; WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new HashSet<Integer>(), - timeout, unit, false); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit, false); WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); Set<BucketRegion> bucketRegions = new HashSet<>(); @@ -111,8 +109,7 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest long timeout = 5000; TimeUnit unit = TimeUnit.MILLISECONDS; WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new HashSet<Integer>(), - timeout, unit, false); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit, false); WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any()); doReturn(getCallableResult(false)).when(coordinatorSpy) @@ -126,8 +123,7 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest long timeout = 5000; TimeUnit unit = TimeUnit.MILLISECONDS; WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new HashSet<Integer>(), - timeout, unit, true); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit, true); WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any()); doReturn(getCallableResult(true)).when(coordinatorSpy) @@ -141,8 +137,7 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest long timeout = 5000; TimeUnit unit = TimeUnit.MILLISECONDS; WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new HashSet<Integer>(), - timeout, unit, true); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit, true); WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any()); doReturn(getCallableResult(false)).when(coordinatorSpy) http://git-wip-us.apache.org/repos/asf/geode/blob/56a3fa75/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java index 222bb5e..e11384c 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java @@ -18,8 +18,6 @@ package org.apache.geode.cache.lucene.internal.distributed; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.geode.cache.Cache; @@ -46,7 +44,6 @@ import org.apache.geode.cache.lucene.internal.repository.IndexResultCollector; import org.apache.geode.cache.lucene.internal.repository.RepositoryManager; import org.apache.geode.internal.InternalEntity; import org.apache.geode.internal.cache.BucketNotFoundException; -import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.logging.LogService; /** @@ -83,14 +80,7 @@ public class WaitUntilFlushedFunction implements Function, InternalEntity { AsyncEventQueueImpl queue = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId); if (queue != null) { try { - Set<Integer> bucketIds; - if (region instanceof PartitionedRegion) { - PartitionedRegion pr = (PartitionedRegion) region; - bucketIds = pr.getDataStore().getAllLocalBucketIds(); - } else { - bucketIds = new HashSet<Integer>(); - } - result = queue.waitUntilFlushed(bucketIds, timeout, unit); + result = queue.waitUntilFlushed(timeout, unit); } catch (InterruptedException e) { } http://git-wip-us.apache.org/repos/asf/geode/blob/56a3fa75/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java index 902eb5f..f92a296 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java @@ -20,8 +20,6 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; import java.util.ArrayList; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.geode.cache.Region; @@ -68,8 +66,7 @@ public class WaitUntilFlushedFunctionJUnitTest { when(mockContext.getArguments()).thenReturn(waitArgs); when(mockContext.getResultSender()).thenReturn(mockResultSender); when(mockCache.getAsyncEventQueue(any())).thenReturn(mockAEQ); - when(mockAEQ.waitUntilFlushed(new HashSet<Integer>(), 10000, TimeUnit.MILLISECONDS)) - .thenReturn(true); + when(mockAEQ.waitUntilFlushed(10000, TimeUnit.MILLISECONDS)).thenReturn(true); WaitUntilFlushedFunction function = new WaitUntilFlushedFunction(); function.execute(mockContext);