Till Westmann has submitted this change and it was merged. Change subject: [ASTERIXDB-2244][RT] Implement micro union-all operator ......................................................................
[ASTERIXDB-2244][RT] Implement micro union-all operator - user model changes: no - storage format changes: no - interface changes: no Details: - implement support for binary micro operators in subplans - implement micro union-all operator - fix free variables visitor Change-Id: I11be926f175889978c144dd4483ec565d3d86e2d Reviewed-on: https://asterix-gerrit.ics.uci.edu/2277 Reviewed-by: Till Westmann <[email protected]> Contrib: Till Westmann <[email protected]> Integration-Tests: Till Westmann <[email protected]> Tested-by: Till Westmann <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties M asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java A hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java A hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java M hyracks-fullstack/algebricks/algebricks-runtime/pom.xml M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java A hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java 42 files changed, 854 insertions(+), 273 deletions(-) Approvals: Till Westmann: Looks good to me, approved; Verified; No violations found; Verified diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java index 464476b..4afccb0 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java @@ -80,20 +80,20 @@ return false; } - computeDefaultPhysicalOp(op, context); + computeDefaultPhysicalOp(op, true, context); context.addToDontApplySet(this, op); return true; } - private static void setPhysicalOperators(ILogicalPlan plan, IOptimizationContext context) + private static void setPhysicalOperators(ILogicalPlan plan, boolean topLevelOp, IOptimizationContext context) throws AlgebricksException { for (Mutable<ILogicalOperator> root : plan.getRoots()) { - computeDefaultPhysicalOp((AbstractLogicalOperator) root.getValue(), context); + computeDefaultPhysicalOp((AbstractLogicalOperator) root.getValue(), topLevelOp, context); } } - private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, IOptimizationContext context) - throws AlgebricksException { + private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, boolean topLevelOp, + IOptimizationContext context) throws AlgebricksException { PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig(); if (op.getOperatorTag().equals(LogicalOperatorTag.GROUP)) { GroupByOperator gby = (GroupByOperator) op; @@ -207,11 +207,11 @@ if (op.getPhysicalOperator() == null) { switch (op.getOperatorTag()) { case INNERJOIN: { - JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context); + JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp, context); break; } case LEFTOUTERJOIN: { - JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, context); + JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, topLevelOp, context); break; } case UNNEST_MAP: @@ -277,11 +277,11 @@ if (op.hasNestedPlans()) { AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op; for (ILogicalPlan p : nested.getNestedPlans()) { - setPhysicalOperators(p, context); + setPhysicalOperators(p, false, context); } } for (Mutable<ILogicalOperator> opRef : op.getInputs()) { - computeDefaultPhysicalOp((AbstractLogicalOperator) opRef.getValue(), context); + computeDefaultPhysicalOp((AbstractLogicalOperator) opRef.getValue(), topLevelOp, context); } } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java index a9cd806..3edccec 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java @@ -19,6 +19,7 @@ package org.apache.asterix.optimizer.rules.subplan; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -56,21 +57,21 @@ import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; -import com.google.common.collect.ImmutableSet; - /* -This rule is to remove SubplanOperators containing DataScan, InnerJoin, LeftOuterJoin, UnionAll or Distinct. Given a qualified Subplan operator called S1, -Let's call its input operator O1. +This rule is to remove SubplanOperators containing DataScan, InnerJoin, LeftOuterJoin. +Given a qualified Subplan operator called S1, Let's call its input operator O1. General Cases We have the following rewritings for general cases: R1. Replace all NestedTupleSourceOperators in S1 with deep-copies (with new variables) of the query plan rooted at O1; -R2. Add a LeftOuterOperatorJoinOperator (let's call it LJ) between O1 and the SubplanOperator's root operator's input (let's call it SO1), - where O1 is the left branch and SO1 is the right branch; -R3. The deep copy of the primary key variables in O1 should be preserved from an inlined NestedTupleSourceOperator to SO1. - The join condition of LJ is the equality between the primary key variables in O1 and its deep copied version at SO1; +R2. Add a LeftOuterOperatorJoinOperator (let's call it LJ) between O1 and the SubplanOperator's root operator's input + (let's call it SO1),where O1 is the left branch and SO1 is the right branch; +R3. The deep copy of the primary key variables in O1 should be preserved from an inlined NestedTupleSourceOperator + to SO1. The join condition of LJ is the equality between the primary key variables in O1 and its deep copied + version at SO1; R4. A variable v indicating non-match tuples is assigned to TRUE between LJ and SO1; -R5. On top of the LJ, add a GroupByOperaptor in which the nested plan consists of the S1's root operator, i.e., an aggregate operator. +R5. On top of the LJ, add a GroupByOperaptor in which the nested plan consists of the S1's root operator, + i.e., an aggregate operator. Below the aggregate, there is a not-null-filter on variable v. The group key is the primary key variables in O1. This is an abstract example for the rewriting mechanism described above: @@ -102,10 +103,12 @@ ..... --Deepcopy_The_Plan_Rooted_At_InputOp_With_New_Variables(InputOp) -In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp, while v_rc_1, ..., v_rc_n are their corresponding variables populated from the deepcopy of InputOp. -"Covering" variables form a set of variables that can imply all live variables. v_l1, ....v_ln in the decoration part of the added group-by operator are all live variables -at InputOp except the covering variables v_lc_1, ..., v_lc_n. In the current implementation, we use "covering" variables as primary key variables. In the next version, we -will use the real primary key variables, which will fix ASTERIXDB-1168. +In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp, while v_rc_1, ..., v_rc_n are their +corresponding variables populated from the deepcopy of InputOp. +"Covering" variables form a set of variables that can imply all live variables. v_l1, ....v_ln in the decoration part +of the added group-by operator are all live variables at InputOp except the covering variables v_lc_1, ..., v_lc_n. +In the current implementation, we use "covering" variables as primary key variables. +In the next version, we will use the real primary key variables, which will fix ASTERIXDB-1168. Here is a concrete example of the general case rewriting (optimizerts/queries/nested_loj4.aql). Before plan: @@ -157,14 +160,19 @@ c. if J1 is a left outer join, the left branch of J1 has a NestedTupleSource descendant (let's call it N1), d. there is no tuple dropping from N1 to J1 -Rewriting R2 is not necessary since before J1, all tuples from N1 are preserved. But the following rewritings are needed: +Rewriting R2 is not necessary since before J1, all tuples from N1 are preserved. But the following rewritings are +needed: R1'. Replace N1 by the O1 (no additional deep copy); R2'. All inner joins on the path from N1 to J1 (including J1) become left-outer joins with the same join conditions; -R3'. If N1 resides in the right branch of an inner join (let's call it J2) in the path from N1 to J1, switch the left and right branches of J2; -R4'. For every left join from N1 to J1 transformed from an inner join, a variable vi indicating non-match tuples is assigned to TRUE in its right branch; -R5'. On top of J1, a GroupByOperaptor G1 is added where the group-by key is the primary key of O1 and the nested query plan for aggregation is the nested pipeline - on top of J1 with an added not-null-filter to check all vi are not null. -R6'. All other NestedTupleSourceOperators in the subplan is inlined with deep copies (with new variables) of the query plan rooted at O1. +R3'. If N1 resides in the right branch of an inner join (let's call it J2) in the path from N1 to J1, + switch the left and right branches of J2; +R4'. For every left join from N1 to J1 transformed from an inner join, a variable vi indicating non-match tuples + is assigned to TRUE in its right branch; +R5'. On top of J1, a GroupByOperaptor G1 is added where the group-by key is the primary key of O1 and + the nested query plan for aggregation is the nested pipeline on top of J1 with an added not-null-filter + to check all vi are not null. +R6'. All other NestedTupleSourceOperators in the subplan is inlined with deep copies (with new variables) + of the query plan rooted at O1. This is an abstract example for the special rewriting mechanism described above: Before rewriting: @@ -197,9 +205,10 @@ – Assign v_new=TRUE – ..... (L1) -In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp and v_l1, ....v_ln in the decoration part of the added group-by operator are all live variables -at InputOp except the covering variables v_lc_1, ..., v_lc_n. In the current implementation, we use "covering" variables as primary key variables. In the next version, -we will use the real primary key variables, which will fix ASTERIXDB-1168. +In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp and v_l1, ....v_ln in the decoration part +of the added group-by operator are all live variables at InputOp except the covering variables v_lc_1, ..., v_lc_n. +In the current implementation, we use "covering" variables as primary key variables. +In the next version, we will use the real primary key variables, which will fix ASTERIXDB-1168. Here is a concrete example (optimizerts/queries/nested_loj2.aql). . Before plan: @@ -343,10 +352,8 @@ private Pair<Boolean, LinkedHashMap<LogicalVariable, LogicalVariable>> applyGeneralFlattening( Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { SubplanOperator subplanOp = (SubplanOperator) opRef.getValue(); - if (!SubplanFlatteningUtil.containsOperators(subplanOp, - ImmutableSet.of(LogicalOperatorTag.DATASOURCESCAN, LogicalOperatorTag.INNERJOIN, - // We don't have nested runtime for union-all and distinct hence we have to include them here. - LogicalOperatorTag.LEFTOUTERJOIN, LogicalOperatorTag.UNIONALL, LogicalOperatorTag.DISTINCT))) { + if (!SubplanFlatteningUtil.containsOperators(subplanOp, EnumSet.of(LogicalOperatorTag.DATASOURCESCAN, + LogicalOperatorTag.INNERJOIN, LogicalOperatorTag.LEFTOUTERJOIN))) { return new Pair<>(false, new LinkedHashMap<>()); } Mutable<ILogicalOperator> inputOpRef = subplanOp.getInputs().get(0); @@ -402,10 +409,8 @@ context.computeAndSetTypeEnvironmentForOperator(leftOuterJoinOp); // Creates group-by operator. - List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = - new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>(); - List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = - new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>(); + List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<>(); + List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<>(); List<ILogicalPlan> nestedPlans = new ArrayList<>(); GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans); @@ -435,8 +440,8 @@ lowestAggregateRefInSubplan.getValue().getInputs().add(currentOpRef); } - // Adds a select operator into the nested plan for group-by to remove tuples with NULL on {@code assignVar}, i.e., - // subplan input tuples that are filtered out within a subplan. + // Adds a select operator into the nested plan for group-by to remove tuples with NULL on {@code assignVar}, + // i.e., subplan input tuples that are filtered out within a subplan. Mutable<ILogicalExpression> filterVarExpr = new MutableObject<>(new VariableReferenceExpression(assignVar)); List<Mutable<ILogicalExpression>> args = new ArrayList<>(); args.add(filterVarExpr); @@ -500,10 +505,8 @@ Mutable<ILogicalOperator> topJoinRef = notNullVarsAndTopJoinRef.second; // Creates a group-by operator. - List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = - new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>(); - List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = - new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>(); + List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<>(); + List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<>(); GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, subplanOp.getNestedPlans()); for (LogicalVariable coverVar : primaryKeyVars) { @@ -521,8 +524,8 @@ groupbyOp.getInputs().add(new MutableObject<>(topJoinRef.getValue())); if (!notNullVars.isEmpty()) { - // Adds a select operator into the nested plan for group-by to remove tuples with NULL on {@code assignVar}, i.e., - // subplan input tuples that are filtered out within a subplan. + // Adds a select operator into the nested plan for group-by to remove tuples with NULL on {@code assignVar}, + // i.e., subplan input tuples that are filtered out within a subplan. List<Mutable<ILogicalExpression>> nullCheckExprRefs = new ArrayList<>(); for (LogicalVariable notNullVar : notNullVars) { Mutable<ILogicalExpression> filterVarExpr = diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java index dba5d47..894097d 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.optimizer.rules.subplan; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -208,7 +209,7 @@ if (!OperatorManipulationUtil.ancestorOfOperators( subplanOp.getNestedPlans().get(0).getRoots().get(0).getValue(), // we don't need to check recursively for this special rewriting. - ImmutableSet.of(LogicalOperatorTag.INNERJOIN, LogicalOperatorTag.LEFTOUTERJOIN))) { + EnumSet.of(LogicalOperatorTag.INNERJOIN, LogicalOperatorTag.LEFTOUTERJOIN))) { return new Pair<Boolean, ILogicalOperator>(false, null); } SubplanSpecialFlatteningCheckVisitor visitor = new SubplanSpecialFlatteningCheckVisitor(); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index 69ce2ea..4d26d25 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -235,7 +235,7 @@ } IPushRuntime assignOp = new AssignRuntimeFactory(outColumns, secondaryFieldAccessEvalFactories, projectionList, true) - .createPushRuntime(ctx); + .createPushRuntime(ctx)[0]; insertOp.setOutputFrameWriter(0, assignOp, primaryIndexInfo.rDesc); assignOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc); SecondaryIndexInfo secondaryIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, secondaryIndex); @@ -270,7 +270,7 @@ NoMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators, StorageComponentProvider storageComponentProvider) throws HyracksDataException, AlgebricksException { - IPushRuntime emptyTupleOp = new EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx); + IPushRuntime emptyTupleOp = new EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx)[0]; JobSpecification spec = new JobSpecification(); PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp new file mode 100644 index 0000000..c1ce4a3 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse test if exists; +create dataverse test; +use test; + +create type TType as open +{ id: bigint }; + +create dataset TData (TType) primary key id; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp new file mode 100644 index 0000000..3ce9554 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; + +insert into TData ( [ +{'id':1, 'x':1, 'f':19}, +{'id':2, 'x':2, 'f':12}, +{'id':3, 'x':1, 'f':10}, +{'id':4, 'x':2, 'f':17}, +{'id':5, 'x':1, 'f':12} +]); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp new file mode 100644 index 0000000..b825157 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; + +select x, +array_sum(( + select value a.f + from g as p + union all + select value a.f + from g as w +)) s +from TData as a +group by a.x as x group as g +order by x +; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm new file mode 100644 index 0000000..5071183 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm @@ -0,0 +1,2 @@ +{ "x": 1, "s": 82 } +{ "x": 2, "s": 58 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index 16fa334..3a535cc 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -6430,6 +6430,11 @@ </compilation-unit> </test-case> <test-case FilePath="subquery"> + <compilation-unit name="non_unary_subplan_01"> + <output-dir compare="Text">non_unary_subplan_01</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="subquery"> <compilation-unit name="query-ASTERIXDB-1571"> <output-dir compare="Text">query-ASTERIXDB-1571</output-dir> </compilation-unit> diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 3ad7fd5..8341a33 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -125,7 +125,7 @@ 1060 = Field \"%1$s\" in the with clause must be of type %2$s 1061 = Field \"%1$s\" in the with clause must contain sub field \"%2$s\" 1062 = Merge policy parameters cannot be of type %1$s -1063 = There is no dataverse with name %1$s +1063 = There is no dataverse with name \"%1$s\" # Feed Errors 3001 = Illegal state. diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java index 1a54b92..b754533 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java +++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java @@ -410,7 +410,7 @@ public Void visit(IndexAccessor ia, Collection<VariableExpr> freeVars) throws CompilationException { ia.getExpr().accept(this, freeVars); if (ia.getIndexExpr() != null) { - ia.getIndexExpr(); + ia.getIndexExpr().accept(this, freeVars); } return null; } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java index 445ad4a..bbfde38 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java @@ -51,10 +51,10 @@ } @Override - public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); - return new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(datasetId), datasetId, - primaryKeyFields, isWriteTransaction, - datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink); + return new IPushRuntime[] { new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(datasetId), + datasetId, primaryKeyFields, isWriteTransaction, + datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink) }; } } diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java index 79b8f38..2d6123e 100644 --- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java +++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java @@ -104,7 +104,7 @@ clusterLocations); PlanCompiler pc = new PlanCompiler(context); - return pc.compilePlan(plan, null, jobEventListenerFactory); + return pc.compilePlan(plan, jobEventListenerFactory); } }; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java index 75b63f1..db9728b 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java @@ -44,6 +44,7 @@ MATERIALIZE, MICRO_PRE_CLUSTERED_GROUP_BY, MICRO_PRE_SORTED_DISTINCT_BY, + MICRO_UNION_ALL, NESTED_LOOP, NESTED_TUPLE_SOURCE, ONE_TO_ONE_EXCHANGE, diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java index 43cde22..29d6037 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.algebricks.core.algebra.operators.physical; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint; @@ -97,7 +99,7 @@ /** * @return labels (0 or 1) for each input and output indicating the dependency between them. - * The edges labeled as 1 must wait for the edges with label 0. + * The edges labeled as 1 must wait for the edges with label 0. */ @Override public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) { @@ -118,47 +120,61 @@ protected AlgebricksPipeline[] compileSubplans(IOperatorSchema outerPlanSchema, AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException { - AlgebricksPipeline[] subplans = new AlgebricksPipeline[npOp.getNestedPlans().size()]; + List<List<AlgebricksPipeline>> subplans = compileSubplansImpl(outerPlanSchema, npOp, opSchema, context); + int n = subplans.size(); + AlgebricksPipeline[] result = new AlgebricksPipeline[n]; + for (int i = 0; i < n; i++) { + List<AlgebricksPipeline> subplanOps = subplans.get(i); + if (subplanOps.size() != 1) { + throw new AlgebricksException("Attempting to construct a nested plan with " + subplanOps.size() + + " operator descriptors. Currently, nested plans can only consist in linear pipelines of " + + "micro operators."); + } + result[i] = subplanOps.get(0); + } + return result; + } + + protected List<List<AlgebricksPipeline>> compileSubplansImpl(IOperatorSchema outerPlanSchema, + AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, JobGenContext context) + throws AlgebricksException { + List<List<AlgebricksPipeline>> subplans = new ArrayList<>(npOp.getNestedPlans().size()); PlanCompiler pc = new PlanCompiler(context); - int i = 0; for (ILogicalPlan p : npOp.getNestedPlans()) { - subplans[i++] = buildPipelineWithProjection(p, outerPlanSchema, npOp, opSchema, pc); + subplans.add(buildPipelineWithProjection(p, outerPlanSchema, npOp, opSchema, pc)); } return subplans; } - private AlgebricksPipeline buildPipelineWithProjection(ILogicalPlan p, IOperatorSchema outerPlanSchema, + private List<AlgebricksPipeline> buildPipelineWithProjection(ILogicalPlan p, IOperatorSchema outerPlanSchema, AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, PlanCompiler pc) throws AlgebricksException { if (p.getRoots().size() > 1) { throw new NotImplementedException("Nested plans with several roots are not supported."); } - JobSpecification nestedJob = pc.compilePlan(p, outerPlanSchema, null); + JobSpecification nestedJob = pc.compileNestedPlan(p, outerPlanSchema); ILogicalOperator topOpInSubplan = p.getRoots().get(0).getValue(); JobGenContext context = pc.getContext(); IOperatorSchema topOpInSubplanScm = context.getSchema(topOpInSubplan); opSchema.addAllVariables(topOpInSubplanScm); Map<OperatorDescriptorId, IOperatorDescriptor> opMap = nestedJob.getOperatorMap(); - if (opMap.size() != 1) { - throw new AlgebricksException("Attempting to construct a nested plan with " + opMap.size() - + " operator descriptors. Currently, nested plans can only consist in linear pipelines of Asterix micro operators."); - } - - for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> opEntry : opMap.entrySet()) { - IOperatorDescriptor opd = opEntry.getValue(); - if (!(opd instanceof AlgebricksMetaOperatorDescriptor)) { - throw new AlgebricksException( - "Can only generate Hyracks jobs for pipelinable Asterix nested plans, not for " - + opd.getClass().getName()); + List<? extends IOperatorDescriptor> metaOps = nestedJob.getMetaOps(); + if (opMap.size() != metaOps.size()) { + for (IOperatorDescriptor opd : opMap.values()) { + if (!(opd instanceof AlgebricksMetaOperatorDescriptor)) { + throw new AlgebricksException( + "Can only generate jobs for pipelinable nested plans, not for " + opd.getClass().getName()); + } } - AlgebricksMetaOperatorDescriptor amod = (AlgebricksMetaOperatorDescriptor) opd; - - return amod.getPipeline(); - // we suppose that the top operator in the subplan already does the - // projection for us + throw new IllegalStateException("Unexpected nested plan"); } - throw new IllegalStateException(); + List<AlgebricksPipeline> result = new ArrayList<>(metaOps.size()); + for (IOperatorDescriptor opd : metaOps) { + AlgebricksMetaOperatorDescriptor amod = (AlgebricksMetaOperatorDescriptor) opd; + result.add(amod.getPipeline()); + } + return result; } } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java new file mode 100644 index 0000000..a4d9576 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; + +public abstract class AbstractUnionAllPOperator extends AbstractPhysicalOperator { + + @Override + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + IPartitioningProperty pp = op2.getDeliveredPhysicalProperties().getPartitioningProperty(); + this.deliveredProperties = new StructuralPropertiesVector(pp, new ArrayList<>(0)); + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { + StructuralPropertiesVector pv0 = + OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new StructuralPropertiesVector( + new RandomPartitioningProperty(context.getComputationNodeDomain()), null)); + StructuralPropertiesVector pv1 = + OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new StructuralPropertiesVector( + new RandomPartitioningProperty(context.getComputationNodeDomain()), null)); + return new PhysicalRequirements(new StructuralPropertiesVector[] { pv0, pv1 }, + IPartitioningRequirementsCoordinator.NO_COORDINATION); + } + + @Override + public boolean expensiveThanMaterialization() { + return false; + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + List<Mutable<ILogicalOperator>> inputs = op.getInputs(); + for (int i = 0; i < inputs.size(); i++) { + ILogicalOperator src = inputs.get(i).getValue(); + builder.contributeGraphEdge(src, 0, op, i); + } + } +} \ No newline at end of file diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java new file mode 100644 index 0000000..f5e992e --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.runtime.operators.union.MicroUnionAllRuntimeFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; + +public class MicroUnionAllPOperator extends AbstractUnionAllPOperator { + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.MICRO_UNION_ALL; + } + + @Override + public boolean isMicroOperator() { + return true; + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + RecordDescriptor recordDescriptor = + JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); + + MicroUnionAllRuntimeFactory runtime = new MicroUnionAllRuntimeFactory(op.getInputs().size()); + builder.contributeMicroOperator(op, runtime, recordDescriptor); + + super.contributeRuntimeOperator(builder, context, op, opSchema, inputSchemas, outerPlanSchema); + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java index d43ddab..95efbac 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java @@ -89,18 +89,18 @@ if (subplan.getNestedPlans().size() != 1) { throw new NotImplementedException("Subplan currently works only for one nested plan with one root."); } - AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], subplan, opSchema, context); - assert subplans.length == 1; - AlgebricksPipeline np = subplans[0]; + List<List<AlgebricksPipeline>> subplans = compileSubplansImpl(inputSchemas[0], subplan, opSchema, context); + assert subplans.size() == 1; + List<AlgebricksPipeline> np = subplans.get(0); RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor( context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context); - IMissingWriterFactory[] missingWriterFactories = new IMissingWriterFactory[np.getOutputWidth()]; + IMissingWriterFactory[] missingWriterFactories = new IMissingWriterFactory[np.get(0).getOutputWidth()]; for (int i = 0; i < missingWriterFactories.length; i++) { missingWriterFactories[i] = context.getMissingWriterFactory(); } - SubplanRuntimeFactory runtime = new SubplanRuntimeFactory(np, missingWriterFactories, inputRecordDesc, null); - RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); + SubplanRuntimeFactory runtime = + new SubplanRuntimeFactory(np, missingWriterFactories, inputRecordDesc, recDesc, null); builder.contributeMicroOperator(subplan, runtime, recDesc); ILogicalOperator src = op.getInputs().get(0).getValue(); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java index a617064..4ccce92 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java @@ -18,29 +18,18 @@ */ package org.apache.hyracks.algebricks.core.algebra.operators.physical; -import java.util.ArrayList; - import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; -import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; -import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; -import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; -import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; -import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty; -import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; -import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.dataflow.std.union.UnionAllOperatorDescriptor; -public class UnionAllPOperator extends AbstractPhysicalOperator { +public class UnionAllPOperator extends AbstractUnionAllPOperator { @Override public PhysicalOperatorTag getOperatorTag() { @@ -53,48 +42,16 @@ } @Override - public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { - AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); - IPartitioningProperty pp = op2.getDeliveredPhysicalProperties().getPartitioningProperty(); - this.deliveredProperties = new StructuralPropertiesVector(pp, new ArrayList<>(0)); - } - - @Override - public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { - StructuralPropertiesVector pv0 = - OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new StructuralPropertiesVector( - new RandomPartitioningProperty(context.getComputationNodeDomain()), null)); - StructuralPropertiesVector pv1 = - OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op, new StructuralPropertiesVector( - new RandomPartitioningProperty(context.getComputationNodeDomain()), null)); - return new PhysicalRequirements(new StructuralPropertiesVector[] { pv0, pv1 }, - IPartitioningRequirementsCoordinator.NO_COORDINATION); - } - - @Override public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) throws AlgebricksException { - - IOperatorDescriptorRegistry spec = builder.getJobSpec(); RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); - // at algebricks level, union all only accepts two inputs, although at - // hyracks - // level, there is no restrictions - UnionAllOperatorDescriptor opDesc = new UnionAllOperatorDescriptor(spec, 2, recordDescriptor); + UnionAllOperatorDescriptor opDesc = + new UnionAllOperatorDescriptor(builder.getJobSpec(), op.getInputs().size(), recordDescriptor); contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc); - ILogicalOperator src1 = op.getInputs().get(0).getValue(); - builder.contributeGraphEdge(src1, 0, op, 0); - ILogicalOperator src2 = op.getInputs().get(1).getValue(); - builder.contributeGraphEdge(src2, 0, op, 1); - } - @Override - public boolean expensiveThanMaterialization() { - return false; + super.contributeRuntimeOperator(builder, context, op, opSchema, inputSchemas, outerPlanSchema); } - } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java index 249e66f..c574cd8 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java @@ -18,7 +18,10 @@ */ package org.apache.hyracks.algebricks.core.algebra.util; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; import java.util.List; import java.util.Map; import java.util.Set; @@ -202,7 +205,7 @@ public static Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> deepCopyWithNewVars( ILogicalOperator root, IOptimizationContext ctx) throws AlgebricksException { LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor = - new LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, null, true); + new LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, ctx, true); ILogicalOperator newRoot = deepCopyVisitor.deepCopy(root); return Pair.of(newRoot, deepCopyVisitor.getInputToOutputVariableMapping()); } @@ -327,4 +330,45 @@ return false; } + /** + * Returns all descendants of an operator that are leaf operators + * + * @param opRef given operator + * @return list containing all leaf descendants + */ + public static List<Mutable<ILogicalOperator>> findLeafDescendantsOrSelf(Mutable<ILogicalOperator> opRef) { + List<Mutable<ILogicalOperator>> result = Collections.emptyList(); + + Deque<Mutable<ILogicalOperator>> queue = new ArrayDeque<>(); + queue.add(opRef); + Mutable<ILogicalOperator> currentOpRef; + while ((currentOpRef = queue.pollLast()) != null) { + List<Mutable<ILogicalOperator>> inputs = currentOpRef.getValue().getInputs(); + if (inputs.isEmpty()) { + if (result.isEmpty()) { + result = new ArrayList<>(); + } + result.add(currentOpRef); + } else { + queue.addAll(inputs); + } + } + return result; + } + + /** + * Find operator in a given list of operator references + * + * @param list list to search in + * @param op operator to find + * @return operator position in the given list or {@code -1} if not found + */ + public static int indexOf(List<Mutable<ILogicalOperator>> list, ILogicalOperator op) { + for (int i = 0, ln = list.size(); i < ln; i++) { + if (list.get(i).getValue() == op) { + return i; + } + } + return -1; + } } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java index 13eef09..16992e7 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import org.apache.commons.lang3.ArrayUtils; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; @@ -34,6 +35,8 @@ import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode; +import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil; +import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; @@ -96,7 +99,7 @@ @Override public void contributeMicroOperator(ILogicalOperator op, IPushRuntimeFactory runtime, RecordDescriptor recDesc, AlgebricksPartitionConstraint pc) { - microOps.put(op, new Pair<IPushRuntimeFactory, RecordDescriptor>(runtime, recDesc)); + microOps.put(op, new Pair<>(runtime, recDesc)); revMicroOpMap.put(runtime, op); if (pc != null) { pcForMicroOps.put(op, pc); @@ -169,6 +172,17 @@ jobSpec.addRoot(opDesc); } setAllPartitionConstraints(tgtConstraints); + } + + public List<IOperatorDescriptor> getGeneratedMetaOps() { + List<IOperatorDescriptor> resultOps = new ArrayList<>(); + for (IOperatorDescriptor opd : jobSpec.getOperatorMap().values()) { + if (opd instanceof AlgebricksMetaOperatorDescriptor) { + resultOps.add(opd); + } + } + resultOps.sort((op1, op2) -> sendsOutput(op1, op2) ? 1 : sendsOutput(op2, op1) ? -1 : 0); + return resultOps; } private void setAllPartitionConstraints(Map<IConnectorDescriptor, TargetConstraint> tgtConstraints) { @@ -317,20 +331,30 @@ int n = opContents.size(); IPushRuntimeFactory[] runtimeFactories = new IPushRuntimeFactory[n]; RecordDescriptor[] internalRecordDescriptors = new RecordDescriptor[n]; - int i = 0; - for (Pair<IPushRuntimeFactory, RecordDescriptor> p : opContents) { + for (int i = 0, ln = opContents.size(); i < ln; i++) { + Pair<IPushRuntimeFactory, RecordDescriptor> p = opContents.get(i); runtimeFactories[i] = p.first; internalRecordDescriptors[i] = p.second; - i++; } ILogicalOperator lastLogicalOp = revMicroOpMap.get(runtimeFactories[n - 1]); ArrayList<ILogicalOperator> outOps = outEdges.get(lastLogicalOp); - int outArity = (outOps == null) ? 0 : outOps.size(); + int outArity = outOps == null ? 0 : outOps.size(); + int[] outPositions = new int[outArity]; + IPushRuntimeFactory[] outRuntimeFactories = new IPushRuntimeFactory[outArity]; + if (outOps != null) { + for (int i = 0, ln = outOps.size(); i < ln; i++) { + ILogicalOperator outOp = outOps.get(i); + outPositions[i] = OperatorManipulationUtil.indexOf(outOp.getInputs(), lastLogicalOp); + Pair<IPushRuntimeFactory, RecordDescriptor> microOpPair = microOps.get(outOp); + outRuntimeFactories[i] = microOpPair != null ? microOpPair.first : null; + } + } + ILogicalOperator firstLogicalOp = revMicroOpMap.get(runtimeFactories[0]); ArrayList<ILogicalOperator> inOps = inEdges.get(firstLogicalOp); int inArity = (inOps == null) ? 0 : inOps.size(); return new AlgebricksMetaOperatorDescriptor(jobSpec, inArity, outArity, runtimeFactories, - internalRecordDescriptors); + internalRecordDescriptors, outRuntimeFactories, outPositions); } private void addMicroOpToMetaRuntimeOp(ILogicalOperator aop) { @@ -344,7 +368,12 @@ return; } ILogicalOperator dest = destList.get(0); + int destInputPos = OperatorManipulationUtil.indexOf(dest.getInputs(), aop); Integer j = algebraicOpBelongingToMetaAsterixOp.get(dest); + if (destInputPos != 0) { + return; + } + if (j == null && microOps.get(dest) != null) { algebraicOpBelongingToMetaAsterixOp.put(dest, k); List<Pair<IPushRuntimeFactory, RecordDescriptor>> aodContent1 = metaAsterixOpSkeletons.get(k); @@ -362,7 +391,6 @@ } } } - } private int createNewMetaOpInfo(ILogicalOperator aop) { @@ -387,4 +415,28 @@ } } + private boolean sendsOutput(IOperatorDescriptor src, IOperatorDescriptor trg) { + AlgebricksPipeline srcPipeline = ((AlgebricksMetaOperatorDescriptor) src).getPipeline(); + IPushRuntimeFactory[] srcOutRts = srcPipeline.getOutputRuntimeFactories(); + if (srcOutRts == null) { + return false; + } + IPushRuntimeFactory[] trgRts = ((AlgebricksMetaOperatorDescriptor) trg).getPipeline().getRuntimeFactories(); + for (IPushRuntimeFactory srcOutRt : srcOutRts) { + if (ArrayUtils.contains(trgRts, srcOutRt)) { + return true; + } + ILogicalOperator srcOutOp = revMicroOpMap.get(srcOutRt); + if (srcOutOp != null) { + Integer k = algebraicOpBelongingToMetaAsterixOp.get(srcOutOp); + if (k != null) { + AlgebricksMetaOperatorDescriptor srcOutMetaOp = metaAsterixOps.get(k); + if (srcOutMetaOp != null && sendsOutput(srcOutMetaOp, trg)) { + return true; + } + } + } + } + return false; + } } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java index 7409247..ddda258 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java @@ -48,14 +48,24 @@ return context; } - public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema, + public JobSpecification compilePlan(ILogicalPlan plan, IJobletEventListenerFactory jobEventListenerFactory) + throws AlgebricksException { + return compilePlanImpl(plan, false, null, jobEventListenerFactory); + } + + public JobSpecification compileNestedPlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + return compilePlanImpl(plan, true, outerPlanSchema, null); + } + + private JobSpecification compilePlanImpl(ILogicalPlan plan, boolean isNestedPlan, IOperatorSchema outerPlanSchema, IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException { JobSpecification spec = new JobSpecification(context.getFrameSize()); if (jobEventListenerFactory != null) { spec.setJobletEventListenerFactory(jobEventListenerFactory); } - List<ILogicalOperator> rootOps = new ArrayList<ILogicalOperator>(); - IHyracksJobBuilder builder = new JobBuilder(spec, context.getClusterLocations()); + List<ILogicalOperator> rootOps = new ArrayList<>(); + JobBuilder builder = new JobBuilder(spec, context.getClusterLocations()); for (Mutable<ILogicalOperator> opRef : plan.getRoots()) { compileOpRef(opRef, spec, builder, outerPlanSchema); rootOps.add(opRef.getValue()); @@ -66,6 +76,9 @@ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); // Do not do activity cluster planning because it is slow on large clusters spec.setUseConnectorPolicyForScheduling(false); + if (isNestedPlan) { + spec.setMetaOps(builder.getGeneratedMetaOps()); + } return spec; } diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java index 5b6285a..d277043 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java @@ -67,6 +67,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.LeftOuterUnnestPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreSortedDistinctByPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroUnionAllPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator; @@ -200,11 +201,11 @@ break; } case INNERJOIN: { - JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context); + JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp, context); break; } case LEFTOUTERJOIN: { - JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, context); + JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, topLevelOp, context); break; } case LIMIT: { @@ -259,11 +260,19 @@ break; } case UNIONALL: { - op.setPhysicalOperator(new UnionAllPOperator()); + if (topLevelOp) { + op.setPhysicalOperator(new UnionAllPOperator()); + } else { + op.setPhysicalOperator(new MicroUnionAllPOperator()); + } break; } case INTERSECT: { - op.setPhysicalOperator(new IntersectPOperator()); + if (topLevelOp) { + op.setPhysicalOperator(new IntersectPOperator()); + } else { + throw new IllegalStateException("Micro operator not implemented for: " + op.getOperatorTag()); + } break; } case UNNEST: { diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java index 9d3b311..3efa46b 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java @@ -139,6 +139,10 @@ while (upperSubplanRootRefIterator.hasNext()) { Mutable<ILogicalOperator> rootOpRef = upperSubplanRootRefIterator.next(); + if (downToNts(rootOpRef) == null) { + continue; + } + // Collects free variables in the root operator of a nested plan and its descent. Set<LogicalVariable> freeVars = new ListSet<>(); OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) rootOpRef.getValue(), @@ -154,6 +158,9 @@ // Sets the nts for a original subplan. Mutable<ILogicalOperator> originalGbyRootOpRef = gbyNestedPlan.getRoots().get(rootIndex); Mutable<ILogicalOperator> originalGbyNtsRef = downToNts(originalGbyRootOpRef); + if (originalGbyNtsRef == null) { + continue; + } NestedTupleSourceOperator originalNts = (NestedTupleSourceOperator) originalGbyNtsRef.getValue(); originalNts.setDataSourceReference(new MutableObject<>(gby)); @@ -265,11 +272,13 @@ } private Mutable<ILogicalOperator> downToNts(Mutable<ILogicalOperator> opRef) { - Mutable<ILogicalOperator> currentOpRef = opRef; - while (currentOpRef.getValue().getInputs().size() > 0) { - currentOpRef = currentOpRef.getValue().getInputs().get(0); + List<Mutable<ILogicalOperator>> leafOps = OperatorManipulationUtil.findLeafDescendantsOrSelf(opRef); + if (leafOps.size() == 1) { + Mutable<ILogicalOperator> leafOp = leafOps.get(0); + if (leafOp.getValue().getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) { + return leafOp; + } } - return currentOpRef; + return null; } - } diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java index 6efda52..0bc2a5e 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java @@ -51,8 +51,11 @@ private JoinUtils() { } - public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, IOptimizationContext context) - throws AlgebricksException { + public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, boolean topLevelOp, + IOptimizationContext context) throws AlgebricksException { + if (!topLevelOp) { + throw new IllegalStateException("Micro operator not implemented for: " + op.getOperatorTag()); + } List<LogicalVariable> sideLeft = new LinkedList<>(); List<LogicalVariable> sideRight = new LinkedList<>(); List<LogicalVariable> varsLeft = op.getInputs().get(0).getValue().getSchema(); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml index bbea2ab..dafb6ab 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml +++ b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml @@ -73,6 +73,10 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java index 379944b..f24d38d 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java @@ -27,10 +27,15 @@ private static final long serialVersionUID = 1L; private final IPushRuntimeFactory[] runtimeFactories; private final RecordDescriptor[] recordDescriptors; + private final IPushRuntimeFactory[] outputRuntimeFactories; + private final int[] outputPositions; - public AlgebricksPipeline(IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] recordDescriptors) { + public AlgebricksPipeline(IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] recordDescriptors, + IPushRuntimeFactory[] outputRuntimeFactories, int[] outputPositions) { this.runtimeFactories = runtimeFactories; this.recordDescriptors = recordDescriptors; + this.outputRuntimeFactories = outputRuntimeFactories; + this.outputPositions = outputPositions; // this.projectedColumns = projectedColumns; } @@ -46,8 +51,15 @@ return recordDescriptors[recordDescriptors.length - 1].getFieldCount(); } + public IPushRuntimeFactory[] getOutputRuntimeFactories() { + return outputRuntimeFactories; + } + + public int[] getOutputPositions() { + return outputPositions; + } + // public int[] getProjectedColumns() { // return projectedColumns; // } - } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java index de6cddd..f90de81 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java @@ -24,5 +24,5 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IPushRuntimeFactory extends Serializable { - public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException; + IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException; } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java index 94af04f..0a578f6 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java @@ -168,7 +168,7 @@ // should enforce protocol boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); for (int i = runtimeFactories.length - 1; i >= 0; i--) { - IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx); + IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx)[0]; newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; start = enforce ? EnforcePushRuntime.enforce(start) : start; newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java index c261df8..75b2fb2 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java @@ -149,7 +149,7 @@ IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories(); RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors(); for (int i = runtimeFactories.length - 1; i >= 0; i--) { - IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx); + IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx)[0]; newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; start = enforce ? EnforceFrameWriter.enforce(start) : start; newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java index 32eff3a..7b3fb46 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java @@ -34,8 +34,8 @@ } @Override - public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { - return createOneOutputPushRuntime(ctx); + public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + return new IPushRuntime[] { createOneOutputPushRuntime(ctx) }; } public abstract AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java index a838557..f0e9406 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java @@ -38,8 +38,8 @@ } @Override - public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { - return new AbstractOneInputSinkPushRuntime() { + public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + return new IPushRuntime[] { new AbstractOneInputSinkPushRuntime() { @Override public void open() throws HyracksDataException { @@ -61,7 +61,6 @@ public void flush() throws HyracksDataException { // flush() is meaningless for sink operators } - }; + } }; } - } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java index b4d23fc..07365db 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java @@ -45,11 +45,18 @@ public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, int outputArity, IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] internalRecordDescriptors) { + this(spec, inputArity, outputArity, runtimeFactories, internalRecordDescriptors, null, null); + } + + public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, int outputArity, + IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] internalRecordDescriptors, + IPushRuntimeFactory[] outputRuntimeFactories, int[] outputPositions) { super(spec, inputArity, outputArity); if (outputArity == 1) { this.outRecDescs[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1]; } - this.pipeline = new AlgebricksPipeline(runtimeFactories, internalRecordDescriptors); + this.pipeline = new AlgebricksPipeline(runtimeFactories, internalRecordDescriptors, outputRuntimeFactories, + outputPositions); } public AlgebricksPipeline getPipeline() { @@ -81,7 +88,7 @@ private class SourcePushRuntime extends AbstractUnaryOutputSourceOperatorNodePushable { private final IHyracksTaskContext ctx; - public SourcePushRuntime(IHyracksTaskContext ctx) { + SourcePushRuntime(IHyracksTaskContext ctx) { this.ctx = ctx; } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java index e1081e0..a717794 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java @@ -18,9 +18,13 @@ */ package org.apache.hyracks.algebricks.runtime.operators.meta; +import java.util.HashMap; +import java.util.Map; + import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.EnforceFrameWriter; @@ -37,6 +41,7 @@ private final int inputArity; private final int outputArity; private final AlgebricksPipeline pipeline; + private final Map<IPushRuntimeFactory, IPushRuntime[]> runtimeMap; public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity, RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor) { @@ -45,6 +50,7 @@ this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor; this.inputArity = inputArity; this.outputArity = outputArity; + this.runtimeMap = new HashMap<>(); } public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException { @@ -52,19 +58,30 @@ boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT); // plug the operators IFrameWriter start = writer;// this.writer; - for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) { - IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(ctx); - newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime; + IPushRuntimeFactory[] runtimeFactories = pipeline.getRuntimeFactories(); + RecordDescriptor[] recordDescriptors = pipeline.getRecordDescriptors(); + for (int i = runtimeFactories.length - 1; i >= 0; i--) { start = enforce ? EnforceFrameWriter.enforce(start) : start; - if (i == pipeline.getRuntimeFactories().length - 1) { - if (outputArity == 1) { - newRuntime.setOutputFrameWriter(0, start, pipelineOutputRecordDescriptor); + + IPushRuntimeFactory runtimeFactory = runtimeFactories[i]; + IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx); + for (int j = 0; j < newRuntimes.length; j++) { + if (enforce) { + newRuntimes[j] = EnforcePushRuntime.enforce(newRuntimes[j]); } - } else { - newRuntime.setOutputFrameWriter(0, start, pipeline.getRecordDescriptors()[i]); + if (i == runtimeFactories.length - 1) { + if (outputArity == 1) { + newRuntimes[j].setOutputFrameWriter(0, start, pipelineOutputRecordDescriptor); + } + } else { + newRuntimes[j].setOutputFrameWriter(0, start, recordDescriptors[i]); + } } + runtimeMap.put(runtimeFactory, newRuntimes); + + IPushRuntime newRuntime = newRuntimes[0]; if (i > 0) { - newRuntime.setInputRecordDescriptor(0, pipeline.getRecordDescriptors()[i - 1]); + newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]); } else if (inputArity > 0) { newRuntime.setInputRecordDescriptor(0, pipelineInputRecordDescriptor); } @@ -72,4 +89,8 @@ } return start; } + + public IPushRuntime[] getPushRuntime(IPushRuntimeFactory runtimeFactory) { + return runtimeMap.get(runtimeFactory); + } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java index 7e04750..159fde7 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java @@ -20,9 +20,11 @@ import java.io.DataOutput; import java.nio.ByteBuffer; +import java.util.List; import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime; @@ -41,16 +43,21 @@ private static final long serialVersionUID = 1L; - private final AlgebricksPipeline pipeline; + private final List<AlgebricksPipeline> pipelines; + private final RecordDescriptor inputRecordDesc; + + private final RecordDescriptor outputRecordDesc; + private final IMissingWriterFactory[] missingWriterFactories; - public SubplanRuntimeFactory(AlgebricksPipeline pipeline, IMissingWriterFactory[] missingWriterFactories, - RecordDescriptor inputRecordDesc, int[] projectionList) { + public SubplanRuntimeFactory(List<AlgebricksPipeline> pipelines, IMissingWriterFactory[] missingWriterFactories, + RecordDescriptor inputRecordDesc, RecordDescriptor outputRecordDesc, int[] projectionList) { super(projectionList); - this.pipeline = pipeline; + this.pipelines = pipelines; this.missingWriterFactories = missingWriterFactories; this.inputRecordDesc = inputRecordDesc; + this.outputRecordDesc = outputRecordDesc; if (projectionList != null) { throw new NotImplementedException(); } @@ -60,8 +67,12 @@ public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Subplan { \n"); - for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) { - sb.append(" " + f.toString() + ";\n"); + for (AlgebricksPipeline pipeline : pipelines) { + sb.append('{'); + for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) { + sb.append(" ").append(f).append(";\n"); + } + sb.append('}'); } sb.append("}"); return sb.toString(); @@ -70,110 +81,177 @@ @Override public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException { + return new SubplanPushRuntime(ctx); + } - RecordDescriptor pipelineOutputRecordDescriptor = null; + private class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { - final PipelineAssembler pa = - new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, pipelineOutputRecordDescriptor); - final IMissingWriter[] nullWriters = new IMissingWriter[missingWriterFactories.length]; - for (int i = 0; i < missingWriterFactories.length; i++) { - nullWriters[i] = missingWriterFactories[i].createMissingWriter(); + final IHyracksTaskContext ctx; + + final NestedTupleSourceRuntime[] startOfPipelines; + + boolean first; + + SubplanPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + this.ctx = ctx; + this.first = true; + + IMissingWriter[] missingWriters = new IMissingWriter[missingWriterFactories.length]; + for (int i = 0; i < missingWriterFactories.length; i++) { + missingWriters[i] = missingWriterFactories[i].createMissingWriter(); + } + + int pipelineCount = pipelines.size(); + startOfPipelines = new NestedTupleSourceRuntime[pipelineCount]; + PipelineAssembler[] pipelineAssemblers = new PipelineAssembler[pipelineCount]; + for (int i = 0; i < pipelineCount; i++) { + AlgebricksPipeline pipeline = pipelines.get(i); + RecordDescriptor pipelineLastRecordDescriptor = + pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1]; + + RecordDescriptor outputRecordDescriptor; + IFrameWriter outputWriter; + if (i == 0) { + // primary pipeline + outputWriter = new TupleOuterProduct(pipelineLastRecordDescriptor, missingWriters); + outputRecordDescriptor = SubplanRuntimeFactory.this.outputRecordDesc; + } else { + // secondary pipeline + IPushRuntime outputPushRuntime = linkSecondaryPipeline(pipeline, pipelineAssemblers, i); + if (outputPushRuntime == null) { + throw new IllegalStateException("Invalid pipeline"); + } + outputPushRuntime.setInputRecordDescriptor(0, pipelineLastRecordDescriptor); + outputWriter = outputPushRuntime; + outputRecordDescriptor = pipelineLastRecordDescriptor; + } + + PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor); + startOfPipelines[i] = (NestedTupleSourceRuntime) pa.assemblePipeline(outputWriter, ctx); + pipelineAssemblers[i] = pa; + } } - return new AbstractOneInputOneOutputOneFramePushRuntime() { + IPushRuntime linkSecondaryPipeline(AlgebricksPipeline pipeline, PipelineAssembler[] pipelineAssemblers, + int pipelineAssemblersCount) { + IPushRuntimeFactory[] outputRuntimeFactories = pipeline.getOutputRuntimeFactories(); + if (outputRuntimeFactories == null || outputRuntimeFactories.length != 1) { + throw new IllegalStateException(); + } + IPushRuntimeFactory outRuntimeFactory = outputRuntimeFactories[0]; + int outputPosition = pipeline.getOutputPositions()[0]; + for (int i = 0; i < pipelineAssemblersCount; i++) { + IPushRuntime[] p = pipelineAssemblers[i].getPushRuntime(outRuntimeFactory); + if (p != null) { + return p[outputPosition]; + } + } + return null; + } - /** - * Computes the outer product between a given tuple and the frames - * passed. - */ - class TupleOuterProduct implements IFrameWriter { + @Override + public void open() throws HyracksDataException { + writer.open(); + if (first) { + first = false; + initAccessAppendRef(ctx); + } + } - private boolean smthWasWritten = false; - private FrameTupleAccessor ta = new FrameTupleAccessor( - pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1]); - private ArrayTupleBuilder tb = new ArrayTupleBuilder( - nullWriters.length + SubplanRuntimeFactory.this.inputRecordDesc.getFieldCount()); + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + tAccess.reset(buffer); + int nTuple = tAccess.getTupleCount(); + for (int t = 0; t < nTuple; t++) { + tRef.reset(tAccess, t); - @Override - public void open() throws HyracksDataException { - smthWasWritten = false; + for (NestedTupleSourceRuntime nts : startOfPipelines) { + nts.writeTuple(buffer, t); } - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - ta.reset(buffer); - int nTuple = ta.getTupleCount(); - for (int t = 0; t < nTuple; t++) { - appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t); + int n = 0; + try { + for (; n < startOfPipelines.length; n++) { + NestedTupleSourceRuntime nts = startOfPipelines[n]; + try { + nts.open(); + } catch (Exception e) { + nts.fail(); + throw e; + } } - smthWasWritten = true; - } - - @Override - public void close() throws HyracksDataException { - if (!smthWasWritten && !failed) { - // the case when we need to write nulls - appendNullsToTuple(); - appendToFrameFromTupleBuilder(tb); - } - } - - @Override - public void fail() throws HyracksDataException { - // writer.fail() is called by the outer class' writer.fail(). - } - - private void appendNullsToTuple() throws HyracksDataException { - tb.reset(); - int n0 = tRef.getFieldCount(); - for (int f = 0; f < n0; f++) { - tb.addField(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), f); - } - DataOutput dos = tb.getDataOutput(); - for (int i = 0; i < nullWriters.length; i++) { - nullWriters[i].writeMissing(dos); - tb.addFieldEndOffset(); + } finally { + for (int i = n - 1; i >= 0; i--) { + startOfPipelines[i].close(); } } } + } - IFrameWriter endPipe = new TupleOuterProduct(); + @Override + public void flush() throws HyracksDataException { + writer.flush(); + } - NestedTupleSourceRuntime startOfPipeline = (NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, ctx); + /** + * Computes the outer product between a given tuple and the frames + * passed. + */ + class TupleOuterProduct implements IFrameWriter { - boolean first = true; + private boolean smthWasWritten; + private final FrameTupleAccessor ta; + private final ArrayTupleBuilder tb; + private final IMissingWriter[] missingWriters; + + private TupleOuterProduct(RecordDescriptor recordDescriptor, IMissingWriter[] missingWriters) { + ta = new FrameTupleAccessor(recordDescriptor); + tb = new ArrayTupleBuilder( + missingWriters.length + SubplanRuntimeFactory.this.inputRecordDesc.getFieldCount()); + this.missingWriters = missingWriters; + } @Override public void open() throws HyracksDataException { - writer.open(); - if (first) { - first = false; - initAccessAppendRef(ctx); - } + smthWasWritten = false; } @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - tAccess.reset(buffer); - int nTuple = tAccess.getTupleCount(); + ta.reset(buffer); + int nTuple = ta.getTupleCount(); for (int t = 0; t < nTuple; t++) { - tRef.reset(tAccess, t); - startOfPipeline.writeTuple(buffer, t); - try { - startOfPipeline.open(); - } catch (Exception e) { - startOfPipeline.fail(); - throw e; - } finally { - startOfPipeline.close(); - } + appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t); + } + smthWasWritten = true; + } + + @Override + public void close() throws HyracksDataException { + if (!smthWasWritten && !failed) { + // the case when we need to write nulls + appendNullsToTuple(); + appendToFrameFromTupleBuilder(tb); } } @Override - public void flush() throws HyracksDataException { - writer.flush(); + public void fail() throws HyracksDataException { + // writer.fail() is called by the outer class' writer.fail(). } - }; + + private void appendNullsToTuple() throws HyracksDataException { + tb.reset(); + int n0 = tRef.getFieldCount(); + for (int f = 0; f < n0; f++) { + tb.addField(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), f); + } + DataOutput dos = tb.getDataOutput(); + for (IMissingWriter missingWriter : missingWriters) { + missingWriter.writeMissing(dos); + tb.addFieldEndOffset(); + } + } + } } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java index 3ccceed..67f4a77 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java @@ -40,8 +40,8 @@ } @Override - public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException { - return new AbstractOneInputSourcePushRuntime() { + public IPushRuntime[] createPushRuntime(final IHyracksTaskContext ctx) throws HyracksDataException { + return new IPushRuntime[] { new AbstractOneInputSourcePushRuntime() { private final ArrayTupleBuilder tb = new ArrayTupleBuilder(0); private final FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx)); @@ -69,6 +69,6 @@ public void flush() throws HyracksDataException { appender.flush(writer); } - }; + } }; } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java index 496679f..8e64092 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java @@ -39,8 +39,8 @@ } @Override - public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { - return new NestedTupleSourceRuntime(ctx); + public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + return new IPushRuntime[] { new NestedTupleSourceRuntime(ctx) }; } public static class NestedTupleSourceRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java index 021784a..8a06ecf 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java @@ -56,9 +56,9 @@ } @Override - public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) { + public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) { IAWriter w = PrinterBasedWriterFactory.INSTANCE.createWriter(printColumns, System.out, printerFactories, inputRecordDesc); - return new SinkWriterRuntime(w, System.out, inputRecordDesc); + return new IPushRuntime[] { new SinkWriterRuntime(w, System.out, inputRecordDesc) }; } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java index d52ceee..536a769 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java @@ -67,11 +67,11 @@ } @Override - public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { try { PrintStream filePrintStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outputFile))); IAWriter w = writerFactory.createWriter(fields, filePrintStream, printerFactories, inputRecordDesc); - return new SinkWriterRuntime(w, filePrintStream, inputRecordDesc, true); + return new IPushRuntime[] { new SinkWriterRuntime(w, filePrintStream, inputRecordDesc, true) }; } catch (IOException e) { throw new HyracksDataException(e); } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java new file mode 100644 index 0000000..1706e59 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.union; + +import java.nio.ByteBuffer; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class MicroUnionAllRuntimeFactory implements IPushRuntimeFactory { + + private static final long serialVersionUID = 1L; + + private final int inputArity; + + public MicroUnionAllRuntimeFactory(int inputArity) { + this.inputArity = inputArity; + } + + @Override + public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) { + Mutable<Boolean> failedShared = new MutableObject<>(Boolean.FALSE); + IPushRuntime[] result = new IPushRuntime[inputArity]; + for (int i = 0; i < inputArity; i++) { + result[i] = new MicroUnionAllPushRuntime(i, failedShared); + } + return result; + } + + @Override + public String toString() { + return "union-all"; + } + + private final class MicroUnionAllPushRuntime implements IPushRuntime { + + private final int idx; + + private final Mutable<Boolean> failedShared; + + private IFrameWriter writer; + + MicroUnionAllPushRuntime(int idx, Mutable<Boolean> failedShared) { + this.idx = idx; + this.failedShared = failedShared; + } + + @Override + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { + if (index != 0) { + throw new IllegalArgumentException(String.valueOf(index)); + } + this.writer = writer; + } + + @Override + public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) { + // input is not accessed + } + + @Override + public void open() throws HyracksDataException { + if (idx == 0) { + writer.open(); + } + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + writer.nextFrame(buffer); + } + + @Override + public void fail() throws HyracksDataException { + boolean failed = failedShared.getValue(); + failedShared.setValue(Boolean.TRUE); + if (!failed) { + writer.fail(); + } + } + + @Override + public void close() throws HyracksDataException { + if (idx == 0) { + writer.close(); + } + } + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java index 40e2ec6..a7621ec 100644 --- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java +++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; +import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; @@ -439,7 +440,7 @@ RecordDescriptor aggDesc = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE }); AlgebricksPipeline pipeline = new AlgebricksPipeline(new IPushRuntimeFactory[] { nts, agg }, - new RecordDescriptor[] { ntsDesc, aggDesc }); + new RecordDescriptor[] { ntsDesc, aggDesc }, null, null); NestedPlansAccumulatingAggregatorFactory npaaf = new NestedPlansAccumulatingAggregatorFactory( new AlgebricksPipeline[] { pipeline }, new int[] { 3 }, new int[] {}); RecordDescriptor gbyDesc = new RecordDescriptor(new ISerializerDeserializer[] { @@ -780,10 +781,10 @@ new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE }); AlgebricksPipeline pipeline = new AlgebricksPipeline(new IPushRuntimeFactory[] { nts, assign2, project1 }, - new RecordDescriptor[] { assign1Desc, assign2Desc, project1Desc }); + new RecordDescriptor[] { assign1Desc, assign2Desc, project1Desc }, null, null); - SubplanRuntimeFactory subplan = new SubplanRuntimeFactory(pipeline, - new IMissingWriterFactory[] { NoopMissingWriterFactory.INSTANCE }, assign1Desc, null); + SubplanRuntimeFactory subplan = new SubplanRuntimeFactory(Collections.singletonList(pipeline), + new IMissingWriterFactory[] { NoopMissingWriterFactory.INSTANCE }, assign1Desc, null, null); RecordDescriptor subplanDesc = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE }); @@ -851,7 +852,7 @@ RecordDescriptor aggDesc = new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE }); AlgebricksPipeline pipeline = new AlgebricksPipeline(new IPushRuntimeFactory[] { nts, agg }, - new RecordDescriptor[] { ntsDesc, aggDesc }); + new RecordDescriptor[] { ntsDesc, aggDesc }, null, null); NestedPlansAccumulatingAggregatorFactory npaaf = new NestedPlansAccumulatingAggregatorFactory( new AlgebricksPipeline[] { pipeline }, new int[] { 3 }, new int[] {}); RecordDescriptor gbyDesc = new RecordDescriptor(new ISerializerDeserializer[] { diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java index 9f66080..c4c7320 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java @@ -89,6 +89,8 @@ private transient int connectorIdCounter; + private transient List<IOperatorDescriptor> metaOps; + // This constructor uses the default frame size. It is for test purposes only. // For other use cases, use the one which sets the frame size. public JobSpecification() { @@ -308,6 +310,14 @@ return requiredClusterCapacity; } + public void setMetaOps(List<IOperatorDescriptor> metaOps) { + this.metaOps = metaOps; + } + + public List<IOperatorDescriptor> getMetaOps() { + return metaOps; + } + private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) { List<V> vList = map.computeIfAbsent(key, k -> new ArrayList<>()); extend(vList, index); -- To view, visit https://asterix-gerrit.ics.uci.edu/2277 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I11be926f175889978c144dd4483ec565d3d86e2d Gerrit-PatchSet: 11 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
