GEODE-2750: The lucene index is now destroyed on remote members before the 
initiating member


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/99e61ffa
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/99e61ffa
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/99e61ffa

Branch: refs/heads/feature/GEODE-2632
Commit: 99e61ffa09158ad60164368805b4052614b806c8
Parents: c61cab9
Author: Barry Oglesby <bogle...@pivotal.io>
Authored: Mon Apr 3 18:44:25 2017 -0700
Committer: Barry Oglesby <bogle...@pivotal.io>
Committed: Tue Apr 4 16:31:35 2017 -0700

----------------------------------------------------------------------
 .../internal/AsyncEventQueueImpl.java           |   6 +-
 .../cache/wan/AbstractGatewaySender.java        |  54 ++++---
 .../lucene/LuceneIndexDestroyedException.java   |  42 +++++
 .../lucene/LuceneIndexNotFoundException.java    |  41 +++++
 .../AbstractPartitionedRepositoryManager.java   |  14 +-
 .../LuceneIndexForPartitionedRegion.java        |  16 +-
 .../cache/lucene/internal/LuceneIndexImpl.java  |   6 +-
 .../distributed/LuceneQueryFunction.java        |   9 +-
 .../lucene/LuceneIndexDestroyDUnitTest.java     | 158 ++++++++++++++++---
 .../LuceneQueryFunctionJUnitTest.java           |   3 +-
 10 files changed, 285 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
