This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 002efec Revert "GEODE-577: rewrite QueryMonitorDUnitTest (#2179)" 002efec is described below commit 002efecdb188b24977374f12cbac3aaab955e81c Author: Jinmei Liao <jil...@pivotal.io> AuthorDate: Mon Aug 6 14:19:09 2018 -0700 Revert "GEODE-577: rewrite QueryMonitorDUnitTest (#2179)" This reverts commit 38e1714 --- .../ResourceManagerWithQueryMonitorDUnitTest.java | 23 +- .../geode/cache/query/internal/DefaultQuery.java | 10 +- .../geode/cache/query/internal/QueryMonitor.java | 102 +- .../cache/PartitionedRegionQueryEvaluator.java | 6 +- .../cache/query/internal/QueryMonitorTest.java | 91 -- .../cache/query/dunit/QueryMonitorDUnitTest.java | 1299 +++++++++++++++----- 6 files changed, 1074 insertions(+), 457 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java index 80f2192..598bde1 100755 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/ResourceManagerWithQueryMonitorDUnitTest.java @@ -870,7 +870,7 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa ServerOperationException soe = (ServerOperationException) e; if (soe.getRootCause() instanceof QueryException) { QueryException qe = (QueryException) soe.getRootCause(); - if (isExceptionDueToTimeout(qe)) { + if (isExceptionDueToTimeout(qe, queryTimeout)) { logger.info("Query Execution must be terminated by a timeout. Expected behavior"); return 0; } @@ -952,7 +952,7 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa // meaning the query should not be canceled due to low memory throw new CacheException("Query should not have been canceled due to memory") {}; } - } else if (isExceptionDueToTimeout((QueryException) e)) { + } else if (isExceptionDueToTimeout((QueryException) e, queryTimeout)) { if (queryTimeout == -1) { // no time out set, this should not be thrown @@ -976,7 +976,7 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa // meaning the query should not be canceled due to low memory throw new CacheException("Query should not have been canceled due to memory") {}; } - } else if (isExceptionDueToTimeout(qe)) { + } else if (isExceptionDueToTimeout(qe, queryTimeout)) { if (queryTimeout == -1) { e.printStackTrace(); // no time out set, this should not be thrown @@ -1219,12 +1219,13 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa .toLocalizedString())); } - private boolean isExceptionDueToTimeout(QueryException e) { + private boolean isExceptionDueToTimeout(QueryException e, long queryTimeout) { String message = e.getMessage(); // -1 needs to be matched due to client/server set up, BaseCommand uses the // MAX_QUERY_EXECUTION_TIME and not the testMaxQueryExecutionTime return (message.contains("The QueryMonitor thread may be sleeping longer than") - || message.contains("Query execution cancelled after exceeding max execution time") + || message.contains(LocalizedStrings.QueryMonitor_LONG_RUNNING_QUERY_CANCELED + .toLocalizedString(queryTimeout)) || message.contains( LocalizedStrings.QueryMonitor_LONG_RUNNING_QUERY_CANCELED.toLocalizedString(-1))); } @@ -1260,6 +1261,10 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa } } + public void doTestHook(String description) { + + } + public void countDown() { latch.countDown(); } @@ -1283,6 +1288,10 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa } } } + + public void doTestHook(String description) { + + } } private class CancelDuringAddResultsHook implements DefaultQuery.TestHook { @@ -1306,5 +1315,9 @@ public class ResourceManagerWithQueryMonitorDUnitTest extends ClientServerTestCa rejectedObjects = true; } } + + public void doTestHook(String description) { + + } } } diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java index 1053d8d..fad0e0f 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java @@ -716,9 +716,11 @@ public class DefaultQuery implements Query { /** * The query gets canceled by the QueryMonitor with the reason being specified + * <p> + * TODO: parameter isCanceled is always true */ - public void setCanceled(CacheRuntimeException canceledException) { - this.isCanceled = true; + public void setCanceled(boolean isCanceled, CacheRuntimeException canceledException) { + this.isCanceled = isCanceled; this.canceledException = canceledException; } @@ -983,8 +985,8 @@ public class DefaultQuery implements Query { } public interface TestHook { - default void doTestHook(int spot) {}; + void doTestHook(int spot); - default void doTestHook(String spot) {}; + void doTestHook(String spot); } } diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java index f7750bc..ccba334 100755 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryMonitor.java @@ -14,16 +14,21 @@ */ package org.apache.geode.cache.query.internal; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.logging.log4j.Logger; +import org.apache.geode.cache.query.Query; import org.apache.geode.cache.query.QueryExecutionLowMemoryException; import org.apache.geode.cache.query.QueryExecutionTimeoutException; +import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; +import org.apache.geode.internal.logging.log4j.LocalizedMessage; /** * QueryMonitor class, monitors the query execution time. Instantiated based on the system property @@ -41,12 +46,13 @@ public class QueryMonitor implements Runnable { private static final Logger logger = LogService.getLogger(); private final InternalCache cache; + private boolean testingQueryMonitor = false; /** * Holds the query execution status for the thread executing the query. FALSE if the query is not * canceled due to max query execution timeout. TRUE it the query is canceled due to max query * execution timeout timeout. */ - private static final ThreadLocal<AtomicBoolean> queryCancelled = + private static final ThreadLocal<AtomicBoolean> queryExecutionStatus = ThreadLocal.withInitial(() -> new AtomicBoolean(Boolean.FALSE)); private final long maxQueryExecutionTime; @@ -57,6 +63,9 @@ public class QueryMonitor implements Runnable { private final AtomicBoolean stopped = new AtomicBoolean(Boolean.FALSE); + /** For DUnit test purpose TODO: delete this ConcurrentMap */ + private ConcurrentMap queryMonitorTasks = null; + // Variables for cancelling queries due to low memory private static volatile Boolean LOW_MEMORY = Boolean.FALSE; @@ -73,19 +82,14 @@ public class QueryMonitor implements Runnable { * @param queryThread Thread executing the query. * @param query Query. */ - public void monitorQueryThread(Thread queryThread, DefaultQuery query) { - // cq query is not monitored - if (query.isCqQuery()) { - return; - } - + public void monitorQueryThread(Thread queryThread, Query query) { if (LOW_MEMORY) { String reason = LocalizedStrings.QueryMonitor_LOW_MEMORY_CANCELED_QUERY .toLocalizedString(LOW_MEMORY_USED_BYTES); - query.setCanceled(new QueryExecutionLowMemoryException(reason)); + ((DefaultQuery) query).setCanceled(true, new QueryExecutionLowMemoryException(reason)); throw new QueryExecutionLowMemoryException(reason); } - QueryThreadTask queryTask = new QueryThreadTask(queryThread, query, queryCancelled.get()); + QueryThreadTask queryTask = new QueryThreadTask(queryThread, query, queryExecutionStatus.get()); synchronized (queryThreads) { queryThreads.add(queryTask); queryThreads.notifyAll(); @@ -97,19 +101,28 @@ public class QueryMonitor implements Runnable { queryThreads.size(), queryThread.getId(), query.getQueryString(), queryThread); } + // For dunit test purpose + if (cache != null && testingQueryMonitor) { + if (this.queryMonitorTasks == null) { + this.queryMonitorTasks = new ConcurrentHashMap(); + } + this.queryMonitorTasks.put(queryThread, queryTask); + } } /** * Stops monitoring the query. Removes the passed thread from QueryMonitor queue. */ - public void stopMonitoringQueryThread(Thread queryThread, DefaultQuery query) { + public void stopMonitoringQueryThread(Thread queryThread, Query query) { // Re-Set the queryExecution status on the LocalThread. QueryExecutionTimeoutException testException = null; - boolean[] queryCompleted = query.getQueryCompletedForMonitoring(); + DefaultQuery defaultQuery = (DefaultQuery) query; + boolean[] queryCompleted = defaultQuery.getQueryCompletedForMonitoring(); synchronized (queryCompleted) { - queryCancelled.get().getAndSet(Boolean.FALSE); - query.setQueryCompletedForMonitoring(true); + queryExecutionStatus.get().getAndSet(Boolean.FALSE); + + defaultQuery.setQueryCompletedForMonitoring(true); // Remove the query task from the queue. queryThreads.remove(new QueryThreadTask(queryThread, null, null)); } @@ -134,7 +147,7 @@ public class QueryMonitor implements Runnable { * gemfire.Cache.MAX_QUERY_EXECUTION_TIME */ public static void isQueryExecutionCanceled() { - if (queryCancelled.get() != null && queryCancelled.get().get()) { + if (queryExecutionStatus.get() != null && queryExecutionStatus.get().get()) { throw new QueryExecutionCanceledException(); } } @@ -169,21 +182,24 @@ public class QueryMonitor implements Runnable { try { QueryThreadTask queryTask; long sleepTime; + // TODO: while-block cannot complete without throwing while (true) { // Get the first query task from the queue. This query will have the shortest // remaining time that needs to canceled first. queryTask = (QueryThreadTask) queryThreads.peek(); if (queryTask == null) { + // Empty queue. synchronized (queryThreads) { queryThreads.wait(); } continue; } - long executionTime = System.currentTimeMillis() - queryTask.StartTime; + long currentTime = System.currentTimeMillis(); + // Check if the sleepTime is greater than the remaining query execution time. - if (executionTime < this.maxQueryExecutionTime) { - sleepTime = this.maxQueryExecutionTime - executionTime; + if (currentTime - queryTask.StartTime < this.maxQueryExecutionTime) { + sleepTime = this.maxQueryExecutionTime - (currentTime - queryTask.StartTime); // Its been noted that the sleep is not guaranteed to wait for the specified // time (as stated in Suns doc too), it depends on the OSs thread scheduling // behavior, hence thread may sleep for longer than the specified time. @@ -191,23 +207,33 @@ public class QueryMonitor implements Runnable { Thread.sleep(sleepTime); continue; } - // Query execution has taken more than the max time, Set queryCancelled flag - // to true. - boolean[] queryCompleted = queryTask.query.getQueryCompletedForMonitoring(); + + // Query execution has taken more than the max time, Set queryExecutionStatus flag + // to canceled (TRUE). + boolean[] queryCompleted = + ((DefaultQuery) queryTask.query).getQueryCompletedForMonitoring(); synchronized (queryCompleted) { - // check if query is already completed - if (!queryCompleted[0]) { - queryTask.query.setCanceled(new QueryExecutionTimeoutException(String - .format("Query execution cancelled after exceeding max execution time %sms.", - this.maxQueryExecutionTime))); - queryTask.queryCancelled.set(Boolean.TRUE); - // remove the query threads from monitoring queue + if (!queryCompleted[0] && !((DefaultQuery) queryTask.query).isCqQuery()) { // Check if the + // query is + // already + // completed. + ((DefaultQuery) queryTask.query).setCanceled(true, + new QueryExecutionTimeoutException( + LocalizedStrings.QueryMonitor_LONG_RUNNING_QUERY_CANCELED + .toLocalizedString(GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME))); + queryTask.queryExecutionStatus.set(Boolean.TRUE); + // Remove the task from queue. queryThreads.poll(); - logger.info(String.format( - "%s is set as canceled after %s milliseconds", queryTask.toString(), - executionTime)); } } + + logger.info(LocalizedMessage.create( + LocalizedStrings.GemFireCache_LONG_RUNNING_QUERY_EXECUTION_CANCELED, + new Object[] {queryTask.query.getQueryString(), queryTask.queryThread.getId()})); + + if (logger.isDebugEnabled()) { + logger.debug("Query Execution for the thread {} got canceled.", queryTask.queryThread); + } } } catch (InterruptedException ignore) { if (logger.isDebugEnabled()) { @@ -249,15 +275,15 @@ public class QueryMonitor implements Runnable { } private void cancelQueryDueToLowMemory(QueryThreadTask queryTask, long memoryThreshold) { - boolean[] queryCompleted = queryTask.query.getQueryCompletedForMonitoring(); + boolean[] queryCompleted = ((DefaultQuery) queryTask.query).getQueryCompletedForMonitoring(); synchronized (queryCompleted) { if (!queryCompleted[0]) { // cancel if query is not completed String reason = LocalizedStrings.QueryMonitor_LOW_MEMORY_CANCELED_QUERY .toLocalizedString(memoryThreshold); - queryTask.query.setCanceled( + ((DefaultQuery) queryTask.query).setCanceled(true, new QueryExecutionLowMemoryException(reason)); - queryTask.queryCancelled.set(Boolean.TRUE); + queryTask.queryExecutionStatus.set(Boolean.TRUE); } } } @@ -279,16 +305,16 @@ public class QueryMonitor implements Runnable { final Thread queryThread; // package-private to avoid synthetic accessor - final DefaultQuery query; + final Query query; // package-private to avoid synthetic accessor - final AtomicBoolean queryCancelled; + final AtomicBoolean queryExecutionStatus; - QueryThreadTask(Thread queryThread, DefaultQuery query, AtomicBoolean queryCancelled) { + QueryThreadTask(Thread queryThread, Query query, AtomicBoolean queryExecutionStatus) { this.StartTime = System.currentTimeMillis(); this.queryThread = queryThread; this.query = query; - this.queryCancelled = queryCancelled; + this.queryExecutionStatus = queryExecutionStatus; } @Override @@ -315,7 +341,7 @@ public class QueryMonitor implements Runnable { return new StringBuilder().append("QueryThreadTask[StartTime:").append(this.StartTime) .append(", queryThread:").append(this.queryThread).append(", threadId:") .append(this.queryThread.getId()).append(", query:").append(this.query.getQueryString()) - .append(", queryCancelled:").append(this.queryCancelled).append(']') + .append(", queryExecutionStatus:").append(this.queryExecutionStatus).append(']') .toString(); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java index 76262fc..8a8c61c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java @@ -241,7 +241,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation String reason = LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION .toLocalizedString(); - query.setCanceled(new QueryExecutionLowMemoryException(reason)); + query.setCanceled(true, new QueryExecutionLowMemoryException(reason)); } else { if (logger.isDebugEnabled()) { logger.debug("query cancelled while gathering results, aborting due to exception " @@ -762,7 +762,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation String reason = LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION .toLocalizedString(); - query.setCanceled(new QueryExecutionLowMemoryException(reason)); + query.setCanceled(true, new QueryExecutionLowMemoryException(reason)); if (DefaultQuery.testHook != null) { DefaultQuery.testHook.doTestHook(5); } @@ -1103,7 +1103,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation String reason = LocalizedStrings.QueryMonitor_LOW_MEMORY_WHILE_GATHERING_RESULTS_FROM_PARTITION_REGION .toLocalizedString(); - query.setCanceled(new QueryExecutionLowMemoryException(reason)); + query.setCanceled(true, new QueryExecutionLowMemoryException(reason)); this.abort = true; } diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java deleted file mode 100644 index ea04c18..0000000 --- a/geode-core/src/test/java/org/apache/geode/cache/query/internal/QueryMonitorTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.cache.query.internal; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.List; - -import org.awaitility.Awaitility; -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.geode.internal.cache.InternalCache; - -/** - * although max_execution_time is set as 10ms, the monitor thread can sleep more than the specified - * time, so query will be cancelled at un-deterministic time after 10ms. We cannot assert on - * specific time at which the query will be cancelled. We can only assert that the query will be - * cancelled at one point after 10ms. - */ -public class QueryMonitorTest { - - private static InternalCache cache; - private static QueryMonitor monitor; - private static long max_execution_time = 5; - - @BeforeClass - public static void setUp() { - cache = mock(InternalCache.class); - monitor = new QueryMonitor(cache, max_execution_time); - Thread monitorThread = new Thread(() -> monitor.run(), "query monitor thread"); - monitorThread.setDaemon(true); - monitorThread.start(); - } - - @Test - public void queryIsCancelled() { - List<DefaultQuery> queries = new ArrayList<>(); - List<Thread> threads = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - DefaultQuery query = new DefaultQuery("query" + i, cache, false); - queries.add(query); - Thread queryExecutionThread = createQueryExecutionThread(i); - threads.add(queryExecutionThread); - monitor.monitorQueryThread(queryExecutionThread, query); - } - - for (DefaultQuery query : queries) { - // make sure the isCancelled flag in Query is set correctly - Awaitility.await().until(() -> query.isCanceled()); - } - Awaitility.await().until(() -> monitor.getQueryMonitorThreadCount() == 0); - // make sure all thread died - for (Thread thread : threads) { - Awaitility.await().until(() -> !thread.isAlive()); - } - } - - @Test - public void cqQueryIsNotMonitored() { - DefaultQuery query = mock(DefaultQuery.class); - when(query.isCqQuery()).thenReturn(true); - monitor.monitorQueryThread(mock(Thread.class), query); - assertThat(monitor.getQueryMonitorThreadCount()).isEqualTo(0); - } - - private Thread createQueryExecutionThread(int i) { - Thread thread = new Thread(() -> { - // make sure the threadlocal variable is updated - Awaitility.await().until(() -> QueryMonitor.isQueryExecutionCanceled()); - }); - thread.setName("query" + i); - return thread; - } - -} diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java index 871c0b4..4bcfd01 100644 --- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java +++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryMonitorDUnitTest.java @@ -12,23 +12,31 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ + package org.apache.geode.cache.query.dunit; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import java.io.File; -import java.util.concurrent.TimeUnit; +import java.io.IOException; +import java.io.Serializable; +import org.apache.logging.log4j.Logger; import org.junit.After; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheException; import org.apache.geode.cache.DiskStoreFactory; import org.apache.geode.cache.EvictionAction; import org.apache.geode.cache.EvictionAttributes; +import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.query.CqAttributes; @@ -41,211 +49,804 @@ import org.apache.geode.cache.query.QueryService; import org.apache.geode.cache.query.cq.dunit.CqQueryTestListener; import org.apache.geode.cache.query.data.Portfolio; import org.apache.geode.cache.query.internal.DefaultQuery; +import org.apache.geode.cache.query.internal.QueryMonitor; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache30.CacheSerializableRunnable; +import org.apache.geode.cache30.CacheTestCase; import org.apache.geode.internal.cache.GemFireCacheImpl; -import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.test.dunit.Assert; import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.LogWriterUtils; -import org.apache.geode.test.dunit.rules.ClientVM; -import org.apache.geode.test.dunit.rules.ClusterStartupRule; -import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.dunit.NetworkUtils; +import org.apache.geode.test.dunit.SerializableRunnable; +import org.apache.geode.test.dunit.ThreadUtils; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.Wait; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.ClientCacheRule; +import org.apache.geode.test.dunit.rules.DistributedTestRule; import org.apache.geode.test.junit.categories.OQLQueryTest; -import org.apache.geode.test.junit.rules.GfshCommandRule; -import org.apache.geode.test.junit.rules.VMProvider; /** - * These tests add a test hook to make sure query execution sleeps for at least 20 ms, so the - * queryMonitor thread will get a chance to cancel the query before it completes. - * - * The MAX_QUERY_EXECUTION_TIME is set as 1 ms, i.e, theoretically all queries will be cancelled - * after 1ms, but due to thread scheduling, this may not be true. we can only decrease the flakyness - * of the test by making MAX_QUERY_EXECUTION_TIME the smallest possible (1) and making the query - * execution time longer (but not too long to make the test run too slow). + * Tests for QueryMonitoring service. * + * @since GemFire 6.0 */ @Category({OQLQueryTest.class}) -public class QueryMonitorDUnitTest { - private static int MAX_QUERY_EXECUTE_TIME = 1; +public class QueryMonitorDUnitTest implements Serializable { + + private static final Logger logger = LogService.getLogger(); + + @Rule + public DistributedTestRule distributedTestRule = new DistributedTestRule(); + @Rule - public ClusterStartupRule cluster = new ClusterStartupRule(5); + public CacheRule cacheRule = new CacheRule(); @Rule - public GfshCommandRule gfsh = new GfshCommandRule(); + public ClientCacheRule clientCacheRule = new ClientCacheRule(); + + private final String exampleRegionName = "exampleRegion"; + + + /* Some of the queries are commented out as they were taking less time */ + String[] queryStr = {"SELECT ID FROM /exampleRegion p WHERE p.ID > 100", + "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE x.pk != '1000'", + "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE x.pkid != '1'", + "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE p.pk > '1'", + "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE p.pkid != '53'", + "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.Id > 100", + "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.Id > 100 and pos.secId IN SET('YHOO', 'IBM', 'AMZN')", + "SELECT * FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active' and p.ID < 100000", + "SELECT * FROM /exampleRegion WHERE ID > 100 and status = 'active'", + "SELECT DISTINCT * FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active' and p.ID < 100000", + "SELECT DISTINCT ID FROM /exampleRegion WHERE status = 'active'", + "SELECT DISTINCT ID FROM /exampleRegion p WHERE p.status = 'active'", + "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.secId IN SET('YHOO', 'IBM', 'AMZN')", + "SELECT DISTINCT proj1:p, proj2:itrX FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos" + + " WHERE pos.secId = 'YHOO') as itrX", + "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos" + + " WHERE pos.secId = 'YHOO') as itrX", + "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT p.ID FROM /exampleRegion x" + + " WHERE x.ID = p.ID) as itrX", + "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion x, x.positions.values pos" + + " WHERE x.ID = p.ID) as itrX", + "SELECT DISTINCT x.ID FROM /exampleRegion x, x.positions.values v WHERE " + + "v.secId = element(SELECT DISTINCT vals.secId FROM /exampleRegion p, p.positions.values vals WHERE vals.secId = 'YHOO')", + "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (true AND pos.secId ='IBM')", + "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (true AND pos.secId !='IBM')", + "SELECT DISTINCT structset.sos, structset.key " + + "FROM /exampleRegion p, p.positions.values outerPos, " + + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding " + + "FROM /exampleRegion.entries pf, pf.value.positions.values pos " + + "where outerPos.secId != 'IBM' AND " + + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset " + + "where structset.sos > 2000", + "SELECT DISTINCT * " + "FROM /exampleRegion p, p.positions.values outerPos, " + + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding " + + "FROM /exampleRegion.entries pf, pf.value.positions.values pos " + + "where outerPos.secId != 'IBM' AND " + + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset " + + "where structset.sos > 2000", + "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values position " + + "WHERE (true = null OR position.secId = 'SUN') AND true",}; + + String[] prQueryStr = { + "SELECT ID FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active'", + "SELECT * FROM /exampleRegion WHERE ID > 100 and status = 'active'", + "SELECT DISTINCT * FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active' and p.ID < 100000", + "SELECT DISTINCT p.ID FROM /exampleRegion p WHERE p.ID > 100 and p.ID < 100000 and p.status = 'active'", + "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (pos.secId != 'IBM')",}; + + private int numServers; + + @After + public final void preTearDownCacheTestCase() throws Exception { + Host host = Host.getHost(0); + // shut down clients before servers + for (int i = numServers; i < 4; i++) { + host.getVM(i).invoke(() -> CacheTestCase.disconnectFromDS()); + } + } + + public void setup(int numServers) throws Exception { + Host host = Host.getHost(0); + this.numServers = numServers; + } + + public void createExampleRegion() { + createExampleRegion(false, null); + } + + private void createExampleRegion(final boolean eviction, final String dirName) { + RegionFactory regionFactory = + cacheRule.getCache().createRegionFactory(RegionShortcut.REPLICATE); + + // setting the eviction attributes. + if (eviction) { + File[] f = new File[1]; + f[0] = new File(dirName); + f[0].mkdir(); + DiskStoreFactory dsf = cacheRule.getCache().createDiskStoreFactory(); + dsf.setDiskDirs(f).create("ds1"); + EvictionAttributes evictAttrs = + EvictionAttributes.createLRUEntryAttributes(100, EvictionAction.OVERFLOW_TO_DISK); + regionFactory.setDiskStoreName("ds1").setEvictionAttributes(evictAttrs); + } + regionFactory.create(exampleRegionName); + } + + private void createExamplePRRegion() { + RegionFactory regionFactory = + cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION); + + AttributesFactory factory = new AttributesFactory(); + // factory.setDataPolicy(DataPolicy.PARTITION); + regionFactory + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(8).create()); + regionFactory.create(exampleRegionName); + } + + private int configServer(final int queryMonitorTime, final String testName) throws IOException { + cacheRule.createCache(); + CacheServer cacheServer = cacheRule.getCache().addCacheServer(); + cacheServer.setPort(0); + cacheServer.start(); + GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = queryMonitorTime; + cacheRule.getCache().getLogger().fine("#### RUNNING TEST : " + testName); + DefaultQuery.testHook = new QueryTimeoutHook(queryMonitorTime); + return cacheServer.getPort(); + } + + // Stop server + private void stopServer(VM server) { + SerializableRunnable stopServer = new SerializableRunnable("Stop CacheServer") { + public void run() { + // Reset the test flag. + Cache cache = cacheRule.getCache(); + DefaultQuery.testHook = null; + GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = -1; + stopBridgeServer(cacheRule.getCache()); + } + }; + server.invoke(stopServer); + } + + private void configClient(String host, int... ports) { + configClient(false, host, ports); + } + + private void configClient(boolean enableSubscriptions, String host, int... ports) { + ClientCacheFactory clientCacheFactory = new ClientCacheFactory(); + for (int port : ports) { + clientCacheFactory.addPoolServer(host, port); + } + clientCacheFactory.setPoolSubscriptionEnabled(true); + clientCacheFactory.setPoolReadTimeout(10 * 60 * 1000); // 10 mins + clientCacheRule.createClientCache(clientCacheFactory); + } - private MemberVM locator, server1, server2; - private ClientVM client3, client4; + private void verifyException(Exception e) { + e.printStackTrace(); + String error = e.getMessage(); + if (e.getCause() != null) { + error = e.getCause().getMessage(); + } - @Before - public void setUpServers() throws Exception { - locator = cluster.startLocatorVM(0, l -> l.withoutClusterConfigurationService()); - server1 = cluster.startServerVM(1, locator.getPort()); - server2 = cluster.startServerVM(2, locator.getPort()); + if (error.contains("Query execution cancelled after exceeding max execution time") + || error.contains("The Query completed sucessfully before it got canceled") + || error.contains("The QueryMonitor thread may be sleeping longer than the set sleep time") + || error.contains( + "The query task could not be found but the query is marked as having been canceled")) { + // Expected exception. + return; + } + + System.out.println("Unexpected exception:"); + if (e.getCause() != null) { + e.getCause().printStackTrace(); + } else { + e.printStackTrace(); + } - // configure the server to make the query to wait for at least 1 second in every execution spot - // and set a MAX_QUERY_EXECUTION_TIME to be 10ms - VMProvider.invokeInEveryMember(() -> { - DefaultQuery.testHook = new QueryTimeoutHook(); - GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = MAX_QUERY_EXECUTE_TIME; - }, server1, server2); - gfsh.connectAndVerify(locator); + fail("Expected exception Not found. Expected exception with message: \n" + + "\"Query execution taking more than the max execution time\"" + "\n Found \n" + error); } + /** + * Tests query execution from client to server (single server). + */ @Test - public void testMultipleClientToOneServer() throws Exception { - int server1Port = server1.getPort(); - client3 = cluster.startClientVM(3, true, server1Port); - client4 = cluster.startClientVM(4, true, server1Port); + public void testQueryMonitorClientServer() throws Exception { + + setup(1); + + final Host host = Host.getHost(0); + + VM server = host.getVM(0); + VM client1 = host.getVM(1); + VM client2 = host.getVM(2); + VM client3 = host.getVM(3); + + final int numberOfEntries = 100; + String serverHostName = NetworkUtils.getServerHostName(host); + + // Start server + int serverPort = server.invoke("Create BridgeServer", + () -> configServer(10, "testQueryMonitorClientServer")); // All the queries taking more than + // 20ms should be canceled by Query + // monitor. + server.invoke("createExampleRegion", () -> createExampleRegion()); + + // Initialize server regions. + server.invoke("populatePortfolioRegions", () -> populatePortfolioRegions(numberOfEntries)); + + // Initialize Client1 + client1.invoke("Init client", () -> configClient(serverHostName, serverPort)); + + // Initialize Client2 + client2.invoke("Init client", () -> configClient(serverHostName, serverPort)); - gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE") - .statusIsSuccess(); + // Initialize Client3 + client3.invoke("Init client", () -> configClient(serverHostName, serverPort)); - locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2); - server1.invoke(() -> populateRegion(0, 100)); + // Execute client queries - // execute the query - VMProvider.invokeInEveryMember(() -> exuteQuery(), client3, client4); + client1.invoke("execute Queries", () -> executeQueriesFromClient(10)); + client2.invoke("execute Queries", () -> executeQueriesFromClient(10)); + client3.invoke("execute Queries", () -> executeQueriesFromClient(10)); + + stopServer(server); + } + + private void executeQueriesFromClient(int timeout) { + GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = timeout; + QueryService queryService = clientCacheRule.getClientCache().getQueryService(); + executeQueriesAgainstQueryService(queryService); + } + + private void executeQueriesOnServer() { + QueryService queryService = GemFireCacheImpl.getInstance().getQueryService(); + executeQueriesAgainstQueryService(queryService); } + private void executeQueriesAgainstQueryService(QueryService queryService) { + for (int k = 0; k < queryStr.length; k++) { + String qStr = queryStr[k]; + executeQuery(queryService, qStr); + } + } + + private void executeQuery(QueryService queryService, String qStr) { + try { + logger.info("Executing query :" + qStr); + Query query = queryService.newQuery(qStr); + query.execute(); + fail("The query should have been canceled by the QueryMonitor. Query: " + qStr); + } catch (Exception e) { + verifyException(e); + } + } + + /** + * Tests query execution from client to server (multi server). + */ @Test - public void testOneClientToMultipleServerOnReplicateRegion() throws Exception { - int server1Port = server1.getPort(); - int server2Port = server2.getPort(); - client3 = cluster.startClientVM(3, true, server1Port, server2Port); + public void testQueryMonitorMultiClientMultiServer() throws Exception { + + setup(2); + + final Host host = Host.getHost(0); + + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + VM client1 = host.getVM(2); + VM client2 = host.getVM(3); + + final int numberOfEntries = 100; + + String serverHostName = NetworkUtils.getServerHostName(host); + + // Start server + int serverPort1 = server1.invoke("Create BridgeServer", + () -> configServer(20, "testQueryMonitorMultiClientMultiServer"));// All the queries taking + // more than 20ms should + // be canceled by Query + // monitor. + server1.invoke("createExampleRegion", () -> createExampleRegion()); - gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE") - .statusIsSuccess(); + int serverPort2 = server2.invoke("Create BridgeServer", + () -> configServer(20, "testQueryMonitorMultiClientMultiServer"));// All the queries taking + // more than 20ms should + // be canceled by Query + // monitor. + server2.invoke("createExampleRegion", () -> createExampleRegion()); - locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2); - server1.invoke(() -> populateRegion(0, 100)); + // Initialize server regions. + server1.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries)); - // execute the query from client3 - client3.invoke(() -> exuteQuery()); + // Initialize server regions. + server2.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries)); + + // Initialize Client1 and create client regions. + client1.invoke("Init client", () -> configClient(serverHostName, serverPort1, serverPort2)); + // client1.invoke("createExampleRegion", () -> createExampleRegion()); + + // Initialize Client2 and create client regions. + client2.invoke("Init client", () -> configClient(serverHostName, serverPort1, serverPort2)); + // client2.invoke("createExampleRegion", () -> createExampleRegion()); + + // Execute client queries + + client1.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20)); + client2.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20)); + + stopServer(server1); + stopServer(server2); } + /** + * Tests query execution on local vm. + */ @Test - public void testOneClientToOneServerOnPartitionedRegion() throws Exception { - // client3 connects to server1, client4 connects to server2 - int server1Port = server1.getPort(); - int server2Port = server2.getPort(); - client3 = cluster.startClientVM(3, true, server1Port); - client4 = cluster.startClientVM(4, true, server2Port); + public void testQueryExecutionLocally() throws Exception { + + setup(2); + + final Host host = Host.getHost(0); + + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + + final int numberOfEntries = 100; + + // Start server + server1.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All + // the + // queries + // taking + // more + // than + // 20ms + // should + // be + // canceled + // by + // Query + // monitor. + server1.invoke("createExampleRegion", () -> createExampleRegion()); + + server2.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All + // the + // queries + // taking + // more + // than + // 20ms + // should + // be + // canceled + // by + // Query + // monitor. + server2.invoke("createExampleRegion", () -> createExampleRegion()); + + // Initialize server regions. + server1.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries)); + + // Initialize server regions. + server2.invoke("Create Bridge Server", () -> populatePortfolioRegions(numberOfEntries)); + + // Execute server queries + + server1.invoke("execute queries on Server", () -> executeQueriesOnServer()); + server2.invoke("execute queries on Server", () -> executeQueriesOnServer()); + + stopServer(server1); + stopServer(server2); + } - gfsh.executeAndAssertThat("create region --name=exampleRegion --type=PARTITION") - .statusIsSuccess(); + /** + * Tests query execution on local vm. + */ + @Test + public void testQueryExecutionLocallyAndCacheOp() throws Exception { + + setup(2); + + final Host host = Host.getHost(0); + + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + + final int numberOfEntries = 1000; + + // Start server + server1.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All + // the + // queries + // taking + // more + // than + // 20ms + // should + // be + // canceled + // by + // Query + // monitor. + server1.invoke("createExampleRegion", () -> createExampleRegion()); + + server2.invoke("Create BridgeServer", () -> configServer(20, "testQueryExecutionLocally"));// All + // the + // queries + // taking + // more + // than + // 20ms + // should + // be + // canceled + // by + // Query + // monitor. + server2.invoke("createExampleRegion", () -> createExampleRegion()); + + // Initialize server regions. + server1.invoke("populatePortfolioRegions", () -> populatePortfolioRegions(numberOfEntries)); + + // Initialize server regions. + server2.invoke("populatePortfolioRegions", () -> populatePortfolioRegions(numberOfEntries)); + + // Execute server queries + SerializableRunnable executeQuery = new CacheSerializableRunnable("Execute queries") { + public void run2() throws CacheException { + try { + QueryService queryService = GemFireCacheImpl.getInstance().getQueryService(); + String qStr = + "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion x, x.positions.values pos" + + " WHERE x.ID = p.ID) as itrX"; + executeQuery(queryService, qStr); + + // Create index and Perform cache op. Bug#44307 + queryService.createIndex("idIndex", "ID", "/exampleRegion"); + queryService.createIndex("statusIndex", "status", "/exampleRegion"); + Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName); + for (int i = (1 + 100); i <= (numberOfEntries + 200); i++) { + exampleRegion.put("" + i, new Portfolio(i)); + } + + } catch (Exception ex) { + Assert.fail("Exception creating the query service", ex); + } + } + }; + + server1.invoke(executeQuery); + server2.invoke(executeQuery); - locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2); - server1.invoke(() -> populateRegion(0, 100)); - server2.invoke(() -> populateRegion(100, 200)); + stopServer(server1); + stopServer(server2); + } - client3.invoke(() -> exuteQuery()); - client4.invoke(() -> exuteQuery()); + private void populatePortfolioRegions(int numberOfEntries) { + Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName);; + for (int i = (1 + 100); i <= (numberOfEntries + 100); i++) { + exampleRegion.put("" + i, new Portfolio(i)); + } } + /** + * Tests query execution from client to server (multiple server) on Partition Region . + */ @Test - public void testQueryExecutionFromServerAndPerformCacheOp() throws Exception { - gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE") - .statusIsSuccess(); - - locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2); - server1.invoke(() -> populateRegion(0, 100)); - - // execute the query from one server - server1.invoke(() -> exuteQuery()); - - // Create index and Perform cache op. Bug#44307 - server1.invoke(() -> { - QueryService queryService = ClusterStartupRule.getCache().getQueryService(); - queryService.createIndex("idIndex", "ID", "/exampleRegion"); - queryService.createIndex("statusIndex", "status", "/exampleRegion"); - populateRegion(100, 10); - }); + public void testQueryMonitorOnPR() throws Exception { + + setup(2); + + final Host host = Host.getHost(0); + + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + VM client1 = host.getVM(2); + VM client2 = host.getVM(3); + + final int numberOfEntries = 100; + + String serverHostName = NetworkUtils.getServerHostName(host); + + // Start server + int serverPort1 = server1.invoke("configServer", + () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries + // taking more than + // 100ms should be + // canceled by Query + // monitor. + server1.invoke("createExamplePRRegion", () -> createExamplePRRegion()); + + int serverPort2 = server2.invoke("configServer", + () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries + // taking more than + // 100ms should be + // canceled by Query + // monitor. + server2.invoke("createExamplePRRegion", () -> createExamplePRRegion()); + + // Initialize server regions. + server1.invoke("bulkInsertPorfolio", () -> bulkInsertPorfolio(101, numberOfEntries)); + + // Initialize server regions. + server2.invoke("bulkInsertPorfolio", () -> bulkInsertPorfolio((numberOfEntries + 100), + (numberOfEntries + numberOfEntries + 100))); + + // Initialize Client1 + client1.invoke("Init client", () -> configClient(serverHostName, serverPort1)); + + // Initialize Client2 + client2.invoke("Init client", () -> configClient(serverHostName, serverPort2)); + + // Execute client queries - // destroy indices created in this test - gfsh.executeAndAssertThat("destroy index --region=/exampleRegion").statusIsSuccess(); + client1.invoke("Execute Queries", () -> executeQueriesFromClient(20)); + client2.invoke("Execute Queries", () -> executeQueriesFromClient(20)); + + stopServer(server1); + stopServer(server2); + } + + private void bulkInsertPorfolio(int startingId, int numberOfEntries) { + Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName); + for (int i = startingId; i <= (numberOfEntries + 100); i++) { + exampleRegion.put("" + i, new Portfolio(i)); + } } + /** + * Tests query execution on Partition Region, executes query locally. + */ @Test - public void testQueryExecutionFromServerOnPartitionedRegion() throws Exception { - gfsh.executeAndAssertThat("create region --name=exampleRegion --type=PARTITION") - .statusIsSuccess(); + public void testQueryMonitorWithLocalQueryOnPR() throws Exception { + + setup(2); + + final Host host = Host.getHost(0); + + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + + final int numberOfEntries = 100; + + // Start server + server1.invoke("configServer", + () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries + // taking more than + // 100ms should be + // canceled by Query + // monitor. + server1.invoke("Create Partition Regions", () -> createExamplePRRegion()); + + server2.invoke("configServer", + () -> configServer(20, "testQueryMonitorMultiClientMultiServerOnPR"));// All the queries + // taking more than + // 100ms should be + // canceled by Query + // monitor. + server2.invoke("Create Partition Regions", () -> createExamplePRRegion()); + + // Initialize server regions. + server1.invoke(new CacheSerializableRunnable("Create Bridge Server") { + public void run2() throws CacheException { + bulkInsertPorfolio(101, numberOfEntries); + } + }); + + // Initialize server regions. + server2.invoke(new CacheSerializableRunnable("Create Bridge Server") { + public void run2() throws CacheException { + bulkInsertPorfolio((numberOfEntries + 100), (numberOfEntries + numberOfEntries + 100)); + } + }); - locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2); - server1.invoke(() -> populateRegion(100, 200)); - server2.invoke(() -> populateRegion(200, 300)); + // Execute client queries + server1.invoke("execute queries on server", () -> executeQueriesOnServer()); + server2.invoke("execute queries on server", () -> executeQueriesOnServer()); - // execute the query from one server - server1.invoke(() -> exuteQuery()); - server2.invoke(() -> exuteQuery()); + stopServer(server1); + stopServer(server2); } + /** + * Tests query execution from client to server (multiple server) with eviction to disk. + */ @Test - public void testQueryMonitorRegionWithEviction() throws Exception { - File server1WorkingDir = server1.getWorkingDir(); - File server2WorkingDir = server2.getWorkingDir(); - server1.invoke(() -> createReplicateRegionWithEviction(server1WorkingDir)); - server2.invoke(() -> createReplicateRegionWithEviction(server2WorkingDir)); - server1.invoke(() -> populateRegion(0, 100)); - server2.invoke(() -> populateRegion(100, 200)); - - // client3 connects to server1, client4 connects to server2 - int server1Port = server1.getPort(); - int server2Port = server2.getPort(); - client3 = cluster.startClientVM(3, ccf -> { - configureClientCacheFactory(ccf, server1Port); + public void testQueryMonitorRegionWithEviction() throws CacheException { + + final Host host = Host.getHost(0); + + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + VM client1 = host.getVM(2); + VM client2 = host.getVM(3); + + final int numberOfEntries = 100; + + String serverHostName = NetworkUtils.getServerHostName(host); + + // Start server + int serverPort1 = server1.invoke("Create BridgeServer", + () -> configServer(20, "testQueryMonitorRegionWithEviction"));// All the queries taking more + // than 20ms should be + // canceled by Query monitor. + server1.invoke("createExampleRegion", + () -> createExampleRegion(true, "server1_testQueryMonitorRegionWithEviction")); + + int serverPort2 = server2.invoke("Create BridgeServer", + () -> configServer(20, "testQueryMonitorRegionWithEviction"));// All the queries taking more + // than 20ms should be + // canceled by Query monitor. + server2.invoke("createExampleRegion", + () -> createExampleRegion(true, "server2_testQueryMonitorRegionWithEviction")); + + // Initialize server regions. + server1.invoke(new CacheSerializableRunnable("Create Bridge Server") { + public void run2() throws CacheException { + bulkInsertPorfolio(101, numberOfEntries); + } }); - client4 = cluster.startClientVM(4, ccf -> { - configureClientCacheFactory(ccf, server2Port); + // Initialize server regions. + server2.invoke(new CacheSerializableRunnable("Create Bridge Server") { + public void run2() throws CacheException { + bulkInsertPorfolio((numberOfEntries + 100), (numberOfEntries + numberOfEntries + 100)); + } }); - client3.invoke(() -> exuteQuery()); - client4.invoke(() -> exuteQuery()); + + // Initialize Client1 and create client regions. + client1.invoke("Init client", () -> configClient(serverHostName, serverPort1)); + + // Initialize Client2 and create client regions. + client2.invoke("Init client", () -> configClient(serverHostName, serverPort2)); + + // Execute client queries + client1.invoke("Execute Queries", () -> executeQueriesFromClient(20)); + client2.invoke("Execute Queries", () -> executeQueriesFromClient(20)); + + stopServer(server1); + stopServer(server2); } + /** + * Tests query execution on region with indexes. + */ @Test public void testQueryMonitorRegionWithIndex() throws Exception { - gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE") - .statusIsSuccess(); - locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2); - - // create the indices using API - VMProvider.invokeInEveryMember(() -> { - // create index. - QueryService cacheQS = ClusterStartupRule.getCache().getQueryService(); - cacheQS.createIndex("idIndex", "p.ID", "/exampleRegion p"); - cacheQS.createIndex("statusIndex", "p.status", "/exampleRegion p"); - cacheQS.createIndex("secIdIndex", "pos.secId", "/exampleRegion p, p.positions.values pos"); - cacheQS.createIndex("posIdIndex", "pos.Id", "/exampleRegion p, p.positions.values pos"); - cacheQS.createKeyIndex("pkIndex", "pk", "/exampleRegion"); - cacheQS.createKeyIndex("pkidIndex", "pkid", "/exampleRegion"); - populateRegion(0, 150); - }, server1, server2); - - // client3 connects to server1, client4 connects to server2 - int server1Port = server1.getPort(); - int server2Port = server2.getPort(); - client3 = cluster.startClientVM(3, true, server1Port); - client4 = cluster.startClientVM(4, true, server2Port); - - client3.invoke(() -> exuteQuery()); - client4.invoke(() -> exuteQuery()); + + setup(2); + + final Host host = Host.getHost(0); + + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + VM client1 = host.getVM(2); + VM client2 = host.getVM(3); + + final int numberOfEntries = 100; + + String serverHostName = NetworkUtils.getServerHostName(host); + + // Start server + int serverPort1 = + server1.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All + // the + // queries + // taking + // more + // than + // 20ms + // should + // be + // canceled + // by + // Query + // monitor. + server1.invoke("createExampleRegion", () -> createExampleRegion()); + + int serverPort2 = + server2.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All + // the + // queries + // taking + // more + // than + // 20ms + // should + // be + // canceled + // by + // Query + // monitor. + server2.invoke("createExampleRegion", () -> createExampleRegion()); + + // Initialize server regions. + server1.invoke("Create Indexes", () -> createIndexes(numberOfEntries)); + + // Initialize server regions. + server2.invoke("Create Indexes", () -> createIndexes(numberOfEntries)); + + // Initialize Client1 + client1.invoke("Init client", () -> configClient(serverHostName, serverPort1)); + + // Initialize Client2 + client2.invoke("Init client", () -> configClient(serverHostName, serverPort2)); + + // Execute client queries + client1.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20)); + client2.invoke("executeQueriesFromClient", () -> executeQueriesFromClient(20)); + + stopServer(server1); + stopServer(server2); + } + + private void createIndexes(int numberOfEntries) throws Exception { + Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName); + + // create index. + QueryService cacheQS = cacheRule.getCache().getQueryService(); + cacheQS.createIndex("idIndex", "p.ID", "/exampleRegion p"); + cacheQS.createIndex("statusIndex", "p.status", "/exampleRegion p"); + cacheQS.createIndex("secIdIndex", "pos.secId", "/exampleRegion p, p.positions.values pos"); + cacheQS.createIndex("posIdIndex", "pos.Id", "/exampleRegion p, p.positions.values pos"); + cacheQS.createKeyIndex("pkIndex", "pk", "/exampleRegion"); + cacheQS.createKeyIndex("pkidIndex", "pkid", "/exampleRegion"); + + for (int i = (1 + 100); i <= (numberOfEntries + 100); i++) { + exampleRegion.put("" + i, new Portfolio(i)); + } } + /** + * The following CQ test is added to make sure testMaxQueryExecutionTime is reset and is not + * affecting other query related tests. + * + */ @Test public void testCqExecuteDoesNotGetAffectedByTimeout() throws Exception { - gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE") - .statusIsSuccess(); - locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 2); - server1.invoke(() -> populateRegion(0, 100)); - - int server1Port = server1.getPort(); - client3 = cluster.startClientVM(3, ccf -> { - configureClientCacheFactory(ccf, server1Port); + setup(1); + + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + VM producerClient = host.getVM(2); + + // Start server + int serverPort = + server.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All + server.invoke("createExampleRegion", () -> createExampleRegion()); + + + final String host0 = NetworkUtils.getServerHostName(server.getHost()); + + // Create client. + client.invoke("createClient", () -> configClient(true, host.getHostName(), serverPort)); + + final int size = 10; + final String name = "testQuery_4"; + server.invoke(() -> { + Region region = cacheRule.getCache().getRegion(exampleRegionName); + for (int i = 1; i <= size; i++) { + region.put("key" + i, new Portfolio(i)); + } }); - client3.invoke(() -> { + // create and execute cq + client.invoke(() -> { String cqName = "testCQForQueryMonitorDUnitTest"; - String query = "select * from /exampleRegion"; + String query = "select * from /" + exampleRegionName; // Get CQ Service. - QueryService cqService = ClusterStartupRule.getClientCache().getQueryService(); + QueryService cqService = null; + cqService = clientCacheRule.getClientCache().getQueryService(); // Create CQ Attributes. CqAttributesFactory cqf = new CqAttributesFactory(); @@ -256,212 +857,278 @@ public class QueryMonitorDUnitTest { CqQuery cq1 = cqService.newCq(cqName, query, cqa); cq1.execute(); }); - - server1.invoke(() -> { - populateRegion(0, 150); - }); } @Test - public void testCacheOpAfterQueryCancel() throws Exception { - int locatorPort = locator.getPort(); - // start up more servers - MemberVM server3 = cluster.startServerVM(3, locatorPort); + public void testCqProcessingDoesNotGetAffectedByTimeout() throws Exception { + setup(1); - server3.invoke(() -> { - DefaultQuery.testHook = new QueryTimeoutHook(); - GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = MAX_QUERY_EXECUTE_TIME; - }); + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + VM producerClient = host.getVM(2); - gfsh.executeAndAssertThat("create region --name=exampleRegion --type=REPLICATE") - .statusIsSuccess(); - locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/exampleRegion", 3); + // Start server + int serverPort = + server.invoke("configServer", () -> configServer(20, "testQueryMonitorRegionWithIndex"));// All + server.invoke("createExampleRegion", () -> createExampleRegion()); - server1.invoke(() -> { - QueryService queryService = ClusterStartupRule.getCache().getQueryService(); - queryService.createIndex("statusIndex", "status", "/exampleRegion"); - queryService.createIndex("secIdIndex", "pos.secId", - "/exampleRegion p, p.positions.values pos"); - populateRegion(100, 1000); - }); - AsyncInvocation ai1 = server1.invokeAsync(() -> { - for (int j = 0; j < 5; j++) { - populateRegion(0, 2000); + final String host0 = NetworkUtils.getServerHostName(server.getHost()); + + // Create client. + client.invoke("createClient", () -> configClient(true, host.getHostName(), serverPort)); + + final int size = 10; + final String name = "testQuery_4"; + server.invoke(() -> { + Region region = cacheRule.getCache().getRegion(exampleRegionName); + for (int i = 1; i <= size; i++) { + region.put("key" + i, new Portfolio(i)); } }); - AsyncInvocation ai2 = server2.invokeAsync(() -> { - for (int j = 0; j < 5; j++) { - populateRegion(1000, 3000); + // create and execute cq + client.invoke(() -> { + String cqName = "testCQForQueryMonitorDUnitTest"; + String query = "select * from /" + exampleRegionName; + // Get CQ Service. + QueryService cqService = null; + cqService = clientCacheRule.getClientCache().getQueryService(); + + // Create CQ Attributes. + CqAttributesFactory cqf = new CqAttributesFactory(); + CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())}; + cqf.initCqListeners(cqListeners); + CqAttributes cqa = cqf.create(); + + CqQuery cq1 = cqService.newCq(cqName, query, cqa); + cq1.execute(); + }); + + server.invoke(() -> { + Region region = cacheRule.getCache().getRegion(exampleRegionName); + for (int i = 1; i <= size; i++) { + region.put("key" + i, new Portfolio(i)); } }); + } + + /** + * Tests cache operation right after query cancellation. + */ + @Test + public void testCacheOpAfterQueryCancel() throws Exception { + + setup(4); + + final Host host = Host.getHost(0); + + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + VM server3 = host.getVM(2); + VM server4 = host.getVM(3); - // server3 performs a region put after a query is canceled. - AsyncInvocation ai3 = server3.invokeAsync(() -> { - Region exampleRegion = ClusterStartupRule.getCache().getRegion("exampleRegion"); - QueryService queryService = GemFireCacheImpl.getInstance().getQueryService(); - String qStr = - "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values pos1, p.positions.values pos" - + " where p.ID < pos.sharesOutstanding OR p.ID > 0 OR p.position1.mktValue > 0 " - + " OR pos.secId in SET ('SUN', 'IBM', 'YHOO', 'GOOG', 'MSFT', " - + " 'AOL', 'APPL', 'ORCL', 'SAP', 'DELL', 'RHAT', 'NOVL', 'HP')" - + " order by p.status, p.ID desc"; - for (int i = 0; i < 100; i++) { + final int numberOfEntries = 1000; + + // Start server + server1.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally")); + server1.invoke("Create Partition Regions", () -> createExamplePRRegion()); + + server2.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally")); + server2.invoke("Create Partition Regions", () -> createExamplePRRegion()); + + server3.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally")); + server3.invoke("Create Partition Regions", () -> createExamplePRRegion()); + + server4.invoke("Create BridgeServer", () -> configServer(5, "testQueryExecutionLocally")); + server4.invoke("Create Partition Regions", () -> createExamplePRRegion()); + + server1.invoke(new CacheSerializableRunnable("Create Bridge Server") { + public void run2() throws CacheException { try { - Query query = queryService.newQuery(qStr); - query.execute(); - fail("The query should have been canceled by the QueryMonitor. Query: " + qStr); - } catch (QueryExecutionTimeoutException qet) { + QueryService queryService = GemFireCacheImpl.getInstance().getQueryService(); + queryService.createIndex("statusIndex", "status", "/exampleRegion"); + queryService.createIndex("secIdIndex", "pos.secId", + "/exampleRegion p, p.positions.values pos"); + } catch (Exception ex) { + fail("Failed to create index."); + } + Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName); + for (int i = 100; i <= (numberOfEntries); i++) { exampleRegion.put("" + i, new Portfolio(i)); } } }); - ai1.await(); - ai2.await(); - ai3.await(); + // Initialize server regions. + AsyncInvocation ai1 = + server1.invokeAsync(new CacheSerializableRunnable("Create Bridge Server") { + public void run2() throws CacheException { + Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName); + for (int j = 0; j < 5; j++) { + for (int i = 1; i <= (numberOfEntries + 1000); i++) { + exampleRegion.put("" + i, new Portfolio(i)); + } + } + LogWriterUtils.getLogWriter() + .info("### Completed updates in server1 in testCacheOpAfterQueryCancel"); + } + }); + + AsyncInvocation ai2 = + server2.invokeAsync(new CacheSerializableRunnable("Create Bridge Server") { + public void run2() throws CacheException { + Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName); + for (int j = 0; j < 5; j++) { + for (int i = (1 + 1000); i <= (numberOfEntries + 2000); i++) { + exampleRegion.put("" + i, new Portfolio(i)); + } + } + LogWriterUtils.getLogWriter() + .info("### Completed updates in server2 in testCacheOpAfterQueryCancel"); + } + }); + + // Execute server queries + SerializableRunnable executeQuery = new CacheSerializableRunnable("Execute queries") { + public void run2() throws CacheException { + try { + Region exampleRegion = cacheRule.getCache().getRegion(exampleRegionName); + QueryService queryService = GemFireCacheImpl.getInstance().getQueryService(); + String qStr = + "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values pos1, p.positions.values pos" + + " where p.ID < pos.sharesOutstanding OR p.ID > 0 OR p.position1.mktValue > 0 " + + " OR pos.secId in SET ('SUN', 'IBM', 'YHOO', 'GOOG', 'MSFT', " + + " 'AOL', 'APPL', 'ORCL', 'SAP', 'DELL', 'RHAT', 'NOVL', 'HP')" + + " order by p.status, p.ID desc"; + for (int i = 0; i < 500; i++) { + try { + GemFireCacheImpl.getInstance().getLogger().info("Executing query :" + qStr); + Query query = queryService.newQuery(qStr); + query.execute(); + } catch (QueryExecutionTimeoutException qet) { + LogWriterUtils.getLogWriter() + .info("### Got Expected QueryExecutionTimeout exception. " + qet.getMessage()); + if (qet.getMessage().contains("cancelled after exceeding max execution")) { + LogWriterUtils.getLogWriter().info("### Doing a put operation"); + exampleRegion.put("" + i, new Portfolio(i)); + } + } catch (Exception e) { + fail("Exception executing query." + e.getMessage()); + } + } + LogWriterUtils.getLogWriter() + .info("### Completed Executing queries in testCacheOpAfterQueryCancel"); + } catch (Exception ex) { + Assert.fail("Exception creating the query service", ex); + } + } + }; + + AsyncInvocation ai3 = server3.invokeAsync(executeQuery); + AsyncInvocation ai4 = server4.invokeAsync(executeQuery); + + LogWriterUtils.getLogWriter() + .info("### Waiting for async threads to join in testCacheOpAfterQueryCancel"); + try { + ThreadUtils.join(ai1, 5 * 60 * 1000); + ThreadUtils.join(ai2, 5 * 60 * 1000); + ThreadUtils.join(ai3, 5 * 60 * 1000); + ThreadUtils.join(ai4, 5 * 60 * 1000); + } catch (Exception ex) { + fail("Async thread join failure"); + } + LogWriterUtils.getLogWriter() + .info("### DONE Waiting for async threads to join in testCacheOpAfterQueryCancel"); - server3.invoke(() -> { - DefaultQuery.testHook = null; - GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = -1; - }); - } + validateQueryMonitorThreadCnt(server1, 0, 1000); + validateQueryMonitorThreadCnt(server2, 0, 1000); + validateQueryMonitorThreadCnt(server3, 0, 1000); + validateQueryMonitorThreadCnt(server4, 0, 1000); + LogWriterUtils.getLogWriter() + .info("### DONE validating query monitor threads testCacheOpAfterQueryCancel"); - private static void populateRegion(int startingId, int endingId) { - Region exampleRegion = ClusterStartupRule.getCache().getRegion("exampleRegion"); - for (int i = startingId; i < endingId; i++) { - exampleRegion.put("" + i, new Portfolio(i)); - } + stopServer(server1); + stopServer(server2); + stopServer(server3); + stopServer(server4); } - private static void exuteQuery() { - QueryService queryService; - if (ClusterStartupRule.getClientCache() == null) { - queryService = ClusterStartupRule.getCache().getQueryService(); - } else { - queryService = ClusterStartupRule.getClientCache().getQueryService(); - } - for (int k = 0; k < queryStr.length; k++) { - String qStr = queryStr[k]; - try { - Query query = queryService.newQuery(qStr); - query.execute(); - fail("The query should have been canceled by the QueryMonitor. Query: " + qStr); - } catch (Exception e) { - verifyException(e); - } - } + private void validateQueryMonitorThreadCnt(VM vm, final int threadCount, final int waitTime) { + SerializableRunnable validateThreadCnt = + new CacheSerializableRunnable("validateQueryMonitorThreadCnt") { + public void run2() throws CacheException { + Cache cache = cacheRule.getCache(); + QueryMonitor qm = ((GemFireCacheImpl) cache).getQueryMonitor(); + if (qm == null) { + fail("Didn't found query monitor."); + } + int waited = 0; + while (true) { + if (qm.getQueryMonitorThreadCount() != threadCount) { + if (waited <= waitTime) { + Wait.pause(10); + waited += 10; + continue; + } else { + fail("Didn't found expected monitoring thread. Expected: " + threadCount + + " found :" + qm.getQueryMonitorThreadCount()); + } + } + break; + } + } + }; + vm.invoke(validateThreadCnt); } - private static void configureClientCacheFactory(ClientCacheFactory ccf, int... serverPorts) { - for (int serverPort : serverPorts) { - ccf.addPoolServer("localhost", serverPort); - } - ccf.setPoolReadTimeout(10 * 60 * 1000); // 10 min - ccf.setPoolSubscriptionEnabled(true); + /** + * Starts a bridge server on the given port, using the given deserializeValues and + * notifyBySubscription to serve up the given region. + */ + protected int startBridgeServer(int port, boolean notifyBySubscription) throws IOException { + + Cache cache = cacheRule.getCache(); + CacheServer bridge = cache.addCacheServer(); + bridge.setPort(port); + bridge.setNotifyBySubscription(notifyBySubscription); + bridge.start(); + return bridge.getPort(); } - private static void createReplicateRegionWithEviction(File workingDir) { - InternalCache cache = ClusterStartupRule.getCache(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - dsf.setDiskDirs(new File[] {workingDir}).create("ds"); - EvictionAttributes evictAttrs = - EvictionAttributes.createLRUEntryAttributes(100, EvictionAction.OVERFLOW_TO_DISK); - cache.createRegionFactory(RegionShortcut.REPLICATE) - .setDiskStoreName("ds") - .setEvictionAttributes(evictAttrs) - .create("exampleRegion"); + /** + * Stops the bridge server that serves up the given cache. + */ + private void stopBridgeServer(Cache cache) { + CacheServer bridge = (CacheServer) cache.getCacheServers().iterator().next(); + bridge.stop(); + assertFalse(bridge.isRunning()); } - private static void verifyException(Exception e) { - String error = e.getMessage(); - if (e.getCause() != null) { - error = e.getCause().getMessage(); - } + private class QueryTimeoutHook implements DefaultQuery.TestHook { - if (error.contains("Query execution cancelled after exceeding max execution time") - || error.contains("The Query completed sucessfully before it got canceled") - || error.contains("The QueryMonitor thread may be sleeping longer than the set sleep time") - || error.contains( - "The query task could not be found but the query is marked as having been canceled")) { - // Expected exception. - return; - } + long timeout; - System.out.println("Unexpected exception:"); - if (e.getCause() != null) { - e.getCause().printStackTrace(); - } else { - e.printStackTrace(); + private QueryTimeoutHook(long timeout) { + this.timeout = timeout; } - fail("Expected exception Not found. Expected exception with message: \n" - + "\"Query execution taking more than the max execution time\"" + "\n Found \n" + error); - } - - - @After - public void reset() { - VMProvider.invokeInEveryMember(() -> { - DefaultQuery.testHook = null; - GemFireCacheImpl.MAX_QUERY_EXECUTION_TIME = -1; - }, server1, server2); - } + public void doTestHook(String description) { + if (description.equals("6")) { + try { + Thread.sleep(timeout * 2); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + } - private static class QueryTimeoutHook implements DefaultQuery.TestHook { public void doTestHook(int spot) { - if (spot != 6) { - return; - } - try { - TimeUnit.MILLISECONDS.sleep(20); - } catch (InterruptedException e) { - throw new RuntimeException(e.getMessage(), e); - } + doTestHook("" + spot); } - } - private static String[] queryStr = {"SELECT ID FROM /exampleRegion p WHERE p.ID > 100", - "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE x.pk != '1000'", - "SELECT DISTINCT * FROM /exampleRegion x, x.positions.values WHERE x.pkid != '1'", - "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE p.pk > '1'", - "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values WHERE p.pkid != '53'", - "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.Id > 100", - "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.Id > 100 and pos.secId IN SET('YHOO', 'IBM', 'AMZN')", - "SELECT * FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active' and p.ID < 100000", - "SELECT * FROM /exampleRegion WHERE ID > 100 and status = 'active'", - "SELECT DISTINCT * FROM /exampleRegion p WHERE p.ID > 100 and p.status = 'active' and p.ID < 100000", - "SELECT DISTINCT ID FROM /exampleRegion WHERE status = 'active'", - "SELECT DISTINCT ID FROM /exampleRegion p WHERE p.status = 'active'", - "SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos WHERE pos.secId IN SET('YHOO', 'IBM', 'AMZN')", - "SELECT DISTINCT proj1:p, proj2:itrX FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos" - + " WHERE pos.secId = 'YHOO') as itrX", - "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion p, p.positions.values pos" - + " WHERE pos.secId = 'YHOO') as itrX", - "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT p.ID FROM /exampleRegion x" - + " WHERE x.ID = p.ID) as itrX", - "SELECT DISTINCT * FROM /exampleRegion p, (SELECT DISTINCT pos FROM /exampleRegion x, x.positions.values pos" - + " WHERE x.ID = p.ID) as itrX", - "SELECT DISTINCT x.ID FROM /exampleRegion x, x.positions.values v WHERE " - + "v.secId = element(SELECT DISTINCT vals.secId FROM /exampleRegion p, p.positions.values vals WHERE vals.secId = 'YHOO')", - "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (true AND pos.secId ='IBM')", - "SELECT DISTINCT * FROM /exampleRegion p, positions.values pos WHERE (p.ID > 1 or p.status = 'active') or (true AND pos.secId !='IBM')", - "SELECT DISTINCT structset.sos, structset.key " - + "FROM /exampleRegion p, p.positions.values outerPos, " - + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding " - + "FROM /exampleRegion.entries pf, pf.value.positions.values pos " - + "where outerPos.secId != 'IBM' AND " - + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset " - + "where structset.sos > 2000", - "SELECT DISTINCT * " + "FROM /exampleRegion p, p.positions.values outerPos, " - + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding " - + "FROM /exampleRegion.entries pf, pf.value.positions.values pos " - + "where outerPos.secId != 'IBM' AND " - + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset " - + "where structset.sos > 2000", - "SELECT DISTINCT * FROM /exampleRegion p, p.positions.values position " - + "WHERE (true = null OR position.secId = 'SUN') AND true",}; + } }