spark git commit: [SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter

2016-06-10 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 47c2a265f -> 55a837246


[SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter

## What changes were proposed in this pull request?

This patch moves some codes in `DataFrameWriter.insertInto` that belongs to 
`Analyzer`.

## How was this patch tested?
Existing tests.

Author: Liang-Chi Hsieh 

Closes #13496 from viirya/move-analyzer-stuff.

(cherry picked from commit 0ec279ffdf92853965e327a9f0f6956cacb7a23e)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55a83724
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55a83724
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55a83724

Branch: refs/heads/branch-2.0
Commit: 55a83724632aa54e49aedbab8ddd21d010eca26d
Parents: 47c2a26
Author: Liang-Chi Hsieh 
Authored: Fri Jun 10 11:05:04 2016 -0700
Committer: Cheng Lian 
Committed: Fri Jun 10 11:05:14 2016 -0700

--
 .../spark/sql/catalyst/analysis/Analyzer.scala | 17 ++---
 .../org/apache/spark/sql/DataFrameWriter.scala | 12 +---
 .../spark/sql/hive/execution/HiveQuerySuite.scala  |  4 ++--
 3 files changed, 17 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/55a83724/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 4446140..a081357 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -452,6 +452,17 @@ class Analyzer(
 
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
   case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if 
child.resolved =>
+// A partitioned relation's schema can be different from the input 
logicalPlan, since
+// partition columns are all moved after data columns. We Project to 
adjust the ordering.
+val input = if (parts.nonEmpty) {
+  val (inputPartCols, inputDataCols) = child.output.partition { attr =>
+parts.contains(attr.name)
+  }
+  Project(inputDataCols ++ inputPartCols, child)
+} else {
+  child
+}
+
 val table = lookupTableFromCatalog(u)
 // adding the table's partitions or validate the query's partition info
 table match {
@@ -467,8 +478,8 @@ class Analyzer(
  |Requested partitions: ${parts.keys.mkString(",")}
  |Table partitions: 
${tablePartitionNames.mkString(",")}""".stripMargin)
   }
-  // Assume partition columns are correctly placed at the end of 
the child's output
-  i.copy(table = EliminateSubqueryAliases(table))
+  // Partition columns are already correctly placed at the end of 
the child's output
+  i.copy(table = EliminateSubqueryAliases(table), child = input)
 } else {
   // Set up the table's partition scheme with all dynamic 
partitions by moving partition
   // columns to the end of the column list, in partition order.
@@ -486,7 +497,7 @@ class Analyzer(
 child = Project(columns ++ partColumns, child))
 }
   case _ =>
-i.copy(table = EliminateSubqueryAliases(table))
+i.copy(table = EliminateSubqueryAliases(table), child = input)
 }
   case u: UnresolvedRelation =>
 val table = u.tableIdentifier

http://git-wip-us.apache.org/repos/asf/spark/blob/55a83724/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 32e2fdc..6ce59e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -505,21 +505,11 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 val partitions = normalizedParCols.map(_.map(col => col -> (None: 
Option[String])).toMap)
 val overwrite = mode == SaveMode.Overwrite
 
-// A partitioned relation's schema can be different from the input 
logicalPlan, since
-// partition columns are all moved after data columns. We Project to 
adjust the ordering.
-// TODO: this belongs to the analyzer.
-val input = normalizedParCols.map {

spark git commit: [SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter

2016-06-10 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master abdb5d42c -> 0ec279ffd


[SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter

## What changes were proposed in this pull request?

This patch moves some codes in `DataFrameWriter.insertInto` that belongs to 
`Analyzer`.

## How was this patch tested?
Existing tests.

Author: Liang-Chi Hsieh 

Closes #13496 from viirya/move-analyzer-stuff.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ec279ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ec279ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ec279ff

Branch: refs/heads/master
Commit: 0ec279ffdf92853965e327a9f0f6956cacb7a23e
Parents: abdb5d4
Author: Liang-Chi Hsieh 
Authored: Fri Jun 10 11:05:04 2016 -0700
Committer: Cheng Lian 
Committed: Fri Jun 10 11:05:04 2016 -0700

--
 .../spark/sql/catalyst/analysis/Analyzer.scala | 17 ++---
 .../org/apache/spark/sql/DataFrameWriter.scala | 12 +---
 .../spark/sql/hive/execution/HiveQuerySuite.scala  |  4 ++--
 3 files changed, 17 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0ec279ff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d1ca99f..58f3904 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -452,6 +452,17 @@ class Analyzer(
 
 def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
   case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if 
child.resolved =>
+// A partitioned relation's schema can be different from the input 
logicalPlan, since
+// partition columns are all moved after data columns. We Project to 
adjust the ordering.
+val input = if (parts.nonEmpty) {
+  val (inputPartCols, inputDataCols) = child.output.partition { attr =>
+parts.contains(attr.name)
+  }
+  Project(inputDataCols ++ inputPartCols, child)
+} else {
+  child
+}
+
 val table = lookupTableFromCatalog(u)
 // adding the table's partitions or validate the query's partition info
 table match {
@@ -467,8 +478,8 @@ class Analyzer(
  |Requested partitions: ${parts.keys.mkString(",")}
  |Table partitions: 
${tablePartitionNames.mkString(",")}""".stripMargin)
   }
-  // Assume partition columns are correctly placed at the end of 
the child's output
-  i.copy(table = EliminateSubqueryAliases(table))
+  // Partition columns are already correctly placed at the end of 
the child's output
+  i.copy(table = EliminateSubqueryAliases(table), child = input)
 } else {
   // Set up the table's partition scheme with all dynamic 
partitions by moving partition
   // columns to the end of the column list, in partition order.
@@ -486,7 +497,7 @@ class Analyzer(
 child = Project(columns ++ partColumns, child))
 }
   case _ =>
-i.copy(table = EliminateSubqueryAliases(table))
+i.copy(table = EliminateSubqueryAliases(table), child = input)
 }
   case u: UnresolvedRelation =>
 val table = u.tableIdentifier

http://git-wip-us.apache.org/repos/asf/spark/blob/0ec279ff/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 32e2fdc..6ce59e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -505,21 +505,11 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 val partitions = normalizedParCols.map(_.map(col => col -> (None: 
Option[String])).toMap)
 val overwrite = mode == SaveMode.Overwrite
 
-// A partitioned relation's schema can be different from the input 
logicalPlan, since
-// partition columns are all moved after data columns. We Project to 
adjust the ordering.
-// TODO: this belongs to the analyzer.
-val input = normalizedParCols.map { parCols =>
-  val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { 
attr =>
-