Dmitry Lychagin has submitted this change and it was merged. Change subject: [NO ISSUE][RT] Window operator runtime optimization ......................................................................
[NO ISSUE][RT] Window operator runtime optimization - user model changes: no - storage format changes: no - interface changes: no Details: - Runtime optimization for window operator with monotonic frame start expression. In this case continue scanning from the beginning of the frame that was found in the previous iteration - Allow inlining variables into window operator expressions except PARTITION BY, ORDER BY and frame value expressions Change-Id: I65bed4092f4fd3622f1525b26ce25e2ac07d7538 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3135 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.2.adm A asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.3.adm A asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.4.adm A asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.5.adm A asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.6.adm M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java A hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java A hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java 24 files changed, 787 insertions(+), 95 deletions(-) Approvals: Anon. E. Moose #1000171: Ali Alsuliman: Looks good to me, approved Jenkins: Verified; ; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java index 6749bf9..d1ce865 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java @@ -35,6 +35,7 @@ import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDataSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; public class AnalysisUtil { @@ -129,6 +130,39 @@ return new Pair<>(dataverseName, datasetName); } + /** + * Checks whether frame boundary expression is a monotonically non-descreasing function over a frame value variable + */ + public static boolean isWindowFrameBoundaryMonotonic(List<Mutable<ILogicalExpression>> frameBoundaryExprList, + List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExprList) { + if (frameValueExprList.size() != 1) { + return false; + } + ILogicalExpression frameValueExpr = frameValueExprList.get(0).second.getValue(); + if (frameValueExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) { + return false; + } + if (frameBoundaryExprList.size() != 1) { + return false; + } + ILogicalExpression frameStartExpr = frameBoundaryExprList.get(0).getValue(); + switch (frameStartExpr.getExpressionTag()) { + case CONSTANT: + return true; + case VARIABLE: + return frameStartExpr.equals(frameValueExpr); + case FUNCTION_CALL: + AbstractFunctionCallExpression frameStartCallExpr = (AbstractFunctionCallExpression) frameStartExpr; + FunctionIdentifier fi = frameStartCallExpr.getFunctionIdentifier(); + return (BuiltinFunctions.NUMERIC_ADD.equals(fi) || BuiltinFunctions.NUMERIC_SUBTRACT.equals(fi)) + && frameStartCallExpr.getArguments().get(0).getValue().equals(frameValueExpr) + && frameStartCallExpr.getArguments().get(1).getValue() + .getExpressionTag() == LogicalExpressionTag.CONSTANT; + default: + throw new IllegalStateException(String.valueOf(frameStartExpr.getExpressionTag())); + } + } + private static List<FunctionIdentifier> fieldAccessFunctions = new ArrayList<>(); static { @@ -136,5 +170,4 @@ fieldAccessFunctions.add(BuiltinFunctions.GET_HANDLE); fieldAccessFunctions.add(BuiltinFunctions.TYPE_OF); } - } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java index cd75c1e..ce9fd03 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java @@ -31,6 +31,7 @@ import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.optimizer.base.AnalysisUtil; import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams; import org.apache.asterix.optimizer.rules.am.BTreeJobGenParams; import org.apache.commons.lang3.mutable.Mutable; @@ -380,7 +381,8 @@ } } } - - return new WindowPOperator(partitionColumns, partitionMaterialization, orderColumns); + boolean frameStartIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(), + winOp.getFrameValueExpressions()); + return new WindowPOperator(partitionColumns, partitionMaterialization, orderColumns, frameStartIsMonotonic); } } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp new file mode 100644 index 0000000..47f2ce9 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Tests runtime optimizations of window functions + * Expected Res : SUCCESS + */ + +drop dataverse test if exists; +create dataverse test; + +use test; + +create function q1_sum_1_preceding_1_following(N) { + from + range(1, N) x + let + result_expected = 3 * x - (case x when N then x + 1 else 0 end), + result_actual = sum(x) over (order by x range between 1 preceding and 1 following), + result_delta = result_expected - result_actual + select + min(result_delta) min_delta, + max(result_delta) max_delta +}; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp new file mode 100644 index 0000000..2d561b9 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Test window operator with monotonic frame start expression + * : on a dataset that fits into one physical frame + * Expected Res : SUCCESS + */ + +use test; + +q1_sum_1_preceding_1_following(10); + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp new file mode 100644 index 0000000..fec6158 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Test window operator with monotonic frame start expression + * : on a dataset that spans several physical frames + * Expected Res : SUCCESS + */ + +use test; + +q1_sum_1_preceding_1_following(10000); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp new file mode 100644 index 0000000..84b5234 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Test window operator with monotonic frame start expression + * : on dataset that spans several physical frames with frame that spans several physical frames + * Expected Res : SUCCESS + */ + +use test; + +with N as 10000, W as 5000 + +from ( + from + range(1, N) x + select value + sum(x) over (order by x range between W preceding and W following) +) v +select value sum(v) + + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp new file mode 100644 index 0000000..8a4374f --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Test window operator with monotonic frame start expression + * : on a dataset that spans several physical frames + * : with a frame that starts before current physical frame + * Expected Res : SUCCESS + */ + +use test; + +with N as 10000, W as 5000 + +from ( + from + range(1, N) x + select value + sum(x) over (order by x range between W + 2 preceding and W preceding) +) v +select value sum(v) \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp new file mode 100644 index 0000000..91c0a31 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Test window operator with monotonic frame start expression + * : on a dataset that spans several physical frames + * : with a frame that starts after current physical frame + * Expected Res : SUCCESS + */ + +use test; + +with N as 10000, W as 5000 + +from ( + from + range(1, N) x + select value + sum(x) over (order by x range between W following and W + 2 following) +) v +select value sum(v) \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.2.adm new file mode 100644 index 0000000..6115ead --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.2.adm @@ -0,0 +1 @@ +{ "min_delta": 0, "max_delta": 0 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.3.adm new file mode 100644 index 0000000..6115ead --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.3.adm @@ -0,0 +1 @@ +{ "min_delta": 0, "max_delta": 0 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.4.adm new file mode 100644 index 0000000..76a77d9 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.4.adm @@ -0,0 +1 @@ +375062502500 \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.5.adm new file mode 100644 index 0000000..77dda1e --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.5.adm @@ -0,0 +1 @@ +37492501 \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.6.adm new file mode 100644 index 0000000..a25255a --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.6.adm @@ -0,0 +1 @@ +112492496 \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index 0c8ac6a..455b3ea 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -9311,6 +9311,11 @@ <output-dir compare="Text">win_opt_01</output-dir> </compilation-unit> </test-case> + <test-case FilePath="window"> + <compilation-unit name="win_opt_02"> + <output-dir compare="Text">win_opt_02</output-dir> + </compilation-unit> + </test-case> </test-group> <test-group name="writers"> <test-case FilePath="writers"> diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java index 8f563b3..bdfdac8 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java @@ -40,9 +40,12 @@ /** * Window operator evaluates window functions. It has the following components: * <ul> - * <li>{@link #partitionExpressions} - define how input data must be partitioned</li> - * <li>{@link #orderExpressions} - define how data inside these partitions must be ordered</li> - * <li>{@link #frameValueExpressions} - value expressions for comparing against frame start / end boundaries and frame exclusion</li> + * <li>{@link #partitionExpressions} - define how input data must be partitioned. + * Each must be a variable reference</li> + * <li>{@link #orderExpressions} - define how data inside these partitions must be ordered. + * Each must be a variable reference</li> + * <li>{@link #frameValueExpressions} - value expressions for comparing against frame start / end boundaries and frame exclusion. + * Each must be a variable reference</li> * <li>{@link #frameStartExpressions} - frame start boundary</li> * <li>{@link #frameEndExpressions} - frame end boundary</li> * <li>{@link #frameExcludeExpressions} - define values to be excluded from the frame</li> @@ -217,15 +220,27 @@ @Override public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException { + return acceptExpressionTransform(visitor, true); + } + + /** + * Allows performing expression transformation only on a subset of this operator's expressions + * @param visitor transforming visitor + * @param visitVarRefRequiringExprs whether to visit variable reference requiring expressions, or not + */ + public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor, + boolean visitVarRefRequiringExprs) throws AlgebricksException { boolean mod = false; - for (Mutable<ILogicalExpression> expr : partitionExpressions) { - mod |= visitor.transform(expr); - } - for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) { - mod |= visitor.transform(p.second); - } - for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : frameValueExpressions) { - mod |= visitor.transform(p.second); + if (visitVarRefRequiringExprs) { + for (Mutable<ILogicalExpression> expr : partitionExpressions) { + mod |= visitor.transform(expr); + } + for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : orderExpressions) { + mod |= visitor.transform(p.second); + } + for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : frameValueExpressions) { + mod |= visitor.transform(p.second); + } } for (Mutable<ILogicalExpression> expr : frameStartExpressions) { mod |= visitor.transform(expr); @@ -305,4 +320,14 @@ expr.getValue().getUsedVariables(vars); } } + + /** + * Only the following expressions require variable references: {@link #partitionExpressions}, + * {@link #orderExpressions}, and {@link #frameValueExpressions}, others do not. + * Use {@link #acceptExpressionTransform(ILogicalExpressionReferenceTransform, boolean)} + * to visit only non-requiring expressions. + */ + public boolean requiresVariableReferenceExpressions() { + return false; + } } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java index c8168d1..2a8658d 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java @@ -75,11 +75,14 @@ private final List<OrderColumn> orderColumns; + private final boolean frameStartIsMonotonic; + public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization, - List<OrderColumn> orderColumns) { + List<OrderColumn> orderColumns, boolean frameStartIsMonotonic) { this.partitionColumns = partitionColumns; this.partitionMaterialization = partitionMaterialization; this.orderColumns = orderColumns; + this.frameStartIsMonotonic = frameStartIsMonotonic; } @Override @@ -217,10 +220,10 @@ } else { runtime = new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories, orderComparatorFactories, frameValueExprEvalsAndComparators.first, - frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameEndExprEvals, - frameExcludeExprEvalsAndComparators.first, winOp.getFrameExcludeNegationStartIdx(), - frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval, - context.getBinaryIntegerInspectorFactory(), winOp.getFrameMaxObjects(), + frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartIsMonotonic, + frameEndExprEvals, frameExcludeExprEvalsAndComparators.first, + winOp.getFrameExcludeNegationStartIdx(), frameExcludeExprEvalsAndComparators.second, + frameOffsetExprEval, context.getBinaryIntegerInspectorFactory(), winOp.getFrameMaxObjects(), projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory); } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java index 9d5cdeb..d521831 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java @@ -120,9 +120,9 @@ public R visitWriteResultOperator(WriteResultOperator op, T arg) throws AlgebricksException; - public R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T tag) throws AlgebricksException; + public R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T arg) throws AlgebricksException; - public R visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T tag) throws AlgebricksException; + public R visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T arg) throws AlgebricksException; public R visitTokenizeOperator(TokenizeOperator op, T arg) throws AlgebricksException; diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java new file mode 100644 index 0000000..9350f95 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.core.algebra.visitors; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; + +/** + * This visitor performs expression transformation on each operator by calling + * {@link ILogicalOperator#acceptExpressionTransform(ILogicalExpressionReferenceTransform)}. + * Subclasses can override individual {@code visit*} methods to customize which expressions must be transformed + * based on the operator kind. This functionality is required in cases when only a subset of operator's expressions + * must be transformed. + * + * @see WindowOperator#acceptExpressionTransform(ILogicalExpressionReferenceTransform, boolean) + */ +public abstract class LogicalExpressionReferenceTransformVisitor + implements ILogicalOperatorVisitor<Boolean, ILogicalExpressionReferenceTransform> { + + protected boolean visitOperator(ILogicalOperator op, ILogicalExpressionReferenceTransform transform) + throws AlgebricksException { + return op.acceptExpressionTransform(transform); + } + + @Override + public Boolean visitAggregateOperator(AggregateOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitRunningAggregateOperator(RunningAggregateOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitGroupByOperator(GroupByOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitLimitOperator(LimitOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitInnerJoinOperator(InnerJoinOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitLeftOuterJoinOperator(LeftOuterJoinOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitNestedTupleSourceOperator(NestedTupleSourceOperator op, + ILogicalExpressionReferenceTransform arg) throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitOrderOperator(OrderOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitAssignOperator(AssignOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitSelectOperator(SelectOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitDelegateOperator(DelegateOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitProjectOperator(ProjectOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitReplicateOperator(ReplicateOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitSplitOperator(SplitOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitMaterializeOperator(MaterializeOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitScriptOperator(ScriptOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitSubplanOperator(SubplanOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitSinkOperator(SinkOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitUnionOperator(UnionAllOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitIntersectOperator(IntersectOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitUnnestOperator(UnnestOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitUnnestMapOperator(UnnestMapOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, + ILogicalExpressionReferenceTransform arg) throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitDataScanOperator(DataSourceScanOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitDistinctOperator(DistinctOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitExchangeOperator(ExchangeOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitWriteOperator(WriteOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitDistributeResultOperator(DistributeResultOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitWriteResultOperator(WriteResultOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, + ILogicalExpressionReferenceTransform arg) throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, + ILogicalExpressionReferenceTransform arg) throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitTokenizeOperator(TokenizeOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitForwardOperator(ForwardOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } + + @Override + public Boolean visitWindowOperator(WindowOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return visitOperator(op, arg); + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java index dc9a11f..f072312 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java @@ -72,7 +72,7 @@ if (!op.requiresVariableReferenceExpressions()) { inlineVisitor.setOperator(op); inlineVisitor.setTargetVariable(entry.getKey()); - if (op.acceptExpressionTransform(inlineVisitor)) { + if (op.accept(inlineVisitor, inlineVisitor)) { modified = true; } inlineVisitor.setTargetVariable(null); diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java index 729d6f9..2c95ce0 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java @@ -35,13 +35,14 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; -import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; +import org.apache.hyracks.algebricks.core.algebra.visitors.LogicalExpressionReferenceTransformVisitor; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; @@ -113,7 +114,7 @@ // Only inline variables in operators that can deal with arbitrary expressions. if (!op.requiresVariableReferenceExpressions()) { inlineVisitor.setOperator(op); - return op.acceptExpressionTransform(inlineVisitor); + return op.accept(inlineVisitor, inlineVisitor); } return false; } @@ -199,7 +200,8 @@ return modified; } - public static class InlineVariablesVisitor implements ILogicalExpressionReferenceTransform { + public static class InlineVariablesVisitor extends LogicalExpressionReferenceTransformVisitor + implements ILogicalExpressionReferenceTransform { private final Map<LogicalVariable, ILogicalExpression> varAssignRhs; private final Set<LogicalVariable> liveVars = new HashSet<>(); @@ -227,9 +229,15 @@ } @Override + public Boolean visitWindowOperator(WindowOperator op, ILogicalExpressionReferenceTransform arg) + throws AlgebricksException { + return op.acceptExpressionTransform(arg, false); + } + + @Override public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException { ILogicalExpression e = exprRef.getValue(); - switch (((AbstractLogicalExpression) e).getExpressionTag()) { + switch (e.getExpressionTag()) { case VARIABLE: return transformVariableReferenceExpression(exprRef, ((VariableReferenceExpression) e).getVariableReference()); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java index 661bb8a..4e97d6c 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java @@ -99,9 +99,8 @@ boolean isFirstChunk = chunkEndIdx.isEmpty(); if (isFirstChunk) { if (frameId != curFrameId) { - int nBlocks = FrameHelper.deserializeNumOfMinFrame(frameBuffer); - curFrame.ensureFrameSize(curFrame.getMinSize() * nBlocks); int pos = frameBuffer.position(); + curFrame.ensureFrameSize(frameBuffer.capacity()); FrameUtils.copyAndFlip(frameBuffer, curFrame.getBuffer()); frameBuffer.position(pos); curFrameId = frameId; diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java index e7daf11..565cbe6 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java @@ -26,7 +26,6 @@ import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; -import org.apache.hyracks.api.comm.FrameHelper; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -34,15 +33,17 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.data.std.util.DataUtils; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference; import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor; +import org.apache.hyracks.storage.common.MultiComparator; /** * Runtime for window operators that performs partition materialization and can evaluate running aggregates @@ -56,11 +57,11 @@ private IScalarEvaluator[] frameValueEvals; - private IPointable[] frameValuePointables; + private PointableTupleReference frameValuePointables; private final IBinaryComparatorFactory[] frameValueComparatorFactories; - private IBinaryComparator[] frameValueComparators; + private MultiComparator frameValueComparators; private final boolean frameStartExists; @@ -68,7 +69,9 @@ private IScalarEvaluator[] frameStartEvals; - private IPointable[] frameStartPointables; + private PointableTupleReference frameStartPointables; + + private final boolean frameStartIsMonotonic; private final boolean frameEndExists; @@ -76,7 +79,7 @@ private IScalarEvaluator[] frameEndEvals; - private IPointable[] frameEndPointables; + private PointableTupleReference frameEndPointables; private final boolean frameExcludeExists; @@ -86,7 +89,7 @@ private final int frameExcludeNegationStartIdx; - private IPointable[] frameExcludePointables; + private PointableTupleReference frameExcludePointables; private IPointable frameExcludePointable2; @@ -116,18 +119,28 @@ private IFrame runFrame; + private int runFrameChunkId; + + private long runFrameSize; + private FrameTupleAccessor tAccess2; private FrameTupleReference tRef2; private IBinaryIntegerInspector bii; + private int chunkIdxFrameStartGlobal; + + private int tBeginIdxFrameStartGlobal; + + private long readerPosFrameStartGlobal; + WindowNestedPlansPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories, IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartEvalFactories, - IScalarEvaluatorFactory[] frameEndEvalFactories, IScalarEvaluatorFactory[] frameExcludeEvalFactories, - int frameExcludeNegationStartIdx, IBinaryComparatorFactory[] frameExcludeComparatorFactories, - IScalarEvaluatorFactory frameOffsetEvalFactory, + boolean frameStartIsMonotonic, IScalarEvaluatorFactory[] frameEndEvalFactories, + IScalarEvaluatorFactory[] frameExcludeEvalFactories, int frameExcludeNegationStartIdx, + IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetEvalFactory, IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects, int[] projectionColumns, int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) { @@ -137,6 +150,7 @@ this.frameValueExists = frameValueEvalFactories != null && frameValueEvalFactories.length > 0; this.frameStartEvalFactories = frameStartEvalFactories; this.frameStartExists = frameStartEvalFactories != null && frameStartEvalFactories.length > 0; + this.frameStartIsMonotonic = frameStartExists && frameStartIsMonotonic; this.frameEndEvalFactories = frameEndEvalFactories; this.frameEndExists = frameEndEvalFactories != null && frameEndEvalFactories.length > 0; this.frameValueComparatorFactories = frameValueComparatorFactories; @@ -158,7 +172,7 @@ if (frameValueExists) { frameValueEvals = createEvaluators(frameValueEvalFactories, ctx); - frameValueComparators = createBinaryComparators(frameValueComparatorFactories); + frameValueComparators = MultiComparator.create(frameValueComparatorFactories); frameValuePointables = createPointables(frameValueEvalFactories.length); } if (frameStartExists) { @@ -190,16 +204,26 @@ } @Override + protected void beginPartitionImpl() throws HyracksDataException { + super.beginPartitionImpl(); + chunkIdxFrameStartGlobal = -1; + tBeginIdxFrameStartGlobal = -1; + readerPosFrameStartGlobal = -1; + runFrameChunkId = -1; + } + + @Override protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException { + boolean frameStartForward = frameStartIsMonotonic && chunkIdxFrameStartGlobal >= 0; + long readerPos = -1; int nChunks = getPartitionChunkCount(); if (nChunks > 1) { readerPos = reader.position(); if (chunkIdx == 0) { ByteBuffer curFrameBuffer = curFrame.getBuffer(); - int nBlocks = FrameHelper.deserializeNumOfMinFrame(curFrameBuffer); - copyFrame2.ensureFrameSize(copyFrame2.getMinSize() * nBlocks); int pos = curFrameBuffer.position(); + copyFrame2.ensureFrameSize(curFrameBuffer.capacity()); FrameUtils.copyAndFlip(curFrameBuffer, copyFrame2.getBuffer()); curFrameBuffer.position(pos); } @@ -216,19 +240,13 @@ // frame boundaries if (frameStartExists) { - for (int i = 0; i < frameStartEvals.length; i++) { - frameStartEvals[i].evaluate(tRef, frameStartPointables[i]); - } + evaluate(frameStartEvals, tRef, frameStartPointables); } if (frameEndExists) { - for (int i = 0; i < frameEndEvals.length; i++) { - frameEndEvals[i].evaluate(tRef, frameEndPointables[i]); - } + evaluate(frameEndEvals, tRef, frameEndPointables); } if (frameExcludeExists) { - for (int i = 0; i < frameExcludeEvals.length; i++) { - frameExcludeEvals[i].evaluate(tRef, frameExcludePointables[i]); - } + evaluate(frameExcludeEvals, tRef, frameExcludePointables); } int toSkip = 0; if (frameOffsetExists) { @@ -241,37 +259,65 @@ // aggregator created by WindowAggregatorDescriptorFactory does not process argument tuple in init() nestedAgg.init(null, null, -1, null); + int chunkIdxInnerStart = frameStartForward ? chunkIdxFrameStartGlobal : 0; + int tBeginIdxInnerStart = frameStartForward ? tBeginIdxFrameStartGlobal : -1; if (nChunks > 1) { - reader.seek(0); + reader.seek(frameStartForward ? readerPosFrameStartGlobal : 0); } - frame_loop: for (int chunkIdx2 = 0; chunkIdx2 < nChunks; chunkIdx2++) { - IFrame innerFrame; - if (chunkIdx2 == 0) { - // first chunk's frame is always in memory - innerFrame = chunkIdx == 0 ? curFrame : copyFrame2; - } else { - reader.nextFrame(runFrame); - innerFrame = runFrame; - } - tAccess2.reset(innerFrame.getBuffer()); + int chunkIdxFrameStartLocal = -1, tBeginIdxFrameStartLocal = -1; + long readerPosFrameStartLocal = -1; - int tBeginIdx2 = getTupleBeginIdx(chunkIdx2); - int tEndIdx2 = getTupleEndIdx(chunkIdx2); - for (int tIdx2 = tBeginIdx2; tIdx2 <= tEndIdx2; tIdx2++) { - tRef2.reset(tAccess2, tIdx2); + frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) { + long readerPosFrameInner; + IFrame frameInner; + if (chunkIdxInner == 0) { + // first chunk's frame is always in memory + frameInner = chunkIdx == 0 ? curFrame : copyFrame2; + readerPosFrameInner = 0; + } else { + readerPosFrameInner = reader.position(); + if (runFrameChunkId == chunkIdxInner) { + // runFrame has this chunk, so just advance the reader + reader.seek(readerPosFrameInner + runFrameSize); + } else { + reader.nextFrame(runFrame); + runFrameSize = reader.position() - readerPosFrameInner; + runFrameChunkId = chunkIdxInner; + } + frameInner = runFrame; + } + tAccess2.reset(frameInner.getBuffer()); + + int tBeginIdxInner; + if (tBeginIdxInnerStart < 0) { + tBeginIdxInner = getTupleBeginIdx(chunkIdxInner); + } else { + tBeginIdxInner = tBeginIdxInnerStart; + tBeginIdxInnerStart = -1; + } + int tEndIdxInner = getTupleEndIdx(chunkIdxInner); + + for (int tIdxInner = tBeginIdxInner; tIdxInner <= tEndIdxInner; tIdxInner++) { + tRef2.reset(tAccess2, tIdxInner); if (frameStartExists || frameEndExists) { - for (int frameValueIdx = 0; frameValueIdx < frameValueEvals.length; frameValueIdx++) { - frameValueEvals[frameValueIdx].evaluate(tRef2, frameValuePointables[frameValueIdx]); - } - if (frameStartExists - && compare(frameValuePointables, frameStartPointables, frameValueComparators) < 0) { - // skip if value < start - continue; + evaluate(frameValueEvals, tRef2, frameValuePointables); + if (frameStartExists) { + if (frameValueComparators.compare(frameValuePointables, frameStartPointables) < 0) { + // skip if value < start + continue; + } + if (chunkIdxFrameStartLocal < 0) { + // save position of the first tuple that matches the frame start. + // we'll continue from it in the next frame iteration + chunkIdxFrameStartLocal = chunkIdxInner; + tBeginIdxFrameStartLocal = tIdxInner; + readerPosFrameStartLocal = readerPosFrameInner; + } } if (frameEndExists - && compare(frameValuePointables, frameEndPointables, frameValueComparators) > 0) { + && frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) { // skip and exit if value > end break frame_loop; } @@ -288,7 +334,7 @@ } if (toWrite != 0) { - nestedAgg.aggregate(tAccess2, tIdx2, null, -1, null); + nestedAgg.aggregate(tAccess2, tIdxInner, null, -1, null); } if (toWrite > 0) { toWrite--; @@ -301,6 +347,19 @@ nestedAgg.outputFinalResult(tupleBuilder, null, -1, null); appendToFrameFromTupleBuilder(tupleBuilder); + + if (frameStartIsMonotonic) { + if (chunkIdxFrameStartLocal >= 0) { + chunkIdxFrameStartGlobal = chunkIdxFrameStartLocal; + tBeginIdxFrameStartGlobal = tBeginIdxFrameStartLocal; + readerPosFrameStartGlobal = readerPosFrameStartLocal; + } else { + // frame start not found, set start beyond the last chunk + chunkIdxFrameStartGlobal = nChunks; + tBeginIdxFrameStartGlobal = 0; + readerPosFrameStartGlobal = 0; + } + } } if (nChunks > 1) { @@ -311,7 +370,7 @@ private boolean isExcluded() throws HyracksDataException { for (int i = 0; i < frameExcludeEvals.length; i++) { frameExcludeEvals[i].evaluate(tRef2, frameExcludePointable2); - boolean b = DataUtils.compare(frameExcludePointables[i], frameExcludePointable2, + boolean b = DataUtils.compare(frameExcludePointables.getField(i), frameExcludePointable2, frameExcludeComparators[i]) != 0; if (i >= frameExcludeNegationStartIdx) { b = !b; @@ -337,22 +396,18 @@ return evals; } - private static IPointable[] createPointables(int ln) { + private static void evaluate(IScalarEvaluator[] evals, IFrameTupleReference inTuple, + PointableTupleReference outTuple) throws HyracksDataException { + for (int i = 0; i < evals.length; i++) { + evals[i].evaluate(inTuple, outTuple.getField(i)); + } + } + + private static PointableTupleReference createPointables(int ln) { IPointable[] pointables = new IPointable[ln]; for (int i = 0; i < ln; i++) { pointables[i] = VoidPointable.FACTORY.createPointable(); } - return pointables; - } - - private static int compare(IValueReference[] first, IValueReference[] second, IBinaryComparator[] comparators) - throws HyracksDataException { - for (int i = 0; i < first.length; i++) { - int c = DataUtils.compare(first[i], second[i], comparators[i]); - if (c != 0) { - return c; - } - } - return 0; + return new PointableTupleReference(pointables); } } \ No newline at end of file diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java index 640e260..16591d5 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java @@ -40,6 +40,8 @@ private final IScalarEvaluatorFactory[] frameStartEvalFactories; + private final boolean frameStartIsMonotonic; + private final IScalarEvaluatorFactory[] frameEndEvalFactories; private final IBinaryComparatorFactory[] frameValueComparatorFactories; @@ -64,9 +66,9 @@ IBinaryComparatorFactory[] partitionComparatorFactories, IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories, IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartEvalFactories, - IScalarEvaluatorFactory[] frameEndEvalFactories, IScalarEvaluatorFactory[] frameExcludeEvalFactories, - int frameExcludeNegationStartIdx, IBinaryComparatorFactory[] frameExcludeComparatorFactories, - IScalarEvaluatorFactory frameOffsetEvalFactory, + boolean frameStartIsMonotonic, IScalarEvaluatorFactory[] frameEndEvalFactories, + IScalarEvaluatorFactory[] frameExcludeEvalFactories, int frameExcludeNegationStartIdx, + IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetEvalFactory, IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects, int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, @@ -75,6 +77,7 @@ projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories); this.frameValueEvalFactories = frameValueEvalFactories; this.frameStartEvalFactories = frameStartEvalFactories; + this.frameStartIsMonotonic = frameStartIsMonotonic; this.frameEndEvalFactories = frameEndEvalFactories; this.frameValueComparatorFactories = frameValueComparatorFactories; this.frameExcludeEvalFactories = frameExcludeEvalFactories; @@ -91,10 +94,10 @@ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) { return new WindowNestedPlansPushRuntime(partitionColumns, partitionComparatorFactories, orderComparatorFactories, frameValueEvalFactories, frameValueComparatorFactories, - frameStartEvalFactories, frameEndEvalFactories, frameExcludeEvalFactories, frameExcludeNegationStartIdx, - frameExcludeComparatorFactories, frameOffsetEvalFactory, binaryIntegerInspectorFactory, frameMaxObjects, - projectionList, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, - ctx); + frameStartEvalFactories, frameStartIsMonotonic, frameEndEvalFactories, frameExcludeEvalFactories, + frameExcludeNegationStartIdx, frameExcludeComparatorFactories, frameOffsetEvalFactory, + binaryIntegerInspectorFactory, frameMaxObjects, projectionList, runningAggOutColumns, + runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java new file mode 100644 index 0000000..1d947c1 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.common.data.accessors; + +import org.apache.hyracks.data.std.api.IPointable; + +/** + * A tuple reference implementation that holds fields in a {@link IPointable} array + */ +public class PointableTupleReference implements ITupleReference { + + private final IPointable[] fields; + + public PointableTupleReference(IPointable[] fields) { + this.fields = fields; + } + + @Override + public int getFieldCount() { + return fields.length; + } + + @Override + public byte[] getFieldData(int fIdx) { + return getField(fIdx).getByteArray(); + } + + @Override + public int getFieldStart(int fIdx) { + return getField(fIdx).getStartOffset(); + } + + @Override + public int getFieldLength(int fIdx) { + return getField(fIdx).getLength(); + } + + public IPointable getField(int fIdx) { + return fields[fIdx]; + } +} \ No newline at end of file -- To view, visit https://asterix-gerrit.ics.uci.edu/3135 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I65bed4092f4fd3622f1525b26ce25e2ac07d7538 Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
