Dmitry Lychagin has submitted this change and it was merged. ( https://asterix-gerrit.ics.uci.edu/3369 )
Change subject: [NO ISSUE][COMP] Refactor physical window operator ...................................................................... [NO ISSUE][COMP] Refactor physical window operator - user model changes: no - storage format changes: no - interface changes: no Details: - Create a new physical operator (WindowStreamPOperator) for window operators that do not require partition materialization - Create AbstractWindowPOperator which is now a base class for both physical window operators - Rename WindowSimpleRuntime* to WindowStreamRuntime* Change-Id: I3863fa3d298aef53d4098be9fc17b0451eb2c23e Reviewed-on: https://asterix-gerrit.ics.uci.edu/3369 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java M asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java A hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java A hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java R hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java R hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java 13 files changed, 439 insertions(+), 309 deletions(-) Approvals: Jenkins: Verified; Verified Anon. E. Moose (1000171): Ali Alsuliman: Looks good to me, approved Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java index b26eaca..69eecfd 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java @@ -56,8 +56,10 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowStreamPOperator; import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; import org.apache.hyracks.algebricks.rewriter.rules.SetAlgebricksPhysicalOperatorsRule; @@ -241,19 +243,24 @@ } @Override - public WindowPOperator createWindowPOperator(WindowOperator winOp) throws AlgebricksException { - boolean partitionMaterialization = winOp.hasNestedPlans() || AnalysisUtil.hasFunctionWithProperty(winOp, - BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION); - boolean frameStartIsMonotonic = AnalysisUtil - .isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(), winOp.getFrameValueExpressions()); - boolean frameEndIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameEndExpressions(), - winOp.getFrameValueExpressions()); - boolean nestedTrivialAggregates = winOp.hasNestedPlans() - && winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan); - - return new WindowPOperator(winOp.getPartitionVarList(), partitionMaterialization, - winOp.getOrderColumnList(), frameStartIsMonotonic, frameEndIsMonotonic, nestedTrivialAggregates, - context.getPhysicalOptimizationConfig().getMaxFramesForWindow()); + public AbstractWindowPOperator createWindowPOperator(WindowOperator winOp) throws AlgebricksException { + if (winOp.hasNestedPlans()) { + boolean frameStartIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic( + winOp.getFrameStartExpressions(), winOp.getFrameValueExpressions()); + boolean frameEndIsMonotonic = AnalysisUtil.isWindowFrameBoundaryMonotonic( + winOp.getFrameEndExpressions(), winOp.getFrameValueExpressions()); + boolean nestedTrivialAggregates = + winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan); + return new WindowPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList(), + frameStartIsMonotonic, frameEndIsMonotonic, nestedTrivialAggregates, + context.getPhysicalOptimizationConfig().getMaxFramesForWindow()); + } else if (AnalysisUtil.hasFunctionWithProperty(winOp, + BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION)) { + return new WindowPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList(), false, false, false, + context.getPhysicalOptimizationConfig().getMaxFramesForWindow()); + } else { + return new WindowStreamPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList()); + } } } } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java index 6cba1b1..a2c1c33 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java @@ -25,7 +25,6 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator; public class OperatorResourcesComputer { @@ -146,10 +145,10 @@ } private long getWindowRequiredMemory(WindowOperator op) { - WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator(); // memory budget configuration only applies to window operators that materialize partitions (non-streaming) // streaming window operators only need 2 frames: output + (conservative estimate) last frame partition columns - long memorySize = physOp.isPartitionMaterialization() ? windowMemorySize : 2 * frameSize; + long memorySize = op.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.WINDOW_STREAM ? 2 * frameSize + : windowMemorySize; return getOperatorRequiredMemory(op, memorySize); } } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java index c0fca94..024a13e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java @@ -318,7 +318,7 @@ WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator(); visitInternal(op, true); addOutputBuffer(op); // + previous frame - if (physOp.isPartitionMaterialization()) { + if (physOp.getOperatorTag() == PhysicalOperatorTag.WINDOW) { addOutputBuffer(op); // + run frame } return null; diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan index e452d03..c12faf5 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_misc/win_misc_02.plan @@ -11,7 +11,7 @@ -- AGGREGATE |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } - -- WINDOW |PARTITIONED| + -- WINDOW_STREAM |PARTITIONED| -- ONE_TO_ONE_EXCHANGE |PARTITIONED| -- STABLE_SORT [$$34(ASC), $$48(ASC)] |PARTITIONED| -- HASH_PARTITION_EXCHANGE [$$34] |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan index ab78ecc..a1e04ad 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_6.plan @@ -3,7 +3,7 @@ -- STREAM_PROJECT |LOCAL| -- ASSIGN |LOCAL| -- WINDOW |LOCAL| - -- WINDOW |LOCAL| + -- WINDOW_STREAM |LOCAL| -- ONE_TO_ONE_EXCHANGE |LOCAL| -- STABLE_SORT [$$m(ASC), $$t(ASC)] |LOCAL| -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan index 5b3d480..b111336 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/window/win_opt_01/win_opt_01_7.plan @@ -8,7 +8,7 @@ -- AGGREGATE |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } - -- WINDOW |LOCAL| + -- WINDOW_STREAM |LOCAL| -- ONE_TO_ONE_EXCHANGE |LOCAL| -- STABLE_SORT [$$m(ASC), $$t(ASC)] |LOCAL| -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED| diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java index 8e1f77f..84d19c1 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java @@ -78,5 +78,6 @@ UPDATE, WRITE_RESULT, INTERSECT, - WINDOW + WINDOW, + WINDOW_STREAM } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java new file mode 100644 index 0000000..7065b70 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.function.Function; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.ListSet; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; +import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; +import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.win.WindowAggregatorDescriptorFactory; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.ErrorCode; + +public abstract class AbstractWindowPOperator extends AbstractPhysicalOperator { + + private final List<LogicalVariable> partitionColumns; + + private final List<OrderColumn> orderColumns; + + AbstractWindowPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns) { + this.partitionColumns = partitionColumns; + this.orderColumns = orderColumns; + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException { + IPartitioningProperty pp; + switch (op.getExecutionMode()) { + case PARTITIONED: + pp = new UnorderedPartitionedProperty(new ListSet<>(partitionColumns), + context.getComputationNodeDomain()); + break; + case UNPARTITIONED: + pp = IPartitioningProperty.UNPARTITIONED; + break; + case LOCAL: + pp = null; + break; + default: + throw new IllegalStateException(op.getExecutionMode().name()); + } + + // require local order property [pc1, ... pcN, oc1, ... ocN] + // accounting for cases where there's an overlap between order and partition columns + // TODO replace with required local grouping on partition columns + local order on order columns + List<OrderColumn> lopColumns = new ArrayList<>(); + ListSet<LogicalVariable> pcVars = new ListSet<>(); + pcVars.addAll(partitionColumns); + for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) { + OrderColumn oc = orderColumns.get(oIdx); + LogicalVariable ocVar = oc.getColumn(); + if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, pcVars)) { + throw new AlgebricksException(ErrorCode.HYRACKS, ErrorCode.UNSUPPORTED_WINDOW_SPEC, + op.getSourceLocation(), String.valueOf(partitionColumns), String.valueOf(orderColumns)); + } + lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder())); + } + int pIdx = 0; + for (LogicalVariable pColumn : pcVars) { + lopColumns.add(pIdx++, new OrderColumn(pColumn, OrderOperator.IOrder.OrderKind.ASC)); + } + List<ILocalStructuralProperty> localProps = + lopColumns.isEmpty() ? null : Collections.singletonList(new LocalOrderProperty(lopColumns)); + + return new PhysicalRequirements( + new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, localProps) }, + IPartitioningRequirementsCoordinator.NO_COORDINATION); + } + + @Override + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + deliveredProperties = op2.getDeliveredPhysicalProperties().clone(); + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + WindowOperator winOp = (WindowOperator) op; + + int[] partitionColumnsList = JobGenHelper.projectVariables(inputSchemas[0], partitionColumns); + + IVariableTypeEnvironment opTypeEnv = context.getTypeEnvironment(op); + IBinaryComparatorFactory[] partitionComparatorFactories = + JobGenHelper.variablesToAscBinaryComparatorFactories(partitionColumns, opTypeEnv, context); + + //TODO not all functions need order comparators + IBinaryComparatorFactory[] orderComparatorFactories = + JobGenHelper.variablesToBinaryComparatorFactories(orderColumns, opTypeEnv, context); + + IVariableTypeEnvironment inputTypeEnv = context.getTypeEnvironment(op.getInputs().get(0).getValue()); + IExpressionRuntimeProvider exprRuntimeProvider = context.getExpressionRuntimeProvider(); + IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider = context.getBinaryComparatorFactoryProvider(); + + List<Mutable<ILogicalExpression>> frameStartExprList = winOp.getFrameStartExpressions(); + IScalarEvaluatorFactory[] frameStartExprEvals = + createEvaluatorFactories(frameStartExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context); + + List<Mutable<ILogicalExpression>> frameStartValidationExprList = winOp.getFrameStartValidationExpressions(); + IScalarEvaluatorFactory[] frameStartValidationExprEvals = createEvaluatorFactories(frameStartValidationExprList, + inputSchemas, inputTypeEnv, exprRuntimeProvider, context); + + List<Mutable<ILogicalExpression>> frameEndExprList = winOp.getFrameEndExpressions(); + IScalarEvaluatorFactory[] frameEndExprEvals = + createEvaluatorFactories(frameEndExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context); + + List<Mutable<ILogicalExpression>> frameEndValidationExprList = winOp.getFrameEndValidationExpressions(); + IScalarEvaluatorFactory[] frameEndValidationExprEvals = createEvaluatorFactories(frameEndValidationExprList, + inputSchemas, inputTypeEnv, exprRuntimeProvider, context); + + List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExprList = + winOp.getFrameValueExpressions(); + Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameValueExprEvalsAndComparators = + createEvaluatorAndComparatorFactories(frameValueExprList, Pair::getSecond, Pair::getFirst, inputSchemas, + inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context); + + List<Mutable<ILogicalExpression>> frameExcludeExprList = winOp.getFrameExcludeExpressions(); + Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameExcludeExprEvalsAndComparators = + createEvaluatorAndComparatorFactories(frameExcludeExprList, v -> v, v -> OrderOperator.ASC_ORDER, + inputSchemas, inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context); + + IScalarEvaluatorFactory frameOffsetExprEval = null; + ILogicalExpression frameOffsetExpr = winOp.getFrameOffset().getValue(); + if (frameOffsetExpr != null) { + frameOffsetExprEval = + exprRuntimeProvider.createEvaluatorFactory(frameOffsetExpr, inputTypeEnv, inputSchemas, context); + } + + int[] projectionColumnsExcludingSubplans = JobGenHelper.projectAllVariables(opSchema); + + int[] runningAggOutColumns = JobGenHelper.projectVariables(opSchema, winOp.getVariables()); + + List<Mutable<ILogicalExpression>> runningAggExprs = winOp.getExpressions(); + int runningAggExprCount = runningAggExprs.size(); + IRunningAggregateEvaluatorFactory[] runningAggFactories = + new IRunningAggregateEvaluatorFactory[runningAggExprCount]; + for (int i = 0; i < runningAggExprCount; i++) { + StatefulFunctionCallExpression expr = (StatefulFunctionCallExpression) runningAggExprs.get(i).getValue(); + runningAggFactories[i] = exprRuntimeProvider.createRunningAggregateFunctionFactory(expr, inputTypeEnv, + inputSchemas, context); + } + + int nestedAggOutSchemaSize = 0; + WindowAggregatorDescriptorFactory nestedAggFactory = null; + if (winOp.hasNestedPlans()) { + int opSchemaSizePreSubplans = opSchema.getSize(); + AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], winOp, opSchema, context); + nestedAggOutSchemaSize = opSchema.getSize() - opSchemaSizePreSubplans; + nestedAggFactory = new WindowAggregatorDescriptorFactory(subplans); + nestedAggFactory.setSourceLocation(winOp.getSourceLocation()); + } + + AbstractWindowRuntimeFactory runtime = createRuntimeFactory(winOp, partitionColumnsList, + partitionComparatorFactories, orderComparatorFactories, frameValueExprEvalsAndComparators.first, + frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartValidationExprEvals, + frameEndExprEvals, frameEndValidationExprEvals, frameExcludeExprEvalsAndComparators.first, + frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval, projectionColumnsExcludingSubplans, + runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, context); + runtime.setSourceLocation(winOp.getSourceLocation()); + + // contribute one Asterix framewriter + RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(opTypeEnv, opSchema, context); + builder.contributeMicroOperator(winOp, runtime, recDesc); + // and contribute one edge from its child + ILogicalOperator src = winOp.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src, 0, winOp, 0); + } + + protected abstract AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, + int[] partitionColumnsList, IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals, + IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartExprEvals, + IScalarEvaluatorFactory[] frameStartValidationExprEvals, IScalarEvaluatorFactory[] frameEndExprEvals, + IScalarEvaluatorFactory[] frameEndValidationExprEvals, IScalarEvaluatorFactory[] frameExcludeExprEvals, + IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetExprEval, + int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns, + IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, + WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext context); + + @Override + public boolean isMicroOperator() { + return true; + } + + @Override + public boolean expensiveThanMaterialization() { + return true; + } + + private IScalarEvaluatorFactory[] createEvaluatorFactories(List<Mutable<ILogicalExpression>> exprList, + IOperatorSchema[] inputSchemas, IVariableTypeEnvironment inputTypeEnv, + IExpressionRuntimeProvider exprRuntimeProvider, JobGenContext context) throws AlgebricksException { + if (exprList.isEmpty()) { + return null; + } + int ln = exprList.size(); + IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln]; + for (int i = 0; i < ln; i++) { + ILogicalExpression expr = exprList.get(i).getValue(); + evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context); + } + return evals; + } + + private <T> Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> createEvaluatorAndComparatorFactories( + List<T> exprList, Function<T, Mutable<ILogicalExpression>> exprGetter, + Function<T, OrderOperator.IOrder> orderGetter, IOperatorSchema[] inputSchemas, + IVariableTypeEnvironment inputTypeEnv, IExpressionRuntimeProvider exprRuntimeProvider, + IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider, JobGenContext context) + throws AlgebricksException { + if (exprList.isEmpty()) { + return new Pair<>(null, null); + } + int ln = exprList.size(); + IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln]; + IBinaryComparatorFactory[] comparators = new IBinaryComparatorFactory[ln]; + for (int i = 0; i < ln; i++) { + T exprObj = exprList.get(i); + ILogicalExpression expr = exprGetter.apply(exprObj).getValue(); + OrderOperator.IOrder order = orderGetter.apply(exprObj); + evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context); + comparators[i] = binaryComparatorFactoryProvider.getBinaryComparatorFactory(inputTypeEnv.getType(expr), + order.getKind() == OrderOperator.IOrder.OrderKind.ASC); + } + return new Pair<>(evals, comparators); + } + + private static boolean containsAny(List<OrderColumn> ocList, int startIdx, Set<LogicalVariable> varSet) { + for (int i = startIdx, ln = ocList.size(); i < ln; i++) { + if (varSet.contains(ocList.get(i).getColumn())) { + return true; + } + } + return false; + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java index 8bd4610..23853e8 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java @@ -19,42 +19,13 @@ package org.apache.hyracks.algebricks.core.algebra.operators.physical; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.Set; -import java.util.function.Function; -import org.apache.commons.lang3.mutable.Mutable; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.ListSet; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; -import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; -import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; -import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; -import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; -import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; -import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; -import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; -import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; -import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; -import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty; import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; -import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; -import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; -import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; -import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; -import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; -import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory; @@ -63,18 +34,9 @@ import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRunningRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.win.WindowNestedPlansUnboundedRuntimeFactory; -import org.apache.hyracks.algebricks.runtime.operators.win.WindowSimpleRuntimeFactory; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.ErrorCode; -public class WindowPOperator extends AbstractPhysicalOperator { - - private final List<LogicalVariable> partitionColumns; - - private final boolean partitionMaterialization; - - private final List<OrderColumn> orderColumns; +public final class WindowPOperator extends AbstractWindowPOperator { private final boolean frameStartIsMonotonic; @@ -85,12 +47,10 @@ // The maximum number of in-memory frames that this operator can use. private final int memSizeInFrames; - public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization, - List<OrderColumn> orderColumns, boolean frameStartIsMonotonic, boolean frameEndIsMonotonic, - boolean nestedTrivialAggregates, int memSizeInFrames) { - this.partitionColumns = partitionColumns; - this.partitionMaterialization = partitionMaterialization; - this.orderColumns = orderColumns; + public WindowPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns, + boolean frameStartIsMonotonic, boolean frameEndIsMonotonic, boolean nestedTrivialAggregates, + int memSizeInFrames) { + super(partitionColumns, orderColumns); this.frameStartIsMonotonic = frameStartIsMonotonic; this.frameEndIsMonotonic = frameEndIsMonotonic; this.nestedTrivialAggregates = nestedTrivialAggregates; @@ -103,245 +63,55 @@ } @Override - public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException { - IPartitioningProperty pp; - switch (op.getExecutionMode()) { - case PARTITIONED: - pp = new UnorderedPartitionedProperty(new ListSet<>(partitionColumns), - context.getComputationNodeDomain()); - break; - case UNPARTITIONED: - pp = IPartitioningProperty.UNPARTITIONED; - break; - case LOCAL: - pp = null; - break; - default: - throw new IllegalStateException(op.getExecutionMode().name()); - } + protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList, + IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals, + IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartExprEvals, + IScalarEvaluatorFactory[] frameStartValidationExprEvals, IScalarEvaluatorFactory[] frameEndExprEvals, + IScalarEvaluatorFactory[] frameEndValidationExprEvals, IScalarEvaluatorFactory[] frameExcludeExprEvals, + IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetExprEval, + int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns, + IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, + WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext context) { - // require local order property [pc1, ... pcN, oc1, ... ocN] - // accounting for cases where there's an overlap between order and partition columns - // TODO replace with required local grouping on partition columns + local order on order columns - List<OrderColumn> lopColumns = new ArrayList<>(); - ListSet<LogicalVariable> pcVars = new ListSet<>(); - pcVars.addAll(partitionColumns); - for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) { - OrderColumn oc = orderColumns.get(oIdx); - LogicalVariable ocVar = oc.getColumn(); - if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, pcVars)) { - throw new AlgebricksException(ErrorCode.HYRACKS, ErrorCode.UNSUPPORTED_WINDOW_SPEC, - op.getSourceLocation(), String.valueOf(partitionColumns), String.valueOf(orderColumns)); - } - lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder())); - } - int pIdx = 0; - for (LogicalVariable pColumn : pcVars) { - lopColumns.add(pIdx++, new OrderColumn(pColumn, OrderOperator.IOrder.OrderKind.ASC)); - } - List<ILocalStructuralProperty> localProps = - lopColumns.isEmpty() ? null : Collections.singletonList(new LocalOrderProperty(lopColumns)); - - return new PhysicalRequirements( - new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, localProps) }, - IPartitioningRequirementsCoordinator.NO_COORDINATION); - } - - @Override - public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { - AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); - deliveredProperties = op2.getDeliveredPhysicalProperties().clone(); - } - - @Override - public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, - IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) - throws AlgebricksException { - WindowOperator winOp = (WindowOperator) op; - - int[] partitionColumnsList = JobGenHelper.projectVariables(inputSchemas[0], partitionColumns); - - IVariableTypeEnvironment opTypeEnv = context.getTypeEnvironment(op); - IBinaryComparatorFactory[] partitionComparatorFactories = - JobGenHelper.variablesToAscBinaryComparatorFactories(partitionColumns, opTypeEnv, context); - - //TODO not all functions need order comparators - IBinaryComparatorFactory[] orderComparatorFactories = - JobGenHelper.variablesToBinaryComparatorFactories(orderColumns, opTypeEnv, context); - - IVariableTypeEnvironment inputTypeEnv = context.getTypeEnvironment(op.getInputs().get(0).getValue()); - IExpressionRuntimeProvider exprRuntimeProvider = context.getExpressionRuntimeProvider(); - IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider = context.getBinaryComparatorFactoryProvider(); - - List<Mutable<ILogicalExpression>> frameStartExprList = winOp.getFrameStartExpressions(); - IScalarEvaluatorFactory[] frameStartExprEvals = - createEvaluatorFactories(frameStartExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context); - - List<Mutable<ILogicalExpression>> frameStartValidationExprList = winOp.getFrameStartValidationExpressions(); - IScalarEvaluatorFactory[] frameStartValidationExprEvals = createEvaluatorFactories(frameStartValidationExprList, - inputSchemas, inputTypeEnv, exprRuntimeProvider, context); - - List<Mutable<ILogicalExpression>> frameEndExprList = winOp.getFrameEndExpressions(); - IScalarEvaluatorFactory[] frameEndExprEvals = - createEvaluatorFactories(frameEndExprList, inputSchemas, inputTypeEnv, exprRuntimeProvider, context); - - List<Mutable<ILogicalExpression>> frameEndValidationExprList = winOp.getFrameEndValidationExpressions(); - IScalarEvaluatorFactory[] frameEndValidationExprEvals = createEvaluatorFactories(frameEndValidationExprList, - inputSchemas, inputTypeEnv, exprRuntimeProvider, context); - - List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> frameValueExprList = - winOp.getFrameValueExpressions(); - Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameValueExprEvalsAndComparators = - createEvaluatorAndComparatorFactories(frameValueExprList, Pair::getSecond, Pair::getFirst, inputSchemas, - inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context); - - List<Mutable<ILogicalExpression>> frameExcludeExprList = winOp.getFrameExcludeExpressions(); - Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> frameExcludeExprEvalsAndComparators = - createEvaluatorAndComparatorFactories(frameExcludeExprList, v -> v, v -> OrderOperator.ASC_ORDER, - inputSchemas, inputTypeEnv, exprRuntimeProvider, binaryComparatorFactoryProvider, context); - - IScalarEvaluatorFactory frameOffsetExprEval = null; - ILogicalExpression frameOffsetExpr = winOp.getFrameOffset().getValue(); - if (frameOffsetExpr != null) { - frameOffsetExprEval = - exprRuntimeProvider.createEvaluatorFactory(frameOffsetExpr, inputTypeEnv, inputSchemas, context); - } - - int[] projectionColumnsExcludingSubplans = JobGenHelper.projectAllVariables(opSchema); - - int[] runningAggOutColumns = JobGenHelper.projectVariables(opSchema, winOp.getVariables()); - - List<Mutable<ILogicalExpression>> runningAggExprs = winOp.getExpressions(); - int runningAggExprCount = runningAggExprs.size(); - IRunningAggregateEvaluatorFactory[] runningAggFactories = - new IRunningAggregateEvaluatorFactory[runningAggExprCount]; - for (int i = 0; i < runningAggExprCount; i++) { - StatefulFunctionCallExpression expr = (StatefulFunctionCallExpression) runningAggExprs.get(i).getValue(); - runningAggFactories[i] = exprRuntimeProvider.createRunningAggregateFunctionFactory(expr, inputTypeEnv, - inputSchemas, context); - } - - AbstractWindowRuntimeFactory runtime = null; - if (winOp.hasNestedPlans()) { - int opSchemaSizePreSubplans = opSchema.getSize(); - AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], winOp, opSchema, context); - int aggregatorOutputSchemaSize = opSchema.getSize() - opSchemaSizePreSubplans; - WindowAggregatorDescriptorFactory nestedAggFactory = new WindowAggregatorDescriptorFactory(subplans); - nestedAggFactory.setSourceLocation(winOp.getSourceLocation()); - - int frameMaxObjects = winOp.getFrameMaxObjects(); - - // special cases - if (frameStartExprList.isEmpty() && frameExcludeExprList.isEmpty() && frameOffsetExpr == null) { - if (frameEndExprList.isEmpty()) { - // special case #1: frame == whole partition, no exclusions, no offset - runtime = new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList, - partitionComparatorFactories, orderComparatorFactories, frameMaxObjects, - projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, - aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames); - } else if (frameEndIsMonotonic && nestedTrivialAggregates) { - // special case #2: accumulating frame from beginning of the partition, no exclusions, no offset, - // trivial aggregate subplan ( aggregate + nts ) - nestedAggFactory.setPartialOutputEnabled(true); - runtime = new WindowNestedPlansRunningRuntimeFactory(partitionColumnsList, - partitionComparatorFactories, orderComparatorFactories, - frameValueExprEvalsAndComparators.first, frameValueExprEvalsAndComparators.second, - frameEndExprEvals, frameEndValidationExprEvals, frameMaxObjects, - context.getBinaryBooleanInspectorFactory(), projectionColumnsExcludingSubplans, - runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory, - memSizeInFrames); - } - } - // default case - if (runtime == null) { - runtime = new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories, - orderComparatorFactories, frameValueExprEvalsAndComparators.first, - frameValueExprEvalsAndComparators.second, frameStartExprEvals, frameStartValidationExprEvals, - frameStartIsMonotonic, frameEndExprEvals, frameEndValidationExprEvals, - frameExcludeExprEvalsAndComparators.first, winOp.getFrameExcludeNegationStartIdx(), - frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval, frameMaxObjects, - context.getBinaryBooleanInspectorFactory(), context.getBinaryIntegerInspectorFactory(), - projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, - aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames); - } - } else if (partitionMaterialization) { - runtime = new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories, + // special cases + if (!winOp.hasNestedPlans()) { + return new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories, orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, memSizeInFrames); - } else { - runtime = new WindowSimpleRuntimeFactory(partitionColumnsList, partitionComparatorFactories, - orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns, - runningAggFactories); } - runtime.setSourceLocation(winOp.getSourceLocation()); - // contribute one Asterix framewriter - RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(opTypeEnv, opSchema, context); - builder.contributeMicroOperator(winOp, runtime, recDesc); - // and contribute one edge from its child - ILogicalOperator src = winOp.getInputs().get(0).getValue(); - builder.contributeGraphEdge(src, 0, winOp, 0); - } - - @Override - public boolean isMicroOperator() { - return true; - } - - @Override - public boolean expensiveThanMaterialization() { - return true; - } - - public boolean isPartitionMaterialization() { - return partitionMaterialization; - } - - private IScalarEvaluatorFactory[] createEvaluatorFactories(List<Mutable<ILogicalExpression>> exprList, - IOperatorSchema[] inputSchemas, IVariableTypeEnvironment inputTypeEnv, - IExpressionRuntimeProvider exprRuntimeProvider, JobGenContext context) throws AlgebricksException { - if (exprList.isEmpty()) { - return null; - } - int ln = exprList.size(); - IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln]; - for (int i = 0; i < ln; i++) { - ILogicalExpression expr = exprList.get(i).getValue(); - evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context); - } - return evals; - } - - private <T> Pair<IScalarEvaluatorFactory[], IBinaryComparatorFactory[]> createEvaluatorAndComparatorFactories( - List<T> exprList, Function<T, Mutable<ILogicalExpression>> exprGetter, - Function<T, OrderOperator.IOrder> orderGetter, IOperatorSchema[] inputSchemas, - IVariableTypeEnvironment inputTypeEnv, IExpressionRuntimeProvider exprRuntimeProvider, - IBinaryComparatorFactoryProvider binaryComparatorFactoryProvider, JobGenContext context) - throws AlgebricksException { - if (exprList.isEmpty()) { - return new Pair<>(null, null); - } - int ln = exprList.size(); - IScalarEvaluatorFactory[] evals = new IScalarEvaluatorFactory[ln]; - IBinaryComparatorFactory[] comparators = new IBinaryComparatorFactory[ln]; - for (int i = 0; i < ln; i++) { - T exprObj = exprList.get(i); - ILogicalExpression expr = exprGetter.apply(exprObj).getValue(); - OrderOperator.IOrder order = orderGetter.apply(exprObj); - evals[i] = exprRuntimeProvider.createEvaluatorFactory(expr, inputTypeEnv, inputSchemas, context); - comparators[i] = binaryComparatorFactoryProvider.getBinaryComparatorFactory(inputTypeEnv.getType(expr), - order.getKind() == OrderOperator.IOrder.OrderKind.ASC); - } - return new Pair<>(evals, comparators); - } - - private boolean containsAny(List<OrderColumn> ocList, int startIdx, Set<LogicalVariable> varSet) { - for (int i = startIdx, ln = ocList.size(); i < ln; i++) { - if (varSet.contains(ocList.get(i).getColumn())) { - return true; + boolean hasFrameStart = frameStartExprEvals != null && frameStartExprEvals.length > 0; + boolean hasFrameEnd = frameEndExprEvals != null && frameEndExprEvals.length > 0; + boolean hasFrameExclude = frameExcludeExprEvals != null && frameExcludeExprEvals.length > 0; + boolean hasFrameOffset = frameOffsetExprEval != null; + if (!hasFrameStart && !hasFrameExclude && !hasFrameOffset) { + if (!hasFrameEnd) { + // special case #1: frame == whole partition, no exclusions, no offset + return new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList, partitionComparatorFactories, + orderComparatorFactories, winOp.getFrameMaxObjects(), projectionColumnsExcludingSubplans, + runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, + memSizeInFrames); + } else if (frameEndIsMonotonic && nestedTrivialAggregates) { + // special case #2: accumulating frame from beginning of the partition, no exclusions, no offset, + // trivial aggregate subplan ( aggregate + nts ) + nestedAggFactory.setPartialOutputEnabled(true); + return new WindowNestedPlansRunningRuntimeFactory(partitionColumnsList, partitionComparatorFactories, + orderComparatorFactories, frameValueExprEvals, frameValueComparatorFactories, frameEndExprEvals, + frameEndValidationExprEvals, winOp.getFrameMaxObjects(), + context.getBinaryBooleanInspectorFactory(), projectionColumnsExcludingSubplans, + runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, + memSizeInFrames); } } - return false; + + // default case + return new WindowNestedPlansRuntimeFactory(partitionColumnsList, partitionComparatorFactories, + orderComparatorFactories, frameValueExprEvals, frameValueComparatorFactories, frameStartExprEvals, + frameStartValidationExprEvals, frameStartIsMonotonic, frameEndExprEvals, frameEndValidationExprEvals, + frameExcludeExprEvals, winOp.getFrameExcludeNegationStartIdx(), frameExcludeComparatorFactories, + frameOffsetExprEval, winOp.getFrameMaxObjects(), context.getBinaryBooleanInspectorFactory(), + context.getBinaryIntegerInspectorFactory(), projectionColumnsExcludingSubplans, runningAggOutColumns, + runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, memSizeInFrames); } } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java new file mode 100644 index 0000000..33b47ec --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.List; + +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; +import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.win.WindowAggregatorDescriptorFactory; +import org.apache.hyracks.algebricks.runtime.operators.win.WindowStreamRuntimeFactory; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; + +public final class WindowStreamPOperator extends AbstractWindowPOperator { + + public WindowStreamPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns) { + super(partitionColumns, orderColumns); + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.WINDOW_STREAM; + } + + @Override + protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList, + IBinaryComparatorFactory[] partitionComparatorFactories, + IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals, + IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartExprEvals, + IScalarEvaluatorFactory[] frameStartValidationExprEvals, IScalarEvaluatorFactory[] frameEndExprEvals, + IScalarEvaluatorFactory[] frameEndValidationExprEvals, IScalarEvaluatorFactory[] frameExcludeExprEvals, + IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetExprEval, + int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns, + IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, + WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext context) { + return new WindowStreamRuntimeFactory(partitionColumnsList, partitionComparatorFactories, + orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns, + runningAggFactories); + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java index f127898..e6cdc28 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java @@ -79,6 +79,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.BulkloadPOperator; @@ -468,8 +469,8 @@ return createWindowPOperator(op); } - protected WindowPOperator createWindowPOperator(WindowOperator op) throws AlgebricksException { - return new WindowPOperator(op.getPartitionVarList(), true, op.getOrderColumnList(), false, false, false, + protected AbstractWindowPOperator createWindowPOperator(WindowOperator op) throws AlgebricksException { + return new WindowPOperator(op.getPartitionVarList(), op.getOrderColumnList(), false, false, false, context.getPhysicalOptimizationConfig().getMaxFramesForWindow()); } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java similarity index 94% rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java index f7f1a25..d23d4e7 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamPushRuntime.java @@ -30,9 +30,9 @@ /** * Runtime for window operators that evaluates running aggregates without partition materialization. */ -class WindowSimplePushRuntime extends AbstractWindowPushRuntime { +class WindowStreamPushRuntime extends AbstractWindowPushRuntime { - WindowSimplePushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, + WindowStreamPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx, SourceLocation sourceLoc) { diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java similarity index 82% rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java index 2d1cdde..be368a9 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowStreamRuntimeFactory.java @@ -27,13 +27,14 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; /** - * Runtime factory for window operators that evaluates running aggregates without partition materialization. + * Runtime factory for window operators that evaluates running aggregates in a streaming fashion + * (without partition materialization). */ -public class WindowSimpleRuntimeFactory extends AbstractWindowRuntimeFactory { +public class WindowStreamRuntimeFactory extends AbstractWindowRuntimeFactory { private static final long serialVersionUID = 1L; - public WindowSimpleRuntimeFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, + public WindowStreamRuntimeFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, @@ -42,13 +43,13 @@ @Override public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) { - return new WindowSimplePushRuntime(partitionColumns, partitionComparatorFactories, orderComparatorFactories, + return new WindowStreamPushRuntime(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionList, runningAggOutColumns, runningAggFactories, ctx, sourceLoc); } @Override public String toString() { - return "window (" + Arrays.toString(partitionColumns) + ") " + Arrays.toString(runningAggOutColumns) + " := " - + Arrays.toString(runningAggFactories); + return "window-stream (" + Arrays.toString(partitionColumns) + ") " + Arrays.toString(runningAggOutColumns) + + " := " + Arrays.toString(runningAggFactories); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/3369 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-MessageType: merged Gerrit-Change-Id: I3863fa3d298aef53d4098be9fc17b0451eb2c23e Gerrit-Change-Number: 3369 Gerrit-PatchSet: 3 Gerrit-Owner: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose (1000171) Gerrit-Reviewer: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