index a44b9e4..0def5d2 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
@@ -199,8 +199,12 @@ public class AsyncEventQueueImpl implements 
AsyncEventQueue {
   }
 
   public void destroy() {
+    destroy(true);
+  }
+
+  public void destroy(boolean initiator) {
     GemFireCacheImpl gfci = (GemFireCacheImpl) ((AbstractGatewaySender) 
this.sender).getCache();
-    this.sender.destroy();
+    ((AbstractGatewaySender) this.sender).destroy(initiator);
     gfci.removeAsyncEventQueue(this);
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index a604dbf..1c94f94 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -514,6 +514,10 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
    */
   @Override
   public void destroy() {
+    destroy(true);
+  }
+
+  public void destroy(boolean initiator) {
     try {
       this.getLifeCycleLock().writeLock().lock();
       // first, check if this sender is attached to any region. If so, throw
@@ -542,33 +546,35 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
       ((GemFireCacheImpl) this.cache).removeGatewaySender(this);
 
       // destroy the region underneath the sender's queue
-      Set<RegionQueue> regionQueues = getQueues();
-      if (regionQueues != null) {
-        for (RegionQueue regionQueue : regionQueues) {
-          try {
-            if (regionQueue instanceof ConcurrentParallelGatewaySenderQueue) {
-              Set<PartitionedRegion> queueRegions =
-                  ((ConcurrentParallelGatewaySenderQueue) 
regionQueue).getRegions();
-              for (PartitionedRegion queueRegion : queueRegions) {
-                queueRegion.destroyRegion();
+      if (initiator) {
+        Set<RegionQueue> regionQueues = getQueues();
+        if (regionQueues != null) {
+          for (RegionQueue regionQueue : regionQueues) {
+            try {
+              if (regionQueue instanceof ConcurrentParallelGatewaySenderQueue) 
{
+                Set<PartitionedRegion> queueRegions =
+                    ((ConcurrentParallelGatewaySenderQueue) 
regionQueue).getRegions();
+                for (PartitionedRegion queueRegion : queueRegions) {
+                  queueRegion.destroyRegion();
+                }
+              } else {// For SerialGatewaySenderQueue, do local destroy
+                regionQueue.getRegion().localDestroyRegion();
               }
-            } else {// For SerialGatewaySenderQueue, do local destroy
-              regionQueue.getRegion().localDestroyRegion();
+            }
+            // Can occur in case of ParallelGatewaySenderQueue, when the 
region is
+            // being destroyed
+            // by several nodes simultaneously
+            catch (RegionDestroyedException e) {
+              // the region might have already been destroyed by other node. 
Just
+              // log
+              // the exception.
+              this.logger.info(LocalizedMessage.create(
+                  
LocalizedStrings.AbstractGatewaySender_REGION_0_UNDERLYING_GATEWAYSENDER_1_IS_ALREADY_DESTROYED,
+                  new Object[] {e.getRegionFullPath(), this}));
             }
           }
-          // Can occur in case of ParallelGatewaySenderQueue, when the region 
is
-          // being destroyed
-          // by several nodes simultaneously
-          catch (RegionDestroyedException e) {
-            // the region might have already been destroyed by other node. Just
-            // log
-            // the exception.
-            this.logger.info(LocalizedMessage.create(
-                
LocalizedStrings.AbstractGatewaySender_REGION_0_UNDERLYING_GATEWAYSENDER_1_IS_ALREADY_DESTROYED,
-                new Object[] {e.getRegionFullPath(), this}));
-          }
-        }
-      } // END if (regionQueues != null)
+        } // END if (regionQueues != null)
+      }
     } finally {
       this.getLifeCycleLock().writeLock().unlock();
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexDestroyedException.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexDestroyedException.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexDestroyedException.java
new file mode 100644
index 0000000..7b38f24
--- /dev/null
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexDestroyedException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene;
+
+import org.apache.geode.GemFireException;
+
+/**
+ * A LuceneIndexDestroyedException is thrown if a Lucene index is attempted to 
be used while it is
+ * being destroyed or after it has been destroyed.
+ */
+public class LuceneIndexDestroyedException extends GemFireException {
+
+  private final String indexName;
+
+  private final String regionPath;
+
+  public LuceneIndexDestroyedException(String indexName, String regionPath) {
+    super();
+    this.indexName = indexName;
+    this.regionPath = regionPath;
+  }
+
+  public String getIndexName() {
+    return this.indexName;
+  }
+
+  public String getRegionPath() {
+    return this.regionPath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexNotFoundException.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexNotFoundException.java
new file mode 100644
index 0000000..375837f
--- /dev/null
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndexNotFoundException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene;
+
+import org.apache.geode.GemFireException;
+
+/**
+ * A LuceneIndexNotFoundException is thrown if a Lucene index is requested but 
not found.
+ */
+public class LuceneIndexNotFoundException extends GemFireException {
+
+  private final String indexName;
+
+  private final String regionPath;
+
+  public LuceneIndexNotFoundException(String indexName, String regionPath) {
+    super();
+    this.indexName = indexName;
+    this.regionPath = regionPath;
+  }
+
+  public String getIndexName() {
+    return this.indexName;
+  }
+
+  public String getRegionPath() {
+    return this.regionPath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
index 26179c7..26bb488 100755
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.execute.RegionFunctionContext;
+import org.apache.geode.cache.lucene.LuceneIndexDestroyedException;
 import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
 import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
 import 
org.apache.geode.cache.lucene.internal.repository.serializer.LuceneSerializer;
@@ -96,9 +97,11 @@ public abstract class AbstractPartitionedRepositoryManager 
implements Repository
   protected IndexRepository computeRepository(Integer bucketId) {
     IndexRepository repo = indexRepositories.compute(bucketId, (key, 
oldRepository) -> {
       try {
-        if (closed && oldRepository != null) {
-          oldRepository.cleanup();
-          return null;
+        if (closed) {
+          if (oldRepository != null) {
+            oldRepository.cleanup();
+          }
+          throw new LuceneIndexDestroyedException(index.getName(), 
index.getRegionPath());
         }
         return computeRepository(bucketId, serializer, index, userRegion, 
oldRepository);
       } catch (IOException e) {
@@ -130,7 +133,10 @@ public abstract class AbstractPartitionedRepositoryManager 
implements Repository
   public void close() {
     this.closed = true;
     for (Integer bucketId : indexRepositories.keySet()) {
-      computeRepository(bucketId);
+      try {
+        computeRepository(bucketId);
+      } catch (LuceneIndexDestroyedException e) {
+        /* expected exception */}
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index 80e0c44..fe85efe 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -188,24 +188,24 @@ public class LuceneIndexForPartitionedRegion extends 
LuceneIndexImpl {
           + "; initiator=" + initiator);
     }
 
-    // Invoke super destroy to remove the extension
+    // Invoke super destroy to remove the extension and async event queue
     super.destroy(initiator);
 
-    // Destroy the file region (colocated with the application region)
+    // Destroy index on remote members if necessary
+    if (initiator) {
+      destroyOnRemoteMembers();
+    }
+
+    // Destroy the file region (colocated with the application region) if 
necessary
     // localDestroyRegion can't be used because locally destroying regions is 
not supported on
     // colocated regions
-    if (!fileAndChunkRegion.isDestroyed()) {
+    if (initiator) {
       fileAndChunkRegion.destroyRegion();
       if (logger.isDebugEnabled()) {
         logger.debug("Destroyed fileAndChunkRegion=" + 
fileAndChunkRegion.getName());
       }
     }
 
-    // Destroy index on remote members if necessary
-    if (initiator) {
-      destroyOnRemoteMembers();
-    }
-
     if (logger.isDebugEnabled()) {
       logger.debug("Destroyed index regionPath=" + regionPath + "; indexName=" 
+ indexName
           + "; initiator=" + initiator);

http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
index 93fb25b..d58f856 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
@@ -213,7 +213,7 @@ public abstract class LuceneIndexImpl implements 
InternalLuceneIndex {
     }
 
     // Destroy the async event queue
-    destroyAsyncEventQueue();
+    destroyAsyncEventQueue(initiator);
 
     // Close the repository manager
     repositoryManager.close();
@@ -238,7 +238,7 @@ public abstract class LuceneIndexImpl implements 
InternalLuceneIndex {
     }
   }
 
-  private void destroyAsyncEventQueue() {
+  private void destroyAsyncEventQueue(boolean initiator) {
     String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath);
 
     // Get the AsyncEventQueue
@@ -260,7 +260,7 @@ public abstract class LuceneIndexImpl implements 
InternalLuceneIndex {
     // Destroy the aeq (this also removes it from the GemFireCacheImpl)
     // The AsyncEventQueue can be null in an accessor member
     if (aeq != null) {
-      aeq.destroy();
+      aeq.destroy(initiator);
     }
     if (logger.isDebugEnabled()) {
       logger.debug("Destroyed aeqId=" + aeqId);

http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java
index 428301f..0bd9046 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunction.java
@@ -21,6 +21,8 @@ import java.util.Collection;
 
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.lucene.LuceneIndexDestroyedException;
+import org.apache.geode.cache.lucene.LuceneIndexNotFoundException;
 import org.apache.geode.cache.lucene.internal.LuceneIndexImpl;
 import org.apache.geode.cache.lucene.internal.LuceneIndexStats;
 import org.apache.geode.internal.cache.PrimaryBucketException;
@@ -75,6 +77,9 @@ public class LuceneQueryFunction implements Function, 
InternalEntity {
     }
 
     LuceneIndexImpl index = getLuceneIndex(region, searchContext);
+    if (index == null) {
+      throw new LuceneIndexNotFoundException(searchContext.getIndexName(), 
region.getFullPath());
+    }
     RepositoryManager repoManager = index.getRepositoryManager();
     LuceneIndexStats stats = index.getIndexStats();
 
@@ -131,10 +136,6 @@ public class LuceneQueryFunction implements Function, 
InternalEntity {
       throw new InternalFunctionInvocationTargetException(
           "Cache is closed when attempting to retrieve index:" + 
region.getFullPath(), e);
     }
-    if (index == null) {
-      throw new InternalFunctionInvocationTargetException(
-          "Index for Region:" + region.getFullPath() + " was not found");
-    }
     return index;
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java
index 694d3f1..16ac116 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java
@@ -32,7 +32,6 @@ import org.apache.geode.test.dunit.ThreadUtils;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.awaitility.Awaitility;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -54,10 +53,18 @@ import static org.junit.Assert.assertTrue;
 @RunWith(JUnitParamsRunner.class)
 public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest {
 
+  private static final String INDEX1_NAME = INDEX_NAME + "1";
+
+  private static final String INDEX2_NAME = INDEX_NAME + "2";
+
   private static volatile boolean STOP_PUTS = false;
 
+  private static volatile boolean STOP_QUERIES = false;
+
   private static int NUM_PUTS_COMPLETED = 0;
 
+  private static int NUM_QUERIES_COMPLETED = 0;
+
   protected VM accessor;
 
   @Override
@@ -202,7 +209,7 @@ public class LuceneIndexDestroyDUnitTest extends 
LuceneDUnitTest {
     // End puts
     dataStore1.invoke(() -> stopPuts());
 
-    // Wait for the putter to complete and verify no exception has occurred
+    // Wait for the putter to complete and verify no unexpected exception has 
occurred
     ThreadUtils.join(putter, 60 * 1000);
     if (putter.exceptionOccurred()) {
       fail(putter.getException());
@@ -214,6 +221,95 @@ public class LuceneIndexDestroyDUnitTest extends 
LuceneDUnitTest {
 
   @Test
   @Parameters(method = "getListOfRegionTestTypes")
+  public void verifyDestroySingleIndexWhileDoingQueries(RegionTestableType 
regionType)
+      throws Exception {
+    // Create index and region
+    SerializableRunnableIF createIndex = createIndex();
+    dataStore1.invoke(() -> initDataStore(createIndex, regionType));
+    dataStore2.invoke(() -> initDataStore(createIndex, regionType));
+    accessor.invoke(() -> initAccessor(createIndex, regionType));
+
+    // Verify index created
+    dataStore1.invoke(() -> verifyIndexCreated());
+    dataStore2.invoke(() -> verifyIndexCreated());
+    accessor.invoke(() -> verifyIndexCreated());
+
+    // Do puts
+    int numPuts = 100;
+    accessor.invoke(() -> doPuts(numPuts));
+
+    // Wait until queue is flushed
+    accessor.invoke(() -> waitUntilFlushed(INDEX_NAME));
+
+    // Start queries
+    AsyncInvocation querier = accessor
+        .invokeAsync(() -> doQueriesUntilException(INDEX_NAME, "field1Value", 
"field1", numPuts));
+
+    // Wait until queries have started
+    accessor.invoke(() -> waitUntilQueriesHaveStarted());
+
+    // Destroy index (only needs to be done on one member)
+    accessor.invoke(() -> destroyIndex());
+
+    // Verify index destroyed
+    dataStore1.invoke(() -> verifyIndexDestroyed());
+    dataStore2.invoke(() -> verifyIndexDestroyed());
+    accessor.invoke(() -> verifyIndexDestroyed());
+
+    // Wait for the querier to complete and verify no exception has occurred
+    ThreadUtils.join(querier, 60 * 1000);
+    if (querier.exceptionOccurred()) {
+      fail(querier.getException());
+    }
+  }
+
+  @Test
+  @Parameters(method = "getListOfRegionTestTypes")
+  public void verifyDestroyAllIndexesWhileDoingQueries(RegionTestableType 
regionType)
+      throws Exception {
+    // Create indexes and region
+    SerializableRunnableIF createIndexes = createIndexes();
+    dataStore1.invoke(() -> initDataStore(createIndexes, regionType));
+    dataStore2.invoke(() -> initDataStore(createIndexes, regionType));
+    accessor.invoke(() -> initAccessor(createIndexes, regionType));
+
+    // Verify indexes created
+    dataStore1.invoke(() -> verifyIndexesCreated());
+    dataStore2.invoke(() -> verifyIndexesCreated());
+    accessor.invoke(() -> verifyIndexesCreated());
+
+    // Do puts
+    int numPuts = 100;
+    accessor.invoke(() -> doPuts(numPuts));
+
+    // Wait until queues are flushed
+    accessor.invoke(() -> waitUntilFlushed(INDEX1_NAME));
+    accessor.invoke(() -> waitUntilFlushed(INDEX2_NAME));
+
+    // Start queries
+    AsyncInvocation querier = accessor
+        .invokeAsync(() -> doQueriesUntilException(INDEX1_NAME, "field1Value", 
"field1", numPuts));
+
+    // Wait until queries have started
+    accessor.invoke(() -> waitUntilQueriesHaveStarted());
+
+    // Destroy indexes (only needs to be done on one member)
+    accessor.invoke(() -> destroyIndexes());
+
+    // Verify indexes destroyed
+    dataStore1.invoke(() -> verifyIndexesDestroyed());
+    dataStore2.invoke(() -> verifyIndexesDestroyed());
+    accessor.invoke(() -> verifyIndexesDestroyed());
+
+    // Wait for the querier to complete and verify no unexpected exception has 
occurred
+    ThreadUtils.join(querier, 60 * 1000);
+    if (querier.exceptionOccurred()) {
+      fail(querier.getException());
+    }
+  }
+
+  @Test
+  @Parameters(method = "getListOfRegionTestTypes")
   public void verifyDestroyRecreateIndexSameName(RegionTestableType 
regionType) {
     // Create index and region
     SerializableRunnableIF createIndex = createIndex();
@@ -227,12 +323,11 @@ public class LuceneIndexDestroyDUnitTest extends 
LuceneDUnitTest {
     accessor.invoke(() -> verifyIndexCreated());
 
     // Do puts to cause IndexRepositories to be created
-    int numPuts = 10;
+    int numPuts = 100;
     accessor.invoke(() -> doPuts(numPuts));
 
     // Wait until queue is flushed
-    dataStore1.invoke(() -> waitUntilFlushed(INDEX_NAME));
-    dataStore2.invoke(() -> waitUntilFlushed(INDEX_NAME));
+    accessor.invoke(() -> waitUntilFlushed(INDEX_NAME));
 
     // Execute query and verify results
     accessor.invoke(() -> executeQuery(INDEX_NAME, "field1Value", "field1", 
numPuts));
@@ -282,12 +377,11 @@ public class LuceneIndexDestroyDUnitTest extends 
LuceneDUnitTest {
     accessor.invoke(() -> verifyIndexCreated());
 
     // Do puts to cause IndexRepositories to be created
-    int numPuts = 10;
+    int numPuts = 100;
     accessor.invoke(() -> doPuts(numPuts));
 
     // Wait until queue is flushed
-    dataStore1.invoke(() -> waitUntilFlushed(INDEX_NAME));
-    dataStore2.invoke(() -> waitUntilFlushed(INDEX_NAME));
+    accessor.invoke(() -> waitUntilFlushed(INDEX_NAME));
 
     // Execute query and verify results
     accessor.invoke(() -> executeQuery(INDEX_NAME, "field1Value", "field1", 
numPuts));
@@ -338,12 +432,11 @@ public class LuceneIndexDestroyDUnitTest extends 
LuceneDUnitTest {
     accessor.invoke(() -> verifyIndexCreated());
 
     // Do puts to cause IndexRepositories to be created
-    int numPuts = 10;
+    int numPuts = 1000;
     accessor.invoke(() -> doPuts(numPuts));
 
     // Wait until queue is flushed
-    dataStore1.invoke(() -> waitUntilFlushed(INDEX_NAME));
-    dataStore2.invoke(() -> waitUntilFlushed(INDEX_NAME));
+    accessor.invoke(() -> waitUntilFlushed(INDEX_NAME));
 
     // Execute query and verify results
     accessor.invoke(() -> executeQuery(INDEX_NAME, "field1Value", "field1", 
numPuts));
@@ -393,8 +486,8 @@ public class LuceneIndexDestroyDUnitTest extends 
LuceneDUnitTest {
   private SerializableRunnableIF createIndexes() {
     return () -> {
       LuceneService luceneService = LuceneServiceProvider.get(getCache());
-      luceneService.createIndexFactory().setFields("text").create(INDEX_NAME + 
"0", REGION_NAME);
-      luceneService.createIndexFactory().setFields("text").create(INDEX_NAME + 
"1", REGION_NAME);
+      
luceneService.createIndexFactory().setFields("field1").create(INDEX1_NAME, 
REGION_NAME);
+      
luceneService.createIndexFactory().setFields("field2").create(INDEX2_NAME, 
REGION_NAME);
     };
   }
 
@@ -405,8 +498,8 @@ public class LuceneIndexDestroyDUnitTest extends 
LuceneDUnitTest {
 
   private void verifyIndexesCreated() {
     LuceneService luceneService = LuceneServiceProvider.get(getCache());
-    assertNotNull(luceneService.getIndex(INDEX_NAME + "0", REGION_NAME));
-    assertNotNull(luceneService.getIndex(INDEX_NAME + "1", REGION_NAME));
+    assertNotNull(luceneService.getIndex(INDEX1_NAME, REGION_NAME));
+    assertNotNull(luceneService.getIndex(INDEX2_NAME, REGION_NAME));
   }
 
   private void waitUntilFlushed(String indexName) throws Exception {
@@ -432,6 +525,21 @@ public class LuceneIndexDestroyDUnitTest extends 
LuceneDUnitTest {
     }
   }
 
+  private void doQueriesUntilException(String indexName, String queryString, 
String field,
+      int expectedResultsSize) throws Exception {
+    allowQueries();
+    int i = 0;
+    while (!STOP_QUERIES) {
+      try {
+        executeQuery(indexName, queryString, field, expectedResultsSize);
+        NUM_QUERIES_COMPLETED += 1;
+      } catch (LuceneIndexDestroyedException
+          | LuceneIndexNotFoundException e /* expected exceptions */) {
+        stopQueries();
+      }
+    }
+  }
+
   private static void stopPuts() {
     STOP_PUTS = true;
   }
@@ -440,6 +548,14 @@ public class LuceneIndexDestroyDUnitTest extends 
LuceneDUnitTest {
     STOP_PUTS = false;
   }
 
+  private static void stopQueries() {
+    STOP_QUERIES = true;
+  }
+
+  private static void allowQueries() {
+    STOP_QUERIES = false;
+  }
+
   private void verifyRegionSize() {
     assertEquals(NUM_PUTS_COMPLETED, getCache().getRegion(REGION_NAME).size());
   }
@@ -449,11 +565,15 @@ public class LuceneIndexDestroyDUnitTest extends 
LuceneDUnitTest {
         .until(() -> getCache().getRegion(REGION_NAME).size() > 0);
   }
 
+  private void waitUntilQueriesHaveStarted() {
+    Awaitility.waitAtMost(30, TimeUnit.SECONDS).until(() -> 
NUM_QUERIES_COMPLETED > 0);
+  }
+
   private void executeQuery(String indexName, String queryString, String field,
       int expectedResultsSize) throws LuceneQueryException {
     LuceneService luceneService = LuceneServiceProvider.get(getCache());
-    LuceneQuery query =
-        luceneService.createLuceneQueryFactory().create(indexName, 
REGION_NAME, queryString, field);
+    LuceneQuery query = 
luceneService.createLuceneQueryFactory().setResultLimit(expectedResultsSize)
+        .create(indexName, REGION_NAME, queryString, field);
     Collection results = query.findValues();
     assertEquals(expectedResultsSize, results.size());
   }
@@ -488,8 +608,8 @@ public class LuceneIndexDestroyDUnitTest extends 
LuceneDUnitTest {
   }
 
   private void verifyIndexesDestroyed() {
-    verifyIndexDestroyed(INDEX_NAME + "0");
-    verifyIndexDestroyed(INDEX_NAME + "1");
+    verifyIndexDestroyed(INDEX1_NAME);
+    verifyIndexDestroyed(INDEX2_NAME);
   }
 
   private void verifyIndexDestroyed(String indexName) {

http://git-wip-us.apache.org/repos/asf/geode/blob/99e61ffa/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
index 18621d5..0283ffa 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneQueryFunctionJUnitTest.java
@@ -28,6 +28,7 @@ import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.ResultSender;
+import org.apache.geode.cache.lucene.LuceneIndexNotFoundException;
 import org.apache.geode.cache.lucene.LuceneQueryException;
 import org.apache.geode.cache.lucene.LuceneQueryFactory;
 import org.apache.geode.cache.lucene.LuceneQueryProvider;
@@ -210,7 +211,7 @@ public class LuceneQueryFunctionJUnitTest {
     function.execute(mockContext);
   }
 
-  @Test(expected = InternalFunctionInvocationTargetException.class)
+  @Test(expected = LuceneIndexNotFoundException.class)
   public void 
whenServiceReturnsNullIndexDuringQueryExecutionFunctionExceptionShouldBeThrown()
       throws Exception {
     when(mockContext.getDataSet()).thenReturn(mockRegion);

Reply via email to