Dmitry Lychagin has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/3369
Change subject: [NO ISSUE][COMP] Refactor physical window operator
......................................................................
[NO ISSUE][COMP] Refactor physical window operator
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Create a new physical operator (WindowStreamPOperator)
for window operators that do not require partition materialization
- Create AbstractWindowPOperator which is now a base
class for both physical window operators
- Rename WindowSimpleRuntime* to WindowStreamRuntime*
Change-Id: I3863fa3d298aef53d4098be9fc17b0451eb2c23e
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
M
asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
M
asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
M
asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
A
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
A
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
M
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
R
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
R
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
13 files changed, 439 insertions(+), 309 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/69/3369/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index b26eaca..69eecfd 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -56,8 +56,10 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowStreamPOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
import
org.apache.hyracks.algebricks.rewriter.rules.SetAlgebricksPhysicalOperatorsRule;
@@ -241,19 +243,24 @@
}
@Override
- public WindowPOperator createWindowPOperator(WindowOperator winOp)
throws AlgebricksException {
- boolean partitionMaterialization = winOp.hasNestedPlans() ||
AnalysisUtil.hasFunctionWithProperty(winOp,
-
BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION);
- boolean frameStartIsMonotonic = AnalysisUtil
-
.isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(),
winOp.getFrameValueExpressions());
- boolean frameEndIsMonotonic =
AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameEndExpressions(),
- winOp.getFrameValueExpressions());
- boolean nestedTrivialAggregates = winOp.hasNestedPlans()
- &&
winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
-
- return new WindowPOperator(winOp.getPartitionVarList(),
partitionMaterialization,
- winOp.getOrderColumnList(), frameStartIsMonotonic,
frameEndIsMonotonic, nestedTrivialAggregates,
-
context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+ public AbstractWindowPOperator createWindowPOperator(WindowOperator
winOp) throws AlgebricksException {
+ if (winOp.hasNestedPlans()) {
+ boolean frameStartIsMonotonic =
AnalysisUtil.isWindowFrameBoundaryMonotonic(
+ winOp.getFrameStartExpressions(),
winOp.getFrameValueExpressions());
+ boolean frameEndIsMonotonic =
AnalysisUtil.isWindowFrameBoundaryMonotonic(
+ winOp.getFrameEndExpressions(),
winOp.getFrameValueExpressions());
+ boolean nestedTrivialAggregates =
+
winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
+ return new WindowPOperator(winOp.getPartitionVarList(),
winOp.getOrderColumnList(),
+ frameStartIsMonotonic, frameEndIsMonotonic,
nestedTrivialAggregates,
+
context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+ } else if (AnalysisUtil.hasFunctionWithProperty(winOp,
+
BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION)) {
+ return new WindowPOperator(winOp.getPartitionVarList(),
winOp.getOrderColumnList(), false, false, false,
+
context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+ } else {
+ return new WindowStreamPOperator(winOp.getPartitionVarList(),
winOp.getOrderColumnList());
+ }
}
}
}
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
index 6cba1b1..a2c1c33 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -25,7 +25,6 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
public class OperatorResourcesComputer {
@@ -146,10 +145,10 @@
}
private long getWindowRequiredMemory(WindowOperator op) {
- WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator();
// memory budget configuration only applies to window operators that
materialize partitions (non-streaming)
// streaming window operators only need 2 frames: output +
(conservative estimate) last frame partition columns
- long memorySize = physOp.isPartitionMaterialization() ?
windowMemorySize : 2 * frameSize;
+ long memorySize = op.getPhysicalOperator().getOperatorTag() ==
PhysicalOperatorTag.WINDOW_STREAM ? 2 * frameSize
+ : windowMemorySize;
return getOperatorRequiredMemory(op, memorySize);
}
}
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
index c0fca94..024a13e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
@@ -318,7 +318,7 @@
WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator();
visitInternal(op, true);
addOutputBuffer(op); // + previous frame
- if (physOp.isPartitionMaterialization()) {
+ if (physOp.getOperatorTag() == PhysicalOperatorTag.WINDOW) {
addOutputBuffer(op); // + run frame
}
return null;
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
index e452d03..c12faf5 100644
---
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan
@@ -11,7 +11,7 @@
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- WINDOW |PARTITIONED|
+ -- WINDOW_STREAM |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STABLE_SORT [$$34(ASC), $$48(ASC)] |PARTITIONED|
-- HASH_PARTITION_EXCHANGE [$$34] |PARTITIONED|
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
index ab78ecc..a1e04ad 100644
---
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan
@@ -3,7 +3,7 @@
-- STREAM_PROJECT |LOCAL|
-- ASSIGN |LOCAL|
-- WINDOW |LOCAL|
- -- WINDOW |LOCAL|
+ -- WINDOW_STREAM |LOCAL|
-- ONE_TO_ONE_EXCHANGE |LOCAL|
-- STABLE_SORT [$$m(ASC), $$t(ASC)] |LOCAL|
-- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
index 5b3d480..b111336 100644
---
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan
@@ -8,7 +8,7 @@
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- WINDOW |LOCAL|
+ -- WINDOW_STREAM |LOCAL|
-- ONE_TO_ONE_EXCHANGE |LOCAL|
-- STABLE_SORT [$$m(ASC), $$t(ASC)] |LOCAL|
-- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 404a8dc..9c7b1d0 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -78,5 +78,6 @@
UPDATE,
WRITE_RESULT,
INTERSECT,
- WINDOW
+ WINDOW,
+ WINDOW_STREAM
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
new file mode 100644
index 0000000..7065b70
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import
org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import
org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import
org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
+import
org.apache.hyracks.algebricks.runtime.operators.win.WindowAggregatorDescriptorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+
+public abstract class AbstractWindowPOperator extends AbstractPhysicalOperator
{
+
+ private final List<LogicalVariable> partitionColumns;
+
+ private final List<OrderColumn> orderColumns;
+
+ AbstractWindowPOperator(List<LogicalVariable> partitionColumns,
List<OrderColumn> orderColumns) {
+ this.partitionColumns = partitionColumns;
+ this.orderColumns = orderColumns;
+ }
+
+ @Override
+ public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) throws AlgebricksException {
+ IPartitioningProperty pp;
+ switch (op.getExecutionMode()) {
+ case PARTITIONED:
+ pp = new UnorderedPartitionedProperty(new
ListSet<>(partitionColumns),
+ context.getComputationNodeDomain());
+ break;
+ case UNPARTITIONED:
+ pp = IPartitioningProperty.UNPARTITIONED;
+ break;
+ case LOCAL:
+ pp = null;
+ break;
+ default:
+ throw new IllegalStateException(op.getExecutionMode().name());
+ }
+
+ // require local order property [pc1, ... pcN, oc1, ... ocN]
+ // accounting for cases where there's an overlap between order and
partition columns
+ // TODO replace with required local grouping on partition columns +
local order on order columns
+ List<OrderColumn> lopColumns = new ArrayList<>();
+ ListSet<LogicalVariable> pcVars = new ListSet<>();
+ pcVars.addAll(partitionColumns);
+ for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
+ OrderColumn oc = orderColumns.get(oIdx);
+ LogicalVariable ocVar = oc.getColumn();
+ if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1,
pcVars)) {
+ throw new AlgebricksException(ErrorCode.HYRACKS,
ErrorCode.UNSUPPORTED_WINDOW_SPEC,
+ op.getSourceLocation(),
String.valueOf(partitionColumns), String.valueOf(orderColumns));
+ }
+ lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
+ }
+ int pIdx = 0;
+ for (LogicalVariable pColumn : pcVars) {
+ lopColumns.add(pIdx++, new OrderColumn(pColumn,
OrderOperator.IOrder.OrderKind.ASC));
+ }
+ List<ILocalStructuralProperty> localProps =
+ lopColumns.isEmpty() ? null : Collections.singletonList(new
LocalOrderProperty(lopColumns));
+
+ return new PhysicalRequirements(
+ new StructuralPropertiesVector[] { new
StructuralPropertiesVector(pp, localProps) },
+ IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op,
IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator)
op.getInputs().get(0).getValue();
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder,
JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas,
IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ WindowOperator winOp = (WindowOperator) op;
+
+ int[] partitionColumnsList =
JobGenHelper.projectVariables(inputSchemas[0], partitionColumns);
+
+ IVariableTypeEnvironment opTypeEnv = context.getTypeEnvironment(op);
+ IBinaryComparatorFactory[] partitionComparatorFactories =
+
JobGenHelper.variablesToAscBinaryComparatorFactories(partitionColumns,
opTypeEnv, context);
+
+ //TODO not all functions need order comparators
+ IBinaryComparatorFactory[] orderComparatorFactories =
+
JobGenHelper.variablesToBinaryComparatorFactories(orderColumns, opTypeEnv,
context);
+
+ IVariableTypeEnvironment inputTypeEnv =
context.getTypeEnvironment(op.getInputs().get(0).getValue());
+ IExpressionRuntimeProvider exprRuntimeProvider =
context.getExpressionRuntimeProvider();
+ IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider =
context.getBinaryComparatorFactoryProvider();
+
+ List<Mutable<ILogicalExpression>> frameStartExprList =
winOp.getFrameStartExpressions();
+ IScalarEvaluatorFactory[] frameStartExprEvals =
+ createEvaluatorFactories(frameStartExprList, inputSchemas,
inputTypeEnv, exprRuntimeProvider, context);
+
+ List<Mutable<ILogicalExpression>> frameStartValidationExprList =
winOp.getFrameStartValidationExpressions();
+ IScalarEvaluatorFactory[] frameStartValidationExprEvals =
createEvaluatorFactories(frameStartValidationExprList,
+ inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+
+ List<Mutable<ILogicalExpression>> frameEndExprList =
winOp.getFrameEndExpressions();
+ IScalarEvaluatorFactory[] frameEndExprEvals =
+ createEvaluatorFactories(frameEndExprList, inputSchemas,
inputTypeEnv, exprRuntimeProvider, context);
+
+ List<Mutable<ILogicalExpression>> frameEndValidationExprList =
winOp.getFrameEndValidationExpressions();
+ IScalarEvaluatorFactory[] frameEndValidationExprEvals =
createEvaluatorFactories(frameEndValidationExprList,
+ inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
+
+ List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>>
frameValueExprList =
+ winOp.getFrameValueExpressions();
+ Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]>
frameValueExprEvalsAndComparators =
+ createEvaluatorAndComparatorFactories(frameValueExprList,
Pair::getSecond, Pair::getFirst, inputSchemas,
+ inputTypeEnv, exprRuntimeProvider,
binaryComparatorFactoryProvider, context);
+
+ List<Mutable<ILogicalExpression>> frameExcludeExprList =
winOp.getFrameExcludeExpressions();
+ Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]>
frameExcludeExprEvalsAndComparators =
+ createEvaluatorAndComparatorFactories(frameExcludeExprList, v
-> v, v -> OrderOperator.ASC_ORDER,
+ inputSchemas, inputTypeEnv, exprRuntimeProvider,
binaryComparatorFactoryProvider, context);
+
+ IScalarEvaluatorFactory frameOffsetExprEval = null;
+ ILogicalExpression frameOffsetExpr = winOp.getFrameOffset().getValue();
+ if (frameOffsetExpr != null) {
+ frameOffsetExprEval =
+
exprRuntimeProvider.createEvaluatorFactory(frameOffsetExpr, inputTypeEnv,
inputSchemas, context);
+ }
+
+ int[] projectionColumnsExcludingSubplans =
JobGenHelper.projectAllVariables(opSchema);
+
+ int[] runningAggOutColumns = JobGenHelper.projectVariables(opSchema,
winOp.getVariables());
+
+ List<Mutable<ILogicalExpression>> runningAggExprs =
winOp.getExpressions();
+ int runningAggExprCount = runningAggExprs.size();
+ IRunningAggregateEvaluatorFactory[] runningAggFactories =
+ new IRunningAggregateEvaluatorFactory[runningAggExprCount];
+ for (int i = 0; i < runningAggExprCount; i++) {
+ StatefulFunctionCallExpression expr =
(StatefulFunctionCallExpression) runningAggExprs.get(i).getValue();
+ runningAggFactories[i] =
exprRuntimeProvider.createRunningAggregateFunctionFactory(expr, inputTypeEnv,
+ inputSchemas, context);
+ }
+
+ int nestedAggOutSchemaSize = 0;
+ WindowAggregatorDescriptorFactory nestedAggFactory = null;
+ if (winOp.hasNestedPlans()) {
+ int opSchemaSizePreSubplans = opSchema.getSize();
+ AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0],
winOp, opSchema, context);
+ nestedAggOutSchemaSize = opSchema.getSize() -
opSchemaSizePreSubplans;
+ nestedAggFactory = new WindowAggregatorDescriptorFactory(subplans);
+ nestedAggFactory.setSourceLocation(winOp.getSourceLocation());
+ }
+
+ AbstractWindowRuntimeFactory runtime = createRuntimeFactory(winOp,
partitionColumnsList,
+ partitionComparatorFactories, orderComparatorFactories,
frameValueExprEvalsAndComparators.first,
+ frameValueExprEvalsAndComparators.second, frameStartExprEvals,
frameStartValidationExprEvals,
+ frameEndExprEvals, frameEndValidationExprEvals,
frameExcludeExprEvalsAndComparators.first,
+ frameExcludeExprEvalsAndComparators.second,
frameOffsetExprEval, projectionColumnsExcludingSubplans,
+ runningAggOutColumns, runningAggFactories,
nestedAggOutSchemaSize, nestedAggFactory, context);
+ runtime.setSourceLocation(winOp.getSourceLocation());
+
+ // contribute one Asterix framewriter
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(opTypeEnv,
opSchema, context);
+ builder.contributeMicroOperator(winOp, runtime, recDesc);
+ // and contribute one edge from its child
+ ILogicalOperator src = winOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, winOp, 0);
+ }
+
+ protected abstract AbstractWindowRuntimeFactory
createRuntimeFactory(WindowOperator winOp,
+ int[] partitionColumnsList, IBinaryComparatorFactory[]
partitionComparatorFactories,
+ IBinaryComparatorFactory[] orderComparatorFactories,
IScalarEvaluatorFactory[] frameValueExprEvals,
+ IBinaryComparatorFactory[] frameValueComparatorFactories,
IScalarEvaluatorFactory[] frameStartExprEvals,
+ IScalarEvaluatorFactory[] frameStartValidationExprEvals,
IScalarEvaluatorFactory[] frameEndExprEvals,
+ IScalarEvaluatorFactory[] frameEndValidationExprEvals,
IScalarEvaluatorFactory[] frameExcludeExprEvals,
+ IBinaryComparatorFactory[] frameExcludeComparatorFactories,
IScalarEvaluatorFactory frameOffsetExprEval,
+ int[] projectionColumnsExcludingSubplans, int[]
runningAggOutColumns,
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, int
nestedAggOutSchemaSize,
+ WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext
context);
+
+ @Override
+ public boolean isMicroOperator() {
+ return true;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
+
+ private IScalarEvaluatorFactory[]
createEvaluatorFactories(List<Mutable<ILogicalExpression>> exprList,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment
inputTypeEnv,
+ IExpressionRuntimeProvider exprRuntimeProvider, JobGenContext
context) throws AlgebricksException {
+ if (exprList.isEmpty()) {
+ return null;
+ }
+ int ln = exprList.size();
+ IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
+ for (int i = 0; i < ln; i++) {
+ ILogicalExpression expr = exprList.get(i).getValue();
+ evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr,
inputTypeEnv, inputSchemas, context);
+ }
+ return evals;
+ }
+
+ private <T> Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]>
createEvaluatorAndComparatorFactories(
+ List<T> exprList, Function<T, Mutable<ILogicalExpression>>
exprGetter,
+ Function<T, OrderOperator.IOrder> orderGetter, IOperatorSchema[]
inputSchemas,
+ IVariableTypeEnvironment inputTypeEnv, IExpressionRuntimeProvider
exprRuntimeProvider,
+ IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider,
JobGenContext context)
+ throws AlgebricksException {
+ if (exprList.isEmpty()) {
+ return new Pair<>(null, null);
+ }
+ int ln = exprList.size();
+ IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
+ IBinaryComparatorFactory[] comparators = new
IBinaryComparatorFactory[ln];
+ for (int i = 0; i < ln; i++) {
+ T exprObj = exprList.get(i);
+ ILogicalExpression expr = exprGetter.apply(exprObj).getValue();
+ OrderOperator.IOrder order = orderGetter.apply(exprObj);
+ evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr,
inputTypeEnv, inputSchemas, context);
+ comparators[i] =
binaryComparatorFactoryProvider.getBinaryComparatorFactory(inputTypeEnv.getType(expr),
+ order.getKind() == OrderOperator.IOrder.OrderKind.ASC);
+ }
+ return new Pair<>(evals, comparators);
+ }
+
+ private static boolean containsAny(List<OrderColumn> ocList, int startIdx,
Set<LogicalVariable> varSet) {
+ for (int i = startIdx, ln = ocList.size(); i < ln; i++) {
+ if (varSet.contains(ocList.get(i).getColumn())) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index 8bd4610..23853e8 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -19,42 +19,13 @@
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.Set;
-import java.util.function.Function;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.ListSet;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
-import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
-import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import
org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import
org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
-import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import
org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
@@ -63,18 +34,9 @@
import
org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRunningRuntimeFactory;
import
org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRuntimeFactory;
import
org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansUnboundedRuntimeFactory;
-import
org.apache.hyracks.algebricks.runtime.operators.win.WindowSimpleRuntimeFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.ErrorCode;
-public class WindowPOperator extends AbstractPhysicalOperator {
-
- private final List<LogicalVariable> partitionColumns;
-
- private final boolean partitionMaterialization;
-
- private final List<OrderColumn> orderColumns;
+public final class WindowPOperator extends AbstractWindowPOperator {
private final boolean frameStartIsMonotonic;
@@ -85,12 +47,10 @@
// The maximum number of in-memory frames that this operator can use.
private final int memSizeInFrames;
- public WindowPOperator(List<LogicalVariable> partitionColumns, boolean
partitionMaterialization,
- List<OrderColumn> orderColumns, boolean frameStartIsMonotonic,
boolean frameEndIsMonotonic,
- boolean nestedTrivialAggregates, int memSizeInFrames) {
- this.partitionColumns = partitionColumns;
- this.partitionMaterialization = partitionMaterialization;
- this.orderColumns = orderColumns;
+ public WindowPOperator(List<LogicalVariable> partitionColumns,
List<OrderColumn> orderColumns,
+ boolean frameStartIsMonotonic, boolean frameEndIsMonotonic,
boolean nestedTrivialAggregates,
+ int memSizeInFrames) {
+ super(partitionColumns, orderColumns);
this.frameStartIsMonotonic = frameStartIsMonotonic;
this.frameEndIsMonotonic = frameEndIsMonotonic;
this.nestedTrivialAggregates = nestedTrivialAggregates;
@@ -103,245 +63,55 @@
}
@Override
- public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) throws AlgebricksException {
- IPartitioningProperty pp;
- switch (op.getExecutionMode()) {
- case PARTITIONED:
- pp = new UnorderedPartitionedProperty(new
ListSet<>(partitionColumns),
- context.getComputationNodeDomain());
- break;
- case UNPARTITIONED:
- pp = IPartitioningProperty.UNPARTITIONED;
- break;
- case LOCAL:
- pp = null;
- break;
- default:
- throw new IllegalStateException(op.getExecutionMode().name());
- }
+ protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator
winOp, int[] partitionColumnsList,
+ IBinaryComparatorFactory[] partitionComparatorFactories,
+ IBinaryComparatorFactory[] orderComparatorFactories,
IScalarEvaluatorFactory[] frameValueExprEvals,
+ IBinaryComparatorFactory[] frameValueComparatorFactories,
IScalarEvaluatorFactory[] frameStartExprEvals,
+ IScalarEvaluatorFactory[] frameStartValidationExprEvals,
IScalarEvaluatorFactory[] frameEndExprEvals,
+ IScalarEvaluatorFactory[] frameEndValidationExprEvals,
IScalarEvaluatorFactory[] frameExcludeExprEvals,
+ IBinaryComparatorFactory[] frameExcludeComparatorFactories,
IScalarEvaluatorFactory frameOffsetExprEval,
+ int[] projectionColumnsExcludingSubplans, int[]
runningAggOutColumns,
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, int
nestedAggOutSchemaSize,
+ WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext
context) {
- // require local order property [pc1, ... pcN, oc1, ... ocN]
- // accounting for cases where there's an overlap between order and
partition columns
- // TODO replace with required local grouping on partition columns +
local order on order columns
- List<OrderColumn> lopColumns = new ArrayList<>();
- ListSet<LogicalVariable> pcVars = new ListSet<>();
- pcVars.addAll(partitionColumns);
- for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
- OrderColumn oc = orderColumns.get(oIdx);
- LogicalVariable ocVar = oc.getColumn();
- if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1,
pcVars)) {
- throw new AlgebricksException(ErrorCode.HYRACKS,
ErrorCode.UNSUPPORTED_WINDOW_SPEC,
- op.getSourceLocation(),
String.valueOf(partitionColumns), String.valueOf(orderColumns));
- }
- lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
- }
- int pIdx = 0;
- for (LogicalVariable pColumn : pcVars) {
- lopColumns.add(pIdx++, new OrderColumn(pColumn,
OrderOperator.IOrder.OrderKind.ASC));
- }
- List<ILocalStructuralProperty> localProps =
- lopColumns.isEmpty() ? null : Collections.singletonList(new
LocalOrderProperty(lopColumns));
-
- return new PhysicalRequirements(
- new StructuralPropertiesVector[] { new
StructuralPropertiesVector(pp, localProps) },
- IPartitioningRequirementsCoordinator.NO_COORDINATION);
- }
-
- @Override
- public void computeDeliveredProperties(ILogicalOperator op,
IOptimizationContext context) {
- AbstractLogicalOperator op2 = (AbstractLogicalOperator)
op.getInputs().get(0).getValue();
- deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
- }
-
- @Override
- public void contributeRuntimeOperator(IHyracksJobBuilder builder,
JobGenContext context, ILogicalOperator op,
- IOperatorSchema opSchema, IOperatorSchema[] inputSchemas,
IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
- WindowOperator winOp = (WindowOperator) op;
-
- int[] partitionColumnsList =
JobGenHelper.projectVariables(inputSchemas[0], partitionColumns);
-
- IVariableTypeEnvironment opTypeEnv = context.getTypeEnvironment(op);
- IBinaryComparatorFactory[] partitionComparatorFactories =
-
JobGenHelper.variablesToAscBinaryComparatorFactories(partitionColumns,
opTypeEnv, context);
-
- //TODO not all functions need order comparators
- IBinaryComparatorFactory[] orderComparatorFactories =
-
JobGenHelper.variablesToBinaryComparatorFactories(orderColumns, opTypeEnv,
context);
-
- IVariableTypeEnvironment inputTypeEnv =
context.getTypeEnvironment(op.getInputs().get(0).getValue());
- IExpressionRuntimeProvider exprRuntimeProvider =
context.getExpressionRuntimeProvider();
- IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider =
context.getBinaryComparatorFactoryProvider();
-
- List<Mutable<ILogicalExpression>> frameStartExprList =
winOp.getFrameStartExpressions();
- IScalarEvaluatorFactory[] frameStartExprEvals =
- createEvaluatorFactories(frameStartExprList, inputSchemas,
inputTypeEnv, exprRuntimeProvider, context);
-
- List<Mutable<ILogicalExpression>> frameStartValidationExprList =
winOp.getFrameStartValidationExpressions();
- IScalarEvaluatorFactory[] frameStartValidationExprEvals =
createEvaluatorFactories(frameStartValidationExprList,
- inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
- List<Mutable<ILogicalExpression>> frameEndExprList =
winOp.getFrameEndExpressions();
- IScalarEvaluatorFactory[] frameEndExprEvals =
- createEvaluatorFactories(frameEndExprList, inputSchemas,
inputTypeEnv, exprRuntimeProvider, context);
-
- List<Mutable<ILogicalExpression>> frameEndValidationExprList =
winOp.getFrameEndValidationExpressions();
- IScalarEvaluatorFactory[] frameEndValidationExprEvals =
createEvaluatorFactories(frameEndValidationExprList,
- inputSchemas, inputTypeEnv, exprRuntimeProvider, context);
-
- List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>>
frameValueExprList =
- winOp.getFrameValueExpressions();
- Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]>
frameValueExprEvalsAndComparators =
- createEvaluatorAndComparatorFactories(frameValueExprList,
Pair::getSecond, Pair::getFirst, inputSchemas,
- inputTypeEnv, exprRuntimeProvider,
binaryComparatorFactoryProvider, context);
-
- List<Mutable<ILogicalExpression>> frameExcludeExprList =
winOp.getFrameExcludeExpressions();
- Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]>
frameExcludeExprEvalsAndComparators =
- createEvaluatorAndComparatorFactories(frameExcludeExprList, v
-> v, v -> OrderOperator.ASC_ORDER,
- inputSchemas, inputTypeEnv, exprRuntimeProvider,
binaryComparatorFactoryProvider, context);
-
- IScalarEvaluatorFactory frameOffsetExprEval = null;
- ILogicalExpression frameOffsetExpr = winOp.getFrameOffset().getValue();
- if (frameOffsetExpr != null) {
- frameOffsetExprEval =
-
exprRuntimeProvider.createEvaluatorFactory(frameOffsetExpr, inputTypeEnv,
inputSchemas, context);
- }
-
- int[] projectionColumnsExcludingSubplans =
JobGenHelper.projectAllVariables(opSchema);
-
- int[] runningAggOutColumns = JobGenHelper.projectVariables(opSchema,
winOp.getVariables());
-
- List<Mutable<ILogicalExpression>> runningAggExprs =
winOp.getExpressions();
- int runningAggExprCount = runningAggExprs.size();
- IRunningAggregateEvaluatorFactory[] runningAggFactories =
- new IRunningAggregateEvaluatorFactory[runningAggExprCount];
- for (int i = 0; i < runningAggExprCount; i++) {
- StatefulFunctionCallExpression expr =
(StatefulFunctionCallExpression) runningAggExprs.get(i).getValue();
- runningAggFactories[i] =
exprRuntimeProvider.createRunningAggregateFunctionFactory(expr, inputTypeEnv,
- inputSchemas, context);
- }
-
- AbstractWindowRuntimeFactory runtime = null;
- if (winOp.hasNestedPlans()) {
- int opSchemaSizePreSubplans = opSchema.getSize();
- AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0],
winOp, opSchema, context);
- int aggregatorOutputSchemaSize = opSchema.getSize() -
opSchemaSizePreSubplans;
- WindowAggregatorDescriptorFactory nestedAggFactory = new
WindowAggregatorDescriptorFactory(subplans);
- nestedAggFactory.setSourceLocation(winOp.getSourceLocation());
-
- int frameMaxObjects = winOp.getFrameMaxObjects();
-
- // special cases
- if (frameStartExprList.isEmpty() && frameExcludeExprList.isEmpty()
&& frameOffsetExpr == null) {
- if (frameEndExprList.isEmpty()) {
- // special case #1: frame == whole partition, no
exclusions, no offset
- runtime = new
WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList,
- partitionComparatorFactories,
orderComparatorFactories, frameMaxObjects,
- projectionColumnsExcludingSubplans,
runningAggOutColumns, runningAggFactories,
- aggregatorOutputSchemaSize, nestedAggFactory,
memSizeInFrames);
- } else if (frameEndIsMonotonic && nestedTrivialAggregates) {
- // special case #2: accumulating frame from beginning of
the partition, no exclusions, no offset,
- // trivial aggregate subplan ( aggregate
+ nts )
- nestedAggFactory.setPartialOutputEnabled(true);
- runtime = new
WindowNestedPlansRunningRuntimeFactory(partitionColumnsList,
- partitionComparatorFactories,
orderComparatorFactories,
- frameValueExprEvalsAndComparators.first,
frameValueExprEvalsAndComparators.second,
- frameEndExprEvals, frameEndValidationExprEvals,
frameMaxObjects,
- context.getBinaryBooleanInspectorFactory(),
projectionColumnsExcludingSubplans,
- runningAggOutColumns, runningAggFactories,
aggregatorOutputSchemaSize, nestedAggFactory,
- memSizeInFrames);
- }
- }
- // default case
- if (runtime == null) {
- runtime = new
WindowNestedPlansRuntimeFactory(partitionColumnsList,
partitionComparatorFactories,
- orderComparatorFactories,
frameValueExprEvalsAndComparators.first,
- frameValueExprEvalsAndComparators.second,
frameStartExprEvals, frameStartValidationExprEvals,
- frameStartIsMonotonic, frameEndExprEvals,
frameEndValidationExprEvals,
- frameExcludeExprEvalsAndComparators.first,
winOp.getFrameExcludeNegationStartIdx(),
- frameExcludeExprEvalsAndComparators.second,
frameOffsetExprEval, frameMaxObjects,
- context.getBinaryBooleanInspectorFactory(),
context.getBinaryIntegerInspectorFactory(),
- projectionColumnsExcludingSubplans,
runningAggOutColumns, runningAggFactories,
- aggregatorOutputSchemaSize, nestedAggFactory,
memSizeInFrames);
- }
- } else if (partitionMaterialization) {
- runtime = new
WindowMaterializingRuntimeFactory(partitionColumnsList,
partitionComparatorFactories,
+ // special cases
+ if (!winOp.hasNestedPlans()) {
+ return new WindowMaterializingRuntimeFactory(partitionColumnsList,
partitionComparatorFactories,
orderComparatorFactories,
projectionColumnsExcludingSubplans, runningAggOutColumns,
runningAggFactories, memSizeInFrames);
- } else {
- runtime = new WindowSimpleRuntimeFactory(partitionColumnsList,
partitionComparatorFactories,
- orderComparatorFactories,
projectionColumnsExcludingSubplans, runningAggOutColumns,
- runningAggFactories);
}
- runtime.setSourceLocation(winOp.getSourceLocation());
- // contribute one Asterix framewriter
- RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(opTypeEnv,
opSchema, context);
- builder.contributeMicroOperator(winOp, runtime, recDesc);
- // and contribute one edge from its child
- ILogicalOperator src = winOp.getInputs().get(0).getValue();
- builder.contributeGraphEdge(src, 0, winOp, 0);
- }
-
- @Override
- public boolean isMicroOperator() {
- return true;
- }
-
- @Override
- public boolean expensiveThanMaterialization() {
- return true;
- }
-
- public boolean isPartitionMaterialization() {
- return partitionMaterialization;
- }
-
- private IScalarEvaluatorFactory[]
createEvaluatorFactories(List<Mutable<ILogicalExpression>> exprList,
- IOperatorSchema[] inputSchemas, IVariableTypeEnvironment
inputTypeEnv,
- IExpressionRuntimeProvider exprRuntimeProvider, JobGenContext
context) throws AlgebricksException {
- if (exprList.isEmpty()) {
- return null;
- }
- int ln = exprList.size();
- IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
- for (int i = 0; i < ln; i++) {
- ILogicalExpression expr = exprList.get(i).getValue();
- evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr,
inputTypeEnv, inputSchemas, context);
- }
- return evals;
- }
-
- private <T> Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]>
createEvaluatorAndComparatorFactories(
- List<T> exprList, Function<T, Mutable<ILogicalExpression>>
exprGetter,
- Function<T, OrderOperator.IOrder> orderGetter, IOperatorSchema[]
inputSchemas,
- IVariableTypeEnvironment inputTypeEnv, IExpressionRuntimeProvider
exprRuntimeProvider,
- IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider,
JobGenContext context)
- throws AlgebricksException {
- if (exprList.isEmpty()) {
- return new Pair<>(null, null);
- }
- int ln = exprList.size();
- IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln];
- IBinaryComparatorFactory[] comparators = new
IBinaryComparatorFactory[ln];
- for (int i = 0; i < ln; i++) {
- T exprObj = exprList.get(i);
- ILogicalExpression expr = exprGetter.apply(exprObj).getValue();
- OrderOperator.IOrder order = orderGetter.apply(exprObj);
- evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr,
inputTypeEnv, inputSchemas, context);
- comparators[i] =
binaryComparatorFactoryProvider.getBinaryComparatorFactory(inputTypeEnv.getType(expr),
- order.getKind() == OrderOperator.IOrder.OrderKind.ASC);
- }
- return new Pair<>(evals, comparators);
- }
-
- private boolean containsAny(List<OrderColumn> ocList, int startIdx,
Set<LogicalVariable> varSet) {
- for (int i = startIdx, ln = ocList.size(); i < ln; i++) {
- if (varSet.contains(ocList.get(i).getColumn())) {
- return true;
+ boolean hasFrameStart = frameStartExprEvals != null &&
frameStartExprEvals.length > 0;
+ boolean hasFrameEnd = frameEndExprEvals != null &&
frameEndExprEvals.length > 0;
+ boolean hasFrameExclude = frameExcludeExprEvals != null &&
frameExcludeExprEvals.length > 0;
+ boolean hasFrameOffset = frameOffsetExprEval != null;
+ if (!hasFrameStart && !hasFrameExclude && !hasFrameOffset) {
+ if (!hasFrameEnd) {
+ // special case #1: frame == whole partition, no exclusions,
no offset
+ return new
WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList,
partitionComparatorFactories,
+ orderComparatorFactories, winOp.getFrameMaxObjects(),
projectionColumnsExcludingSubplans,
+ runningAggOutColumns, runningAggFactories,
nestedAggOutSchemaSize, nestedAggFactory,
+ memSizeInFrames);
+ } else if (frameEndIsMonotonic && nestedTrivialAggregates) {
+ // special case #2: accumulating frame from beginning of the
partition, no exclusions, no offset,
+ // trivial aggregate subplan ( aggregate +
nts )
+ nestedAggFactory.setPartialOutputEnabled(true);
+ return new
WindowNestedPlansRunningRuntimeFactory(partitionColumnsList,
partitionComparatorFactories,
+ orderComparatorFactories, frameValueExprEvals,
frameValueComparatorFactories, frameEndExprEvals,
+ frameEndValidationExprEvals,
winOp.getFrameMaxObjects(),
+ context.getBinaryBooleanInspectorFactory(),
projectionColumnsExcludingSubplans,
+ runningAggOutColumns, runningAggFactories,
nestedAggOutSchemaSize, nestedAggFactory,
+ memSizeInFrames);
}
}
- return false;
+
+ // default case
+ return new WindowNestedPlansRuntimeFactory(partitionColumnsList,
partitionComparatorFactories,
+ orderComparatorFactories, frameValueExprEvals,
frameValueComparatorFactories, frameStartExprEvals,
+ frameStartValidationExprEvals, frameStartIsMonotonic,
frameEndExprEvals, frameEndValidationExprEvals,
+ frameExcludeExprEvals,
winOp.getFrameExcludeNegationStartIdx(), frameExcludeComparatorFactories,
+ frameOffsetExprEval, winOp.getFrameMaxObjects(),
context.getBinaryBooleanInspectorFactory(),
+ context.getBinaryIntegerInspectorFactory(),
projectionColumnsExcludingSubplans, runningAggOutColumns,
+ runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory,
memSizeInFrames);
}
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
new file mode 100644
index 0000000..33b47ec
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import
org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
+import
org.apache.hyracks.algebricks.runtime.operators.win.WindowAggregatorDescriptorFactory;
+import
org.apache.hyracks.algebricks.runtime.operators.win.WindowStreamRuntimeFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public final class WindowStreamPOperator extends AbstractWindowPOperator {
+
+ public WindowStreamPOperator(List<LogicalVariable> partitionColumns,
List<OrderColumn> orderColumns) {
+ super(partitionColumns, orderColumns);
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.WINDOW_STREAM;
+ }
+
+ @Override
+ protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator
winOp, int[] partitionColumnsList,
+ IBinaryComparatorFactory[] partitionComparatorFactories,
+ IBinaryComparatorFactory[] orderComparatorFactories,
IScalarEvaluatorFactory[] frameValueExprEvals,
+ IBinaryComparatorFactory[] frameValueComparatorFactories,
IScalarEvaluatorFactory[] frameStartExprEvals,
+ IScalarEvaluatorFactory[] frameStartValidationExprEvals,
IScalarEvaluatorFactory[] frameEndExprEvals,
+ IScalarEvaluatorFactory[] frameEndValidationExprEvals,
IScalarEvaluatorFactory[] frameExcludeExprEvals,
+ IBinaryComparatorFactory[] frameExcludeComparatorFactories,
IScalarEvaluatorFactory frameOffsetExprEval,
+ int[] projectionColumnsExcludingSubplans, int[]
runningAggOutColumns,
+ IRunningAggregateEvaluatorFactory[] runningAggFactories, int
nestedAggOutSchemaSize,
+ WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext
context) {
+ return new WindowStreamRuntimeFactory(partitionColumnsList,
partitionComparatorFactories,
+ orderComparatorFactories, projectionColumnsExcludingSubplans,
runningAggOutColumns,
+ runningAggFactories);
+ }
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 49e5a0b..3a99458 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -79,6 +79,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.BulkloadPOperator;
@@ -468,8 +469,8 @@
return createWindowPOperator(op);
}
- protected WindowPOperator createWindowPOperator(WindowOperator op)
throws AlgebricksException {
- return new WindowPOperator(op.getPartitionVarList(), true,
op.getOrderColumnList(), false, false, false,
+ protected AbstractWindowPOperator createWindowPOperator(WindowOperator
op) throws AlgebricksException {
+ return new WindowPOperator(op.getPartitionVarList(),
op.getOrderColumnList(), false, false, false,
context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
similarity index 94%
rename from
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
rename to
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
index f7f1a25..d23d4e7 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java
@@ -30,9 +30,9 @@
/**
* Runtime for window operators that evaluates running aggregates without
partition materialization.
*/
-class WindowSimplePushRuntime extends AbstractWindowPushRuntime {
+class WindowStreamPushRuntime extends AbstractWindowPushRuntime {
- WindowSimplePushRuntime(int[] partitionColumns, IBinaryComparatorFactory[]
partitionComparatorFactories,
+ WindowStreamPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[]
partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, int[]
projectionColumns, int[] runningAggOutColumns,
IRunningAggregateEvaluatorFactory[] runningAggFactories,
IHyracksTaskContext ctx,
SourceLocation sourceLoc) {
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
similarity index 82%
rename from
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
rename to
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
index 2d1cdde..be368a9 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java
@@ -27,13 +27,14 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
/**
- * Runtime factory for window operators that evaluates running aggregates
without partition materialization.
+ * Runtime factory for window operators that evaluates running aggregates in a
streaming fashion
+ * (without partition materialization).
*/
-public class WindowSimpleRuntimeFactory extends AbstractWindowRuntimeFactory {
+public class WindowStreamRuntimeFactory extends AbstractWindowRuntimeFactory {
private static final long serialVersionUID = 1L;
- public WindowSimpleRuntimeFactory(int[] partitionColumns,
IBinaryComparatorFactory[] partitionComparatorFactories,
+ public WindowStreamRuntimeFactory(int[] partitionColumns,
IBinaryComparatorFactory[] partitionComparatorFactories,
IBinaryComparatorFactory[] orderComparatorFactories, int[]
projectionColumnsExcludingSubplans,
int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[]
runningAggFactories) {
super(partitionColumns, partitionComparatorFactories,
orderComparatorFactories,
@@ -42,13 +43,13 @@
@Override
public AbstractOneInputOneOutputOneFramePushRuntime
createOneOutputPushRuntime(IHyracksTaskContext ctx) {
- return new WindowSimplePushRuntime(partitionColumns,
partitionComparatorFactories, orderComparatorFactories,
+ return new WindowStreamPushRuntime(partitionColumns,
partitionComparatorFactories, orderComparatorFactories,
projectionList, runningAggOutColumns, runningAggFactories,
ctx, sourceLoc);
}
@Override
public String toString() {
- return "window (" + Arrays.toString(partitionColumns) + ") " +
Arrays.toString(runningAggOutColumns) + " := "
- + Arrays.toString(runningAggFactories);
+ return "window-stream (" + Arrays.toString(partitionColumns) + ") " +
Arrays.toString(runningAggOutColumns)
+ + " := " + Arrays.toString(runningAggFactories);
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/3369
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I3863fa3d298aef53d4098be9fc17b0451eb2c23e
Gerrit-Change-Number: 3369
Gerrit-PatchSet: 1
Gerrit-Owner: Dmitry Lychagin <[email protected]>