alex-plekhanov commented on code in PR #10928:
URL: https://github.com/apache/ignite/pull/10928#discussion_r1340968143


##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionNode.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.partition;
+
+import java.util.Collection;
+
+/** */
+public interface PartitionNode {
+    /** */
+    Collection<Integer> apply(PartitionPruningContext ctx);

Review Comment:
   `@Nullable`?
   And let's describe meaning of null value in comments.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionNode.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.partition;
+
+import java.util.Collection;
+
+/** */
+public interface PartitionNode {
+    /** */
+    Collection<Integer> apply(PartitionPruningContext ctx);
+
+    /** */
+    default int cacheId() {
+        return Integer.MIN_VALUE;

Review Comment:
   `MIN_VALUE` can be real cache id, let's use `CU.UNDEFINED_CACHE_ID`.
   Also method can be moved to `PartitionSingleNode` (or field `cacheId` can be 
used instead).



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PartitionExtractor.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.stream.Collectors;
+import com.google.common.primitives.Primitives;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionAllNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionLiteralNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionNoneNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionOperandNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionParameterNode;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgniteRelShuttle;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.ProjectableFilterableTableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/** */
+public class PartitionExtractor extends IgniteRelShuttle {
+    /** */
+    private final IgniteTypeFactory typeFactory;
+
+    /** */
+    private final Deque<PartitionNode> stack = new ArrayDeque<>();
+
+    /** */
+    public PartitionExtractor(IgniteTypeFactory typeFactory) {
+        this.typeFactory = typeFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteIndexScan rel) {
+        processScan(rel);
+
+        return rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTableScan rel) {
+        processScan(rel);
+
+        return rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteRel processNode(IgniteRel rel) {
+        IgniteRel res = super.processNode(rel);
+
+        List<PartitionNode> operands = new ArrayList<>();
+        for (int i = 0; i < rel.getInputs().size(); ++i)
+            operands.add(stack.pop());
+
+        if (rel instanceof IgniteUnionAll)
+            stack.push(PartitionOperandNode.createOrOperandNode(operands));
+        else
+            stack.push(PartitionOperandNode.createAndOperandNode(operands));

Review Comment:
   Why `createAndOperandNode` here? For example, if we have `JOIN` with two 
inputs on different partitions, we need data from both partitions, but not 
intersection of this partitions. Or if we have some SetOp other than `UnionAll` 
(`MINUS` for example) we also need all partitions. I.e.:
   ```
   SELECT val FROM t WHERE id in (1, 2, 3)
   MINUS
   SELECT val FROM t WHERE id = 4
   ```
   



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PartitionExtractor.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.stream.Collectors;
+import com.google.common.primitives.Primitives;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionAllNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionLiteralNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionNoneNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionOperandNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionParameterNode;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgniteRelShuttle;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.ProjectableFilterableTableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/** */
+public class PartitionExtractor extends IgniteRelShuttle {
+    /** */
+    private final IgniteTypeFactory typeFactory;
+
+    /** */
+    private final Deque<PartitionNode> stack = new ArrayDeque<>();
+
+    /** */
+    public PartitionExtractor(IgniteTypeFactory typeFactory) {
+        this.typeFactory = typeFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteIndexScan rel) {
+        processScan(rel);
+
+        return rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTableScan rel) {
+        processScan(rel);
+
+        return rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteRel processNode(IgniteRel rel) {
+        IgniteRel res = super.processNode(rel);
+

Review Comment:
   ```
   if (rel.getInputs().size() <= 1)
       return res;
   ```
   ?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TypeUtils.java:
##########
@@ -498,4 +505,46 @@ public static RexNode toRexLiteral(Object dfltVal, 
RelDataType type, DataContext
 
         return rexBuilder.makeLiteral(dfltVal, type, true);
     }
+
+    /**
+     * @return New object of specific type.
+     */
+    public static Object createObject(GridCacheContext<?, ?> cctx, String 
typeName, Class<?> typeCls) throws IgniteException {

Review Comment:
   Looks like redundant change, used only by CacheTableDescriptorImpl and was 
moved from this class



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PartitionExtractor.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.stream.Collectors;
+import com.google.common.primitives.Primitives;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionAllNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionLiteralNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionNoneNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionOperandNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionParameterNode;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgniteRelShuttle;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.ProjectableFilterableTableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/** */
+public class PartitionExtractor extends IgniteRelShuttle {
+    /** */
+    private final IgniteTypeFactory typeFactory;
+
+    /** */
+    private final Deque<PartitionNode> stack = new ArrayDeque<>();
+
+    /** */
+    public PartitionExtractor(IgniteTypeFactory typeFactory) {
+        this.typeFactory = typeFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteIndexScan rel) {
+        processScan(rel);
+
+        return rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTableScan rel) {
+        processScan(rel);
+
+        return rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteRel processNode(IgniteRel rel) {
+        IgniteRel res = super.processNode(rel);
+
+        List<PartitionNode> operands = new ArrayList<>();
+        for (int i = 0; i < rel.getInputs().size(); ++i)
+            operands.add(stack.pop());
+
+        if (rel instanceof IgniteUnionAll)
+            stack.push(PartitionOperandNode.createOrOperandNode(operands));
+        else
+            stack.push(PartitionOperandNode.createAndOperandNode(operands));
+
+        return res;
+    }
+
+    /** */
+    public PartitionNode go(Fragment fragment) {
+        if (!(fragment.root() instanceof IgniteSender))
+            return PartitionAllNode.INSTANCE;
+
+        if (fragment.mapping() == null || !fragment.mapping().colocated())
+            return PartitionAllNode.INSTANCE;
+
+        visit(fragment.root());
+
+        return stack.isEmpty() ? PartitionAllNode.INSTANCE : stack.pop();
+    }
+
+    /** */
+    private void processScan(IgniteRel rel) {
+        assert rel instanceof ProjectableFilterableTableScan;
+
+        RexNode condition = ((ProjectableFilterableTableScan)rel).condition();
+
+        if (!rel.distribution().function().affinity() || 
rel.distribution().getKeys().size() != 1) {
+            stack.push(PartitionAllNode.INSTANCE);
+
+            return;
+        }
+
+        ImmutableIntList keys = 
ImmutableIntList.copyOf(rel.distribution().getKeys());
+        IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);
+        RelDataType rowType = tbl.getRowType(typeFactory);
+        int cacheId = 
((CacheTableDescriptor)tbl.descriptor()).cacheInfo().cacheId();
+
+        List<Class<?>> types = new ArrayList<>(rowType.getFieldCount());
+        for (RelDataTypeField field : rowType.getFieldList()) {
+            if (QueryUtils.KEY_FIELD_NAME.equals(field.getName()))
+                keys = keys.append(field.getIndex());
+
+            
types.add(Primitives.wrap((Class<?>)typeFactory.getJavaClass(field.getType())));
+        }
+
+        PartitionNode partNode = processCondition(condition, types, keys, 
cacheId);

Review Comment:
   We should also take into account `requiredColumns`.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionOperandNode.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.partition;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** */
+public class PartitionOperandNode implements PartitionNode {
+    /** */
+    private final Operand op;
+
+    /** */
+    private final List<PartitionNode> operands;
+
+    /** */
+    private PartitionOperandNode(Operand op, List<PartitionNode> operands) {
+        this.op = op;
+        this.operands = Collections.unmodifiableList(operands);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Integer> apply(PartitionPruningContext ctx) {
+        Set<Integer> allParts = null;
+
+        if (op == Operand.AND) {
+            for (PartitionNode operand : operands) {
+                Collection<Integer> parts = operand.apply(ctx);
+
+                if (parts == null)
+                    continue;
+
+                if (allParts == null)
+                    allParts = new HashSet<>(parts);
+                else
+                    allParts.retainAll(parts);
+            }
+        }
+        else {
+            for (PartitionNode operand: operands) {
+                Collection<Integer> parts = operand.apply(ctx);
+
+                if (parts == null)

Review Comment:
   Shouldn't we return `null` in this case? Null only possible for 
PartitionAllNode, in case of `OR` operand we also should return all nodes 
(`null`). `PartitionAllNode` is filtered out by the `optimize` method, but this 
method never called.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/partition/PartitionNode.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.partition;
+
+import java.util.Collection;
+
+/** */
+public interface PartitionNode {
+    /** */
+    Collection<Integer> apply(PartitionPruningContext ctx);
+
+    /** */
+    default int cacheId() {
+        return Integer.MIN_VALUE;
+    }
+
+    /** */
+    default PartitionNode optimize() {

Review Comment:
   Never used.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PartitionExtractor.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.stream.Collectors;
+import com.google.common.primitives.Primitives;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionAllNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionLiteralNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionNoneNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionOperandNode;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.partition.PartitionParameterNode;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+import 
org.apache.ignite.internal.processors.query.calcite.prepare.IgniteRelShuttle;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.ProjectableFilterableTableScan;
+import 
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/** */
+public class PartitionExtractor extends IgniteRelShuttle {
+    /** */
+    private final IgniteTypeFactory typeFactory;
+
+    /** */
+    private final Deque<PartitionNode> stack = new ArrayDeque<>();
+
+    /** */
+    public PartitionExtractor(IgniteTypeFactory typeFactory) {
+        this.typeFactory = typeFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteIndexScan rel) {
+        processScan(rel);
+
+        return rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteTableScan rel) {
+        processScan(rel);
+
+        return rel;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteRel processNode(IgniteRel rel) {
+        IgniteRel res = super.processNode(rel);
+
+        List<PartitionNode> operands = new ArrayList<>();
+        for (int i = 0; i < rel.getInputs().size(); ++i)
+            operands.add(stack.pop());
+
+        if (rel instanceof IgniteUnionAll)
+            stack.push(PartitionOperandNode.createOrOperandNode(operands));
+        else
+            stack.push(PartitionOperandNode.createAndOperandNode(operands));
+
+        return res;
+    }
+
+    /** */
+    public PartitionNode go(Fragment fragment) {
+        if (!(fragment.root() instanceof IgniteSender))
+            return PartitionAllNode.INSTANCE;
+
+        if (fragment.mapping() == null || !fragment.mapping().colocated())
+            return PartitionAllNode.INSTANCE;
+
+        visit(fragment.root());
+
+        return stack.isEmpty() ? PartitionAllNode.INSTANCE : stack.pop();
+    }
+
+    /** */
+    private void processScan(IgniteRel rel) {
+        assert rel instanceof ProjectableFilterableTableScan;
+
+        RexNode condition = ((ProjectableFilterableTableScan)rel).condition();
+
+        if (!rel.distribution().function().affinity() || 
rel.distribution().getKeys().size() != 1) {
+            stack.push(PartitionAllNode.INSTANCE);
+
+            return;
+        }
+
+        ImmutableIntList keys = 
ImmutableIntList.copyOf(rel.distribution().getKeys());
+        IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);
+        RelDataType rowType = tbl.getRowType(typeFactory);
+        int cacheId = 
((CacheTableDescriptor)tbl.descriptor()).cacheInfo().cacheId();
+
+        List<Class<?>> types = new ArrayList<>(rowType.getFieldCount());
+        for (RelDataTypeField field : rowType.getFieldList()) {
+            if (QueryUtils.KEY_FIELD_NAME.equals(field.getName()))
+                keys = keys.append(field.getIndex());
+
+            
types.add(Primitives.wrap((Class<?>)typeFactory.getJavaClass(field.getType())));
+        }
+
+        PartitionNode partNode = processCondition(condition, types, keys, 
cacheId);
+
+        stack.push(partNode);
+    }
+
+    /** */
+    private PartitionNode processCondition(RexNode condition, List<Class<?>> 
types, ImmutableIntList keys, int cacheId) {
+        if (!(condition instanceof RexCall))
+            return PartitionAllNode.INSTANCE;
+
+        SqlKind opKind = condition.getKind();
+        List<RexNode> operands = ((RexCall)condition).getOperands();
+
+        switch (opKind) {
+            case IS_NULL: {
+                RexNode left = operands.get(0);
+
+                if (!left.isA(SqlKind.LOCAL_REF))
+                    return PartitionAllNode.INSTANCE;
+
+                int idx = ((RexLocalRef)left).getIndex();
+
+                if (!keys.contains(idx))
+                    return PartitionAllNode.INSTANCE;
+                else
+                    return PartitionNoneNode.INSTANCE;
+            }
+            case EQUALS: {
+                if (operands.size() != 2)
+                    return PartitionAllNode.INSTANCE;
+
+                RexNode left = operands.get(0);
+                RexNode right = operands.get(1);
+
+                if (!left.isA(SqlKind.LOCAL_REF))

Review Comment:
   What about conditions like `? = t.a`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to