spark git commit: [SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter
Repository: spark Updated Branches: refs/heads/branch-2.0 47c2a265f -> 55a837246 [SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter ## What changes were proposed in this pull request? This patch moves some codes in `DataFrameWriter.insertInto` that belongs to `Analyzer`. ## How was this patch tested? Existing tests. Author: Liang-Chi Hsieh Closes #13496 from viirya/move-analyzer-stuff. (cherry picked from commit 0ec279ffdf92853965e327a9f0f6956cacb7a23e) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55a83724 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55a83724 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55a83724 Branch: refs/heads/branch-2.0 Commit: 55a83724632aa54e49aedbab8ddd21d010eca26d Parents: 47c2a26 Author: Liang-Chi Hsieh Authored: Fri Jun 10 11:05:04 2016 -0700 Committer: Cheng Lian Committed: Fri Jun 10 11:05:14 2016 -0700 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 17 ++--- .../org/apache/spark/sql/DataFrameWriter.scala | 12 +--- .../spark/sql/hive/execution/HiveQuerySuite.scala | 4 ++-- 3 files changed, 17 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/55a83724/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 4446140..a081357 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -452,6 +452,17 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => +// A partitioned relation's schema can be different from the input logicalPlan, since +// partition columns are all moved after data columns. We Project to adjust the ordering. +val input = if (parts.nonEmpty) { + val (inputPartCols, inputDataCols) = child.output.partition { attr => +parts.contains(attr.name) + } + Project(inputDataCols ++ inputPartCols, child) +} else { + child +} + val table = lookupTableFromCatalog(u) // adding the table's partitions or validate the query's partition info table match { @@ -467,8 +478,8 @@ class Analyzer( |Requested partitions: ${parts.keys.mkString(",")} |Table partitions: ${tablePartitionNames.mkString(",")}""".stripMargin) } - // Assume partition columns are correctly placed at the end of the child's output - i.copy(table = EliminateSubqueryAliases(table)) + // Partition columns are already correctly placed at the end of the child's output + i.copy(table = EliminateSubqueryAliases(table), child = input) } else { // Set up the table's partition scheme with all dynamic partitions by moving partition // columns to the end of the column list, in partition order. @@ -486,7 +497,7 @@ class Analyzer( child = Project(columns ++ partColumns, child)) } case _ => -i.copy(table = EliminateSubqueryAliases(table)) +i.copy(table = EliminateSubqueryAliases(table), child = input) } case u: UnresolvedRelation => val table = u.tableIdentifier http://git-wip-us.apache.org/repos/asf/spark/blob/55a83724/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 32e2fdc..6ce59e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -505,21 +505,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap) val overwrite = mode == SaveMode.Overwrite -// A partitioned relation's schema can be different from the input logicalPlan, since -// partition columns are all moved after data columns. We Project to adjust the ordering. -// TODO: this belongs to the analyzer. -val input = normalizedParCols.map {
spark git commit: [SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter
Repository: spark Updated Branches: refs/heads/master abdb5d42c -> 0ec279ffd [SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter ## What changes were proposed in this pull request? This patch moves some codes in `DataFrameWriter.insertInto` that belongs to `Analyzer`. ## How was this patch tested? Existing tests. Author: Liang-Chi Hsieh Closes #13496 from viirya/move-analyzer-stuff. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ec279ff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ec279ff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ec279ff Branch: refs/heads/master Commit: 0ec279ffdf92853965e327a9f0f6956cacb7a23e Parents: abdb5d4 Author: Liang-Chi Hsieh Authored: Fri Jun 10 11:05:04 2016 -0700 Committer: Cheng Lian Committed: Fri Jun 10 11:05:04 2016 -0700 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 17 ++--- .../org/apache/spark/sql/DataFrameWriter.scala | 12 +--- .../spark/sql/hive/execution/HiveQuerySuite.scala | 4 ++-- 3 files changed, 17 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0ec279ff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d1ca99f..58f3904 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -452,6 +452,17 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => +// A partitioned relation's schema can be different from the input logicalPlan, since +// partition columns are all moved after data columns. We Project to adjust the ordering. +val input = if (parts.nonEmpty) { + val (inputPartCols, inputDataCols) = child.output.partition { attr => +parts.contains(attr.name) + } + Project(inputDataCols ++ inputPartCols, child) +} else { + child +} + val table = lookupTableFromCatalog(u) // adding the table's partitions or validate the query's partition info table match { @@ -467,8 +478,8 @@ class Analyzer( |Requested partitions: ${parts.keys.mkString(",")} |Table partitions: ${tablePartitionNames.mkString(",")}""".stripMargin) } - // Assume partition columns are correctly placed at the end of the child's output - i.copy(table = EliminateSubqueryAliases(table)) + // Partition columns are already correctly placed at the end of the child's output + i.copy(table = EliminateSubqueryAliases(table), child = input) } else { // Set up the table's partition scheme with all dynamic partitions by moving partition // columns to the end of the column list, in partition order. @@ -486,7 +497,7 @@ class Analyzer( child = Project(columns ++ partColumns, child)) } case _ => -i.copy(table = EliminateSubqueryAliases(table)) +i.copy(table = EliminateSubqueryAliases(table), child = input) } case u: UnresolvedRelation => val table = u.tableIdentifier http://git-wip-us.apache.org/repos/asf/spark/blob/0ec279ff/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 32e2fdc..6ce59e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -505,21 +505,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap) val overwrite = mode == SaveMode.Overwrite -// A partitioned relation's schema can be different from the input logicalPlan, since -// partition columns are all moved after data columns. We Project to adjust the ordering. -// TODO: this belongs to the analyzer. -val input = normalizedParCols.map { parCols => - val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { attr => -