GEODE-3055: waitUntilFlush should use data region's bucketid list. Some of the buckets maybe not initialized, then wait for tempQueue to be empty.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/32c74ece Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/32c74ece Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/32c74ece Branch: refs/heads/feature/GEM-1483 Commit: 32c74ecec4cd88ea42221d1f7234a07cb7134186 Parents: 7bc3111 Author: zhouxh <gz...@pivotal.io> Authored: Thu Jun 8 14:52:56 2017 -0700 Committer: zhouxh <gz...@pivotal.io> Committed: Wed Jun 14 15:01:32 2017 -0700 ---------------------------------------------------------------------- .../asyncqueue/internal/AsyncEventQueueImpl.java | 5 +++-- .../internal/cache/wan/AbstractGatewaySender.java | 6 ++++-- ...ntilParallelGatewaySenderFlushedCoordinator.java | 16 +++++++++++++++- ...lelGatewaySenderFlushedCoordinatorJUnitTest.java | 15 ++++++++++----- .../distributed/WaitUntilFlushedFunction.java | 12 +++++++++++- .../WaitUntilFlushedFunctionJUnitTest.java | 5 ++++- 6 files changed, 47 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/32c74ece/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java index bf7e874..a420dab 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java @@ -217,7 +217,8 @@ public class AsyncEventQueueImpl implements AsyncEventQueue { return ((AbstractGatewaySender) this.sender).isForwardExpirationDestroy(); } - public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws InterruptedException { - return ((AbstractGatewaySender) this.sender).waitUntilFlushed(timeout, unit); + public boolean waitUntilFlushed(Set<Integer> bucketIds, long timeout, TimeUnit unit) + throws InterruptedException { + return ((AbstractGatewaySender) this.sender).waitUntilFlushed(bucketIds, timeout, unit); } } http://git-wip-us.apache.org/repos/asf/geode/blob/32c74ece/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index c38d547..b6a23d6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -1254,12 +1254,14 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return lifeCycleLock; } - public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws InterruptedException { + public boolean waitUntilFlushed(Set<Integer> bucketIds, long timeout, TimeUnit unit) + throws InterruptedException { boolean result = false; if (isParallel()) { try { WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this, timeout, unit, true); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this, bucketIds, timeout, unit, + true); result = coordinator.waitUntilFlushed(); } catch (BucketMovedException | CancelException | RegionDestroyedException e) { logger.warn( http://git-wip-us.apache.org/repos/asf/geode/blob/32c74ece/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java index 42ce68c..7856324 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java @@ -19,6 +19,7 @@ import org.apache.geode.distributed.internal.*; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.*; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordinator; import java.util.ArrayList; @@ -33,9 +34,12 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator extends WaitUntilGatewaySenderFlushedCoordinator { final static private int CALLABLES_CHUNK_SIZE = 10; + protected Set<Integer> bucketIds; + public WaitUntilParallelGatewaySenderFlushedCoordinator(AbstractGatewaySender sender, - long timeout, TimeUnit unit, boolean initiator) { + Set<Integer> bucketIds, long timeout, TimeUnit unit, boolean initiator) { super(sender, timeout, unit, initiator); + this.bucketIds = bucketIds; } public boolean waitUntilFlushed() throws Throwable { @@ -53,6 +57,16 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator int callableCount = 0; long nanosRemaining = unit.toNanos(timeout); long endTime = System.nanoTime() + nanosRemaining; + for (int bucketId : bucketIds) { + BlockingQueue<GatewaySenderEventImpl> tempQueue = prq.getBucketTmpQueue(bucketId); + while (tempQueue != null && !tempQueue.isEmpty()) { + if (logger.isDebugEnabled()) { + logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator wait for tempQeue " + + bucketId + " to be empty"); + } + Thread.sleep(20); + } + } Set<BucketRegion> localBucketRegions = getLocalBucketRegions(pr); for (BucketRegion br : localBucketRegions) { // timeout exceeded, do not submit more callables, return localResult false http://git-wip-us.apache.org/repos/asf/geode/blob/32c74ece/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java index 5e12ed5..eb505b1 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java @@ -67,7 +67,8 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest long timeout = 5000; TimeUnit unit = TimeUnit.MILLISECONDS; WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit, false); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new HashSet<Integer>(), + timeout, unit, false); WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any()); doReturn(getCallableResult(true)).when(coordinatorSpy) @@ -81,7 +82,8 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest long timeout = 500; TimeUnit unit = TimeUnit.MILLISECONDS; WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit, false); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new HashSet<Integer>(), + timeout, unit, false); WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); Set<BucketRegion> bucketRegions = new HashSet<>(); @@ -109,7 +111,8 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest long timeout = 5000; TimeUnit unit = TimeUnit.MILLISECONDS; WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit, false); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new HashSet<Integer>(), + timeout, unit, false); WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any()); doReturn(getCallableResult(false)).when(coordinatorSpy) @@ -123,7 +126,8 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest long timeout = 5000; TimeUnit unit = TimeUnit.MILLISECONDS; WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit, true); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new HashSet<Integer>(), + timeout, unit, true); WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any()); doReturn(getCallableResult(true)).when(coordinatorSpy) @@ -137,7 +141,8 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest long timeout = 5000; TimeUnit unit = TimeUnit.MILLISECONDS; WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = - new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit, true); + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new HashSet<Integer>(), + timeout, unit, true); WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any()); doReturn(getCallableResult(false)).when(coordinatorSpy) http://git-wip-us.apache.org/repos/asf/geode/blob/32c74ece/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java index e11384c..222bb5e 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java @@ -18,6 +18,8 @@ package org.apache.geode.cache.lucene.internal.distributed; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.geode.cache.Cache; @@ -44,6 +46,7 @@ import org.apache.geode.cache.lucene.internal.repository.IndexResultCollector; import org.apache.geode.cache.lucene.internal.repository.RepositoryManager; import org.apache.geode.internal.InternalEntity; import org.apache.geode.internal.cache.BucketNotFoundException; +import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.logging.LogService; /** @@ -80,7 +83,14 @@ public class WaitUntilFlushedFunction implements Function, InternalEntity { AsyncEventQueueImpl queue = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId); if (queue != null) { try { - result = queue.waitUntilFlushed(timeout, unit); + Set<Integer> bucketIds; + if (region instanceof PartitionedRegion) { + PartitionedRegion pr = (PartitionedRegion) region; + bucketIds = pr.getDataStore().getAllLocalBucketIds(); + } else { + bucketIds = new HashSet<Integer>(); + } + result = queue.waitUntilFlushed(bucketIds, timeout, unit); } catch (InterruptedException e) { } http://git-wip-us.apache.org/repos/asf/geode/blob/32c74ece/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java index f92a296..902eb5f 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java @@ -20,6 +20,8 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.geode.cache.Region; @@ -66,7 +68,8 @@ public class WaitUntilFlushedFunctionJUnitTest { when(mockContext.getArguments()).thenReturn(waitArgs); when(mockContext.getResultSender()).thenReturn(mockResultSender); when(mockCache.getAsyncEventQueue(any())).thenReturn(mockAEQ); - when(mockAEQ.waitUntilFlushed(10000, TimeUnit.MILLISECONDS)).thenReturn(true); + when(mockAEQ.waitUntilFlushed(new HashSet<Integer>(), 10000, TimeUnit.MILLISECONDS)) + .thenReturn(true); WaitUntilFlushedFunction function = new WaitUntilFlushedFunction(); function.execute(mockContext);