Repository: geode
Updated Branches:
  refs/heads/feature/close_cache_aeq_enqueue [created] 070494d37


Starting a test of closing the cache while the AEQ is enqueuing


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/070494d3
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/070494d3
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/070494d3

Branch: refs/heads/feature/close_cache_aeq_enqueue
Commit: 070494d37d5aff1c3ac8e7fe3a483157dea25927
Parents: 456ee15
Author: Dan Smith <upthewatersp...@apache.org>
Authored: Wed May 24 10:18:57 2017 -0700
Committer: Dan Smith <upthewatersp...@apache.org>
Committed: Wed May 24 10:18:57 2017 -0700

----------------------------------------------------------------------
 .../asyncqueue/AsyncEventListenerDUnitTest.java | 81 ++++++++++++++++++++
 1 file changed, 81 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/070494d3/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index 3dd0550..ac89b48 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -19,6 +19,7 @@ import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -29,6 +30,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.wan.GatewayEventFilter;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -1674,6 +1681,80 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     Awaitility.waitAtMost(10000, TimeUnit.MILLISECONDS).until(() -> 
getBucketMoved(vm2, "ln"));
   }
 
+  @Test
+  public void testCacheClosedBeforeAEQWrite() {
+    Integer lnPort =
+        (Integer) vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId(1));
+
+    vm1.invoke(createCacheRunnable(lnPort));
+    vm2.invoke(createCacheRunnable(lnPort));
+    final DistributedMember member1 =
+        vm1.invoke(() -> cache.getDistributedSystem().getDistributedMember());
+
+    vm1.invoke(() -> {
+        cache.createAsyncEventQueueFactory().addGatewayEventFilter(new 
GatewayEventFilter() {
+          @Override
+          public boolean beforeEnqueue(final GatewayQueueEvent event) {
+//            if (event.getOperation().isDestroy()) {
+            if (event.getOperation().isRemoveAll()) {
+              new Thread(() -> cache.close()).start();
+              try {
+                Thread.sleep(1000);
+              } catch (InterruptedException e) {
+                //ignore
+              }
+              throw new CacheClosedException();
+            }
+            return true;
+          };
+
+          @Override
+          public boolean beforeTransmit(final GatewayQueueEvent event) {
+            return false;
+          }
+
+          @Override
+          public void afterAcknowledgement(final GatewayQueueEvent event) {
+
+          }
+
+          @Override
+          public void close() {
+
+          }
+        }).create("ln",  new MyAsyncEventListener());
+    });
+
+    vm1.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue(
+        getTestMethodName() + "_PR", "ln", isOffHeap()));
+
+    vm2.invoke(() -> {
+      AttributesFactory fact = new AttributesFactory();
+
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(16);
+      pfact.setLocalMaxMemory(0);
+      fact.setPartitionAttributes(pfact.create());
+      fact.setOffHeap(isOffHeap());
+      Region r = 
cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln")
+          .create(getTestMethodName() + "_PR");
+
+    });
+
+    vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln"));
+//    vm2.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + 
"_PR", 3));
+    vm2.invoke(() -> {
+      Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + 
"_PR");
+      r.put(1, 1);
+      r.put(2, 2);
+      r.removeAll(Collections.singleton(1));
+      r.remove(1);
+    });
+
+    vm1.invoke(() -> Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() 
-> cache.isClosed()));
+//    vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln"));
+  }
+
   private static Set<Object> getKeysSeen(VM vm, String asyncEventQueueId) {
     return vm.invoke(() -> {
       final BucketMovingAsyncEventListener listener =

Reply via email to