korlov42 commented on code in PR #3335:
URL: https://github.com/apache/ignite-3/pull/3335#discussion_r1520983208
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationGroup.java:
##########
@@ -40,12 +40,12 @@ public class ColocationGroup implements Serializable {
private final List<String> nodeNames;
- private final List<NodeWithConsistencyToken> assignments;
+ private final Map<Integer, NodeWithConsistencyToken> assignments;
Review Comment:
can we use `Int2ObjectMap` here?
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/Partitioned.java:
##########
@@ -17,42 +17,40 @@
package org.apache.ignite.internal.sql.engine.trait;
-import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.RowPartitionExtractor;
-import org.apache.ignite.internal.sql.engine.util.Commons;
/**
* Partitioned.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public final class Partitioned<RowT> implements Destination<RowT> {
- private final List<List<String>> assignments;
+ private final Map<Integer, NodeWithConsistencyToken> assignments;
private final RowPartitionExtractor<RowT> calc;
/**
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public Partitioned(List<String> assignments, RowPartitionExtractor<RowT>
calc) {
+ public Partitioned(Map<Integer, NodeWithConsistencyToken> assignments,
RowPartitionExtractor<RowT> calc) {
this.calc = calc;
- this.assignments = Commons.transform(assignments, List::of);
+ this.assignments = assignments;
}
/** {@inheritDoc} */
@Override
public List<String> targets(RowT row) {
int part = calc.partition(row);
- return assignments.get(part);
+ return List.of(assignments.get(part).name());
}
/** {@inheritDoc} */
@Override
public List<String> targets() {
- return assignments.stream()
- .flatMap(Collection::stream)
- .distinct().collect(Collectors.toList());
+ return
assignments.values().stream().map(NodeWithConsistencyToken::name).collect(Collectors.toUnmodifiableList());
Review Comment:
looks like you forgot to do de-duplication
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java:
##########
@@ -1044,7 +1044,7 @@ public IgniteRel visit(IgniteTableModify rel) {
return super.visit(rel);
}
- private void enlist(int tableId,
List<NodeWithConsistencyToken> assignments) {
+ private void enlist(int tableId, Map<Integer,
NodeWithConsistencyToken> assignments) {
Review Comment:
```suggestion
private void enlist(int tableId,
Int2ObjectMap<NodeWithConsistencyToken> assignments) {
```
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java:
##########
@@ -91,21 +91,26 @@ Destination<RowT> createDestination(IgniteDistribution
distribution, ColocationG
assert !nullOrEmpty(group.assignments()) && !nullOrEmpty(keys);
- List<String> assignments =
Commons.transform(group.assignments(), NodeWithConsistencyToken::name);
-
if (function.affinity()) {
int tableId = ((AffinityDistribution) function).tableId();
Supplier<PartitionCalculator> calculator =
dependencies.partitionCalculator(tableId);
TableDescriptor tableDescriptor =
dependencies.tableDescriptor(tableId);
var resolver = new
TablePartitionExtractor<>(calculator.get(), keys.toIntArray(), tableDescriptor,
rowHandler);
- return new Partitioned<>(assignments, resolver);
+ return new Partitioned<>(group.assignments(), resolver);
}
var resolver = new
RehashingPartitionExtractor<>(group.nodeNames().size(), keys.toIntArray(),
rowHandler);
- return new Partitioned<>(group.nodeNames(), resolver);
+ Int2ObjectMap<NodeWithConsistencyToken> assignments = new
Int2ObjectOpenHashMap<>();
+ int pos = 0;
+
+ for (String name : group.nodeNames()) {
+ assignments.put(pos++, new NodeWithConsistencyToken(name,
0));
Review Comment:
if `consistencyToken` is not used in `Partitioned` destination, then it's
better to remove it from `Partitioned` destination api. Until the benefit is
clear (proven by benchmarks result), I would prefer a cleaner way
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadataExtractor.java:
##########
@@ -137,6 +150,153 @@ public IgniteRel visit(IgniteTableScan rel) {
return rel;
}
+ private static class ModifyNodeShuttle extends IgniteRelShuttle {
+ List<RexNode> projections = null;
+ IgniteValues values = null;
+ List<List<RexNode>> exprProjections = null;
+ boolean collectExprProjections = false;
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel visit(IgniteProject rel) {
+ if (projections == null && !collectExprProjections) {
+ projections = rel.getProjects();
+ } else {
+ if (exprProjections == null) {
+ exprProjections = new ArrayList<>();
+ }
+ exprProjections.add(rel.getProjects());
+ }
+ return super.visit(rel);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel visit(IgniteValues rel) {
+ if (!collectExprProjections) {
+ values = rel;
+ }
+ return super.visit(rel);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel visit(IgniteUnionAll rel) {
+ collectExprProjections = true;
+ return super.visit(rel);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel visit(IgniteTableModify rel) {
+ if (rel.getOperation() != INSERT && rel.getOperation() != DELETE) {
+ return super.visit(rel);
+ }
+
+ IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+
+ RexBuilder rexBuilder = rel.getCluster().getRexBuilder();
+
+ ModifyNodeShuttle modify = new ModifyNodeShuttle();
+ rel.accept(modify);
+
+ extractFromValues(rel.sourceId(), table, modify.values,
modify.projections, modify.exprProjections, rexBuilder);
+
+ return super.visit(rel);
+ }
+
+ private void extractFromValues(
+ long sourceId,
+ IgniteTable table,
+ @Nullable IgniteValues vals,
+ @Nullable List<RexNode> projections,
+ @Nullable List<List<RexNode>> exprProjections,
+ RexBuilder rexBuilder
+ ) {
+ if (vals == null && exprProjections == null) {
+ return;
+ }
+
+ IgniteDistribution distribution = table.distribution();
+ if (!distribution.function().affinity()) {
+ return;
+ }
+
+ IntArrayList keysList = new
IntArrayList(distribution.getKeys().size());
+ for (Integer key : distribution.getKeys()) {
+ keysList.add(key.intValue());
+ }
+
+ Mappings.TargetMapping mapping = null;
+
+ if (projections != null) {
+ IntArrayList projList = new IntArrayList();
+ for (RexNode node : projections) {
+ if (node instanceof RexInputRef) {
+ int idx = ((RexInputRef) node).getIndex();
+ if (keysList.contains(idx)) {
Review Comment:
test on rearranged columns + dynamic params returns empty pruning meta; test
on rearranged columns + two tuples where one of them contains dynamic params
and another doesn't returns pruning meta only for tuple with dynamic params.
In this particular line (`keysList.contains(idx)`) you check whether there
is input ref to input tuple which has the same index as one of the columns of
colocation keys, and this makes no sense
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadataExtractor.java:
##########
@@ -137,6 +150,153 @@ public IgniteRel visit(IgniteTableScan rel) {
return rel;
}
+ private static class ModifyNodeShuttle extends IgniteRelShuttle {
+ List<RexNode> projections = null;
+ IgniteValues values = null;
+ List<List<RexNode>> exprProjections = null;
Review Comment:
still don't quite understand. Values node contains `List<List<RexLiteral>>`
(where `RexLiteral` extends `RexNode`). Project node contains one dimensional
`List<RexNode>`. In case of `UnionAll` we may have several branches containing
projections and values.
Would it be easier to collect everything in `List<List<RexNode>>` during
tree traversal, and then simply derive pruning metadata from this list, rather
than overcompify deriving of meta with conditionals about where to derive meta
from?
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/pruning/PartitionPruningMetadataExtractor.java:
##########
@@ -137,6 +150,153 @@ public IgniteRel visit(IgniteTableScan rel) {
return rel;
}
+ private static class ModifyNodeShuttle extends IgniteRelShuttle {
+ List<RexNode> projections = null;
+ IgniteValues values = null;
+ List<List<RexNode>> exprProjections = null;
+ boolean collectExprProjections = false;
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel visit(IgniteProject rel) {
+ if (projections == null && !collectExprProjections) {
+ projections = rel.getProjects();
+ } else {
+ if (exprProjections == null) {
+ exprProjections = new ArrayList<>();
+ }
+ exprProjections.add(rel.getProjects());
+ }
+ return super.visit(rel);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel visit(IgniteValues rel) {
+ if (!collectExprProjections) {
+ values = rel;
+ }
+ return super.visit(rel);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel visit(IgniteUnionAll rel) {
+ collectExprProjections = true;
+ return super.visit(rel);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgniteRel visit(IgniteTableModify rel) {
+ if (rel.getOperation() != INSERT && rel.getOperation() != DELETE) {
Review Comment:
how DELETE is different from the case above?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]