This is an automated email from the ASF dual-hosted git repository. mkevo pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 0ecd6f6738 GEODE-9632: fix for queries with multy operations and indexes (#7824) 0ecd6f6738 is described below commit 0ecd6f673801cbdcc9cfeba7da425c83502d66f8 Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Fri Jul 29 09:43:11 2022 +0200 GEODE-9632: fix for queries with multy operations and indexes (#7824) --- .../internal/AbstractGroupOrRangeJunction.java | 15 + .../cache/query/internal/CompiledJunction.java | 15 + .../query/internal/index/MemoryIndexStore.java | 45 +++ .../query/dunit/QueryWithRangeIndexDUnitTest.java | 316 +++++++++++++++++++++ 4 files changed, 391 insertions(+) diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java index 31060b0f62..3113e10925 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java @@ -264,6 +264,15 @@ public abstract class AbstractGroupOrRangeJunction extends AbstractCompiledValue List sortedConditionsList = getCondtionsSortedOnIncreasingEstimatedIndexResultSize(context); + Boolean applyLimit = (Boolean) context.cacheGet(CompiledValue.CAN_APPLY_LIMIT_AT_INDEX); + + boolean modifiedApplyLimits = false; + if (applyLimit != null && applyLimit && sortedConditionsList.size() > 1 + && _operator == LITERAL_and) { + context.cachePut(CAN_APPLY_LIMIT_AT_INDEX, Boolean.FALSE); + modifiedApplyLimits = true; + } + // Sort the operands in increasing order of resultset size Iterator i = sortedConditionsList.iterator(); // SortedSet intersectionSet = new TreeSet(new SelectResultsComparator()); @@ -285,6 +294,12 @@ public abstract class AbstractGroupOrRangeJunction extends AbstractCompiledValue // RangeJunction or a CompiledComparison. But if the parent Object is a // RangeJunction then the Filter is a RangeJunctionEvaluator SelectResults filterResults = null; + + if (modifiedApplyLimits && sortedConditionsList.size() == 1) { + context.cachePut(CAN_APPLY_LIMIT_AT_INDEX, Boolean.TRUE); + modifiedApplyLimits = false; + } + Filter filter = (Filter) i.next(); boolean isConditioningNeeded = filter.isConditioningNeededForIndex( indpndntItr.length == 1 ? indpndntItr[0] : null, context, diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java index 5b23519d76..adc4003bef 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java @@ -278,6 +278,15 @@ public class CompiledJunction extends AbstractCompiledValue implements Negatable List sortedConditionsList = getCondtionsSortedOnIncreasingEstimatedIndexResultSize(context); + Boolean applyLimit = (Boolean) context.cacheGet(CompiledValue.CAN_APPLY_LIMIT_AT_INDEX); + + boolean modifiedApplyLimits = false; + if (applyLimit != null && applyLimit && sortedConditionsList.size() > 1 + && _operator == LITERAL_and) { + context.cachePut(CAN_APPLY_LIMIT_AT_INDEX, Boolean.FALSE); + modifiedApplyLimits = true; + } + // Sort the operands in increasing order of resultset size Iterator sortedConditionsItr = sortedConditionsList.iterator(); while (sortedConditionsItr.hasNext()) { @@ -293,6 +302,12 @@ public class CompiledJunction extends AbstractCompiledValue implements Negatable // recursion being ended by evaluating auxIterEvaluate if any. The passing // of IntermediateResult in filterEvalaute causes AND junction evaluation // to be corrupted , if the intermediateResultset contains some value. + + if (modifiedApplyLimits && sortedConditionsList.size() == 1) { + context.cachePut(CAN_APPLY_LIMIT_AT_INDEX, Boolean.TRUE); + modifiedApplyLimits = false; + } + SelectResults filterResults = ((Filter) sortedConditionsItr.next()).filterEvaluate(context, null); if (_operator == LITERAL_and) { diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java index 736de22a2d..8c536368af 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java @@ -14,6 +14,8 @@ */ package org.apache.geode.cache.query.internal.index; +import static java.util.Objects.hash; + import java.util.Collection; import java.util.HashSet; import java.util.Iterator; @@ -44,6 +46,7 @@ import org.apache.geode.internal.cache.NonTXEntry; import org.apache.geode.internal.cache.RegionEntry; import org.apache.geode.internal.cache.Token; import org.apache.geode.internal.cache.persistence.query.CloseableIterator; +import org.apache.geode.pdx.internal.PdxInstanceImpl; /** * The in-memory index storage @@ -867,6 +870,48 @@ public class MemoryIndexStore implements IndexStore { + Integer.toHexString(System.identityHashCode(this)) + ' ' + key + ' ' + value; } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof CachedEntryWrapper)) { + return false; + } + CachedEntryWrapper object = (CachedEntryWrapper) obj; + if (!getKey().equals(object.getKey())) { + if (!(getKey() instanceof PdxInstanceImpl)) { + return false; + } + if (!(object.getKey() instanceof PdxInstanceImpl)) { + return false; + } + PdxInstanceImpl pdxkey1 = (PdxInstanceImpl) getKey(); + PdxInstanceImpl pdxkey2 = (PdxInstanceImpl) object.getKey(); + if (!pdxkey1.equals(pdxkey2)) { + return false; + } + } + if (!getValue().equals(object.getValue())) { + if (!(getValue() instanceof PdxInstanceImpl)) { + return false; + } + if (!(object.getValue() instanceof PdxInstanceImpl)) { + return false; + } + PdxInstanceImpl pdxvalue1 = (PdxInstanceImpl) getValue(); + PdxInstanceImpl pdxvalue2 = (PdxInstanceImpl) object.getValue(); + if (!pdxvalue1.equals(pdxvalue2)) { + return false; + } + } + + return true; + } + + @Override + public int hashCode() { + return hash(key, value); + } + } } diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryWithRangeIndexDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryWithRangeIndexDUnitTest.java new file mode 100644 index 0000000000..90b9311114 --- /dev/null +++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryWithRangeIndexDUnitTest.java @@ -0,0 +1,316 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.query.dunit; + +import static org.apache.geode.cache.Region.SEPARATOR; +import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT; +import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER; +import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT; +import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START; +import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; +import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.apache.geode.test.dunit.VM.getVM; +import static org.apache.geode.test.dunit.VM.getVMId; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.Serializable; +import java.util.HashMap; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.cache.query.data.Portfolio; +import org.apache.geode.cache.query.data.PortfolioPdx; +import org.apache.geode.distributed.LocatorLauncher; +import org.apache.geode.distributed.ServerLauncher; +import org.apache.geode.distributed.internal.InternalLocator; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; +import org.apache.geode.test.dunit.rules.DistributedRule; +import org.apache.geode.test.junit.rules.GfshCommandRule; +import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; + +@SuppressWarnings("ALL") +public class QueryWithRangeIndexDUnitTest extends JUnit4DistributedTestCase + implements Serializable { + + @Rule + public transient GfshCommandRule gfsh = new GfshCommandRule(); + + @Rule + public DistributedRule distributedRule = new DistributedRule(); + + @Rule + public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); + + private static final String locatorName = "locator"; + private static final String serverName = "server"; + + private File locatorDir; + private File serverDir; + + private int locatorPort; + private int locatorJmxPort; + private int serverPort; + + private String locators; + + private VM server; + + private static final String regionName = "exampleRegion"; + + @Before + public void setUp() throws Exception { + VM locator = getVM(0); + server = getVM(1); + + locatorDir = temporaryFolder.newFolder(locatorName); + serverDir = temporaryFolder.newFolder(serverName); + + int[] port = getRandomAvailableTCPPorts(3); + locatorPort = port[0]; + locatorJmxPort = port[1]; + serverPort = port[2]; + + locators = "localhost[" + locatorPort + "]"; + + locator.invoke(() -> startLocator(locatorDir, locatorPort, locatorJmxPort)); + + gfsh.connectAndVerify(locatorJmxPort, GfshCommandRule.PortType.jmxManager); + + server.invoke(() -> startServer(serverDir, serverPort, locators)); + + } + + @Test + public void testQueryWithWildcardAndIndexOnAttributeFromHashMap() { + gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION") + .statusIsSuccess(); + + server.invoke(() -> { + Cache cache = CacheFactory.getAnyInstance(); + QueryService cacheQS = cache.getQueryService(); + cacheQS.createIndex("IdIndex", "value.positions['SUN']", + SEPARATOR + regionName + ".entrySet"); + Region<Integer, Portfolio> region = + cache.getRegion(regionName); + + for (int i = 1; i < 10001; i++) { + Portfolio p1 = new Portfolio(i, i); + p1.positions = new HashMap<>(); + p1.positions.put("IBM", "something"); + if (i == 1) { + p1.positions.put("SUN", "something"); + } else { + p1.positions.put("SUN", "some"); + } + region.put(i, p1); + } + }); + + String query = "query --query=\"<trace> select e.key, e.value from " + + SEPARATOR + regionName + ".entrySet e where e.value.positions['SUN'] like 'somethin%'\""; + + String cmdResult = String.valueOf(gfsh.executeAndAssertThat(query).getResultModel()); + assertThat(cmdResult).contains("\"Rows\":\"1\""); + assertThat(cmdResult).contains("indexesUsed(1):IdIndex(Results: 1)"); + } + + @Test + public void testQueryWithWildcardAndIndexOnMultiValues() { + gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION") + .statusIsSuccess(); + + server.invoke(() -> { + Cache cache = CacheFactory.getAnyInstance(); + QueryService cacheQS = cache.getQueryService(); + cacheQS.createIndex("IdIndex", "value.positions['SUN']", + SEPARATOR + regionName + ".entrySet"); + Region<Integer, Portfolio> region = + cache.getRegion(regionName); + + for (int i = 1; i < 10001; i++) { + Portfolio p1 = new Portfolio(i, i); + p1.positions = new HashMap<>(); + p1.positions.put("IBM", "something"); + if (i % 500 == 0) { + p1.positions.put("SUN", "something" + i); + } else { + p1.positions.put("SUN", "some"); + } + region.put(i, p1); + } + }); + + String query = "query --query=\"<trace> select e.key, e.value from " + + SEPARATOR + regionName + + ".entrySet e where e.value.positions['SUN'] like 'somethin%'\""; + + String cmdResult = String.valueOf(gfsh.executeAndAssertThat(query).getResultModel()); + assertThat(cmdResult).contains("\"Rows\":\"20\""); + assertThat(cmdResult).contains("indexesUsed(1):IdIndex(Results: 20)"); + } + + @Test + public void testLimitIsAppliedOnlyOnQueryResults() { + gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION") + .statusIsSuccess(); + + server.invoke(() -> { + Cache cache = CacheFactory.getAnyInstance(); + QueryService cacheQS = cache.getQueryService(); + cacheQS.createIndex("IdIndex", "value.positions['SUN']", + SEPARATOR + regionName + ".entrySet"); + Region<Integer, Portfolio> region = + cache.getRegion(regionName); + + for (int i = 1; i < 10001; i++) { + Portfolio p1 = new Portfolio(i, i); + p1.positions = new HashMap<>(); + p1.positions.put("IBM", "something"); + if (i % 500 == 0) { + p1.positions.put("SUN", "something"); + } else { + p1.positions.put("SUN", "some"); + } + region.put(i, p1); + } + }); + + String query = "query --query=\"<trace> select e.key, e.value from " + + SEPARATOR + regionName + + ".entrySet e where e.value.positions['SUN'] like 'somethin%' limit 5\""; + + String cmdResult = String.valueOf(gfsh.executeAndAssertThat(query).getResultModel()); + assertThat(cmdResult).contains("\"Rows\":\"5\""); + assertThat(cmdResult).contains("indexesUsed(1):IdIndex(Results: 5)"); + } + + @Test + public void testQueryWithWildcardAndIndexOnAttributeFromHashMapPdx() { + gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION") + .statusIsSuccess(); + + server.invoke(() -> { + Cache cache = CacheFactory.getAnyInstance(); + QueryService cacheQS = cache.getQueryService(); + cacheQS.createIndex("IdIndex", "value.positions['SUN']", + SEPARATOR + regionName + ".entrySet"); + Region<Integer, PortfolioPdx> region = + cache.getRegion(regionName); + + for (int i = 1; i < 10001; i++) { + PortfolioPdx p1 = new PortfolioPdx(i, i); + p1.positions = new HashMap<>(); + p1.positions.put("IBM", "something"); + if (i == 1) { + p1.positions.put("SUN", "something"); + } else { + p1.positions.put("SUN", "some"); + } + region.put(i, p1); + } + }); + + String query = "query --query=\"<trace> select e.key, e.value from " + + SEPARATOR + regionName + ".entrySet e where e.value.positions['SUN'] like 'somethin%'\""; + + String cmdResult = String.valueOf(gfsh.executeAndAssertThat(query).getResultModel()); + assertThat(cmdResult).contains("\"Rows\":\"1\""); + assertThat(cmdResult).contains("indexesUsed(1):IdIndex(Results: 1)"); + } + + + + @Test + public void testLimitIsAppliedOnlyOnQueryResultsPdx() { + gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION") + .statusIsSuccess(); + + server.invoke(() -> { + Cache cache = CacheFactory.getAnyInstance(); + QueryService cacheQS = cache.getQueryService(); + cacheQS.createIndex("IdIndex", "value.positions['SUN']", + SEPARATOR + regionName + ".entrySet"); + Region<Integer, PortfolioPdx> region = + cache.getRegion(regionName); + + for (int i = 1; i < 10001; i++) { + PortfolioPdx p1 = new PortfolioPdx(i, i); + p1.positions = new HashMap<>(); + p1.positions.put("IBM", "something"); + if (i % 500 == 0) { + p1.positions.put("SUN", "something"); + } else { + p1.positions.put("SUN", "some"); + } + region.put(i, p1); + } + }); + + String query = "query --query=\"<trace> select e.key, e.value from " + + SEPARATOR + regionName + + ".entrySet e where e.value.positions['SUN'] like 'somethin%' limit 5\""; + + String cmdResult = String.valueOf(gfsh.executeAndAssertThat(query).getResultModel()); + assertThat(cmdResult).contains("\"Rows\":\"5\""); + assertThat(cmdResult).contains("indexesUsed(1):IdIndex(Results: 5)"); + } + + private static void startLocator(File workingDirectory, int locatorPort, + int jmxPort) { + LocatorLauncher locatorLauncher = new LocatorLauncher.Builder() + .setMemberName(locatorName) + .setPort(locatorPort) + .setWorkingDirectory(workingDirectory.getAbsolutePath()) + .set(JMX_MANAGER, "true") + .set(JMX_MANAGER_PORT, String.valueOf(jmxPort)) + .set(JMX_MANAGER_START, "true") + .build(); + + locatorLauncher.start(); + + await().untilAsserted(() -> { + InternalLocator locator = (InternalLocator) locatorLauncher.getLocator(); + assertThat(locator.isSharedConfigurationRunning()) + .as("Locator shared configuration is running on locator" + getVMId()) + .isTrue(); + }); + } + + private static void startServer(File workingDirectory, int serverPort, + String locators) { + ServerLauncher serverLauncher = new ServerLauncher.Builder() + .setDeletePidFileOnStop(Boolean.TRUE) + .setMemberName(serverName) + .setServerPort(serverPort) + .setWorkingDirectory(workingDirectory.getAbsolutePath()) + .setDebug(true) + .set(HTTP_SERVICE_PORT, "0") + .set(LOCATORS, locators) + .build(); + + serverLauncher.start(); + } +}