GEODE-2750: The lucene index is now destroyed on remote members before the initiating member
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/99e61ffa Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/99e61ffa Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/99e61ffa Branch: refs/heads/feature/GEODE-2632 Commit: 99e61ffa09158ad60164368805b4052614b806c8 Parents: c61cab9 Author: Barry Oglesby <bogle...@pivotal.io> Authored: Mon Apr 3 18:44:25 2017 -0700 Committer: Barry Oglesby <bogle...@pivotal.io> Committed: Tue Apr 4 16:31:35 2017 -0700 ---------------------------------------------------------------------- .../internal/AsyncEventQueueImpl.java | 6 +- .../cache/wan/AbstractGatewaySender.java | 54 ++++--- .../lucene/LuceneIndexDestroyedException.java | 42 +++++ .../lucene/LuceneIndexNotFoundException.java | 41 +++++ .../AbstractPartitionedRepositoryManager.java | 14 +- .../LuceneIndexForPartitionedRegion.java | 16 +- .../cache/lucene/internal/LuceneIndexImpl.java | 6 +- .../distributed/LuceneQueryFunction.java | 9 +- .../lucene/LuceneIndexDestroyDUnitTest.java | 158 ++++++++++++++++--- .../LuceneQueryFunctionJUnitTest.java | 3 +- 10 files changed, 285 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java index a44b9e4..0def5d2 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java @@ -199,8 +199,12 @@ public class AsyncEventQueueImpl implements AsyncEventQueue { } public void destroy() { + destroy(true); + } + + public void destroy(boolean initiator) { GemFireCacheImpl gfci = (GemFireCacheImpl) ((AbstractGatewaySender) this.sender).getCache(); - this.sender.destroy(); + ((AbstractGatewaySender) this.sender).destroy(initiator); gfci.removeAsyncEventQueue(this); } http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index a604dbf..1c94f94 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -514,6 +514,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi */ @Override public void destroy() { + destroy(true); + } + + public void destroy(boolean initiator) { try { this.getLifeCycleLock().writeLock().lock(); // first, check if this sender is attached to any region. If so, throw @@ -542,33 +546,35 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi ((GemFireCacheImpl) this.cache).removeGatewaySender(this); // destroy the region underneath the sender's queue - Set<RegionQueue> regionQueues = getQueues(); - if (regionQueues != null) { - for (RegionQueue regionQueue : regionQueues) { - try { - if (regionQueue instanceof ConcurrentParallelGatewaySenderQueue) { - Set<PartitionedRegion> queueRegions = - ((ConcurrentParallelGatewaySenderQueue) regionQueue).getRegions(); - for (PartitionedRegion queueRegion : queueRegions) { - queueRegion.destroyRegion(); + if (initiator) { + Set<RegionQueue> regionQueues = getQueues(); + if (regionQueues != null) { + for (RegionQueue regionQueue : regionQueues) { + try { + if (regionQueue instanceof ConcurrentParallelGatewaySenderQueue) { + Set<PartitionedRegion> queueRegions = + ((ConcurrentParallelGatewaySenderQueue) regionQueue).getRegions(); + for (PartitionedRegion queueRegion : queueRegions) { + queueRegion.destroyRegion(); + } + } else {// For SerialGatewaySenderQueue, do local destroy + regionQueue.getRegion().localDestroyRegion(); } - } else {// For SerialGatewaySenderQueue, do local destroy - regionQueue.getRegion().localDestroyRegion(); + } + // Can occur in case of ParallelGatewaySenderQueue, when the region is + // being destroyed + // by several nodes simultaneously + catch (RegionDestroyedException e) { + // the region might have already been destroyed by other node. Just + // log + // the exception. + this.logger.info(LocalizedMessage.create( + LocalizedStrings.AbstractGatewaySender_REGION_0_UNDERLYING_GATEWAYSENDER_1_IS_ALREADY_DESTROYED, + new Object[] {e.getRegionFullPath(), this})); } } - // Can occur in case of ParallelGatewaySenderQueue, when the region is - // being destroyed - // by several nodes simultaneously - catch (RegionDestroyedException e) { - // the region might have already been destroyed by other node. Just - // log - // the exception. - this.logger.info(LocalizedMessage.create( - LocalizedStrings.AbstractGatewaySender_REGION_0_UNDERLYING_GATEWAYSENDER_1_IS_ALREADY_DESTROYED, - new Object[] {e.getRegionFullPath(), this})); - } - } - } // END if (regionQueues != null) + } // END if (regionQueues != null) + } } finally { this.getLifeCycleLock().writeLock().unlock(); } http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexDestroyedException.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexDestroyedException.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexDestroyedException.java new file mode 100644 index 0000000..7b38f24 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexDestroyedException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.lucene; + +import org.apache.geode.GemFireException; + +/** + * A LuceneIndexDestroyedException is thrown if a Lucene index is attempted to be used while it is + * being destroyed or after it has been destroyed. + */ +public class LuceneIndexDestroyedException extends GemFireException { + + private final String indexName; + + private final String regionPath; + + public LuceneIndexDestroyedException(String indexName, String regionPath) { + super(); + this.indexName = indexName; + this.regionPath = regionPath; + } + + public String getIndexName() { + return this.indexName; + } + + public String getRegionPath() { + return this.regionPath; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexNotFoundException.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexNotFoundException.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexNotFoundException.java new file mode 100644 index 0000000..375837f --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexNotFoundException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.lucene; + +import org.apache.geode.GemFireException; + +/** + * A LuceneIndexNotFoundException is thrown if a Lucene index is requested but not found. + */ +public class LuceneIndexNotFoundException extends GemFireException { + + private final String indexName; + + private final String regionPath; + + public LuceneIndexNotFoundException(String indexName, String regionPath) { + super(); + this.indexName = indexName; + this.regionPath = regionPath; + } + + public String getIndexName() { + return this.indexName; + } + + public String getRegionPath() { + return this.regionPath; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java index 26179c7..26bb488 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.geode.InternalGemFireError; import org.apache.geode.cache.Region; import org.apache.geode.cache.execute.RegionFunctionContext; +import org.apache.geode.cache.lucene.LuceneIndexDestroyedException; import org.apache.geode.cache.lucene.internal.repository.IndexRepository; import org.apache.geode.cache.lucene.internal.repository.RepositoryManager; import org.apache.geode.cache.lucene.internal.repository.serializer.LuceneSerializer; @@ -96,9 +97,11 @@ public abstract class AbstractPartitionedRepositoryManager implements Repository protected IndexRepository computeRepository(Integer bucketId) { IndexRepository repo = indexRepositories.compute(bucketId, (key, oldRepository) -> { try { - if (closed && oldRepository != null) { - oldRepository.cleanup(); - return null; + if (closed) { + if (oldRepository != null) { + oldRepository.cleanup(); + } + throw new LuceneIndexDestroyedException(index.getName(), index.getRegionPath()); } return computeRepository(bucketId, serializer, index, userRegion, oldRepository); } catch (IOException e) { @@ -130,7 +133,10 @@ public abstract class AbstractPartitionedRepositoryManager implements Repository public void close() { this.closed = true; for (Integer bucketId : indexRepositories.keySet()) { - computeRepository(bucketId); + try { + computeRepository(bucketId); + } catch (LuceneIndexDestroyedException e) { + /* expected exception */} } } } http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java index 80e0c44..fe85efe 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java @@ -188,24 +188,24 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { + "; initiator=" + initiator); } - // Invoke super destroy to remove the extension + // Invoke super destroy to remove the extension and async event queue super.destroy(initiator); - // Destroy the file region (colocated with the application region) + // Destroy index on remote members if necessary + if (initiator) { + destroyOnRemoteMembers(); + } + + // Destroy the file region (colocated with the application region) if necessary // localDestroyRegion can't be used because locally destroying regions is not supported on // colocated regions - if (!fileAndChunkRegion.isDestroyed()) { + if (initiator) { fileAndChunkRegion.destroyRegion(); if (logger.isDebugEnabled()) { logger.debug("Destroyed fileAndChunkRegion=" + fileAndChunkRegion.getName()); } } - // Destroy index on remote members if necessary - if (initiator) { - destroyOnRemoteMembers(); - } - if (logger.isDebugEnabled()) { logger.debug("Destroyed index regionPath=" + regionPath + "; indexName=" + indexName + "; initiator=" + initiator); http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java index 93fb25b..d58f856 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java @@ -213,7 +213,7 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { } // Destroy the async event queue - destroyAsyncEventQueue(); + destroyAsyncEventQueue(initiator); // Close the repository manager repositoryManager.close(); @@ -238,7 +238,7 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { } } - private void destroyAsyncEventQueue() { + private void destroyAsyncEventQueue(boolean initiator) { String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath); // Get the AsyncEventQueue @@ -260,7 +260,7 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { // Destroy the aeq (this also removes it from the GemFireCacheImpl) // The AsyncEventQueue can be null in an accessor member if (aeq != null) { - aeq.destroy(); + aeq.destroy(initiator); } if (logger.isDebugEnabled()) { logger.debug("Destroyed aeqId=" + aeqId); http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java index 428301f..0bd9046 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java @@ -21,6 +21,8 @@ import java.util.Collection; import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.lucene.LuceneIndexDestroyedException; +import org.apache.geode.cache.lucene.LuceneIndexNotFoundException; import org.apache.geode.cache.lucene.internal.LuceneIndexImpl; import org.apache.geode.cache.lucene.internal.LuceneIndexStats; import org.apache.geode.internal.cache.PrimaryBucketException; @@ -75,6 +77,9 @@ public class LuceneQueryFunction implements Function, InternalEntity { } LuceneIndexImpl index = getLuceneIndex(region, searchContext); + if (index == null) { + throw new LuceneIndexNotFoundException(searchContext.getIndexName(), region.getFullPath()); + } RepositoryManager repoManager = index.getRepositoryManager(); LuceneIndexStats stats = index.getIndexStats(); @@ -131,10 +136,6 @@ public class LuceneQueryFunction implements Function, InternalEntity { throw new InternalFunctionInvocationTargetException( "Cache is closed when attempting to retrieve index:" + region.getFullPath(), e); } - if (index == null) { - throw new InternalFunctionInvocationTargetException( - "Index for Region:" + region.getFullPath() + " was not found"); - } return index; } http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java index 694d3f1..16ac116 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java @@ -32,7 +32,6 @@ import org.apache.geode.test.dunit.ThreadUtils; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.junit.categories.DistributedTest; import org.awaitility.Awaitility; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -54,10 +53,18 @@ import static org.junit.Assert.assertTrue; @RunWith(JUnitParamsRunner.class) public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest { + private static final String INDEX1_NAME = INDEX_NAME + "1"; + + private static final String INDEX2_NAME = INDEX_NAME + "2"; + private static volatile boolean STOP_PUTS = false; + private static volatile boolean STOP_QUERIES = false; + private static int NUM_PUTS_COMPLETED = 0; + private static int NUM_QUERIES_COMPLETED = 0; + protected VM accessor; @Override @@ -202,7 +209,7 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest { // End puts dataStore1.invoke(() -> stopPuts()); - // Wait for the putter to complete and verify no exception has occurred + // Wait for the putter to complete and verify no unexpected exception has occurred ThreadUtils.join(putter, 60 * 1000); if (putter.exceptionOccurred()) { fail(putter.getException()); @@ -214,6 +221,95 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest { @Test @Parameters(method = "getListOfRegionTestTypes") + public void verifyDestroySingleIndexWhileDoingQueries(RegionTestableType regionType) + throws Exception { + // Create index and region + SerializableRunnableIF createIndex = createIndex(); + dataStore1.invoke(() -> initDataStore(createIndex, regionType)); + dataStore2.invoke(() -> initDataStore(createIndex, regionType)); + accessor.invoke(() -> initAccessor(createIndex, regionType)); + + // Verify index created + dataStore1.invoke(() -> verifyIndexCreated()); + dataStore2.invoke(() -> verifyIndexCreated()); + accessor.invoke(() -> verifyIndexCreated()); + + // Do puts + int numPuts = 100; + accessor.invoke(() -> doPuts(numPuts)); + + // Wait until queue is flushed + accessor.invoke(() -> waitUntilFlushed(INDEX_NAME)); + + // Start queries + AsyncInvocation querier = accessor + .invokeAsync(() -> doQueriesUntilException(INDEX_NAME, "field1Value", "field1", numPuts)); + + // Wait until queries have started + accessor.invoke(() -> waitUntilQueriesHaveStarted()); + + // Destroy index (only needs to be done on one member) + accessor.invoke(() -> destroyIndex()); + + // Verify index destroyed + dataStore1.invoke(() -> verifyIndexDestroyed()); + dataStore2.invoke(() -> verifyIndexDestroyed()); + accessor.invoke(() -> verifyIndexDestroyed()); + + // Wait for the querier to complete and verify no exception has occurred + ThreadUtils.join(querier, 60 * 1000); + if (querier.exceptionOccurred()) { + fail(querier.getException()); + } + } + + @Test + @Parameters(method = "getListOfRegionTestTypes") + public void verifyDestroyAllIndexesWhileDoingQueries(RegionTestableType regionType) + throws Exception { + // Create indexes and region + SerializableRunnableIF createIndexes = createIndexes(); + dataStore1.invoke(() -> initDataStore(createIndexes, regionType)); + dataStore2.invoke(() -> initDataStore(createIndexes, regionType)); + accessor.invoke(() -> initAccessor(createIndexes, regionType)); + + // Verify indexes created + dataStore1.invoke(() -> verifyIndexesCreated()); + dataStore2.invoke(() -> verifyIndexesCreated()); + accessor.invoke(() -> verifyIndexesCreated()); + + // Do puts + int numPuts = 100; + accessor.invoke(() -> doPuts(numPuts)); + + // Wait until queues are flushed + accessor.invoke(() -> waitUntilFlushed(INDEX1_NAME)); + accessor.invoke(() -> waitUntilFlushed(INDEX2_NAME)); + + // Start queries + AsyncInvocation querier = accessor + .invokeAsync(() -> doQueriesUntilException(INDEX1_NAME, "field1Value", "field1", numPuts)); + + // Wait until queries have started + accessor.invoke(() -> waitUntilQueriesHaveStarted()); + + // Destroy indexes (only needs to be done on one member) + accessor.invoke(() -> destroyIndexes()); + + // Verify indexes destroyed + dataStore1.invoke(() -> verifyIndexesDestroyed()); + dataStore2.invoke(() -> verifyIndexesDestroyed()); + accessor.invoke(() -> verifyIndexesDestroyed()); + + // Wait for the querier to complete and verify no unexpected exception has occurred + ThreadUtils.join(querier, 60 * 1000); + if (querier.exceptionOccurred()) { + fail(querier.getException()); + } + } + + @Test + @Parameters(method = "getListOfRegionTestTypes") public void verifyDestroyRecreateIndexSameName(RegionTestableType regionType) { // Create index and region SerializableRunnableIF createIndex = createIndex(); @@ -227,12 +323,11 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest { accessor.invoke(() -> verifyIndexCreated()); // Do puts to cause IndexRepositories to be created - int numPuts = 10; + int numPuts = 100; accessor.invoke(() -> doPuts(numPuts)); // Wait until queue is flushed - dataStore1.invoke(() -> waitUntilFlushed(INDEX_NAME)); - dataStore2.invoke(() -> waitUntilFlushed(INDEX_NAME)); + accessor.invoke(() -> waitUntilFlushed(INDEX_NAME)); // Execute query and verify results accessor.invoke(() -> executeQuery(INDEX_NAME, "field1Value", "field1", numPuts)); @@ -282,12 +377,11 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest { accessor.invoke(() -> verifyIndexCreated()); // Do puts to cause IndexRepositories to be created - int numPuts = 10; + int numPuts = 100; accessor.invoke(() -> doPuts(numPuts)); // Wait until queue is flushed - dataStore1.invoke(() -> waitUntilFlushed(INDEX_NAME)); - dataStore2.invoke(() -> waitUntilFlushed(INDEX_NAME)); + accessor.invoke(() -> waitUntilFlushed(INDEX_NAME)); // Execute query and verify results accessor.invoke(() -> executeQuery(INDEX_NAME, "field1Value", "field1", numPuts)); @@ -338,12 +432,11 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest { accessor.invoke(() -> verifyIndexCreated()); // Do puts to cause IndexRepositories to be created - int numPuts = 10; + int numPuts = 1000; accessor.invoke(() -> doPuts(numPuts)); // Wait until queue is flushed - dataStore1.invoke(() -> waitUntilFlushed(INDEX_NAME)); - dataStore2.invoke(() -> waitUntilFlushed(INDEX_NAME)); + accessor.invoke(() -> waitUntilFlushed(INDEX_NAME)); // Execute query and verify results accessor.invoke(() -> executeQuery(INDEX_NAME, "field1Value", "field1", numPuts)); @@ -393,8 +486,8 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest { private SerializableRunnableIF createIndexes() { return () -> { LuceneService luceneService = LuceneServiceProvider.get(getCache()); - luceneService.createIndexFactory().setFields("text").create(INDEX_NAME + "0", REGION_NAME); - luceneService.createIndexFactory().setFields("text").create(INDEX_NAME + "1", REGION_NAME); + luceneService.createIndexFactory().setFields("field1").create(INDEX1_NAME, REGION_NAME); + luceneService.createIndexFactory().setFields("field2").create(INDEX2_NAME, REGION_NAME); }; } @@ -405,8 +498,8 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest { private void verifyIndexesCreated() { LuceneService luceneService = LuceneServiceProvider.get(getCache()); - assertNotNull(luceneService.getIndex(INDEX_NAME + "0", REGION_NAME)); - assertNotNull(luceneService.getIndex(INDEX_NAME + "1", REGION_NAME)); + assertNotNull(luceneService.getIndex(INDEX1_NAME, REGION_NAME)); + assertNotNull(luceneService.getIndex(INDEX2_NAME, REGION_NAME)); } private void waitUntilFlushed(String indexName) throws Exception { @@ -432,6 +525,21 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest { } } + private void doQueriesUntilException(String indexName, String queryString, String field, + int expectedResultsSize) throws Exception { + allowQueries(); + int i = 0; + while (!STOP_QUERIES) { + try { + executeQuery(indexName, queryString, field, expectedResultsSize); + NUM_QUERIES_COMPLETED += 1; + } catch (LuceneIndexDestroyedException + | LuceneIndexNotFoundException e /* expected exceptions */) { + stopQueries(); + } + } + } + private static void stopPuts() { STOP_PUTS = true; } @@ -440,6 +548,14 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest { STOP_PUTS = false; } + private static void stopQueries() { + STOP_QUERIES = true; + } + + private static void allowQueries() { + STOP_QUERIES = false; + } + private void verifyRegionSize() { assertEquals(NUM_PUTS_COMPLETED, getCache().getRegion(REGION_NAME).size()); } @@ -449,11 +565,15 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest { .until(() -> getCache().getRegion(REGION_NAME).size() > 0); } + private void waitUntilQueriesHaveStarted() { + Awaitility.waitAtMost(30, TimeUnit.SECONDS).until(() -> NUM_QUERIES_COMPLETED > 0); + } + private void executeQuery(String indexName, String queryString, String field, int expectedResultsSize) throws LuceneQueryException { LuceneService luceneService = LuceneServiceProvider.get(getCache()); - LuceneQuery query = - luceneService.createLuceneQueryFactory().create(indexName, REGION_NAME, queryString, field); + LuceneQuery query = luceneService.createLuceneQueryFactory().setResultLimit(expectedResultsSize) + .create(indexName, REGION_NAME, queryString, field); Collection results = query.findValues(); assertEquals(expectedResultsSize, results.size()); } @@ -488,8 +608,8 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest { } private void verifyIndexesDestroyed() { - verifyIndexDestroyed(INDEX_NAME + "0"); - verifyIndexDestroyed(INDEX_NAME + "1"); + verifyIndexDestroyed(INDEX1_NAME); + verifyIndexDestroyed(INDEX2_NAME); } private void verifyIndexDestroyed(String indexName) { http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java index 18621d5..0283ffa 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java @@ -28,6 +28,7 @@ import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.Region; import org.apache.geode.cache.execute.FunctionException; import org.apache.geode.cache.execute.ResultSender; +import org.apache.geode.cache.lucene.LuceneIndexNotFoundException; import org.apache.geode.cache.lucene.LuceneQueryException; import org.apache.geode.cache.lucene.LuceneQueryFactory; import org.apache.geode.cache.lucene.LuceneQueryProvider; @@ -210,7 +211,7 @@ public class LuceneQueryFunctionJUnitTest { function.execute(mockContext); } - @Test(expected = InternalFunctionInvocationTargetException.class) + @Test(expected = LuceneIndexNotFoundException.class) public void whenServiceReturnsNullIndexDuringQueryExecutionFunctionExceptionShouldBeThrown() throws Exception { when(mockContext.getDataSet()).thenReturn(mockRegion);