Repository: geode Updated Branches: refs/heads/feature/close_cache_aeq_enqueue [created] 070494d37
Starting a test of closing the cache while the AEQ is enqueuing Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/070494d3 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/070494d3 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/070494d3 Branch: refs/heads/feature/close_cache_aeq_enqueue Commit: 070494d37d5aff1c3ac8e7fe3a483157dea25927 Parents: 456ee15 Author: Dan Smith <upthewatersp...@apache.org> Authored: Wed May 24 10:18:57 2017 -0700 Committer: Dan Smith <upthewatersp...@apache.org> Committed: Wed May 24 10:18:57 2017 -0700 ---------------------------------------------------------------------- .../asyncqueue/AsyncEventListenerDUnitTest.java | 81 ++++++++++++++++++++ 1 file changed, 81 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/070494d3/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java index 3dd0550..ac89b48 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.*; import static org.mockito.Matchers.any; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -29,6 +30,12 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.LongStream; +import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.wan.GatewayEventFilter; +import org.apache.geode.cache.wan.GatewayQueueEvent; +import org.apache.geode.internal.cache.wan.MyAsyncEventListener; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -1674,6 +1681,80 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { Awaitility.waitAtMost(10000, TimeUnit.MILLISECONDS).until(() -> getBucketMoved(vm2, "ln")); } + @Test + public void testCacheClosedBeforeAEQWrite() { + Integer lnPort = + (Integer) vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId(1)); + + vm1.invoke(createCacheRunnable(lnPort)); + vm2.invoke(createCacheRunnable(lnPort)); + final DistributedMember member1 = + vm1.invoke(() -> cache.getDistributedSystem().getDistributedMember()); + + vm1.invoke(() -> { + cache.createAsyncEventQueueFactory().addGatewayEventFilter(new GatewayEventFilter() { + @Override + public boolean beforeEnqueue(final GatewayQueueEvent event) { +// if (event.getOperation().isDestroy()) { + if (event.getOperation().isRemoveAll()) { + new Thread(() -> cache.close()).start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + //ignore + } + throw new CacheClosedException(); + } + return true; + }; + + @Override + public boolean beforeTransmit(final GatewayQueueEvent event) { + return false; + } + + @Override + public void afterAcknowledgement(final GatewayQueueEvent event) { + + } + + @Override + public void close() { + + } + }).create("ln", new MyAsyncEventListener()); + }); + + vm1.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( + getTestMethodName() + "_PR", "ln", isOffHeap())); + + vm2.invoke(() -> { + AttributesFactory fact = new AttributesFactory(); + + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(16); + pfact.setLocalMaxMemory(0); + fact.setPartitionAttributes(pfact.create()); + fact.setOffHeap(isOffHeap()); + Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln") + .create(getTestMethodName() + "_PR"); + + }); + + vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln")); +// vm2.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + "_PR", 3)); + vm2.invoke(() -> { + Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR"); + r.put(1, 1); + r.put(2, 2); + r.removeAll(Collections.singleton(1)); + r.remove(1); + }); + + vm1.invoke(() -> Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> cache.isClosed())); +// vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln")); + } + private static Set<Object> getKeysSeen(VM vm, String asyncEventQueueId) { return vm.invoke(() -> { final BucketMovingAsyncEventListener listener =