alex-plekhanov commented on code in PR #10928:
URL: https://github.com/apache/ignite/pull/10928#discussion_r1356282219
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionNode.java:
##########
@@ -18,18 +18,25 @@
package org.apache.ignite.internal.processors.query.calcite.exec.partition;
import java.util.Collection;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.jetbrains.annotations.Nullable;
/** */
public interface PartitionNode {
- /** */
- Collection<Integer> apply(PartitionPruningContext ctx);
+ /**
+ * @param ctx Partition pruning context.
+ * @return Collection of partitions after pruning or {@code null} if all
partition required.
Review Comment:
Typo: `all partitions`
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PartitionExtractor.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.stream.Collectors;
+import com.google.common.primitives.Primitives;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionAllNode;
+import
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionLiteralNode;
+import
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionNode;
+import
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionNoneNode;
+import
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionOperandNode;
+import
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionParameterNode;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+import
org.apache.ignite.internal.processors.query.calcite.prepare.IgniteRelShuttle;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import
org.apache.ignite.internal.processors.query.calcite.rel.ProjectableFilterableTableScan;
+import
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/** */
+public class PartitionExtractor extends IgniteRelShuttle {
+ /** */
+ private final IgniteTypeFactory typeFactory;
+
+ /** */
+ private final Deque<PartitionNode> stack = new ArrayDeque<>();
+
+ /** */
+ public PartitionExtractor(IgniteTypeFactory typeFactory) {
+ this.typeFactory = typeFactory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteIndexScan rel) {
+ processScan(rel);
+
+ return rel;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTableScan rel) {
+ processScan(rel);
+
+ return rel;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteRel processNode(IgniteRel rel) {
+ IgniteRel res = super.processNode(rel);
+
+ if (rel.getInputs().isEmpty())
+ return res;
+
+ List<PartitionNode> operands = new ArrayList<>();
+ for (int i = 0; i < rel.getInputs().size(); ++i) {
+ if (stack.isEmpty())
+ break;
+
+ operands.add(stack.pop());
+ }
+
+ stack.push(PartitionOperandNode.createOrOperandNode(operands));
+
+ return res;
+ }
+
+ /** */
+ public PartitionNode go(Fragment fragment) {
+ if (!(fragment.root() instanceof IgniteSender))
+ return PartitionAllNode.INSTANCE;
+
+ if (fragment.mapping() == null || !fragment.mapping().colocated())
+ return PartitionAllNode.INSTANCE;
+
+ visit(fragment.root());
+
+ if (stack.isEmpty())
+ return PartitionAllNode.INSTANCE;
+
+ PartitionNode res = stack.pop();
+
+ return res.optimize();
Review Comment:
It optimizes only root level statement, for example, if you have condition
like `AND(OR(=(ID,0),=(1,1)), =(VAL,1))`, this condition can't be optimized
only using root level. Perhaps we should optimize when building
PartitionOperandNode (in `createAndOperandNode` and `createOrOperandNode`)
methods.
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PartitionExtractor.java:
##########
@@ -84,18 +84,15 @@ public PartitionExtractor(IgniteTypeFactory typeFactory) {
@Override protected IgniteRel processNode(IgniteRel rel) {
IgniteRel res = super.processNode(rel);
- if (rel.getInputs().isEmpty())
- return res;
+ if (rel.getInputs().size() > 1) {
+ List<PartitionNode> operands = new ArrayList<>();
+ for (int i = 0; i < rel.getInputs().size(); ++i)
+ operands.add(stack.pop());
- List<PartitionNode> operands = new ArrayList<>();
- for (int i = 0; i < rel.getInputs().size(); ++i) {
- if (stack.isEmpty())
- break;
-
- operands.add(stack.pop());
+ stack.push(PartitionOperandNode.createOrOperandNode(operands));
}
-
- stack.push(PartitionOperandNode.createOrOperandNode(operands));
+ else if (rel.getInputs().isEmpty())
Review Comment:
Condition is redundant (just else is enough)
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionPruneTest.java:
##########
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.junit.Test;
+
+/** */
+public class PartitionPruneTest extends AbstractBasicIntegrationTest {
+ /** */
+ private static final int ENTRIES_COUNT = 10000;
+
+ /** */
+ private static final LongAdder INTERCEPTED_START_REQUEST_COUNT = new
LongAdder();
+
+ /** */
+ private static final ConcurrentSkipListSet<Integer> INTERCEPTED_PARTS =
new ConcurrentSkipListSet<>();
+
+ /** */
+ private static final ConcurrentSkipListSet<ClusterNode> INTERCEPTED_NODES
= new ConcurrentSkipListSet<>(
+ Comparator.comparing(ClusterNode::id));
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.getSqlConfiguration().setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration());
+
+ cfg.setCommunicationSpi(new TcpCommunicationSpi() {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
IgniteInClosure<IgniteException> ackC) {
+ assert msg != null;
+
+ if (msg instanceof GridIoMessage) {
+ GridIoMessage msg0 = (GridIoMessage)msg;
+
+ if (msg0.message() instanceof QueryStartRequest) {
+ INTERCEPTED_START_REQUEST_COUNT.increment();
+ INTERCEPTED_NODES.add(node);
+
+ QueryStartRequest startReq =
(QueryStartRequest)msg0.message();
+
+ assertNotNull(startReq.fragmentDescription());
+
+ FragmentMapping mapping =
startReq.fragmentDescription().mapping();
+
+ assertNotNull(mapping);
+
+ List<ColocationGroup> groups = U.field(mapping,
"colocationGroups");
+
+ assertEquals(1, F.size(groups));
+
+ int[] parts = F.first(groups).partitions(node.id());
+
+ if (!F.isEmpty(parts)) {
+ for (int part: parts)
+ INTERCEPTED_PARTS.add(part);
+ }
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ });
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ sql("CREATE TABLE T1(ID INT, IDX_VAL VARCHAR, VAL VARCHAR, PRIMARY
KEY(ID)) WITH cache_name=t1_cache,backups=1");
+ sql("CREATE TABLE T2(ID INT, AK INT, IDX_VAL VARCHAR, VAL VARCHAR,
PRIMARY KEY(ID, AK)) WITH " +
+ "cache_name=t2_cache,backups=1,affinity_key=AK");
+ sql("CREATE TABLE DICT(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL
VARCHAR) WITH template=replicated,cache_name=dict_cache");
+
+ sql("CREATE INDEX T1_IDX ON T1(IDX_VAL)");
+ sql("CREATE INDEX T2_IDX ON T2(IDX_VAL)");
+ sql("CREATE INDEX DICT_IDX ON DICT(IDX_VAL)");
+
+ Stream.of("T1", "DICT").forEach(tableName -> {
+ StringBuilder sb = new StringBuilder("INSERT INTO
").append(tableName)
+ .append("(ID, IDX_VAL, VAL) VALUES ");
+
+ for (int i = 0; i < 10000; ++i) {
+ sb.append("(").append(i).append(", ")
+ .append("'name_").append(i).append("', ")
+ .append("'name_").append(i).append("')");
+
+ if (i < ENTRIES_COUNT - 1)
+ sb.append(",");
+ }
+
+ sql(sb.toString());
+
+ assertEquals(ENTRIES_COUNT, client.getOrCreateCache(tableName +
"_CACHE").size(CachePeekMode.PRIMARY));
+ });
+
+ Stream.of("T2").forEach(tableName -> {
+ StringBuilder sb = new StringBuilder("INSERT INTO
").append(tableName)
+ .append("(ID, AK, IDX_VAL, VAL) VALUES ");
+
+ for (int i = 0; i < 10000; ++i) {
+ sb.append("(").append(i).append(", ")
+ .append(i).append(",")
+ .append("'name_").append(i).append("', ")
+ .append("'name_").append(i).append("')");
+
+ if (i < ENTRIES_COUNT - 1)
+ sb.append(",");
+ }
+
+ sql(sb.toString());
+
+ assertEquals(ENTRIES_COUNT, client.getOrCreateCache(tableName +
"_CACHE").size(CachePeekMode.PRIMARY));
+ });
+
+ sql("ANALYZE PUBLIC.T1(ID), PUBLIC.T2(ID,AK), PUBLIC.DICT(ID) WITH
\"NULLS=0,DISTINCT=10000,TOTAL=10000\"");
+
+ clearIntercepted();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ clearIntercepted();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int nodeCount() {
+ return 8;
+ }
+
+ /** */
+ @Test
+ public void testSimple() {
+ execute("select count(*) from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals(1L, res.get(0).get(0));
+ },
+ 123);
+
+ execute("select * from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123);
+
+ execute("select * from T1 where ? = T1.ID",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123);
+
+ execute("select VAL, IDX_VAL, ID from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123);
+
+ execute("select * from T1 where T1.ID = ? and T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123, 123);
+ }
+
+ /** */
+ @Test
+ public void testNullsInCondition() {
+ execute("select * from T1 where T1.ID is NULL",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ });
+
+ execute("select * from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ }, new Object[]{ null });
+
+ execute("select * from T1 where T1.ID is NULL and T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ }, 123);
+
+ execute("select * from T1 where T1.ID = ? and T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ }, null, 123);
+
+ execute("select * from T1 where T1.ID is NULL or T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ }, 123);
+
+ execute("select * from T1 where T1.ID = ? or T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ }, null, 123);
+ }
+
+ /** */
+ @Test
+ public void testEmptyConditions() {
+ execute("select * from T1 where T1.ID = ? and T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518);
+
+ execute("select * from T1 where T1.ID = ? and (T1.ID = ? OR T1.ID = ?
OR T1.ID = ?)",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518, 781, 295);
+
+ execute("select * from T1 where (T1.ID = ? OR T1.ID = ?) AND (T1.ID =
? OR T1.ID = ?)",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518, 781, 295);
+
+ execute("select * from T1 where (T1.ID = ? AND T1.ID = ?) OR (T1.ID =
? AND T1.ID = ?)",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518, 781, 295);
+ }
+
+ /** */
+ @Test
+ public void testSelectIn() {
+ IntStream.of(2, 6).forEach(i -> {
+ testSelect(i, true, "_KEY");
+ testSelect(i, true, "ID");
+ });
+ }
+
+
+ /** */
+ @Test
+ public void testSelectOr() {
+ testSelect(1, false, "_KEY");
+ testSelect(1, false, "ID");
+
+ IntStream.of(2, 6).forEach(i -> {
+ testSelect(i, false, "_KEY");
+ testSelect(i, false, "ID");
+ });
+ }
+
+ /** */
+ @Test
+ public void testSimpleJoin() {
+ // Key (not alias).
+ execute("SELECT * FROM T1 INNER JOIN T2 ON T1.ID = T2.AK WHERE T1.ID =
?",
+ (res) -> {
+// assertPartitions(partition("T1_CACHE", 123));
+// assertNodes(node("T1_CACHE", 123));
+ assertEquals(1, res.size());
+ assertEquals(123, res.get(0).get(0));
+ },
+ 123
+ );
+
+ // Key (alias).
+ execute("SELECT * FROM T1 INNER JOIN T2 ON T1.ID = T2.AK WHERE T1._KEY
= ?",
+ (res) -> {
+// assertPartitions(partition("T1_CACHE", 125));
+// assertNodes(node("T1_CACHE", 125));
+
+ assertEquals(1, res.size());
+ assertEquals(125, res.get(0).get(0));
+ },
+ 125
+ );
+
+ // Non-affinity key.
+ execute("SELECT * FROM T1 INNER JOIN T2 ON T1.ID = T2.AK WHERE T2.ID =
?",
+ (res) -> {
+ assertEquals(1, res.size());
+ assertEquals(125, res.get(0).get(0));
+ },
+ 125
+ );
+
+ // Affinity key.
+ execute("SELECT * FROM T1 INNER JOIN T2 ON T1.ID = T2.AK WHERE T2.AK =
?",
+ (res) -> {
+// assertPartitions(partition("T2_CACHE", 125));
+ assertEquals(1, res.size());
+ assertEquals(125, res.get(0).get(0));
+ },
+ 125
+ );
+ }
+
+ /** */
+ private void testSelect(int sz, boolean withIn, String column) {
+ assertTrue(sz >= 1);
+ int[] values = ThreadLocalRandom.current().ints(0,
ENTRIES_COUNT).distinct().limit(sz).toArray();
+
+ StringBuilder query;
+
+ if (!withIn || sz == 1)
+ query = new StringBuilder("select * from T1 where ");
+ else
+ query = new StringBuilder("select * from T1 where
T1.").append(column).append(" in (");
+
+ for (int i = 0; i < sz; ++i) {
+ if (!withIn || sz == 1)
+ query.append("T1.").append(column).append("= ?");
+ else
+ query.append('?');
+
+ if (sz == 1)
+ break;
+
+ if (i == sz - 1)
+ query.append(!withIn ? "" : ")");
+ else
+ query.append(!withIn ? " OR " : ", ");
+ }
+
+ execute(query.toString(),
+ res -> {
+ assertPartitions(IntStream.of(values).map(i ->
partition("T1_CACHE", i)).toArray());
+ assertNodes(IntStream.of(values).mapToObj(i ->
node("T1_CACHE", i)).toArray(ClusterNode[]::new));
+
+ assertEquals(values.length, res.size());
+
+ assertEquals(
+
IntStream.of(values).sorted().boxed().collect(Collectors.toList()),
+ res.stream().map(row ->
row.get(0)).sorted().collect(Collectors.toList())
+ );
+ },
+ IntStream.of(values).boxed().toArray(Integer[]::new));
+ }
+
+ /** */
+ public void execute(String sql, Consumer<List<List<?>>> resConsumer,
Object... args) {
+ log.info(">>> TEST COMBINATION: \"" + sql + "\"");
+
+ // Execute query as is.
+ log.info("Execute \"" + sql + "\" with args " + Arrays.toString(args));
+
+ List<List<?>> res = sql(sql, args);
+
+ resConsumer.accept(res);
+ clearIntercepted();
+
+ // Start filling arguments recursively.
+ if (args != null && args.length > 0)
+ executeCombinations0(sql, resConsumer, new HashSet<>(), args);
+ }
+
+ /** */
+ private void executeCombinations0(
+ String sql,
+ Consumer<List<List<?>>> resConsumer,
+ Set<String> executedSqls,
+ Object... args
+ ) {
+ assert args != null && args.length > 0;
+
+ // Get argument positions.
+ List<Integer> paramPoss = new ArrayList<>();
+
+ int pos = 0;
+
+ while (true) {
+ int paramPos = sql.indexOf('?', pos);
+
+ if (paramPos == -1)
+ break;
+
+ paramPoss.add(paramPos);
+
+ pos = paramPos + 1;
+ }
+
+ for (int i = 0; i < args.length; i++) {
+ // Prepare new SQL and arguments.
+ int paramPos = paramPoss.get(i);
+
+ String newSql = sql.substring(0, paramPos) + (args[i] instanceof
String ? "'" + args[i] + "'" : args[i])
+ + sql.substring(paramPos + 1);
+
+ Object[] newArgs = new Object[args.length - 1];
+
+ int newArgsPos = 0;
+
+ for (int j = 0; j < args.length; j++) {
+ if (j != i)
+ newArgs[newArgsPos++] = args[j];
+ }
+
+ // Execute if this combination was never executed before.
+ if (executedSqls.add(newSql)) {
+ log.info("Execute sql \"" + newSql + "\"");
+
+ List<List<?>> res = sql(newSql, newArgs);
+
+ resConsumer.accept(res);
+ clearIntercepted();
+ }
+
+ // Continue recursively.
+ if (newArgs.length > 0)
+ executeCombinations0(newSql, resConsumer, executedSqls,
newArgs);
+ }
+ }
+
+ /** */
+ protected static void assertPartitions(int... expParts) {
+ Collection<Integer> expParts0 = new TreeSet<>();
+
+ if (!F.isEmpty(expParts)) {
+ for (int expPart : expParts)
+ expParts0.add(expPart);
+ }
+
+ TreeSet<Integer> actualParts = new TreeSet<>(INTERCEPTED_PARTS);
+
+ assertEquals("Unexpected partitions [exp=" + Arrays.toString(expParts)
+ ", actual=" + actualParts + ']',
Review Comment:
`Arrays.toString(expParts)` -> `expParts0` (output will be sorted)
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionPruneTest.java:
##########
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.junit.Test;
+
+/** */
+public class PartitionPruneTest extends AbstractBasicIntegrationTest {
+ /** */
+ private static final int ENTRIES_COUNT = 10000;
+
+ /** */
+ private static final LongAdder INTERCEPTED_START_REQUEST_COUNT = new
LongAdder();
+
+ /** */
+ private static final ConcurrentSkipListSet<Integer> INTERCEPTED_PARTS =
new ConcurrentSkipListSet<>();
+
+ /** */
+ private static final ConcurrentSkipListSet<ClusterNode> INTERCEPTED_NODES
= new ConcurrentSkipListSet<>(
+ Comparator.comparing(ClusterNode::id));
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.getSqlConfiguration().setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration());
+
+ cfg.setCommunicationSpi(new TcpCommunicationSpi() {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
IgniteInClosure<IgniteException> ackC) {
+ assert msg != null;
+
+ if (msg instanceof GridIoMessage) {
+ GridIoMessage msg0 = (GridIoMessage)msg;
+
+ if (msg0.message() instanceof QueryStartRequest) {
+ INTERCEPTED_START_REQUEST_COUNT.increment();
+ INTERCEPTED_NODES.add(node);
+
+ QueryStartRequest startReq =
(QueryStartRequest)msg0.message();
+
+ assertNotNull(startReq.fragmentDescription());
+
+ FragmentMapping mapping =
startReq.fragmentDescription().mapping();
+
+ assertNotNull(mapping);
+
+ List<ColocationGroup> groups = U.field(mapping,
"colocationGroups");
Review Comment:
Add getter for colocationGroups instead of reflection?
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionPruneTest.java:
##########
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.junit.Test;
+
+/** */
+public class PartitionPruneTest extends AbstractBasicIntegrationTest {
+ /** */
+ private static final int ENTRIES_COUNT = 10000;
+
+ /** */
+ private static final LongAdder INTERCEPTED_START_REQUEST_COUNT = new
LongAdder();
+
+ /** */
+ private static final ConcurrentSkipListSet<Integer> INTERCEPTED_PARTS =
new ConcurrentSkipListSet<>();
+
+ /** */
+ private static final ConcurrentSkipListSet<ClusterNode> INTERCEPTED_NODES
= new ConcurrentSkipListSet<>(
+ Comparator.comparing(ClusterNode::id));
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.getSqlConfiguration().setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration());
+
+ cfg.setCommunicationSpi(new TcpCommunicationSpi() {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
IgniteInClosure<IgniteException> ackC) {
+ assert msg != null;
+
+ if (msg instanceof GridIoMessage) {
+ GridIoMessage msg0 = (GridIoMessage)msg;
+
+ if (msg0.message() instanceof QueryStartRequest) {
+ INTERCEPTED_START_REQUEST_COUNT.increment();
+ INTERCEPTED_NODES.add(node);
+
+ QueryStartRequest startReq =
(QueryStartRequest)msg0.message();
+
+ assertNotNull(startReq.fragmentDescription());
+
+ FragmentMapping mapping =
startReq.fragmentDescription().mapping();
+
+ assertNotNull(mapping);
+
+ List<ColocationGroup> groups = U.field(mapping,
"colocationGroups");
+
+ assertEquals(1, F.size(groups));
+
+ int[] parts = F.first(groups).partitions(node.id());
+
+ if (!F.isEmpty(parts)) {
+ for (int part: parts)
+ INTERCEPTED_PARTS.add(part);
+ }
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ });
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ sql("CREATE TABLE T1(ID INT, IDX_VAL VARCHAR, VAL VARCHAR, PRIMARY
KEY(ID)) WITH cache_name=t1_cache,backups=1");
+ sql("CREATE TABLE T2(ID INT, AK INT, IDX_VAL VARCHAR, VAL VARCHAR,
PRIMARY KEY(ID, AK)) WITH " +
+ "cache_name=t2_cache,backups=1,affinity_key=AK");
+ sql("CREATE TABLE DICT(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL
VARCHAR) WITH template=replicated,cache_name=dict_cache");
+
+ sql("CREATE INDEX T1_IDX ON T1(IDX_VAL)");
+ sql("CREATE INDEX T2_IDX ON T2(IDX_VAL)");
+ sql("CREATE INDEX DICT_IDX ON DICT(IDX_VAL)");
+
+ Stream.of("T1", "DICT").forEach(tableName -> {
+ StringBuilder sb = new StringBuilder("INSERT INTO
").append(tableName)
+ .append("(ID, IDX_VAL, VAL) VALUES ");
+
+ for (int i = 0; i < 10000; ++i) {
+ sb.append("(").append(i).append(", ")
+ .append("'name_").append(i).append("', ")
+ .append("'name_").append(i).append("')");
+
+ if (i < ENTRIES_COUNT - 1)
+ sb.append(",");
+ }
+
+ sql(sb.toString());
+
+ assertEquals(ENTRIES_COUNT, client.getOrCreateCache(tableName +
"_CACHE").size(CachePeekMode.PRIMARY));
+ });
+
+ Stream.of("T2").forEach(tableName -> {
+ StringBuilder sb = new StringBuilder("INSERT INTO
").append(tableName)
+ .append("(ID, AK, IDX_VAL, VAL) VALUES ");
+
+ for (int i = 0; i < 10000; ++i) {
Review Comment:
10000 -> ENTRIES_COUNT
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionPruneTest.java:
##########
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.junit.Test;
+
+/** */
+public class PartitionPruneTest extends AbstractBasicIntegrationTest {
+ /** */
+ private static final int ENTRIES_COUNT = 10000;
+
+ /** */
+ private static final LongAdder INTERCEPTED_START_REQUEST_COUNT = new
LongAdder();
+
+ /** */
+ private static final ConcurrentSkipListSet<Integer> INTERCEPTED_PARTS =
new ConcurrentSkipListSet<>();
+
+ /** */
+ private static final ConcurrentSkipListSet<ClusterNode> INTERCEPTED_NODES
= new ConcurrentSkipListSet<>(
+ Comparator.comparing(ClusterNode::id));
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.getSqlConfiguration().setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration());
+
+ cfg.setCommunicationSpi(new TcpCommunicationSpi() {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
IgniteInClosure<IgniteException> ackC) {
+ assert msg != null;
+
+ if (msg instanceof GridIoMessage) {
+ GridIoMessage msg0 = (GridIoMessage)msg;
+
+ if (msg0.message() instanceof QueryStartRequest) {
+ INTERCEPTED_START_REQUEST_COUNT.increment();
+ INTERCEPTED_NODES.add(node);
+
+ QueryStartRequest startReq =
(QueryStartRequest)msg0.message();
+
+ assertNotNull(startReq.fragmentDescription());
+
+ FragmentMapping mapping =
startReq.fragmentDescription().mapping();
+
+ assertNotNull(mapping);
+
+ List<ColocationGroup> groups = U.field(mapping,
"colocationGroups");
+
+ assertEquals(1, F.size(groups));
+
+ int[] parts = F.first(groups).partitions(node.id());
+
+ if (!F.isEmpty(parts)) {
+ for (int part: parts)
+ INTERCEPTED_PARTS.add(part);
+ }
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ });
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ sql("CREATE TABLE T1(ID INT, IDX_VAL VARCHAR, VAL VARCHAR, PRIMARY
KEY(ID)) WITH cache_name=t1_cache,backups=1");
+ sql("CREATE TABLE T2(ID INT, AK INT, IDX_VAL VARCHAR, VAL VARCHAR,
PRIMARY KEY(ID, AK)) WITH " +
+ "cache_name=t2_cache,backups=1,affinity_key=AK");
+ sql("CREATE TABLE DICT(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL
VARCHAR) WITH template=replicated,cache_name=dict_cache");
+
+ sql("CREATE INDEX T1_IDX ON T1(IDX_VAL)");
+ sql("CREATE INDEX T2_IDX ON T2(IDX_VAL)");
+ sql("CREATE INDEX DICT_IDX ON DICT(IDX_VAL)");
+
+ Stream.of("T1", "DICT").forEach(tableName -> {
+ StringBuilder sb = new StringBuilder("INSERT INTO
").append(tableName)
+ .append("(ID, IDX_VAL, VAL) VALUES ");
+
+ for (int i = 0; i < 10000; ++i) {
Review Comment:
10000 -> ENTRIES_COUNT
Let's also configure LongQueryWarningTimeout, to avoid logging of this query
as long-running and reduce log size.
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionPruneTest.java:
##########
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.junit.Test;
+
+/** */
+public class PartitionPruneTest extends AbstractBasicIntegrationTest {
+ /** */
+ private static final int ENTRIES_COUNT = 10000;
+
+ /** */
+ private static final LongAdder INTERCEPTED_START_REQUEST_COUNT = new
LongAdder();
+
+ /** */
+ private static final ConcurrentSkipListSet<Integer> INTERCEPTED_PARTS =
new ConcurrentSkipListSet<>();
+
+ /** */
+ private static final ConcurrentSkipListSet<ClusterNode> INTERCEPTED_NODES
= new ConcurrentSkipListSet<>(
+ Comparator.comparing(ClusterNode::id));
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.getSqlConfiguration().setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration());
+
+ cfg.setCommunicationSpi(new TcpCommunicationSpi() {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
IgniteInClosure<IgniteException> ackC) {
+ assert msg != null;
+
+ if (msg instanceof GridIoMessage) {
+ GridIoMessage msg0 = (GridIoMessage)msg;
+
+ if (msg0.message() instanceof QueryStartRequest) {
+ INTERCEPTED_START_REQUEST_COUNT.increment();
+ INTERCEPTED_NODES.add(node);
+
+ QueryStartRequest startReq =
(QueryStartRequest)msg0.message();
+
+ assertNotNull(startReq.fragmentDescription());
+
+ FragmentMapping mapping =
startReq.fragmentDescription().mapping();
+
+ assertNotNull(mapping);
+
+ List<ColocationGroup> groups = U.field(mapping,
"colocationGroups");
+
+ assertEquals(1, F.size(groups));
+
+ int[] parts = F.first(groups).partitions(node.id());
+
+ if (!F.isEmpty(parts)) {
+ for (int part: parts)
+ INTERCEPTED_PARTS.add(part);
+ }
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ });
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ sql("CREATE TABLE T1(ID INT, IDX_VAL VARCHAR, VAL VARCHAR, PRIMARY
KEY(ID)) WITH cache_name=t1_cache,backups=1");
+ sql("CREATE TABLE T2(ID INT, AK INT, IDX_VAL VARCHAR, VAL VARCHAR,
PRIMARY KEY(ID, AK)) WITH " +
+ "cache_name=t2_cache,backups=1,affinity_key=AK");
+ sql("CREATE TABLE DICT(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL
VARCHAR) WITH template=replicated,cache_name=dict_cache");
+
+ sql("CREATE INDEX T1_IDX ON T1(IDX_VAL)");
+ sql("CREATE INDEX T2_IDX ON T2(IDX_VAL)");
+ sql("CREATE INDEX DICT_IDX ON DICT(IDX_VAL)");
+
+ Stream.of("T1", "DICT").forEach(tableName -> {
+ StringBuilder sb = new StringBuilder("INSERT INTO
").append(tableName)
+ .append("(ID, IDX_VAL, VAL) VALUES ");
+
+ for (int i = 0; i < 10000; ++i) {
+ sb.append("(").append(i).append(", ")
+ .append("'name_").append(i).append("', ")
+ .append("'name_").append(i).append("')");
+
+ if (i < ENTRIES_COUNT - 1)
+ sb.append(",");
+ }
+
+ sql(sb.toString());
+
+ assertEquals(ENTRIES_COUNT, client.getOrCreateCache(tableName +
"_CACHE").size(CachePeekMode.PRIMARY));
+ });
+
+ Stream.of("T2").forEach(tableName -> {
+ StringBuilder sb = new StringBuilder("INSERT INTO
").append(tableName)
+ .append("(ID, AK, IDX_VAL, VAL) VALUES ");
+
+ for (int i = 0; i < 10000; ++i) {
+ sb.append("(").append(i).append(", ")
+ .append(i).append(",")
+ .append("'name_").append(i).append("', ")
+ .append("'name_").append(i).append("')");
+
+ if (i < ENTRIES_COUNT - 1)
+ sb.append(",");
+ }
+
+ sql(sb.toString());
+
+ assertEquals(ENTRIES_COUNT, client.getOrCreateCache(tableName +
"_CACHE").size(CachePeekMode.PRIMARY));
+ });
+
+ sql("ANALYZE PUBLIC.T1(ID), PUBLIC.T2(ID,AK), PUBLIC.DICT(ID) WITH
\"NULLS=0,DISTINCT=10000,TOTAL=10000\"");
+
+ clearIntercepted();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ clearIntercepted();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int nodeCount() {
+ return 8;
+ }
+
+ /** */
+ @Test
+ public void testSimple() {
+ execute("select count(*) from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals(1L, res.get(0).get(0));
+ },
+ 123);
+
+ execute("select * from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123);
+
+ execute("select * from T1 where ? = T1.ID",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123);
+
+ execute("select VAL, IDX_VAL, ID from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123);
+
+ execute("select * from T1 where T1.ID = ? and T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123, 123);
+ }
+
+ /** */
+ @Test
+ public void testNullsInCondition() {
+ execute("select * from T1 where T1.ID is NULL",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ });
+
+ execute("select * from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ }, new Object[]{ null });
+
+ execute("select * from T1 where T1.ID is NULL and T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ }, 123);
+
+ execute("select * from T1 where T1.ID = ? and T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ }, null, 123);
+
+ execute("select * from T1 where T1.ID is NULL or T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ }, 123);
+
+ execute("select * from T1 where T1.ID = ? or T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ }, null, 123);
+ }
+
+ /** */
+ @Test
+ public void testEmptyConditions() {
+ execute("select * from T1 where T1.ID = ? and T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518);
+
+ execute("select * from T1 where T1.ID = ? and (T1.ID = ? OR T1.ID = ?
OR T1.ID = ?)",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518, 781, 295);
+
+ execute("select * from T1 where (T1.ID = ? OR T1.ID = ?) AND (T1.ID =
? OR T1.ID = ?)",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518, 781, 295);
+
+ execute("select * from T1 where (T1.ID = ? AND T1.ID = ?) OR (T1.ID =
? AND T1.ID = ?)",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518, 781, 295);
+ }
+
+ /** */
+ @Test
+ public void testSelectIn() {
+ IntStream.of(2, 6).forEach(i -> {
+ testSelect(i, true, "_KEY");
+ testSelect(i, true, "ID");
+ });
+ }
+
+
+ /** */
+ @Test
+ public void testSelectOr() {
+ testSelect(1, false, "_KEY");
+ testSelect(1, false, "ID");
+
+ IntStream.of(2, 6).forEach(i -> {
+ testSelect(i, false, "_KEY");
+ testSelect(i, false, "ID");
+ });
+ }
+
+ /** */
+ @Test
+ public void testSimpleJoin() {
Review Comment:
Let's add test with multiple imputs, but not join (union or other set op for
example)
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionPruneTest.java:
##########
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.junit.Test;
+
+/** */
+public class PartitionPruneTest extends AbstractBasicIntegrationTest {
+ /** */
+ private static final int ENTRIES_COUNT = 10000;
+
+ /** */
+ private static final LongAdder INTERCEPTED_START_REQUEST_COUNT = new
LongAdder();
+
+ /** */
+ private static final ConcurrentSkipListSet<Integer> INTERCEPTED_PARTS =
new ConcurrentSkipListSet<>();
+
+ /** */
+ private static final ConcurrentSkipListSet<ClusterNode> INTERCEPTED_NODES
= new ConcurrentSkipListSet<>(
+ Comparator.comparing(ClusterNode::id));
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.getSqlConfiguration().setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration());
+
+ cfg.setCommunicationSpi(new TcpCommunicationSpi() {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
IgniteInClosure<IgniteException> ackC) {
+ assert msg != null;
+
+ if (msg instanceof GridIoMessage) {
+ GridIoMessage msg0 = (GridIoMessage)msg;
+
+ if (msg0.message() instanceof QueryStartRequest) {
+ INTERCEPTED_START_REQUEST_COUNT.increment();
+ INTERCEPTED_NODES.add(node);
+
+ QueryStartRequest startReq =
(QueryStartRequest)msg0.message();
+
+ assertNotNull(startReq.fragmentDescription());
+
+ FragmentMapping mapping =
startReq.fragmentDescription().mapping();
+
+ assertNotNull(mapping);
+
+ List<ColocationGroup> groups = U.field(mapping,
"colocationGroups");
+
+ assertEquals(1, F.size(groups));
+
+ int[] parts = F.first(groups).partitions(node.id());
+
+ if (!F.isEmpty(parts)) {
+ for (int part: parts)
+ INTERCEPTED_PARTS.add(part);
+ }
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ });
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ sql("CREATE TABLE T1(ID INT, IDX_VAL VARCHAR, VAL VARCHAR, PRIMARY
KEY(ID)) WITH cache_name=t1_cache,backups=1");
+ sql("CREATE TABLE T2(ID INT, AK INT, IDX_VAL VARCHAR, VAL VARCHAR,
PRIMARY KEY(ID, AK)) WITH " +
+ "cache_name=t2_cache,backups=1,affinity_key=AK");
+ sql("CREATE TABLE DICT(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL
VARCHAR) WITH template=replicated,cache_name=dict_cache");
+
+ sql("CREATE INDEX T1_IDX ON T1(IDX_VAL)");
+ sql("CREATE INDEX T2_IDX ON T2(IDX_VAL)");
+ sql("CREATE INDEX DICT_IDX ON DICT(IDX_VAL)");
+
+ Stream.of("T1", "DICT").forEach(tableName -> {
+ StringBuilder sb = new StringBuilder("INSERT INTO
").append(tableName)
+ .append("(ID, IDX_VAL, VAL) VALUES ");
+
+ for (int i = 0; i < 10000; ++i) {
+ sb.append("(").append(i).append(", ")
+ .append("'name_").append(i).append("', ")
+ .append("'name_").append(i).append("')");
+
+ if (i < ENTRIES_COUNT - 1)
+ sb.append(",");
+ }
+
+ sql(sb.toString());
+
+ assertEquals(ENTRIES_COUNT, client.getOrCreateCache(tableName +
"_CACHE").size(CachePeekMode.PRIMARY));
+ });
+
+ Stream.of("T2").forEach(tableName -> {
+ StringBuilder sb = new StringBuilder("INSERT INTO
").append(tableName)
+ .append("(ID, AK, IDX_VAL, VAL) VALUES ");
+
+ for (int i = 0; i < 10000; ++i) {
+ sb.append("(").append(i).append(", ")
+ .append(i).append(",")
+ .append("'name_").append(i).append("', ")
+ .append("'name_").append(i).append("')");
+
+ if (i < ENTRIES_COUNT - 1)
+ sb.append(",");
+ }
+
+ sql(sb.toString());
+
+ assertEquals(ENTRIES_COUNT, client.getOrCreateCache(tableName +
"_CACHE").size(CachePeekMode.PRIMARY));
+ });
+
+ sql("ANALYZE PUBLIC.T1(ID), PUBLIC.T2(ID,AK), PUBLIC.DICT(ID) WITH
\"NULLS=0,DISTINCT=10000,TOTAL=10000\"");
+
+ clearIntercepted();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ clearIntercepted();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int nodeCount() {
+ return 8;
+ }
+
+ /** */
+ @Test
+ public void testSimple() {
+ execute("select count(*) from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals(1L, res.get(0).get(0));
+ },
+ 123);
+
+ execute("select * from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123);
+
+ execute("select * from T1 where ? = T1.ID",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123);
+
+ execute("select VAL, IDX_VAL, ID from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123);
+
+ execute("select * from T1 where T1.ID = ? and T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123, 123);
+ }
+
+ /** */
+ @Test
+ public void testNullsInCondition() {
+ execute("select * from T1 where T1.ID is NULL",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ });
+
+ execute("select * from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ }, new Object[]{ null });
+
+ execute("select * from T1 where T1.ID is NULL and T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ }, 123);
+
+ execute("select * from T1 where T1.ID = ? and T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ }, null, 123);
+
+ execute("select * from T1 where T1.ID is NULL or T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ }, 123);
+
+ execute("select * from T1 where T1.ID = ? or T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ }, null, 123);
+ }
+
+ /** */
+ @Test
+ public void testEmptyConditions() {
+ execute("select * from T1 where T1.ID = ? and T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518);
+
+ execute("select * from T1 where T1.ID = ? and (T1.ID = ? OR T1.ID = ?
OR T1.ID = ?)",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518, 781, 295);
+
+ execute("select * from T1 where (T1.ID = ? OR T1.ID = ?) AND (T1.ID =
? OR T1.ID = ?)",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518, 781, 295);
+
+ execute("select * from T1 where (T1.ID = ? AND T1.ID = ?) OR (T1.ID =
? AND T1.ID = ?)",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518, 781, 295);
+ }
+
+ /** */
+ @Test
+ public void testSelectIn() {
+ IntStream.of(2, 6).forEach(i -> {
+ testSelect(i, true, "_KEY");
+ testSelect(i, true, "ID");
+ });
+ }
+
+
+ /** */
+ @Test
+ public void testSelectOr() {
+ testSelect(1, false, "_KEY");
+ testSelect(1, false, "ID");
+
+ IntStream.of(2, 6).forEach(i -> {
+ testSelect(i, false, "_KEY");
+ testSelect(i, false, "ID");
+ });
+ }
+
+ /** */
+ @Test
+ public void testSimpleJoin() {
+ // Key (not alias).
+ execute("SELECT * FROM T1 INNER JOIN T2 ON T1.ID = T2.AK WHERE T1.ID =
?",
+ (res) -> {
+// assertPartitions(partition("T1_CACHE", 123));
+// assertNodes(node("T1_CACHE", 123));
+ assertEquals(1, res.size());
+ assertEquals(123, res.get(0).get(0));
+ },
+ 123
+ );
+
+ // Key (alias).
+ execute("SELECT * FROM T1 INNER JOIN T2 ON T1.ID = T2.AK WHERE T1._KEY
= ?",
+ (res) -> {
+// assertPartitions(partition("T1_CACHE", 125));
+// assertNodes(node("T1_CACHE", 125));
+
+ assertEquals(1, res.size());
+ assertEquals(125, res.get(0).get(0));
+ },
+ 125
+ );
+
+ // Non-affinity key.
+ execute("SELECT * FROM T1 INNER JOIN T2 ON T1.ID = T2.AK WHERE T2.ID =
?",
+ (res) -> {
+ assertEquals(1, res.size());
+ assertEquals(125, res.get(0).get(0));
+ },
+ 125
+ );
+
+ // Affinity key.
+ execute("SELECT * FROM T1 INNER JOIN T2 ON T1.ID = T2.AK WHERE T2.AK =
?",
+ (res) -> {
+// assertPartitions(partition("T2_CACHE", 125));
+ assertEquals(1, res.size());
+ assertEquals(125, res.get(0).get(0));
+ },
+ 125
+ );
+ }
+
+ /** */
+ private void testSelect(int sz, boolean withIn, String column) {
+ assertTrue(sz >= 1);
+ int[] values = ThreadLocalRandom.current().ints(0,
ENTRIES_COUNT).distinct().limit(sz).toArray();
+
+ StringBuilder query;
+
+ if (!withIn || sz == 1)
+ query = new StringBuilder("select * from T1 where ");
+ else
+ query = new StringBuilder("select * from T1 where
T1.").append(column).append(" in (");
+
+ for (int i = 0; i < sz; ++i) {
+ if (!withIn || sz == 1)
+ query.append("T1.").append(column).append("= ?");
+ else
+ query.append('?');
+
+ if (sz == 1)
+ break;
+
+ if (i == sz - 1)
+ query.append(!withIn ? "" : ")");
+ else
+ query.append(!withIn ? " OR " : ", ");
+ }
+
+ execute(query.toString(),
+ res -> {
+ assertPartitions(IntStream.of(values).map(i ->
partition("T1_CACHE", i)).toArray());
+ assertNodes(IntStream.of(values).mapToObj(i ->
node("T1_CACHE", i)).toArray(ClusterNode[]::new));
+
+ assertEquals(values.length, res.size());
+
+ assertEquals(
+
IntStream.of(values).sorted().boxed().collect(Collectors.toList()),
+ res.stream().map(row ->
row.get(0)).sorted().collect(Collectors.toList())
+ );
+ },
+ IntStream.of(values).boxed().toArray(Integer[]::new));
+ }
+
+ /** */
+ public void execute(String sql, Consumer<List<List<?>>> resConsumer,
Object... args) {
+ log.info(">>> TEST COMBINATION: \"" + sql + "\"");
+
+ // Execute query as is.
+ log.info("Execute \"" + sql + "\" with args " + Arrays.toString(args));
+
+ List<List<?>> res = sql(sql, args);
+
+ resConsumer.accept(res);
+ clearIntercepted();
+
+ // Start filling arguments recursively.
+ if (args != null && args.length > 0)
+ executeCombinations0(sql, resConsumer, new HashSet<>(), args);
+ }
+
+ /** */
+ private void executeCombinations0(
+ String sql,
+ Consumer<List<List<?>>> resConsumer,
+ Set<String> executedSqls,
+ Object... args
+ ) {
+ assert args != null && args.length > 0;
+
+ // Get argument positions.
+ List<Integer> paramPoss = new ArrayList<>();
+
+ int pos = 0;
+
+ while (true) {
+ int paramPos = sql.indexOf('?', pos);
+
+ if (paramPos == -1)
+ break;
+
+ paramPoss.add(paramPos);
+
+ pos = paramPos + 1;
+ }
+
+ for (int i = 0; i < args.length; i++) {
+ // Prepare new SQL and arguments.
+ int paramPos = paramPoss.get(i);
+
+ String newSql = sql.substring(0, paramPos) + (args[i] instanceof
String ? "'" + args[i] + "'" : args[i])
+ + sql.substring(paramPos + 1);
+
+ Object[] newArgs = new Object[args.length - 1];
+
+ int newArgsPos = 0;
+
+ for (int j = 0; j < args.length; j++) {
+ if (j != i)
+ newArgs[newArgsPos++] = args[j];
+ }
+
+ // Execute if this combination was never executed before.
+ if (executedSqls.add(newSql)) {
+ log.info("Execute sql \"" + newSql + "\"");
+
+ List<List<?>> res = sql(newSql, newArgs);
+
+ resConsumer.accept(res);
+ clearIntercepted();
+ }
+
+ // Continue recursively.
+ if (newArgs.length > 0)
+ executeCombinations0(newSql, resConsumer, executedSqls,
newArgs);
+ }
+ }
+
+ /** */
+ protected static void assertPartitions(int... expParts) {
+ Collection<Integer> expParts0 = new TreeSet<>();
+
+ if (!F.isEmpty(expParts)) {
+ for (int expPart : expParts)
+ expParts0.add(expPart);
+ }
+
+ TreeSet<Integer> actualParts = new TreeSet<>(INTERCEPTED_PARTS);
+
+ assertEquals("Unexpected partitions [exp=" + Arrays.toString(expParts)
+ ", actual=" + actualParts + ']',
+ expParts0, actualParts);
+ }
+
+ /** */
+ protected int partition(String cacheName, Object key) {
+ return client.affinity(cacheName).partition(key);
+ }
+
+ /** */
+ protected ClusterNode node(String cacheName, Object key) {
+ return G.allGrids().stream()
+ .filter(ign ->
ign.affinity(cacheName).isPrimary(ign.cluster().localNode(), key))
+ .map(ign -> ign.cluster().localNode()).findFirst().orElse(null);
+ }
+
+ /** */
+ protected static void assertNodes(ClusterNode... expNodes) {
+ Collection<ClusterNode> expNodes0 = new
TreeSet<>(Comparator.comparing(ClusterNode::id));
+
+ if (!F.isEmpty(expNodes)) {
Review Comment:
Redundant braces for one line statement
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PartitionPruneTest.java:
##########
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import
org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.junit.Test;
+
+/** */
+public class PartitionPruneTest extends AbstractBasicIntegrationTest {
+ /** */
+ private static final int ENTRIES_COUNT = 10000;
+
+ /** */
+ private static final LongAdder INTERCEPTED_START_REQUEST_COUNT = new
LongAdder();
+
+ /** */
+ private static final ConcurrentSkipListSet<Integer> INTERCEPTED_PARTS =
new ConcurrentSkipListSet<>();
+
+ /** */
+ private static final ConcurrentSkipListSet<ClusterNode> INTERCEPTED_NODES
= new ConcurrentSkipListSet<>(
+ Comparator.comparing(ClusterNode::id));
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.getSqlConfiguration().setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration());
+
+ cfg.setCommunicationSpi(new TcpCommunicationSpi() {
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
IgniteInClosure<IgniteException> ackC) {
+ assert msg != null;
+
+ if (msg instanceof GridIoMessage) {
+ GridIoMessage msg0 = (GridIoMessage)msg;
+
+ if (msg0.message() instanceof QueryStartRequest) {
+ INTERCEPTED_START_REQUEST_COUNT.increment();
+ INTERCEPTED_NODES.add(node);
+
+ QueryStartRequest startReq =
(QueryStartRequest)msg0.message();
+
+ assertNotNull(startReq.fragmentDescription());
+
+ FragmentMapping mapping =
startReq.fragmentDescription().mapping();
+
+ assertNotNull(mapping);
+
+ List<ColocationGroup> groups = U.field(mapping,
"colocationGroups");
+
+ assertEquals(1, F.size(groups));
+
+ int[] parts = F.first(groups).partitions(node.id());
+
+ if (!F.isEmpty(parts)) {
+ for (int part: parts)
+ INTERCEPTED_PARTS.add(part);
+ }
+ }
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ });
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ sql("CREATE TABLE T1(ID INT, IDX_VAL VARCHAR, VAL VARCHAR, PRIMARY
KEY(ID)) WITH cache_name=t1_cache,backups=1");
+ sql("CREATE TABLE T2(ID INT, AK INT, IDX_VAL VARCHAR, VAL VARCHAR,
PRIMARY KEY(ID, AK)) WITH " +
+ "cache_name=t2_cache,backups=1,affinity_key=AK");
+ sql("CREATE TABLE DICT(ID INT PRIMARY KEY, IDX_VAL VARCHAR, VAL
VARCHAR) WITH template=replicated,cache_name=dict_cache");
+
+ sql("CREATE INDEX T1_IDX ON T1(IDX_VAL)");
+ sql("CREATE INDEX T2_IDX ON T2(IDX_VAL)");
+ sql("CREATE INDEX DICT_IDX ON DICT(IDX_VAL)");
+
+ Stream.of("T1", "DICT").forEach(tableName -> {
+ StringBuilder sb = new StringBuilder("INSERT INTO
").append(tableName)
+ .append("(ID, IDX_VAL, VAL) VALUES ");
+
+ for (int i = 0; i < 10000; ++i) {
+ sb.append("(").append(i).append(", ")
+ .append("'name_").append(i).append("', ")
+ .append("'name_").append(i).append("')");
+
+ if (i < ENTRIES_COUNT - 1)
+ sb.append(",");
+ }
+
+ sql(sb.toString());
+
+ assertEquals(ENTRIES_COUNT, client.getOrCreateCache(tableName +
"_CACHE").size(CachePeekMode.PRIMARY));
+ });
+
+ Stream.of("T2").forEach(tableName -> {
+ StringBuilder sb = new StringBuilder("INSERT INTO
").append(tableName)
+ .append("(ID, AK, IDX_VAL, VAL) VALUES ");
+
+ for (int i = 0; i < 10000; ++i) {
+ sb.append("(").append(i).append(", ")
+ .append(i).append(",")
+ .append("'name_").append(i).append("', ")
+ .append("'name_").append(i).append("')");
+
+ if (i < ENTRIES_COUNT - 1)
+ sb.append(",");
+ }
+
+ sql(sb.toString());
+
+ assertEquals(ENTRIES_COUNT, client.getOrCreateCache(tableName +
"_CACHE").size(CachePeekMode.PRIMARY));
+ });
+
+ sql("ANALYZE PUBLIC.T1(ID), PUBLIC.T2(ID,AK), PUBLIC.DICT(ID) WITH
\"NULLS=0,DISTINCT=10000,TOTAL=10000\"");
+
+ clearIntercepted();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ clearIntercepted();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int nodeCount() {
+ return 8;
+ }
+
+ /** */
+ @Test
+ public void testSimple() {
+ execute("select count(*) from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals(1L, res.get(0).get(0));
+ },
+ 123);
+
+ execute("select * from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123);
+
+ execute("select * from T1 where ? = T1.ID",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123);
+
+ execute("select VAL, IDX_VAL, ID from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123);
+
+ execute("select * from T1 where T1.ID = ? and T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ },
+ 123, 123);
+ }
+
+ /** */
+ @Test
+ public void testNullsInCondition() {
+ execute("select * from T1 where T1.ID is NULL",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ });
+
+ execute("select * from T1 where T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ }, new Object[]{ null });
+
+ execute("select * from T1 where T1.ID is NULL and T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ }, 123);
+
+ execute("select * from T1 where T1.ID = ? and T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ }, null, 123);
+
+ execute("select * from T1 where T1.ID is NULL or T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ }, 123);
+
+ execute("select * from T1 where T1.ID = ? or T1.ID = ?",
+ res -> {
+ assertPartitions(partition("T1_CACHE", 123));
+ assertNodes(node("T1_CACHE", 123));
+
+ assertEquals(1, res.size());
+ assertEquals("name_123", res.get(0).get(1));
+ }, null, 123);
+ }
+
+ /** */
+ @Test
+ public void testEmptyConditions() {
+ execute("select * from T1 where T1.ID = ? and T1.ID = ?",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518);
+
+ execute("select * from T1 where T1.ID = ? and (T1.ID = ? OR T1.ID = ?
OR T1.ID = ?)",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518, 781, 295);
+
+ execute("select * from T1 where (T1.ID = ? OR T1.ID = ?) AND (T1.ID =
? OR T1.ID = ?)",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518, 781, 295);
+
+ execute("select * from T1 where (T1.ID = ? AND T1.ID = ?) OR (T1.ID =
? AND T1.ID = ?)",
+ res -> {
+ assertPartitions();
+ assertNodes();
+
+ assertTrue(res.isEmpty());
+ },
+ 123, 518, 781, 295);
+ }
+
+ /** */
+ @Test
+ public void testSelectIn() {
+ IntStream.of(2, 6).forEach(i -> {
+ testSelect(i, true, "_KEY");
+ testSelect(i, true, "ID");
+ });
+ }
+
+
+ /** */
+ @Test
+ public void testSelectOr() {
+ testSelect(1, false, "_KEY");
+ testSelect(1, false, "ID");
+
+ IntStream.of(2, 6).forEach(i -> {
+ testSelect(i, false, "_KEY");
+ testSelect(i, false, "ID");
+ });
+ }
+
+ /** */
+ @Test
+ public void testSimpleJoin() {
+ // Key (not alias).
+ execute("SELECT * FROM T1 INNER JOIN T2 ON T1.ID = T2.AK WHERE T1.ID =
?",
+ (res) -> {
+// assertPartitions(partition("T1_CACHE", 123));
+// assertNodes(node("T1_CACHE", 123));
+ assertEquals(1, res.size());
+ assertEquals(123, res.get(0).get(0));
+ },
+ 123
+ );
+
+ // Key (alias).
+ execute("SELECT * FROM T1 INNER JOIN T2 ON T1.ID = T2.AK WHERE T1._KEY
= ?",
+ (res) -> {
+// assertPartitions(partition("T1_CACHE", 125));
+// assertNodes(node("T1_CACHE", 125));
+
+ assertEquals(1, res.size());
+ assertEquals(125, res.get(0).get(0));
+ },
+ 125
+ );
+
+ // Non-affinity key.
+ execute("SELECT * FROM T1 INNER JOIN T2 ON T1.ID = T2.AK WHERE T2.ID =
?",
+ (res) -> {
+ assertEquals(1, res.size());
+ assertEquals(125, res.get(0).get(0));
+ },
+ 125
+ );
+
+ // Affinity key.
+ execute("SELECT * FROM T1 INNER JOIN T2 ON T1.ID = T2.AK WHERE T2.AK =
?",
+ (res) -> {
+// assertPartitions(partition("T2_CACHE", 125));
+ assertEquals(1, res.size());
+ assertEquals(125, res.get(0).get(0));
+ },
+ 125
+ );
+ }
+
+ /** */
+ private void testSelect(int sz, boolean withIn, String column) {
+ assertTrue(sz >= 1);
+ int[] values = ThreadLocalRandom.current().ints(0,
ENTRIES_COUNT).distinct().limit(sz).toArray();
+
+ StringBuilder query;
+
+ if (!withIn || sz == 1)
+ query = new StringBuilder("select * from T1 where ");
+ else
+ query = new StringBuilder("select * from T1 where
T1.").append(column).append(" in (");
+
+ for (int i = 0; i < sz; ++i) {
+ if (!withIn || sz == 1)
+ query.append("T1.").append(column).append("= ?");
+ else
+ query.append('?');
+
+ if (sz == 1)
+ break;
+
+ if (i == sz - 1)
+ query.append(!withIn ? "" : ")");
+ else
+ query.append(!withIn ? " OR " : ", ");
+ }
+
+ execute(query.toString(),
+ res -> {
+ assertPartitions(IntStream.of(values).map(i ->
partition("T1_CACHE", i)).toArray());
+ assertNodes(IntStream.of(values).mapToObj(i ->
node("T1_CACHE", i)).toArray(ClusterNode[]::new));
+
+ assertEquals(values.length, res.size());
+
+ assertEquals(
+
IntStream.of(values).sorted().boxed().collect(Collectors.toList()),
+ res.stream().map(row ->
row.get(0)).sorted().collect(Collectors.toList())
+ );
+ },
+ IntStream.of(values).boxed().toArray(Integer[]::new));
+ }
+
+ /** */
+ public void execute(String sql, Consumer<List<List<?>>> resConsumer,
Object... args) {
+ log.info(">>> TEST COMBINATION: \"" + sql + "\"");
+
+ // Execute query as is.
+ log.info("Execute \"" + sql + "\" with args " + Arrays.toString(args));
+
+ List<List<?>> res = sql(sql, args);
+
+ resConsumer.accept(res);
+ clearIntercepted();
+
+ // Start filling arguments recursively.
+ if (args != null && args.length > 0)
+ executeCombinations0(sql, resConsumer, new HashSet<>(), args);
+ }
+
+ /** */
+ private void executeCombinations0(
+ String sql,
+ Consumer<List<List<?>>> resConsumer,
+ Set<String> executedSqls,
+ Object... args
+ ) {
+ assert args != null && args.length > 0;
+
+ // Get argument positions.
+ List<Integer> paramPoss = new ArrayList<>();
+
+ int pos = 0;
+
+ while (true) {
+ int paramPos = sql.indexOf('?', pos);
+
+ if (paramPos == -1)
+ break;
+
+ paramPoss.add(paramPos);
+
+ pos = paramPos + 1;
+ }
+
+ for (int i = 0; i < args.length; i++) {
+ // Prepare new SQL and arguments.
+ int paramPos = paramPoss.get(i);
+
+ String newSql = sql.substring(0, paramPos) + (args[i] instanceof
String ? "'" + args[i] + "'" : args[i])
+ + sql.substring(paramPos + 1);
+
+ Object[] newArgs = new Object[args.length - 1];
+
+ int newArgsPos = 0;
+
+ for (int j = 0; j < args.length; j++) {
+ if (j != i)
+ newArgs[newArgsPos++] = args[j];
+ }
+
+ // Execute if this combination was never executed before.
+ if (executedSqls.add(newSql)) {
+ log.info("Execute sql \"" + newSql + "\"");
+
+ List<List<?>> res = sql(newSql, newArgs);
+
+ resConsumer.accept(res);
+ clearIntercepted();
+ }
+
+ // Continue recursively.
+ if (newArgs.length > 0)
+ executeCombinations0(newSql, resConsumer, executedSqls,
newArgs);
+ }
+ }
+
+ /** */
+ protected static void assertPartitions(int... expParts) {
+ Collection<Integer> expParts0 = new TreeSet<>();
+
+ if (!F.isEmpty(expParts)) {
+ for (int expPart : expParts)
+ expParts0.add(expPart);
+ }
+
+ TreeSet<Integer> actualParts = new TreeSet<>(INTERCEPTED_PARTS);
+
+ assertEquals("Unexpected partitions [exp=" + Arrays.toString(expParts)
+ ", actual=" + actualParts + ']',
+ expParts0, actualParts);
+ }
+
+ /** */
+ protected int partition(String cacheName, Object key) {
+ return client.affinity(cacheName).partition(key);
+ }
+
+ /** */
+ protected ClusterNode node(String cacheName, Object key) {
+ return G.allGrids().stream()
+ .filter(ign ->
ign.affinity(cacheName).isPrimary(ign.cluster().localNode(), key))
+ .map(ign -> ign.cluster().localNode()).findFirst().orElse(null);
+ }
+
+ /** */
+ protected static void assertNodes(ClusterNode... expNodes) {
+ Collection<ClusterNode> expNodes0 = new
TreeSet<>(Comparator.comparing(ClusterNode::id));
+
+ if (!F.isEmpty(expNodes)) {
+ expNodes0.addAll(Arrays.asList(expNodes));
+ }
+
+ TreeSet<ClusterNode> actualNodes = new TreeSet<>(INTERCEPTED_NODES);
+
+ assertEquals("Unexpected nodes [exp=" + Arrays.toString(expNodes) + ",
actual=" + actualNodes + ']',
Review Comment:
`Arrays.toString(expNodes)` -> `expNodes0` (output will be sorted)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]