GEODE-3055: waitUntilFlush should use data region's bucketid list. Some of the 
buckets maybe not initialized, then wait for tempQueue to be empty.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/32c74ece
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/32c74ece
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/32c74ece

Branch: refs/heads/feature/GEM-1483
Commit: 32c74ecec4cd88ea42221d1f7234a07cb7134186
Parents: 7bc3111
Author: zhouxh <gz...@pivotal.io>
Authored: Thu Jun 8 14:52:56 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Wed Jun 14 15:01:32 2017 -0700

----------------------------------------------------------------------
 .../asyncqueue/internal/AsyncEventQueueImpl.java    |  5 +++--
 .../internal/cache/wan/AbstractGatewaySender.java   |  6 ++++--
 ...ntilParallelGatewaySenderFlushedCoordinator.java | 16 +++++++++++++++-
 ...lelGatewaySenderFlushedCoordinatorJUnitTest.java | 15 ++++++++++-----
 .../distributed/WaitUntilFlushedFunction.java       | 12 +++++++++++-
 .../WaitUntilFlushedFunctionJUnitTest.java          |  5 ++++-
 6 files changed, 47 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/32c74ece/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
index bf7e874..a420dab 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
@@ -217,7 +217,8 @@ public class AsyncEventQueueImpl implements AsyncEventQueue 
{
     return ((AbstractGatewaySender) this.sender).isForwardExpirationDestroy();
   }
 
-  public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws 
InterruptedException {
-    return ((AbstractGatewaySender) this.sender).waitUntilFlushed(timeout, 
unit);
+  public boolean waitUntilFlushed(Set<Integer> bucketIds, long timeout, 
TimeUnit unit)
+      throws InterruptedException {
+    return ((AbstractGatewaySender) this.sender).waitUntilFlushed(bucketIds, 
timeout, unit);
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/32c74ece/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index c38d547..b6a23d6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1254,12 +1254,14 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
     return lifeCycleLock;
   }
 
-  public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws 
InterruptedException {
+  public boolean waitUntilFlushed(Set<Integer> bucketIds, long timeout, 
TimeUnit unit)
+      throws InterruptedException {
     boolean result = false;
     if (isParallel()) {
       try {
         WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-            new WaitUntilParallelGatewaySenderFlushedCoordinator(this, 
timeout, unit, true);
+            new WaitUntilParallelGatewaySenderFlushedCoordinator(this, 
bucketIds, timeout, unit,
+                true);
         result = coordinator.waitUntilFlushed();
       } catch (BucketMovedException | CancelException | 
RegionDestroyedException e) {
         logger.warn(

http://git-wip-us.apache.org/repos/asf/geode/blob/32c74ece/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
index 42ce68c..7856324 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
@@ -19,6 +19,7 @@ import org.apache.geode.distributed.internal.*;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.*;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import 
org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordinator;
 
 import java.util.ArrayList;
@@ -33,9 +34,12 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator
     extends WaitUntilGatewaySenderFlushedCoordinator {
   final static private int CALLABLES_CHUNK_SIZE = 10;
 
+  protected Set<Integer> bucketIds;
+
   public 
WaitUntilParallelGatewaySenderFlushedCoordinator(AbstractGatewaySender sender,
-      long timeout, TimeUnit unit, boolean initiator) {
+      Set<Integer> bucketIds, long timeout, TimeUnit unit, boolean initiator) {
     super(sender, timeout, unit, initiator);
+    this.bucketIds = bucketIds;
   }
 
   public boolean waitUntilFlushed() throws Throwable {
@@ -53,6 +57,16 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator
     int callableCount = 0;
     long nanosRemaining = unit.toNanos(timeout);
     long endTime = System.nanoTime() + nanosRemaining;
+    for (int bucketId : bucketIds) {
+      BlockingQueue<GatewaySenderEventImpl> tempQueue = 
prq.getBucketTmpQueue(bucketId);
+      while (tempQueue != null && !tempQueue.isEmpty()) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator wait 
for tempQeue "
+              + bucketId + " to be empty");
+        }
+        Thread.sleep(20);
+      }
+    }
     Set<BucketRegion> localBucketRegions = getLocalBucketRegions(pr);
     for (BucketRegion br : localBucketRegions) {
       // timeout exceeded, do not submit more callables, return localResult 
false

http://git-wip-us.apache.org/repos/asf/geode/blob/32c74ece/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
index 5e12ed5..eb505b1 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
@@ -67,7 +67,8 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
     long timeout = 5000;
     TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
timeout, unit, false);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new 
HashSet<Integer>(),
+            timeout, unit, false);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
     
doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
     doReturn(getCallableResult(true)).when(coordinatorSpy)
@@ -81,7 +82,8 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
     long timeout = 500;
     TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
timeout, unit, false);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new 
HashSet<Integer>(),
+            timeout, unit, false);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
 
     Set<BucketRegion> bucketRegions = new HashSet<>();
@@ -109,7 +111,8 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
     long timeout = 5000;
     TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
timeout, unit, false);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new 
HashSet<Integer>(),
+            timeout, unit, false);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
     
doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
     doReturn(getCallableResult(false)).when(coordinatorSpy)
@@ -123,7 +126,8 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
     long timeout = 5000;
     TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
timeout, unit, true);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new 
HashSet<Integer>(),
+            timeout, unit, true);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
     
doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
     doReturn(getCallableResult(true)).when(coordinatorSpy)
@@ -137,7 +141,8 @@ public class 
WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
     long timeout = 5000;
     TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
timeout, unit, true);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, new 
HashSet<Integer>(),
+            timeout, unit, true);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
     
doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
     doReturn(getCallableResult(false)).when(coordinatorSpy)

http://git-wip-us.apache.org/repos/asf/geode/blob/32c74ece/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java
index e11384c..222bb5e 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java
@@ -18,6 +18,8 @@ package org.apache.geode.cache.lucene.internal.distributed;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.geode.cache.Cache;
@@ -44,6 +46,7 @@ import 
org.apache.geode.cache.lucene.internal.repository.IndexResultCollector;
 import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
 import org.apache.geode.internal.InternalEntity;
 import org.apache.geode.internal.cache.BucketNotFoundException;
+import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.logging.LogService;
 
 /**
@@ -80,7 +83,14 @@ public class WaitUntilFlushedFunction implements Function, 
InternalEntity {
     AsyncEventQueueImpl queue = (AsyncEventQueueImpl) 
cache.getAsyncEventQueue(aeqId);
     if (queue != null) {
       try {
-        result = queue.waitUntilFlushed(timeout, unit);
+        Set<Integer> bucketIds;
+        if (region instanceof PartitionedRegion) {
+          PartitionedRegion pr = (PartitionedRegion) region;
+          bucketIds = pr.getDataStore().getAllLocalBucketIds();
+        } else {
+          bucketIds = new HashSet<Integer>();
+        }
+        result = queue.waitUntilFlushed(bucketIds, timeout, unit);
       } catch (InterruptedException e) {
       }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/32c74ece/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java
index f92a296..902eb5f 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunctionJUnitTest.java
@@ -20,6 +20,8 @@ import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.geode.cache.Region;
@@ -66,7 +68,8 @@ public class WaitUntilFlushedFunctionJUnitTest {
     when(mockContext.getArguments()).thenReturn(waitArgs);
     when(mockContext.getResultSender()).thenReturn(mockResultSender);
     when(mockCache.getAsyncEventQueue(any())).thenReturn(mockAEQ);
-    when(mockAEQ.waitUntilFlushed(10000, 
TimeUnit.MILLISECONDS)).thenReturn(true);
+    when(mockAEQ.waitUntilFlushed(new HashSet<Integer>(), 10000, 
TimeUnit.MILLISECONDS))
+        .thenReturn(true);
 
     WaitUntilFlushedFunction function = new WaitUntilFlushedFunction();
     function.execute(mockContext);

Reply via email to