Repository: drill Updated Branches: refs/heads/master 9cf6faa7a -> 6446e56f2
DRILL-5538: Create TopProject with validatedNodeType after PHYSICAL phase closes #844 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6446e56f Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6446e56f Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6446e56f Branch: refs/heads/master Commit: 6446e56f292a5905d646462c618c056839ad5198 Parents: 9cf6faa Author: Arina Ielchiieva <arina.yelchiy...@gmail.com> Authored: Thu Jun 15 16:01:54 2017 +0300 Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com> Committed: Tue Jun 27 13:03:56 2017 +0300 ---------------------------------------------------------------------- .../physical/visitor/TopProjectVisitor.java | 142 +++++++++++++++++++ .../sql/handlers/CreateTableHandler.java | 14 +- .../planner/sql/handlers/DefaultSqlHandler.java | 71 ++++++---- .../planner/sql/handlers/ExplainHandler.java | 6 +- .../java/org/apache/drill/TestUnionAll.java | 1 - 5 files changed, 195 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/6446e56f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java new file mode 100644 index 0000000..587b006 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical.visitor; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.sql.validate.SqlValidatorUtil; +import org.apache.drill.exec.planner.common.DrillRelOptUtil; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.ProjectPrel; +import org.apache.drill.exec.planner.physical.ScreenPrel; +import org.apache.drill.exec.planner.physical.WriterPrel; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Adds non-trivial top project to ensure the final output field names are preserved. + * Such non-trivial project is needed due to Calcite's behavior of ProjectRemoveRule. + * It will be added under Screen/Writer operator in the physical plan + * if there is no other Projects under these operators, + * in cases like * column expansion or partition by column processing. + */ +public class TopProjectVisitor extends BasePrelVisitor<Prel, Void, RuntimeException> { + + private final RelDataType validatedRowType; + + public TopProjectVisitor(RelDataType validatedRowType) { + this.validatedRowType = validatedRowType; + } + + /** + * Traverses passed physical relational node and its children and checks if top project + * should be added under screen or writer to preserve final output fields names. + * + * @param prel physical relational node + * @param validatedRowType final output row type + * @return physical relational node with added project if necessary + */ + public static Prel insertTopProject(Prel prel, RelDataType validatedRowType){ + return prel.accept(new TopProjectVisitor(validatedRowType), null); + } + + @Override + public Prel visitPrel(Prel prel, Void value) throws RuntimeException { + List<RelNode> children = new ArrayList<>(); + for (Prel child : prel){ + child = child.accept(this, null); + children.add(child); + } + + return (Prel) prel.copy(prel.getTraitSet(), children); + } + + @Override + public Prel visitScreen(ScreenPrel prel, Void value) { + // insert project under screen only if we don't have writer underneath + if (containsWriter(prel)) { + return prel; + } + + Prel newChild = ((Prel) prel.getInput()).accept(this, value); + return prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode)addTopProjectPrel(newChild, validatedRowType))); + } + + @Override + public Prel visitWriter(WriterPrel prel, Void value) { + Prel newChild = ((Prel) prel.getInput()).accept(this, value); + return prel.copy(prel.getTraitSet(), Collections.singletonList((RelNode)addTopProjectPrel(newChild, validatedRowType))); + } + + /** + * Checks if at least one of passed physical relational node children is writer. + * + * @param prel physical relational node + * @return true of writer operator was found + */ + private boolean containsWriter(Prel prel) { + for (Prel child : prel){ + if (child instanceof WriterPrel || containsWriter(child)) { + return true; + } + } + return false; + } + + /** + * Adds top project to ensure final output field names are preserved. + * In case of duplicated column names, will rename duplicates. + * Top project will be added only if top project is non-trivial and + * child physical relational node is not project. + * + * @param prel physical relational node + * @param validatedRowType final output row type + * @return physical relational node with top project if necessary + */ + private Prel addTopProjectPrel(Prel prel, RelDataType validatedRowType) { + RelDataType rowType = prel.getRowType(); + if (rowType.getFieldCount() != validatedRowType.getFieldCount()) { + return prel; + } + + RexBuilder rexBuilder = prel.getCluster().getRexBuilder(); + List<RexNode> projections = new ArrayList<>(); + int projectCount = rowType.getFieldList().size(); + + for (int i = 0; i < projectCount; i++) { + projections.add(rexBuilder.makeInputRef(prel, i)); + } + + List<String> fieldNames = SqlValidatorUtil.uniquify( + validatedRowType.getFieldNames(), + SqlValidatorUtil.F_SUGGESTER2, + prel.getCluster().getTypeFactory().getTypeSystem().isSchemaCaseSensitive()); + + RelDataType newRowType = RexUtil.createStructType(prel.getCluster().getTypeFactory(), projections, fieldNames, null); + ProjectPrel topProject = new ProjectPrel(prel.getCluster(), prel.getTraitSet(), prel, projections, newRowType); + + return prel instanceof Project && DrillRelOptUtil.isTrivialProject(topProject, true) ? prel : topProject; + } + + +} http://git-wip-us.apache.org/repos/asf/drill/blob/6446e56f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java index 72444ca..d232a71 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java @@ -121,7 +121,7 @@ public class CreateTableHandler extends DefaultSqlHandler { RelDataType queryRowType, StorageStrategy storageStrategy) throws RelConversionException, SqlUnsupportedException { - final DrillRel convertedRelNode = convertToDrel(relNode); + final DrillRel convertedRelNode = convertToRawDrel(relNode); // Put a non-trivial topProject to ensure the final output field name is preserved, when necessary. // Only insert project when the field count from the child is same as that of the queryRowType. @@ -136,7 +136,7 @@ public class CreateTableHandler extends DefaultSqlHandler { private Prel convertToPrel(RelNode drel, RelDataType inputRowType, List<String> partitionColumns) throws RelConversionException, SqlUnsupportedException { - Prel prel = convertToPrel(drel); + Prel prel = convertToPrel(drel, inputRowType); prel = prel.accept(new ProjectForWriterVisitor(inputRowType, partitionColumns), null); @@ -188,7 +188,7 @@ public class CreateTableHandler extends DefaultSqlHandler { final RelOptCluster cluster = prel.getCluster(); final List<RexNode> exprs = Lists.newArrayListWithExpectedSize(queryRowType.getFieldCount() + 1); - final List<String> fieldnames = new ArrayList<String>(queryRowType.getFieldNames()); + final List<String> fieldNames = new ArrayList<>(queryRowType.getFieldNames()); for (final RelDataTypeField field : queryRowType.getFieldList()) { exprs.add(RexInputRef.of(field.getIndex(), queryRowType)); @@ -199,7 +199,7 @@ public class CreateTableHandler extends DefaultSqlHandler { final ProjectPrel projectUnderWriter = new ProjectAllowDupPrel(cluster, cluster.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL), child, exprs, queryRowType); - return (Prel) prel.copy(projectUnderWriter.getTraitSet(), + return prel.copy(projectUnderWriter.getTraitSet(), Collections.singletonList( (RelNode) projectUnderWriter)); } else { // find list of partition columns. @@ -217,19 +217,19 @@ public class CreateTableHandler extends DefaultSqlHandler { } // Add partition column comparator to Project's field name list. - fieldnames.add(WriterPrel.PARTITION_COMPARATOR_FIELD); + fieldNames.add(WriterPrel.PARTITION_COMPARATOR_FIELD); // Add partition column comparator to Project's expression list. final RexNode partionColComp = createPartitionColComparator(prel.getCluster().getRexBuilder(), partitionColumnExprs); exprs.add(partionColComp); - final RelDataType rowTypeWithPCComp = RexUtil.createStructType(cluster.getTypeFactory(), exprs, fieldnames); + final RelDataType rowTypeWithPCComp = RexUtil.createStructType(cluster.getTypeFactory(), exprs, fieldNames, null); final ProjectPrel projectUnderWriter = new ProjectAllowDupPrel(cluster, cluster.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL), child, exprs, rowTypeWithPCComp); - return (Prel) prel.copy(projectUnderWriter.getTraitSet(), + return prel.copy(projectUnderWriter.getTraitSet(), Collections.singletonList( (RelNode) projectUnderWriter)); } } http://git-wip-us.apache.org/repos/asf/drill/blob/6446e56f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index ce6cedf..e03a40c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -90,6 +90,7 @@ import org.apache.drill.exec.planner.physical.visitor.SelectionVectorPrelVisitor import org.apache.drill.exec.planner.physical.visitor.SplitUpComplexExpressions; import org.apache.drill.exec.planner.physical.visitor.StarColumnConverter; import org.apache.drill.exec.planner.physical.visitor.SwapHashJoinVisitor; +import org.apache.drill.exec.planner.physical.visitor.TopProjectVisitor; import org.apache.drill.exec.planner.sql.parser.UnsupportedOperatorsVisitor; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionValue; @@ -165,8 +166,8 @@ public class DefaultSqlHandler extends AbstractSqlHandler { final RelDataType validatedRowType = convertedRelNode.getValidatedRowType(); final RelNode queryRelNode = convertedRelNode.getConvertedNode(); - final DrillRel drel = convertToDrel(queryRelNode, validatedRowType); - final Prel prel = convertToPrel(drel); + final DrillRel drel = convertToDrel(queryRelNode); + final Prel prel = convertToPrel(drel, validatedRowType); logAndSetTextPlan("Drill Physical", prel, logger); final PhysicalOperator pop = convertToPop(prel); final PhysicalPlan plan = convertToPlan(pop); @@ -199,13 +200,14 @@ public class DefaultSqlHandler extends AbstractSqlHandler { } /** - * Given a relNode tree for SELECT statement, convert to Drill Logical RelNode tree. - * @param relNode - * @return + * Given a relNode tree for SELECT statement, convert to Drill Logical RelNode tree. + * + * @param relNode relational node + * @return Drill Logical RelNode tree * @throws SqlUnsupportedException * @throws RelConversionException */ - protected DrillRel convertToDrel(final RelNode relNode) throws SqlUnsupportedException, RelConversionException { + protected DrillRel convertToRawDrel(final RelNode relNode) throws SqlUnsupportedException, RelConversionException { if (context.getOptions().getOption(ExecConstants.EARLY_LIMIT0_OPT) && context.getPlannerSettings().isTypeInferenceEnabled() && FindLimit0Visitor.containsLimit0(relNode)) { @@ -279,20 +281,18 @@ public class DefaultSqlHandler extends AbstractSqlHandler { /** * Return Drill Logical RelNode tree for a SELECT statement, when it is executed / explained directly. + * Adds screen operator on top of converted node. * - * @param relNode : root RelNode corresponds to Calcite Logical RelNode. - * @param validatedRowType : the rowType for the final field names. A rename project may be placed on top of the root. - * @return + * @param relNode root RelNode corresponds to Calcite Logical RelNode. + * @return Drill Logical RelNode tree * @throws RelConversionException * @throws SqlUnsupportedException */ - protected DrillRel convertToDrel(RelNode relNode, RelDataType validatedRowType) throws RelConversionException, SqlUnsupportedException { - final DrillRel convertedRelNode = convertToDrel(relNode); + protected DrillRel convertToDrel(RelNode relNode) throws RelConversionException, SqlUnsupportedException { + final DrillRel convertedRelNode = convertToRawDrel(relNode); - // Put a non-trivial topProject to ensure the final output field name is preserved, when necessary. - DrillRel topPreservedNameProj = addRenamedProject(convertedRelNode, validatedRowType); - return new DrillScreenRel(topPreservedNameProj.getCluster(), topPreservedNameProj.getTraitSet(), - topPreservedNameProj); + return new DrillScreenRel(convertedRelNode.getCluster(), convertedRelNode.getTraitSet(), + convertedRelNode); } /** @@ -411,7 +411,16 @@ public class DefaultSqlHandler extends AbstractSqlHandler { return output; } - protected Prel convertToPrel(RelNode drel) throws RelConversionException, SqlUnsupportedException { + /** + * Applies physical rules and certain transformations to convert drill relational node into physical one. + * + * @param drel relational node + * @param validatedRowType final output row type + * @return physical relational node + * @throws RelConversionException + * @throws SqlUnsupportedException + */ + protected Prel convertToPrel(RelNode drel, RelDataType validatedRowType) throws RelConversionException, SqlUnsupportedException { Preconditions.checkArgument(drel.getConvention() == DrillRel.DRILL_LOGICAL); final RelTraitSet traits = drel.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON); @@ -459,7 +468,13 @@ public class DefaultSqlHandler extends AbstractSqlHandler { /* The order of the following transformations is important */ /* - * 0.) For select * from join query, we need insert project on top of scan and a top project just + * 0.) + * Add top project before screen operator or writer to ensure that final output column names are preserved. + */ + phyRelNode = TopProjectVisitor.insertTopProject(phyRelNode, validatedRowType); + + /* + * 1.) For select * from join query, we need insert project on top of scan and a top project just * under screen operator. The project on top of scan will rename from * to T1*, while the top project * will rename T1* to *, before it output the final result. Only the top project will allow * duplicate columns, since user could "explicitly" ask for duplicate columns ( select *, col, *). @@ -468,14 +483,14 @@ public class DefaultSqlHandler extends AbstractSqlHandler { phyRelNode = StarColumnConverter.insertRenameProject(phyRelNode); /* - * 1.) + * 2.) * Join might cause naming conflicts from its left and right child. * In such case, we have to insert Project to rename the conflicting names. */ phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode); /* - * 1.1) Swap left / right for INNER hash join, if left's row count is < (1 + margin) right's row count. + * 2.1) Swap left / right for INNER hash join, if left's row count is < (1 + margin) right's row count. * We want to have smaller dataset on the right side, since hash table builds on right side. */ if (context.getPlannerSettings().isHashJoinSwapEnabled()) { @@ -490,20 +505,20 @@ public class DefaultSqlHandler extends AbstractSqlHandler { } /* - * 1.2) Break up all expressions with complex outputs into their own project operations + * 2.2) Break up all expressions with complex outputs into their own project operations */ phyRelNode = phyRelNode.accept( new SplitUpComplexExpressions(config.getConverter().getTypeFactory(), context.getDrillOperatorTable(), context .getPlannerSettings().functionImplementationRegistry), null); /* - * 1.3) Projections that contain reference to flatten are rewritten as Flatten operators followed by Project + * 2.3) Projections that contain reference to flatten are rewritten as Flatten operators followed by Project */ phyRelNode = phyRelNode.accept( new RewriteProjectToFlatten(config.getConverter().getTypeFactory(), context.getDrillOperatorTable()), null); /* - * 2.) + * 3.) * Since our operators work via names rather than indices, we have to make to reorder any * output before we return data to the user as we may have accidentally shuffled things. * This adds a trivial project to reorder columns prior to output. @@ -511,14 +526,14 @@ public class DefaultSqlHandler extends AbstractSqlHandler { phyRelNode = FinalColumnReorderer.addFinalColumnOrdering(phyRelNode); /* - * 3.) + * 4.) * If two fragments are both estimated to be parallelization one, remove the exchange * separating them */ phyRelNode = ExcessiveExchangeIdentifier.removeExcessiveEchanges(phyRelNode, targetSliceSize); - /* 4.) + /* 5.) * Add ProducerConsumer after each scan if the option is set * Use the configured queueSize */ @@ -530,7 +545,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler { */ - /* 5.) + /* 6.) * if the client does not support complex types (Map, Repeated) * insert a project which which would convert */ @@ -540,20 +555,20 @@ public class DefaultSqlHandler extends AbstractSqlHandler { } - /* 6.) + /* 7.) * Insert LocalExchange (mux and/or demux) nodes */ phyRelNode = InsertLocalExchangeVisitor.insertLocalExchanges(phyRelNode, queryOptions); - /* 7.) + /* 8.) * Next, we add any required selection vector removers given the supported encodings of each * operator. This will ultimately move to a new trait but we're managing here for now to avoid * introducing new issues in planning before the next release */ phyRelNode = SelectionVectorPrelVisitor.addSelectionRemoversWhereNecessary(phyRelNode); - /* 8.) + /* 9.) * Finally, Make sure that the no rels are repeats. * This could happen in the case of querying the same table twice as Optiq may canonicalize these. */ http://git-wip-us.apache.org/repos/asf/drill/blob/6446e56f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java index b5b5f73..d62fb4a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -58,14 +58,14 @@ public class ExplainHandler extends DefaultSqlHandler { final RelNode queryRelNode = convertedRelNode.getConvertedNode(); log("Calcite", queryRelNode, logger, null); - DrillRel drel = convertToDrel(queryRelNode, validatedRowType); + DrillRel drel = convertToDrel(queryRelNode); if (mode == ResultMode.LOGICAL) { LogicalExplain logicalResult = new LogicalExplain(drel, level, context); return DirectPlan.createDirectPlan(context, logicalResult); } - Prel prel = convertToPrel(drel); + Prel prel = convertToPrel(drel, validatedRowType); logAndSetTextPlan("Drill Physical", prel, logger); PhysicalOperator pop = convertToPop(prel); PhysicalPlan plan = convertToPlan(pop); http://git-wip-us.apache.org/repos/asf/drill/blob/6446e56f/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java index 924486f..7700a1e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java @@ -1031,7 +1031,6 @@ public class TestUnionAll extends BaseTestQuery{ // Validate the plan final String[] expectedPlan = {"UnionExchange.*\n", - ".*Project.*\n" + ".*UnionAll"}; final String[] excludedPlan = {};