Dmitry Lychagin has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/3135
Change subject: [NO ISSUE][RT] Window operator runtime optimization
......................................................................
[NO ISSUE][RT] Window operator runtime optimization
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Runtime optimization for window operator with monotonic
frame start expression. In this case continue scanning
from the beginning of the frame that was found in the
previous iteration
- Allow inlining variables into window operator expressions
except PARTITION BY, ORDER BY and frame value expressions
Change-Id: I65bed4092f4fd3622f1525b26ce25e2ac07d7538
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.2.adm
A
asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.3.adm
A
asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.4.adm
A
asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.5.adm
A
asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.6.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
A
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java
M
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java
M
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
A
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java
24 files changed, 787 insertions(+), 95 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/35/3135/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
index 6749bf9..d1ce865 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -35,6 +35,7 @@
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDataSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
public class AnalysisUtil {
@@ -129,6 +130,39 @@
return new Pair<>(dataverseName, datasetName);
}
+ /**
+ * Checks whether frame boundary expression is a monotonically
non-descreasing function over a frame value variable
+ */
+ public static boolean
isWindowFrameBoundaryMonotonic(List<Mutable<ILogicalExpression>>
frameBoundaryExprList,
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>>
frameValueExprList) {
+ if (frameValueExprList.size() != 1) {
+ return false;
+ }
+ ILogicalExpression frameValueExpr =
frameValueExprList.get(0).second.getValue();
+ if (frameValueExpr.getExpressionTag() !=
LogicalExpressionTag.VARIABLE) {
+ return false;
+ }
+ if (frameBoundaryExprList.size() != 1) {
+ return false;
+ }
+ ILogicalExpression frameStartExpr =
frameBoundaryExprList.get(0).getValue();
+ switch (frameStartExpr.getExpressionTag()) {
+ case CONSTANT:
+ return true;
+ case VARIABLE:
+ return frameStartExpr.equals(frameValueExpr);
+ case FUNCTION_CALL:
+ AbstractFunctionCallExpression frameStartCallExpr =
(AbstractFunctionCallExpression) frameStartExpr;
+ FunctionIdentifier fi =
frameStartCallExpr.getFunctionIdentifier();
+ return (BuiltinFunctions.NUMERIC_ADD.equals(fi) ||
BuiltinFunctions.NUMERIC_SUBTRACT.equals(fi))
+ &&
frameStartCallExpr.getArguments().get(0).getValue().equals(frameValueExpr)
+ && frameStartCallExpr.getArguments().get(1).getValue()
+ .getExpressionTag() ==
LogicalExpressionTag.CONSTANT;
+ default:
+ throw new
IllegalStateException(String.valueOf(frameStartExpr.getExpressionTag()));
+ }
+ }
+
private static List<FunctionIdentifier> fieldAccessFunctions = new
ArrayList<>();
static {
@@ -136,5 +170,4 @@
fieldAccessFunctions.add(BuiltinFunctions.GET_HANDLE);
fieldAccessFunctions.add(BuiltinFunctions.TYPE_OF);
}
-
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index cd75c1e..ce9fd03 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -31,6 +31,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.optimizer.base.AnalysisUtil;
import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams;
import org.apache.asterix.optimizer.rules.am.BTreeJobGenParams;
import org.apache.commons.lang3.mutable.Mutable;
@@ -380,7 +381,8 @@
}
}
}
-
- return new WindowPOperator(partitionColumns, partitionMaterialization,
orderColumns);
+ boolean frameStartIsMonotonic =
AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(),
+ winOp.getFrameValueExpressions());
+ return new WindowPOperator(partitionColumns, partitionMaterialization,
orderColumns, frameStartIsMonotonic);
}
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp
new file mode 100644
index 0000000..47f2ce9
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.1.ddl.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Tests runtime optimizations of window functions
+ * Expected Res : SUCCESS
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create function q1_sum_1_preceding_1_following(N) {
+ from
+ range(1, N) x
+ let
+ result_expected = 3 * x - (case x when N then x + 1 else 0 end),
+ result_actual = sum(x) over (order by x range between 1 preceding and 1
following),
+ result_delta = result_expected - result_actual
+ select
+ min(result_delta) min_delta,
+ max(result_delta) max_delta
+};
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp
new file mode 100644
index 0000000..2d561b9
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test window operator with monotonic frame start expression
+ * : on a dataset that fits into one physical frame
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+q1_sum_1_preceding_1_following(10);
+
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp
new file mode 100644
index 0000000..fec6158
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test window operator with monotonic frame start expression
+ * : on a dataset that spans several physical frames
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+q1_sum_1_preceding_1_following(10000);
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp
new file mode 100644
index 0000000..84b5234
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test window operator with monotonic frame start expression
+ * : on dataset that spans several physical frames with frame
that spans several physical frames
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+with N as 10000, W as 5000
+
+from (
+ from
+ range(1, N) x
+ select value
+ sum(x) over (order by x range between W preceding and W following)
+) v
+select value sum(v)
+
+
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp
new file mode 100644
index 0000000..8a4374f
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test window operator with monotonic frame start expression
+ * : on a dataset that spans several physical frames
+ * : with a frame that starts before current physical frame
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+with N as 10000, W as 5000
+
+from (
+ from
+ range(1, N) x
+ select value
+ sum(x) over (order by x range between W + 2 preceding and W preceding)
+) v
+select value sum(v)
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp
new file mode 100644
index 0000000..91c0a31
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Test window operator with monotonic frame start expression
+ * : on a dataset that spans several physical frames
+ * : with a frame that starts after current physical frame
+ * Expected Res : SUCCESS
+ */
+
+use test;
+
+with N as 10000, W as 5000
+
+from (
+ from
+ range(1, N) x
+ select value
+ sum(x) over (order by x range between W following and W + 2 following)
+) v
+select value sum(v)
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.2.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.2.adm
new file mode 100644
index 0000000..6115ead
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.2.adm
@@ -0,0 +1 @@
+{ "min_delta": 0, "max_delta": 0 }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.3.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.3.adm
new file mode 100644
index 0000000..6115ead
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.3.adm
@@ -0,0 +1 @@
+{ "min_delta": 0, "max_delta": 0 }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.4.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.4.adm
new file mode 100644
index 0000000..76a77d9
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.4.adm
@@ -0,0 +1 @@
+375062502500
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.5.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.5.adm
new file mode 100644
index 0000000..77dda1e
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.5.adm
@@ -0,0 +1 @@
+37492501
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.6.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.6.adm
new file mode 100644
index 0000000..a25255a
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/window/win_opt_02/win_opt_02.6.adm
@@ -0,0 +1 @@
+112492496
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 0c8ac6a..455b3ea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -9311,6 +9311,11 @@
<output-dir compare="Text">win_opt_01</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="window">
+ <compilation-unit name="win_opt_02">
+ <output-dir compare="Text">win_opt_02</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="writers">
<test-case FilePath="writers">
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
index 8f563b3..bdfdac8 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
@@ -40,9 +40,12 @@
/**
* Window operator evaluates window functions. It has the following components:
* <ul>
- * <li>{@link #partitionExpressions} - define how input data must be
partitioned</li>
- * <li>{@link #orderExpressions} - define how data inside these partitions
must be ordered</li>
- * <li>{@link #frameValueExpressions} - value expressions for comparing
against frame start / end boundaries and frame exclusion</li>
+ * <li>{@link #partitionExpressions} - define how input data must be
partitioned.
+ * Each must be a variable reference</li>
+ * <li>{@link #orderExpressions} - define how data inside these partitions
must be ordered.
+ * Each must be a variable reference</li>
+ * <li>{@link #frameValueExpressions} - value expressions for comparing
against frame start / end boundaries and frame exclusion.
+ * Each must be a variable reference</li>
* <li>{@link #frameStartExpressions} - frame start boundary</li>
* <li>{@link #frameEndExpressions} - frame end boundary</li>
* <li>{@link #frameExcludeExpressions} - define values to be excluded from
the frame</li>
@@ -217,15 +220,27 @@
@Override
public boolean
acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws
AlgebricksException {
+ return acceptExpressionTransform(visitor, true);
+ }
+
+ /**
+ * Allows performing expression transformation only on a subset of this
operator's expressions
+ * @param visitor transforming visitor
+ * @param visitVarRefRequiringExprs whether to visit variable reference
requiring expressions, or not
+ */
+ public boolean
acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor,
+ boolean visitVarRefRequiringExprs) throws AlgebricksException {
boolean mod = false;
- for (Mutable<ILogicalExpression> expr : partitionExpressions) {
- mod |= visitor.transform(expr);
- }
- for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p :
orderExpressions) {
- mod |= visitor.transform(p.second);
- }
- for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p :
frameValueExpressions) {
- mod |= visitor.transform(p.second);
+ if (visitVarRefRequiringExprs) {
+ for (Mutable<ILogicalExpression> expr : partitionExpressions) {
+ mod |= visitor.transform(expr);
+ }
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p :
orderExpressions) {
+ mod |= visitor.transform(p.second);
+ }
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p :
frameValueExpressions) {
+ mod |= visitor.transform(p.second);
+ }
}
for (Mutable<ILogicalExpression> expr : frameStartExpressions) {
mod |= visitor.transform(expr);
@@ -305,4 +320,14 @@
expr.getValue().getUsedVariables(vars);
}
}
+
+ /**
+ * Only the following expressions require variable references: {@link
#partitionExpressions},
+ * {@link #orderExpressions}, and {@link #frameValueExpressions}, others
do not.
+ * Use {@link
#acceptExpressionTransform(ILogicalExpressionReferenceTransform, boolean)}
+ * to visit only non-requiring expressions.
+ */
+ public boolean requiresVariableReferenceExpressions() {
+ return false;
+ }
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index c8168d1..2a8658d 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -75,11 +75,14 @@
private final List<OrderColumn> orderColumns;
+ private final boolean frameStartIsMonotonic;
+
public WindowPOperator(List<LogicalVariable> partitionColumns, boolean
partitionMaterialization,
- List<OrderColumn> orderColumns) {
+ List<OrderColumn> orderColumns, boolean frameStartIsMonotonic) {
this.partitionColumns = partitionColumns;
this.partitionMaterialization = partitionMaterialization;
this.orderColumns = orderColumns;
+ this.frameStartIsMonotonic = frameStartIsMonotonic;
}
@Override
@@ -217,10 +220,10 @@
} else {
runtime = new
WindowNestedPlansRuntimeFactory(partitionColumnsList,
partitionComparatorFactories,
orderComparatorFactories,
frameValueExprEvalsAndComparators.first,
- frameValueExprEvalsAndComparators.second,
frameStartExprEvals, frameEndExprEvals,
- frameExcludeExprEvalsAndComparators.first,
winOp.getFrameExcludeNegationStartIdx(),
- frameExcludeExprEvalsAndComparators.second,
frameOffsetExprEval,
- context.getBinaryIntegerInspectorFactory(),
winOp.getFrameMaxObjects(),
+ frameValueExprEvalsAndComparators.second,
frameStartExprEvals, frameStartIsMonotonic,
+ frameEndExprEvals,
frameExcludeExprEvalsAndComparators.first,
+ winOp.getFrameExcludeNegationStartIdx(),
frameExcludeExprEvalsAndComparators.second,
+ frameOffsetExprEval,
context.getBinaryIntegerInspectorFactory(), winOp.getFrameMaxObjects(),
projectionColumnsExcludingSubplans,
runningAggOutColumns, runningAggFactories,
aggregatorOutputSchemaSize, nestedAggFactory);
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 9d5cdeb..d521831 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -120,9 +120,9 @@
public R visitWriteResultOperator(WriteResultOperator op, T arg) throws
AlgebricksException;
- public R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T
tag) throws AlgebricksException;
+ public R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T
arg) throws AlgebricksException;
- public R
visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T tag)
throws AlgebricksException;
+ public R
visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T arg)
throws AlgebricksException;
public R visitTokenizeOperator(TokenizeOperator op, T arg) throws
AlgebricksException;
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java
new file mode 100644
index 0000000..9350f95
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.visitors;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+
+/**
+ * This visitor performs expression transformation on each operator by calling
+ * {@link
ILogicalOperator#acceptExpressionTransform(ILogicalExpressionReferenceTransform)}.
+ * Subclasses can override individual {@code visit*} methods to customize
which expressions must be transformed
+ * based on the operator kind. This functionality is required in cases when
only a subset of operator's expressions
+ * must be transformed.
+ *
+ * @see
WindowOperator#acceptExpressionTransform(ILogicalExpressionReferenceTransform,
boolean)
+ */
+public abstract class LogicalExpressionReferenceTransformVisitor
+ implements ILogicalOperatorVisitor<Boolean,
ILogicalExpressionReferenceTransform> {
+
+ protected boolean visitOperator(ILogicalOperator op,
ILogicalExpressionReferenceTransform transform)
+ throws AlgebricksException {
+ return op.acceptExpressionTransform(transform);
+ }
+
+ @Override
+ public Boolean visitAggregateOperator(AggregateOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitRunningAggregateOperator(RunningAggregateOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitGroupByOperator(GroupByOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitLimitOperator(LimitOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitInnerJoinOperator(InnerJoinOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitLeftOuterJoinOperator(LeftOuterJoinOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitNestedTupleSourceOperator(NestedTupleSourceOperator op,
+ ILogicalExpressionReferenceTransform arg) throws
AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitOrderOperator(OrderOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitAssignOperator(AssignOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitSelectOperator(SelectOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitDelegateOperator(DelegateOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitProjectOperator(ProjectOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitReplicateOperator(ReplicateOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitSplitOperator(SplitOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitMaterializeOperator(MaterializeOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitScriptOperator(ScriptOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitSubplanOperator(SubplanOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitSinkOperator(SinkOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitUnionOperator(UnionAllOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitIntersectOperator(IntersectOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitUnnestOperator(UnnestOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitUnnestMapOperator(UnnestMapOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator
op,
+ ILogicalExpressionReferenceTransform arg) throws
AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitDataScanOperator(DataSourceScanOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitDistinctOperator(DistinctOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitExchangeOperator(ExchangeOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitWriteOperator(WriteOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitDistributeResultOperator(DistributeResultOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitWriteResultOperator(WriteResultOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator
op,
+ ILogicalExpressionReferenceTransform arg) throws
AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean
visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op,
+ ILogicalExpressionReferenceTransform arg) throws
AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitTokenizeOperator(TokenizeOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitForwardOperator(ForwardOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+
+ @Override
+ public Boolean visitWindowOperator(WindowOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return visitOperator(op, arg);
+ }
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java
index dc9a11f..f072312 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java
@@ -72,7 +72,7 @@
if (!op.requiresVariableReferenceExpressions()) {
inlineVisitor.setOperator(op);
inlineVisitor.setTargetVariable(entry.getKey());
- if (op.acceptExpressionTransform(inlineVisitor)) {
+ if (op.accept(inlineVisitor, inlineVisitor)) {
modified = true;
}
inlineVisitor.setTargetVariable(null);
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index 729d6f9..2c95ce0 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -35,13 +35,14 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
import
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import
org.apache.hyracks.algebricks.core.algebra.visitors.LogicalExpressionReferenceTransformVisitor;
import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -113,7 +114,7 @@
// Only inline variables in operators that can deal with arbitrary
expressions.
if (!op.requiresVariableReferenceExpressions()) {
inlineVisitor.setOperator(op);
- return op.acceptExpressionTransform(inlineVisitor);
+ return op.accept(inlineVisitor, inlineVisitor);
}
return false;
}
@@ -199,7 +200,8 @@
return modified;
}
- public static class InlineVariablesVisitor implements
ILogicalExpressionReferenceTransform {
+ public static class InlineVariablesVisitor extends
LogicalExpressionReferenceTransformVisitor
+ implements ILogicalExpressionReferenceTransform {
private final Map<LogicalVariable, ILogicalExpression> varAssignRhs;
private final Set<LogicalVariable> liveVars = new HashSet<>();
@@ -227,9 +229,15 @@
}
@Override
+ public Boolean visitWindowOperator(WindowOperator op,
ILogicalExpressionReferenceTransform arg)
+ throws AlgebricksException {
+ return op.acceptExpressionTransform(arg, false);
+ }
+
+ @Override
public boolean transform(Mutable<ILogicalExpression> exprRef) throws
AlgebricksException {
ILogicalExpression e = exprRef.getValue();
- switch (((AbstractLogicalExpression) e).getExpressionTag()) {
+ switch (e.getExpressionTag()) {
case VARIABLE:
return transformVariableReferenceExpression(exprRef,
((VariableReferenceExpression)
e).getVariableReference());
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
index 661bb8a..4e97d6c 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java
@@ -99,9 +99,8 @@
boolean isFirstChunk = chunkEndIdx.isEmpty();
if (isFirstChunk) {
if (frameId != curFrameId) {
- int nBlocks =
FrameHelper.deserializeNumOfMinFrame(frameBuffer);
- curFrame.ensureFrameSize(curFrame.getMinSize() * nBlocks);
int pos = frameBuffer.position();
+ curFrame.ensureFrameSize(frameBuffer.capacity());
FrameUtils.copyAndFlip(frameBuffer, curFrame.getBuffer());
frameBuffer.position(pos);
curFrameId = frameId;
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
index e7daf11..565cbe6 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
@@ -26,7 +26,6 @@
import
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -34,15 +33,17 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.DataUtils;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import
org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import org.apache.hyracks.storage.common.MultiComparator;
/**
* Runtime for window operators that performs partition materialization and
can evaluate running aggregates
@@ -56,11 +57,11 @@
private IScalarEvaluator[] frameValueEvals;
- private IPointable[] frameValuePointables;
+ private PointableTupleReference frameValuePointables;
private final IBinaryComparatorFactory[] frameValueComparatorFactories;
- private IBinaryComparator[] frameValueComparators;
+ private MultiComparator frameValueComparators;
private final boolean frameStartExists;
@@ -68,7 +69,9 @@
private IScalarEvaluator[] frameStartEvals;
- private IPointable[] frameStartPointables;
+ private PointableTupleReference frameStartPointables;
+
+ private final boolean frameStartIsMonotonic;
private final boolean frameEndExists;
@@ -76,7 +79,7 @@
private IScalarEvaluator[] frameEndEvals;
- private IPointable[] frameEndPointables;
+ private PointableTupleReference frameEndPointables;
private final boolean frameExcludeExists;
@@ -86,7 +89,7 @@
private final int frameExcludeNegationStartIdx;
- private IPointable[] frameExcludePointables;
+ private PointableTupleReference frameExcludePointables;
private IPointable frameExcludePointable2;
@@ -116,18 +119,28 @@
private IFrame runFrame;
+ private int runFrameChunkId;
+
+ private long runFrameSize;
+
private FrameTupleAccessor tAccess2;
private FrameTupleReference tRef2;
private IBinaryIntegerInspector bii;
+ private int chunkIdxFrameStartGlobal;
+
+ private int tBeginIdxFrameStartGlobal;
+
+ private long readerPosFrameStartGlobal;
+
WindowNestedPlansPushRuntime(int[] partitionColumns,
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories,
IScalarEvaluatorFactory[] frameValueEvalFactories,
IBinaryComparatorFactory[] frameValueComparatorFactories,
IScalarEvaluatorFactory[] frameStartEvalFactories,
- IScalarEvaluatorFactory[] frameEndEvalFactories,
IScalarEvaluatorFactory[] frameExcludeEvalFactories,
- int frameExcludeNegationStartIdx, IBinaryComparatorFactory[]
frameExcludeComparatorFactories,
- IScalarEvaluatorFactory frameOffsetEvalFactory,
+ boolean frameStartIsMonotonic, IScalarEvaluatorFactory[]
frameEndEvalFactories,
+ IScalarEvaluatorFactory[] frameExcludeEvalFactories, int
frameExcludeNegationStartIdx,
+ IBinaryComparatorFactory[] frameExcludeComparatorFactories,
IScalarEvaluatorFactory frameOffsetEvalFactory,
IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int
frameMaxObjects, int[] projectionColumns,
int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[]
runningAggFactories,
int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory
nestedAggFactory, IHyracksTaskContext ctx) {
@@ -137,6 +150,7 @@
this.frameValueExists = frameValueEvalFactories != null &&
frameValueEvalFactories.length > 0;
this.frameStartEvalFactories = frameStartEvalFactories;
this.frameStartExists = frameStartEvalFactories != null &&
frameStartEvalFactories.length > 0;
+ this.frameStartIsMonotonic = frameStartExists && frameStartIsMonotonic;
this.frameEndEvalFactories = frameEndEvalFactories;
this.frameEndExists = frameEndEvalFactories != null &&
frameEndEvalFactories.length > 0;
this.frameValueComparatorFactories = frameValueComparatorFactories;
@@ -158,7 +172,7 @@
if (frameValueExists) {
frameValueEvals = createEvaluators(frameValueEvalFactories, ctx);
- frameValueComparators =
createBinaryComparators(frameValueComparatorFactories);
+ frameValueComparators =
MultiComparator.create(frameValueComparatorFactories);
frameValuePointables =
createPointables(frameValueEvalFactories.length);
}
if (frameStartExists) {
@@ -190,16 +204,26 @@
}
@Override
+ protected void beginPartitionImpl() throws HyracksDataException {
+ super.beginPartitionImpl();
+ chunkIdxFrameStartGlobal = -1;
+ tBeginIdxFrameStartGlobal = -1;
+ readerPosFrameStartGlobal = -1;
+ runFrameChunkId = -1;
+ }
+
+ @Override
protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader
reader) throws HyracksDataException {
+ boolean frameStartForward = frameStartIsMonotonic &&
chunkIdxFrameStartGlobal >= 0;
+
long readerPos = -1;
int nChunks = getPartitionChunkCount();
if (nChunks > 1) {
readerPos = reader.position();
if (chunkIdx == 0) {
ByteBuffer curFrameBuffer = curFrame.getBuffer();
- int nBlocks =
FrameHelper.deserializeNumOfMinFrame(curFrameBuffer);
- copyFrame2.ensureFrameSize(copyFrame2.getMinSize() * nBlocks);
int pos = curFrameBuffer.position();
+ copyFrame2.ensureFrameSize(curFrameBuffer.capacity());
FrameUtils.copyAndFlip(curFrameBuffer, copyFrame2.getBuffer());
curFrameBuffer.position(pos);
}
@@ -216,19 +240,13 @@
// frame boundaries
if (frameStartExists) {
- for (int i = 0; i < frameStartEvals.length; i++) {
- frameStartEvals[i].evaluate(tRef, frameStartPointables[i]);
- }
+ evaluate(frameStartEvals, tRef, frameStartPointables);
}
if (frameEndExists) {
- for (int i = 0; i < frameEndEvals.length; i++) {
- frameEndEvals[i].evaluate(tRef, frameEndPointables[i]);
- }
+ evaluate(frameEndEvals, tRef, frameEndPointables);
}
if (frameExcludeExists) {
- for (int i = 0; i < frameExcludeEvals.length; i++) {
- frameExcludeEvals[i].evaluate(tRef,
frameExcludePointables[i]);
- }
+ evaluate(frameExcludeEvals, tRef, frameExcludePointables);
}
int toSkip = 0;
if (frameOffsetExists) {
@@ -241,37 +259,65 @@
// aggregator created by WindowAggregatorDescriptorFactory does
not process argument tuple in init()
nestedAgg.init(null, null, -1, null);
+ int chunkIdxInnerStart = frameStartForward ?
chunkIdxFrameStartGlobal : 0;
+ int tBeginIdxInnerStart = frameStartForward ?
tBeginIdxFrameStartGlobal : -1;
if (nChunks > 1) {
- reader.seek(0);
+ reader.seek(frameStartForward ? readerPosFrameStartGlobal : 0);
}
- frame_loop: for (int chunkIdx2 = 0; chunkIdx2 < nChunks;
chunkIdx2++) {
- IFrame innerFrame;
- if (chunkIdx2 == 0) {
- // first chunk's frame is always in memory
- innerFrame = chunkIdx == 0 ? curFrame : copyFrame2;
- } else {
- reader.nextFrame(runFrame);
- innerFrame = runFrame;
- }
- tAccess2.reset(innerFrame.getBuffer());
+ int chunkIdxFrameStartLocal = -1, tBeginIdxFrameStartLocal = -1;
+ long readerPosFrameStartLocal = -1;
- int tBeginIdx2 = getTupleBeginIdx(chunkIdx2);
- int tEndIdx2 = getTupleEndIdx(chunkIdx2);
- for (int tIdx2 = tBeginIdx2; tIdx2 <= tEndIdx2; tIdx2++) {
- tRef2.reset(tAccess2, tIdx2);
+ frame_loop: for (int chunkIdxInner = chunkIdxInnerStart;
chunkIdxInner < nChunks; chunkIdxInner++) {
+ long readerPosFrameInner;
+ IFrame frameInner;
+ if (chunkIdxInner == 0) {
+ // first chunk's frame is always in memory
+ frameInner = chunkIdx == 0 ? curFrame : copyFrame2;
+ readerPosFrameInner = 0;
+ } else {
+ readerPosFrameInner = reader.position();
+ if (runFrameChunkId == chunkIdxInner) {
+ // runFrame has this chunk, so just advance the reader
+ reader.seek(readerPosFrameInner + runFrameSize);
+ } else {
+ reader.nextFrame(runFrame);
+ runFrameSize = reader.position() - readerPosFrameInner;
+ runFrameChunkId = chunkIdxInner;
+ }
+ frameInner = runFrame;
+ }
+ tAccess2.reset(frameInner.getBuffer());
+
+ int tBeginIdxInner;
+ if (tBeginIdxInnerStart < 0) {
+ tBeginIdxInner = getTupleBeginIdx(chunkIdxInner);
+ } else {
+ tBeginIdxInner = tBeginIdxInnerStart;
+ tBeginIdxInnerStart = -1;
+ }
+ int tEndIdxInner = getTupleEndIdx(chunkIdxInner);
+
+ for (int tIdxInner = tBeginIdxInner; tIdxInner <=
tEndIdxInner; tIdxInner++) {
+ tRef2.reset(tAccess2, tIdxInner);
if (frameStartExists || frameEndExists) {
- for (int frameValueIdx = 0; frameValueIdx <
frameValueEvals.length; frameValueIdx++) {
- frameValueEvals[frameValueIdx].evaluate(tRef2,
frameValuePointables[frameValueIdx]);
- }
- if (frameStartExists
- && compare(frameValuePointables,
frameStartPointables, frameValueComparators) < 0) {
- // skip if value < start
- continue;
+ evaluate(frameValueEvals, tRef2, frameValuePointables);
+ if (frameStartExists) {
+ if
(frameValueComparators.compare(frameValuePointables, frameStartPointables) < 0)
{
+ // skip if value < start
+ continue;
+ }
+ if (chunkIdxFrameStartLocal < 0) {
+ // save position of the first tuple that
matches the frame start.
+ // we'll continue from it in the next frame
iteration
+ chunkIdxFrameStartLocal = chunkIdxInner;
+ tBeginIdxFrameStartLocal = tIdxInner;
+ readerPosFrameStartLocal = readerPosFrameInner;
+ }
}
if (frameEndExists
- && compare(frameValuePointables,
frameEndPointables, frameValueComparators) > 0) {
+ &&
frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) {
// skip and exit if value > end
break frame_loop;
}
@@ -288,7 +334,7 @@
}
if (toWrite != 0) {
- nestedAgg.aggregate(tAccess2, tIdx2, null, -1, null);
+ nestedAgg.aggregate(tAccess2, tIdxInner, null, -1,
null);
}
if (toWrite > 0) {
toWrite--;
@@ -301,6 +347,19 @@
nestedAgg.outputFinalResult(tupleBuilder, null, -1, null);
appendToFrameFromTupleBuilder(tupleBuilder);
+
+ if (frameStartIsMonotonic) {
+ if (chunkIdxFrameStartLocal >= 0) {
+ chunkIdxFrameStartGlobal = chunkIdxFrameStartLocal;
+ tBeginIdxFrameStartGlobal = tBeginIdxFrameStartLocal;
+ readerPosFrameStartGlobal = readerPosFrameStartLocal;
+ } else {
+ // frame start not found, set start beyond the last chunk
+ chunkIdxFrameStartGlobal = nChunks;
+ tBeginIdxFrameStartGlobal = 0;
+ readerPosFrameStartGlobal = 0;
+ }
+ }
}
if (nChunks > 1) {
@@ -311,7 +370,7 @@
private boolean isExcluded() throws HyracksDataException {
for (int i = 0; i < frameExcludeEvals.length; i++) {
frameExcludeEvals[i].evaluate(tRef2, frameExcludePointable2);
- boolean b = DataUtils.compare(frameExcludePointables[i],
frameExcludePointable2,
+ boolean b = DataUtils.compare(frameExcludePointables.getField(i),
frameExcludePointable2,
frameExcludeComparators[i]) != 0;
if (i >= frameExcludeNegationStartIdx) {
b = !b;
@@ -337,22 +396,18 @@
return evals;
}
- private static IPointable[] createPointables(int ln) {
+ private static void evaluate(IScalarEvaluator[] evals,
IFrameTupleReference inTuple,
+ PointableTupleReference outTuple) throws HyracksDataException {
+ for (int i = 0; i < evals.length; i++) {
+ evals[i].evaluate(inTuple, outTuple.getField(i));
+ }
+ }
+
+ private static PointableTupleReference createPointables(int ln) {
IPointable[] pointables = new IPointable[ln];
for (int i = 0; i < ln; i++) {
pointables[i] = VoidPointable.FACTORY.createPointable();
}
- return pointables;
- }
-
- private static int compare(IValueReference[] first, IValueReference[]
second, IBinaryComparator[] comparators)
- throws HyracksDataException {
- for (int i = 0; i < first.length; i++) {
- int c = DataUtils.compare(first[i], second[i], comparators[i]);
- if (c != 0) {
- return c;
- }
- }
- return 0;
+ return new PointableTupleReference(pointables);
}
}
\ No newline at end of file
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
index 640e260..16591d5 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java
@@ -40,6 +40,8 @@
private final IScalarEvaluatorFactory[] frameStartEvalFactories;
+ private final boolean frameStartIsMonotonic;
+
private final IScalarEvaluatorFactory[] frameEndEvalFactories;
private final IBinaryComparatorFactory[] frameValueComparatorFactories;
@@ -64,9 +66,9 @@
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories,
IScalarEvaluatorFactory[] frameValueEvalFactories,
IBinaryComparatorFactory[] frameValueComparatorFactories,
IScalarEvaluatorFactory[] frameStartEvalFactories,
- IScalarEvaluatorFactory[] frameEndEvalFactories,
IScalarEvaluatorFactory[] frameExcludeEvalFactories,
- int frameExcludeNegationStartIdx, IBinaryComparatorFactory[]
frameExcludeComparatorFactories,
- IScalarEvaluatorFactory frameOffsetEvalFactory,
+ boolean frameStartIsMonotonic, IScalarEvaluatorFactory[]
frameEndEvalFactories,
+ IScalarEvaluatorFactory[] frameExcludeEvalFactories, int
frameExcludeNegationStartIdx,
+ IBinaryComparatorFactory[] frameExcludeComparatorFactories,
IScalarEvaluatorFactory frameOffsetEvalFactory,
IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int
frameMaxObjects,
int[] projectionColumnsExcludingSubplans, int[]
runningAggOutColumns,
IRunningAggregateEvaluatorFactory[] runningAggFactories, int
nestedAggOutSchemaSize,
@@ -75,6 +77,7 @@
projectionColumnsExcludingSubplans, runningAggOutColumns,
runningAggFactories);
this.frameValueEvalFactories = frameValueEvalFactories;
this.frameStartEvalFactories = frameStartEvalFactories;
+ this.frameStartIsMonotonic = frameStartIsMonotonic;
this.frameEndEvalFactories = frameEndEvalFactories;
this.frameValueComparatorFactories = frameValueComparatorFactories;
this.frameExcludeEvalFactories = frameExcludeEvalFactories;
@@ -91,10 +94,10 @@
public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(IHyracksTaskContext ctx) {
return new WindowNestedPlansPushRuntime(partitionColumns,
partitionComparatorFactories,
orderComparatorFactories, frameValueEvalFactories,
frameValueComparatorFactories,
- frameStartEvalFactories, frameEndEvalFactories,
frameExcludeEvalFactories, frameExcludeNegationStartIdx,
- frameExcludeComparatorFactories, frameOffsetEvalFactory,
binaryIntegerInspectorFactory, frameMaxObjects,
- projectionList, runningAggOutColumns, runningAggFactories,
nestedAggOutSchemaSize, nestedAggFactory,
- ctx);
+ frameStartEvalFactories, frameStartIsMonotonic,
frameEndEvalFactories, frameExcludeEvalFactories,
+ frameExcludeNegationStartIdx, frameExcludeComparatorFactories,
frameOffsetEvalFactory,
+ binaryIntegerInspectorFactory, frameMaxObjects,
projectionList, runningAggOutColumns,
+ runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory,
ctx);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java
new file mode 100644
index 0000000..1d947c1
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.accessors;
+
+import org.apache.hyracks.data.std.api.IPointable;
+
+/**
+ * A tuple reference implementation that holds fields in a {@link IPointable}
array
+ */
+public class PointableTupleReference implements ITupleReference {
+
+ private final IPointable[] fields;
+
+ public PointableTupleReference(IPointable[] fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return fields.length;
+ }
+
+ @Override
+ public byte[] getFieldData(int fIdx) {
+ return getField(fIdx).getByteArray();
+ }
+
+ @Override
+ public int getFieldStart(int fIdx) {
+ return getField(fIdx).getStartOffset();
+ }
+
+ @Override
+ public int getFieldLength(int fIdx) {
+ return getField(fIdx).getLength();
+ }
+
+ public IPointable getField(int fIdx) {
+ return fields[fIdx];
+ }
+}
\ No newline at end of file
--
To view, visit https://asterix-gerrit.ics.uci.edu/3135
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I65bed4092f4fd3622f1525b26ce25e2ac07d7538
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Dmitry Lychagin <[email protected]>