alex-plekhanov commented on code in PR #9987:
URL: https://github.com/apache/ignite/pull/9987#discussion_r861692425
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java:
##########
@@ -92,9 +125,12 @@ public IgniteSort(RelInput input) {
if (isEnforcer() || required.getConvention() !=
IgniteConvention.INSTANCE)
return null;
- RelCollation collation = TraitUtils.collation(required);
+ RelCollation relCollation = TraitUtils.collation(required);
+
+ if (!relCollation.satisfies(collation) ||
!collation.satisfies(relCollation))
Review Comment:
I'm not sure we should pass the collation if required collation is a subset
of sort collation (we should only extend sort collation but not shrink it).
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/LimitOffsetPlannerTest.java:
##########
@@ -82,86 +84,121 @@ public void testOrderOfRels() throws Exception {
IgniteSchema publicSchema =
createSchemaWithTable(IgniteDistributions.random());
// Simple case, Limit can't be pushed down under Exchange or Sort.
Sort before Exchange is more preferable.
- assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 10 OFFSET 10",
publicSchema,
+ assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 5 OFFSET 10",
publicSchema,
+ isInstanceOf(IgniteLimit.class)
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 5.0)
+ .and(s -> doubleFromRex(s.offset, -1) == 10.0))))));
+
+ // Same simple case but witout offset.
+ assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 5", publicSchema,
+ isInstanceOf(IgniteLimit.class)
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 5.0)
+ .and(s -> s.offset == null))))));
+
+ // No special liited sort required if LIMIT is not set.
+ assertPlan("SELECT * FROM TEST ORDER BY ID OFFSET 10", publicSchema,
isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isInstanceOf(IgniteSort.class))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> s.fetch == null)
+ .and(s -> s.offset == null))))));
// Simple case without ordering.
- assertPlan("SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 10 ROWS
ONLY", publicSchema,
+ assertPlan("SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 5 ROWS
ONLY", publicSchema,
isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)))
- .and(hasChildThat(isInstanceOf(IgniteSort.class)).negate()));
+ .and(hasChildThat(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 5.0)
+ .and(s -> doubleFromRex(s.offset, -1) ==
10.0)).negate()));
// Check that Sort node is not eliminated by aggregation and Exchange
node is not eliminated by distribution
// required by parent nodes.
assertPlan("SELECT * FROM TEST UNION (SELECT * FROM TEST ORDER BY ID
LIMIT 10)", publicSchema,
nodeOrAnyChild(isInstanceOf(IgniteUnionAll.class)
.and(hasChildThat(isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isInstanceOf(IgniteSort.class)))))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) ==
10.0)))))))));
// Check that internal Sort node is not eliminated by external Sort
node with different collation.
assertPlan("SELECT * FROM (SELECT * FROM TEST ORDER BY ID LIMIT 10)
ORDER BY VAL", publicSchema,
nodeOrAnyChild(isInstanceOf(IgniteSort.class)
.and(hasChildThat(isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isInstanceOf(IgniteSort.class)))))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) ==
10.0)))))))));
- // Check that extended collation is passed through the Limit node if
it satisfies the Limit collation.
+// // Check that extended collation is passed through the Limit node if
it satisfies the Limit collation.
Review Comment:
?
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java:
##########
@@ -37,31 +37,66 @@
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import static
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.FETCH_IS_PARAM_FACTOR;
+import static
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.OFFSET_IS_PARAM_FACTOR;
import static
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+import static
org.apache.ignite.internal.processors.query.calcite.util.RexUtils.doubleFromRex;
/**
* Ignite sort operator.
*/
public class IgniteSort extends Sort implements IgniteRel {
+ /** */
+ private final boolean enforcer;
+
/**
* Constructor.
*
* @param cluster Cluster.
* @param traits Trait set.
* @param child Input node.
* @param collation Collation.
+ * @param offset Offset.
+ * @param fetch Limit.
+ * @param enforcer Enforcer flag.
*/
public IgniteSort(
RelOptCluster cluster,
RelTraitSet traits,
RelNode child,
- RelCollation collation) {
- super(cluster, traits, child, collation);
+ RelCollation collation,
+ RexNode offset,
+ RexNode fetch,
+ boolean enforcer) {
Review Comment:
`) {` on the new line
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/LimitOffsetPlannerTest.java:
##########
@@ -82,86 +84,121 @@ public void testOrderOfRels() throws Exception {
IgniteSchema publicSchema =
createSchemaWithTable(IgniteDistributions.random());
// Simple case, Limit can't be pushed down under Exchange or Sort.
Sort before Exchange is more preferable.
- assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 10 OFFSET 10",
publicSchema,
+ assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 5 OFFSET 10",
publicSchema,
+ isInstanceOf(IgniteLimit.class)
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 5.0)
+ .and(s -> doubleFromRex(s.offset, -1) == 10.0))))));
+
+ // Same simple case but witout offset.
+ assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 5", publicSchema,
+ isInstanceOf(IgniteLimit.class)
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 5.0)
+ .and(s -> s.offset == null))))));
+
+ // No special liited sort required if LIMIT is not set.
+ assertPlan("SELECT * FROM TEST ORDER BY ID OFFSET 10", publicSchema,
isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isInstanceOf(IgniteSort.class))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> s.fetch == null)
+ .and(s -> s.offset == null))))));
// Simple case without ordering.
- assertPlan("SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 10 ROWS
ONLY", publicSchema,
+ assertPlan("SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 5 ROWS
ONLY", publicSchema,
isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)))
- .and(hasChildThat(isInstanceOf(IgniteSort.class)).negate()));
+ .and(hasChildThat(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 5.0)
+ .and(s -> doubleFromRex(s.offset, -1) ==
10.0)).negate()));
Review Comment:
Here sort node should not be found at all, so fetch and offset check seems
useless (the same for other checks with `negate()`
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java:
##########
@@ -37,31 +37,66 @@
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import static
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.FETCH_IS_PARAM_FACTOR;
+import static
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.OFFSET_IS_PARAM_FACTOR;
import static
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+import static
org.apache.ignite.internal.processors.query.calcite.util.RexUtils.doubleFromRex;
/**
* Ignite sort operator.
*/
public class IgniteSort extends Sort implements IgniteRel {
+ /** */
+ private final boolean enforcer;
+
/**
* Constructor.
*
* @param cluster Cluster.
* @param traits Trait set.
* @param child Input node.
* @param collation Collation.
+ * @param offset Offset.
+ * @param fetch Limit.
+ * @param enforcer Enforcer flag.
*/
public IgniteSort(
RelOptCluster cluster,
RelTraitSet traits,
RelNode child,
- RelCollation collation) {
- super(cluster, traits, child, collation);
+ RelCollation collation,
+ RexNode offset,
+ RexNode fetch,
+ boolean enforcer) {
+ super(cluster, traits, child, collation, offset, fetch);
+
+ this.enforcer = enforcer;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cluster Cluster.
+ * @param traits Trait set.
+ * @param child Input node.
+ * @param collation Collation.
+ * @param enforcer Enforcer flag.
+ */
+ public IgniteSort(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode child,
+ RelCollation collation,
+ boolean enforcer) {
Review Comment:
`) {` on the new line
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitExecutionTest.java:
##########
@@ -48,6 +48,70 @@ public void testLimit() throws Exception {
checkLimit(2000, 3000);
}
+ /** Tests Sort node can as well limit its output when fetch param is set.
*/
+ @Test
+ public void testSort() throws Exception {
+ int bufSize = U.field(AbstractNode.class, "IN_BUFFER_SIZE");
+
+ checkLimitSort(0, 1);
+ checkLimitSort(1, 0);
+ checkLimitSort(1, 1);
+ checkLimitSort(0, bufSize);
+ checkLimitSort(bufSize, 0);
+ checkLimitSort(bufSize, bufSize);
+ checkLimitSort(bufSize - 1, 1);
+ checkLimitSort(2000, 0);
+ checkLimitSort(0, 3000);
+ checkLimitSort(2000, 3000);
+ }
+
+ /**
+ * @param offset Rows offset.
+ * @param fetch Fetch rows count (zero means unlimited).
+ */
+ private void checkLimitSort(int offset, int fetch) {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class);
+
+ RootNode<Object[]> rootNode = new RootNode<>(ctx, rowType);
+
+ SortNode<Object[]> sortNode = new SortNode<>(ctx, rowType,
F::compareArrays, () -> offset,
+ fetch == 0 ? null : () -> fetch);
+
+ SourceNode srcNode = new SourceNode(ctx, rowType, fetch > 0 ? fetch *
10 + offset :
Review Comment:
Here we check sorted input, it's not cover all cases, lets rewrite this
check, create `ScanNode` with shuffled list with values from 0 to `fetch * 10 +
offset` as a source.
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/LimitOffsetPlannerTest.java:
##########
@@ -82,86 +84,121 @@ public void testOrderOfRels() throws Exception {
IgniteSchema publicSchema =
createSchemaWithTable(IgniteDistributions.random());
// Simple case, Limit can't be pushed down under Exchange or Sort.
Sort before Exchange is more preferable.
- assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 10 OFFSET 10",
publicSchema,
+ assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 5 OFFSET 10",
publicSchema,
+ isInstanceOf(IgniteLimit.class)
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 5.0)
+ .and(s -> doubleFromRex(s.offset, -1) == 10.0))))));
+
+ // Same simple case but witout offset.
+ assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 5", publicSchema,
+ isInstanceOf(IgniteLimit.class)
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 5.0)
+ .and(s -> s.offset == null))))));
+
+ // No special liited sort required if LIMIT is not set.
+ assertPlan("SELECT * FROM TEST ORDER BY ID OFFSET 10", publicSchema,
isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isInstanceOf(IgniteSort.class))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> s.fetch == null)
+ .and(s -> s.offset == null))))));
// Simple case without ordering.
- assertPlan("SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 10 ROWS
ONLY", publicSchema,
+ assertPlan("SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 5 ROWS
ONLY", publicSchema,
isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)))
- .and(hasChildThat(isInstanceOf(IgniteSort.class)).negate()));
+ .and(hasChildThat(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 5.0)
+ .and(s -> doubleFromRex(s.offset, -1) ==
10.0)).negate()));
// Check that Sort node is not eliminated by aggregation and Exchange
node is not eliminated by distribution
// required by parent nodes.
assertPlan("SELECT * FROM TEST UNION (SELECT * FROM TEST ORDER BY ID
LIMIT 10)", publicSchema,
nodeOrAnyChild(isInstanceOf(IgniteUnionAll.class)
.and(hasChildThat(isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isInstanceOf(IgniteSort.class)))))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) ==
10.0)))))))));
// Check that internal Sort node is not eliminated by external Sort
node with different collation.
assertPlan("SELECT * FROM (SELECT * FROM TEST ORDER BY ID LIMIT 10)
ORDER BY VAL", publicSchema,
nodeOrAnyChild(isInstanceOf(IgniteSort.class)
.and(hasChildThat(isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isInstanceOf(IgniteSort.class)))))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) ==
10.0)))))))));
- // Check that extended collation is passed through the Limit node if
it satisfies the Limit collation.
+// // Check that extended collation is passed through the Limit node if
it satisfies the Limit collation.
assertPlan("SELECT * FROM (SELECT * FROM TEST ORDER BY ID LIMIT 10)
ORDER BY ID, VAL", publicSchema,
isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
.and(input(isInstanceOf(IgniteSort.class)
- .and(s ->
s.collation().getKeys().equals(ImmutableIntList.of(0, 1))))))));
+ .and(s ->
s.collation().getKeys().equals(ImmutableIntList.of(0, 1)))
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) ==
10.0))))))));
// Check that external Sort node is not required if external collation
is subset of internal collation.
assertPlan("SELECT * FROM (SELECT * FROM TEST ORDER BY ID, VAL LIMIT
10) ORDER BY ID", publicSchema,
isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isInstanceOf(IgniteSort.class))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 10.0))))));
// Check double limit when external collation is a subset of internal
collation.
assertPlan("SELECT * FROM (SELECT * FROM TEST ORDER BY ID, VAL LIMIT
10) ORDER BY ID LIMIT 5 OFFSET 3",
publicSchema,
isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isInstanceOf(IgniteSort.class))))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 10.0)
+ .and(s -> s.offset == null))))))));
// Check limit/exchange/sort rel order in subquery.
assertPlan("SELECT NULLIF((SELECT id FROM test ORDER BY id LIMIT 1
OFFSET 1), id) FROM test",
publicSchema,
hasChildThat(isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
.and(e -> e.distribution() == IgniteDistributions.single())
- .and(input(isInstanceOf(IgniteSort.class)))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.offset, -1) == 1)
+ .and(s -> doubleFromRex(s.fetch, -1) == 1)))))));
publicSchema = createSchemaWithTable(IgniteDistributions.random(), 0);
// Sort node is not required, since collation of the Limit node equals
to the index collation.
- assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 10 OFFSET 10",
publicSchema,
+ assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 5 OFFSET 10",
publicSchema,
isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
.and(input(isInstanceOf(IgniteIndexScan.class)))))
- .and(hasChildThat(isInstanceOf(IgniteSort.class)).negate()));
+ .and(hasChildThat(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.offset, -1) == 10)
+ .and(s -> doubleFromRex(s.fetch, -1) == 5)).negate()));
publicSchema = createSchemaWithTable(IgniteDistributions.random(), 0,
1);
// Sort node is not required, since collation of the Limit node
satisfies the index collation.
- assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 10 OFFSET 10",
publicSchema,
+ assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 5 OFFSET 10",
publicSchema,
isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
.and(input(isInstanceOf(IgniteIndexScan.class)))))
- .and(hasChildThat(isInstanceOf(IgniteSort.class)).negate()));
+ .and(hasChildThat(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.offset, -1) == 10)
+ .and(s -> doubleFromRex(s.fetch, -1) == 5)).negate()));
publicSchema = createSchemaWithTable(IgniteDistributions.single());
// Exchange node is not required, since distribution of the table is
already "single".
- assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 10 OFFSET 10",
publicSchema,
+ assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 5 OFFSET 10",
publicSchema,
isInstanceOf(IgniteLimit.class)
- .and(input(isInstanceOf(IgniteSort.class)))
-
.and(hasChildThat(isInstanceOf(IgniteExchange.class)).negate()));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.offset, -1) == 10)
+ .and(s -> doubleFromRex(s.fetch, -1) == 5)))
+
.and(hasChildThat(isInstanceOf(IgniteExchange.class)).negate()));
Review Comment:
It relies to the sort node, so indent should be the same as for previous line
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java:
##########
@@ -117,13 +154,33 @@ private void flush() throws Exception {
int processed = 0;
inLoop = true;
+
+ if (limit > 0 && !rows.isEmpty()) {
+ if (reversed == null)
+ reversed = new ArrayList<>(rows.size());
+
+ // Make final order (reversed).
+ while (!rows.isEmpty()) {
+ reversed.add(rows.poll());
+
+ if (++processed >= IN_BUFFER_SIZE) {
+ // allow others to do their job
+ context().execute(this::flush, this::onError);
+
+ return;
+ }
+ }
+ }
+
+ processed = 0;
Review Comment:
Should be under the `try` block
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]