korlov42 commented on code in PR #1798:
URL: https://github.com/apache/ignite-3/pull/1798#discussion_r1145770064
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java:
##########
@@ -91,4 +96,18 @@ public CompletableFuture<BatchedResult<T>>
requestNextAsync(int rows) {
public CompletableFuture<Void> closeAsync() {
return dataCursor.closeAsync();
}
+
+ private static Throwable unwrapRemoteCause(@NotNull Throwable t) {
Review Comment:
The problem with this method is that you drop several stack frames, thereby
making almost impossible to identify actual call chain
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java:
##########
@@ -663,6 +670,8 @@ void nodeCanInstallSnapshotsAfterSnapshotInstalledToIt()
throws Exception {
* Tests that, if a snapshot installation fails for some reason, a
subsequent retry due to a timeout happens successfully.
*/
@Test
+ // Hangs at
org.apache.ignite.internal.sql.engine.message.MessageServiceImpl.send(MessageServiceImpl.java:98)
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19088")
Review Comment:
Looks like the reason of hanging is line `DefaultMessagingService.java:186`:
at this line the future that will never be completed is returned from `send()`
method, and we waiting for completion inside
`org.apache.ignite.internal.sql.engine.message.MessageServiceImpl#send` without
any timeouts. I think, we should specify timeout in 1sec (or similar, this
timeout to send the message only, those I consider timeout in 1 second fairly
big) to work around this limitation. Besides, we already have ticket to fix
this properly https://issues.apache.org/jira/browse/IGNITE-17685
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java:
##########
@@ -48,12 +67,63 @@ public TableModifyConverterRule() {
@Override
protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
LogicalTableModify rel) {
RelOptCluster cluster = rel.getCluster();
+ RelOptTable relTable = rel.getTable();
+ IgniteTable igniteTable = relTable.unwrap(IgniteTable.class);
+ assert igniteTable != null;
+
RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
- .replace(IgniteDistributions.single())
+ .replace(igniteTable.distribution())
.replace(RelCollations.EMPTY);
+
RelNode input = convert(rel.getInput(), traits);
- return new IgniteTableModify(cluster, traits, rel.getTable(), input,
+ IgniteTableModify tableModify = new IgniteTableModify(cluster, traits,
relTable, input,
rel.getOperation(), rel.getUpdateColumnList(),
rel.getSourceExpressionList(), rel.isFlattened());
+
+ if (igniteTable.distribution().equals(IgniteDistributions.single())) {
+ return tableModify;
+ } else {
+ return createAggregate(tableModify, cluster);
+ }
+ }
+
+ private static PhysicalNode createAggregate(IgniteTableModify tableModify,
RelOptCluster cluster) {
+
+ RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+ RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+
+ RelDataType rowType = tableModify.getRowType();
+ RelDataTypeFactory typeFactory = cluster.getTypeFactory();
+ RelDataType sumType =
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.DECIMAL),
true);
+ int aggCallCnt = rowType.getFieldCount();
+ List<AggregateCall> aggCalls = new ArrayList<>();
+
+ for (int i = 0; i < aggCallCnt; i++) {
+ AggregateCall sum = AggregateCall.create(SqlStdOperatorTable.SUM,
false, false,
+ false, ImmutableList.of(i), -1, null, RelCollations.EMPTY,
0, tableModify,
+ sumType, null);
+
+ aggCalls.add(sum);
+ }
+
+ IgniteColocatedHashAggregate sumAgg = new IgniteColocatedHashAggregate(
+ cluster,
+ outTrait.replace(IgniteDistributions.single()),
+ convert(tableModify,
inTrait.replace(IgniteDistributions.single())),
+ ImmutableBitSet.of(),
+ List.of(ImmutableBitSet.of()),
+ aggCalls
+ );
+
+ var rowField = rowType.getFieldList().get(0);
+ var typeOfSum = typeFactory.createSqlType(SqlTypeName.BIGINT);
+ var convertedRowType =
typeFactory.createStructType(List.of(Map.entry(rowField.getName(), typeOfSum)));
Review Comment:
`var` is not allowed in such context
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java:
##########
@@ -48,12 +67,63 @@ public TableModifyConverterRule() {
@Override
protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
LogicalTableModify rel) {
RelOptCluster cluster = rel.getCluster();
+ RelOptTable relTable = rel.getTable();
+ IgniteTable igniteTable = relTable.unwrap(IgniteTable.class);
+ assert igniteTable != null;
+
RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
- .replace(IgniteDistributions.single())
+ .replace(igniteTable.distribution())
.replace(RelCollations.EMPTY);
+
RelNode input = convert(rel.getInput(), traits);
- return new IgniteTableModify(cluster, traits, rel.getTable(), input,
+ IgniteTableModify tableModify = new IgniteTableModify(cluster, traits,
relTable, input,
rel.getOperation(), rel.getUpdateColumnList(),
rel.getSourceExpressionList(), rel.isFlattened());
+
+ if (igniteTable.distribution().equals(IgniteDistributions.single())) {
+ return tableModify;
+ } else {
+ return createAggregate(tableModify, cluster);
+ }
+ }
+
+ private static PhysicalNode createAggregate(IgniteTableModify tableModify,
RelOptCluster cluster) {
+
+ RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+ RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+
+ RelDataType rowType = tableModify.getRowType();
+ RelDataTypeFactory typeFactory = cluster.getTypeFactory();
+ RelDataType sumType =
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.DECIMAL),
true);
+ int aggCallCnt = rowType.getFieldCount();
+ List<AggregateCall> aggCalls = new ArrayList<>();
+
+ for (int i = 0; i < aggCallCnt; i++) {
Review Comment:
This doesn't look reliable. For now it works just because ModifyNode
returns exactly one column, but one of the possible changes is support for
RETURNING clause (see
[this](https://www.postgresql.org/docs/current/dml-returning.html) for
details). With this in mind, I definitely would not take SUM over an arbitrary
column
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java:
##########
@@ -48,12 +67,63 @@ public TableModifyConverterRule() {
@Override
protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
LogicalTableModify rel) {
RelOptCluster cluster = rel.getCluster();
+ RelOptTable relTable = rel.getTable();
+ IgniteTable igniteTable = relTable.unwrap(IgniteTable.class);
+ assert igniteTable != null;
+
RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
- .replace(IgniteDistributions.single())
+ .replace(igniteTable.distribution())
.replace(RelCollations.EMPTY);
+
RelNode input = convert(rel.getInput(), traits);
- return new IgniteTableModify(cluster, traits, rel.getTable(), input,
+ IgniteTableModify tableModify = new IgniteTableModify(cluster, traits,
relTable, input,
rel.getOperation(), rel.getUpdateColumnList(),
rel.getSourceExpressionList(), rel.isFlattened());
+
+ if (igniteTable.distribution().equals(IgniteDistributions.single())) {
+ return tableModify;
+ } else {
+ return createAggregate(tableModify, cluster);
+ }
+ }
+
+ private static PhysicalNode createAggregate(IgniteTableModify tableModify,
RelOptCluster cluster) {
+
+ RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+ RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+
+ RelDataType rowType = tableModify.getRowType();
+ RelDataTypeFactory typeFactory = cluster.getTypeFactory();
+ RelDataType sumType =
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.DECIMAL),
true);
+ int aggCallCnt = rowType.getFieldCount();
+ List<AggregateCall> aggCalls = new ArrayList<>();
+
+ for (int i = 0; i < aggCallCnt; i++) {
+ AggregateCall sum = AggregateCall.create(SqlStdOperatorTable.SUM,
false, false,
+ false, ImmutableList.of(i), -1, null, RelCollations.EMPTY,
0, tableModify,
+ sumType, null);
+
+ aggCalls.add(sum);
+ }
+
+ IgniteColocatedHashAggregate sumAgg = new IgniteColocatedHashAggregate(
+ cluster,
+ outTrait.replace(IgniteDistributions.single()),
+ convert(tableModify,
inTrait.replace(IgniteDistributions.single())),
+ ImmutableBitSet.of(),
+ List.of(ImmutableBitSet.of()),
+ aggCalls
+ );
+
+ var rowField = rowType.getFieldList().get(0);
+ var typeOfSum = typeFactory.createSqlType(SqlTypeName.BIGINT);
+ var convertedRowType =
typeFactory.createStructType(List.of(Map.entry(rowField.getName(), typeOfSum)));
+
+ RexBuilder rexBuilder = Commons.rexBuilder();
+ RexInputRef sumRef = rexBuilder.makeInputRef(sumAgg, 0);
+ RexNode rexNode = rexBuilder.makeCast(typeOfSum, sumRef);
+ List<RexNode> projections = Collections.singletonList(rexNode);
Review Comment:
if my understanding correct, we need projection to 1) rename result of
aggregation and 2) cast result to proper type, because SUM aggregate changes
the original type. It would be nice to put this to comment, because it's not
obvious
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyConverterRule.java:
##########
@@ -48,12 +67,63 @@ public TableModifyConverterRule() {
@Override
protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
LogicalTableModify rel) {
RelOptCluster cluster = rel.getCluster();
+ RelOptTable relTable = rel.getTable();
+ IgniteTable igniteTable = relTable.unwrap(IgniteTable.class);
+ assert igniteTable != null;
+
RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
- .replace(IgniteDistributions.single())
+ .replace(igniteTable.distribution())
.replace(RelCollations.EMPTY);
+
RelNode input = convert(rel.getInput(), traits);
- return new IgniteTableModify(cluster, traits, rel.getTable(), input,
+ IgniteTableModify tableModify = new IgniteTableModify(cluster, traits,
relTable, input,
rel.getOperation(), rel.getUpdateColumnList(),
rel.getSourceExpressionList(), rel.isFlattened());
+
+ if (igniteTable.distribution().equals(IgniteDistributions.single())) {
+ return tableModify;
+ } else {
+ return createAggregate(tableModify, cluster);
+ }
+ }
+
+ private static PhysicalNode createAggregate(IgniteTableModify tableModify,
RelOptCluster cluster) {
+
+ RelTraitSet inTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+ RelTraitSet outTrait = cluster.traitSetOf(IgniteConvention.INSTANCE);
+
+ RelDataType rowType = tableModify.getRowType();
+ RelDataTypeFactory typeFactory = cluster.getTypeFactory();
+ RelDataType sumType =
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.DECIMAL),
true);
Review Comment:
it's better to use
`org.apache.calcite.rel.type.RelDataTypeSystem#deriveSumType` to derive type of
a SUM aggregate
##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.planner;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteValues;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests to verify DML plans.
+ */
+public class DmlPlannerTest extends AbstractPlannerTest {
+
+ /**
+ * Test for INSERT .. VALUES when table has a single distribution.
+ */
+ @Test
+ public void testInsertIntoSingleDistributedTable() throws Exception {
+ IgniteTable test1 = createTable("TEST1", IgniteDistributions.single(),
"C1", Integer.class, "C2", Integer.class);
Review Comment:
Please use TestBuilder framework to create test tables
##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.planner;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteValues;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests to verify DML plans.
+ */
+public class DmlPlannerTest extends AbstractPlannerTest {
+
+ /**
+ * Test for INSERT .. VALUES when table has a single distribution.
+ */
+ @Test
+ public void testInsertIntoSingleDistributedTable() throws Exception {
+ IgniteTable test1 = createTable("TEST1", IgniteDistributions.single(),
"C1", Integer.class, "C2", Integer.class);
+ IgniteSchema schema = createSchema(test1);
+
+ // There should be no exchanges and other operations.
+ assertPlan("INSERT INTO TEST1 (C1, C2) VALUES(1, 2)", schema,
+
isInstanceOf(IgniteTableModify.class).and(input(isInstanceOf(IgniteValues.class))));
+ }
+
+ /**
+ * Test for INSERT .. VALUES when table has non single distribution.
+ */
+ @ParameterizedTest
+ @MethodSource("nonSingleDistributions")
+ public void testInsert(IgniteDistribution distribution) throws Exception {
+ IgniteTable test1 = createTable("TEST1", distribution, "C1",
Integer.class, "C2", Integer.class);
+
+ IgniteSchema schema = createSchema(test1);
+
+ assertPlan("INSERT INTO TEST1 (C1, C2) VALUES(1, 2)", schema,
+ nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+ .and(e ->
e.distribution().equals(IgniteDistributions.single())))
+
.and(nodeOrAnyChild(isInstanceOf(IgniteTableModify.class))
+
.and(hasChildThat(isInstanceOf(IgniteExchange.class).and(e ->
distribution.equals(e.distribution())))))
+ );
+ }
+
+ private static Stream<IgniteDistribution> nonSingleDistributions() {
+ return distributions().filter(d ->
!IgniteDistributions.single().equals(d));
+ }
+
+ /**
+ * Test for INSERT .. FROM SELECT when tables has different distributions.
+ */
+ @ParameterizedTest
+ @MethodSource("distributions")
+ public void testInsertSelectFrom(IgniteDistribution distribution) throws
Exception {
+ IgniteDistribution anotherDistribution =
IgniteDistributions.affinity(1, new UUID(1, 0), "0");
+
+ IgniteTable test1 = createTable("TEST1", distribution, "C1",
Integer.class, "C2", Integer.class);
+ IgniteTable test2 = createTable("TEST2", anotherDistribution, "C1",
Integer.class, "C2", Integer.class);
+
+ IgniteSchema schema = createSchema(test1, test2);
+
+ assertPlan("INSERT INTO TEST1 (C1, C2) SELECT C1, C2 FROM TEST2",
schema,
+ nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
+ .and(e ->
e.distribution().equals(IgniteDistributions.single())))
+
.and(nodeOrAnyChild(isInstanceOf(IgniteTableModify.class))
+
.and(hasChildThat(isInstanceOf(IgniteExchange.class).and(e ->
distribution.equals(e.distribution())))))
+ );
+ }
+
+ /**
+ * Test for INSERT .. FROM SELECT when tables has the same distribution.
+ */
+ @ParameterizedTest
+ @MethodSource("distributions")
+ public void testInsertSelectFromSameDistribution(IgniteDistribution
distribution) throws Exception {
Review Comment:
I would like to add one more test for `UPDATE table SET a = a + 1` query
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]