spark git commit: [SPARK-16036][SPARK-16037][SPARK-16034][SQL] Follow up code clean up and improvement
Repository: spark Updated Branches: refs/heads/branch-2.0 d11f533de -> 19397caab [SPARK-16036][SPARK-16037][SPARK-16034][SQL] Follow up code clean up and improvement ## What changes were proposed in this pull request? This PR is the follow-up PR for https://github.com/apache/spark/pull/13754/files and https://github.com/apache/spark/pull/13749. I will comment inline to explain my changes. ## How was this patch tested? Existing tests. Author: Yin Huai Closes #13766 from yhuai/caseSensitivity. (cherry picked from commit 6d0f921aedfdd3b7e8472b6776d0c7d8299190bd) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/19397caa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/19397caa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/19397caa Branch: refs/heads/branch-2.0 Commit: 19397caab62b550269961a123bd11a34afc3a09b Parents: d11f533 Author: Yin Huai Authored: Sun Jun 19 21:45:53 2016 -0700 Committer: Yin Huai Committed: Sun Jun 19 21:46:14 2016 -0700 -- .../plans/logical/basicLogicalOperators.scala | 2 ++ .../org/apache/spark/sql/DataFrameWriter.scala | 24 -- .../sql/execution/datasources/DataSource.scala | 9 ++--- .../spark/sql/execution/datasources/rules.scala | 13 ++-- .../spark/sql/internal/SessionState.scala | 2 +- .../spark/sql/execution/command/DDLSuite.scala | 20 +-- .../spark/sql/hive/HiveSessionState.scala | 2 +- .../sql/hive/InsertIntoHiveTableSuite.scala | 20 +++ .../sql/hive/execution/HiveQuerySuite.scala | 35 .../sql/hive/execution/SQLQuerySuite.scala | 32 -- 10 files changed, 98 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/19397caa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 69b8b05..ff3dcbc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -369,6 +369,8 @@ case class InsertIntoTable( if (table.output.isEmpty) { None } else { + // Note: The parser (visitPartitionSpec in AstBuilder) already turns + // keys in partition to their lowercase forms. val staticPartCols = partition.filter(_._2.isDefined).keySet Some(table.output.filterNot(a => staticPartCols.contains(a.name))) } http://git-wip-us.apache.org/repos/asf/spark/blob/19397caa/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e6fc974..ca3972d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -245,29 +245,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { if (partitioningColumns.isDefined) { throw new AnalysisException( "insertInto() can't be used together with partitionBy(). " + - "Partition columns are defined by the table into which is being inserted." + "Partition columns have already be defined for the table. " + + "It is not necessary to use partitionBy()." ) } -val partitions = normalizedParCols.map(_.map(col => col -> Option.empty[String]).toMap) -val overwrite = mode == SaveMode.Overwrite - -// A partitioned relation's schema can be different from the input logicalPlan, since -// partition columns are all moved after data columns. We Project to adjust the ordering. -// TODO: this belongs to the analyzer. -val input = normalizedParCols.map { parCols => - val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { attr => -parCols.contains(attr.name) - } - Project(inputDataCols ++ inputPartCols, df.logicalPlan) -}.getOrElse(df.logicalPlan) - df.sparkSession.sessionState.executePlan( InsertIntoTable( -UnresolvedRelation(tableIdent), -partitions.getOrElse(Map.empty[String, Option[String]]), -input, -overwrite, +table = UnresolvedRelation(tableIdent), +partition = Map.empty[String, Option[String]], +child = df.logicalP
spark git commit: [SPARK-16036][SPARK-16037][SPARK-16034][SQL] Follow up code clean up and improvement
Repository: spark Updated Branches: refs/heads/master 4f17fddcd -> 6d0f921ae [SPARK-16036][SPARK-16037][SPARK-16034][SQL] Follow up code clean up and improvement ## What changes were proposed in this pull request? This PR is the follow-up PR for https://github.com/apache/spark/pull/13754/files and https://github.com/apache/spark/pull/13749. I will comment inline to explain my changes. ## How was this patch tested? Existing tests. Author: Yin Huai Closes #13766 from yhuai/caseSensitivity. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6d0f921a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6d0f921a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6d0f921a Branch: refs/heads/master Commit: 6d0f921aedfdd3b7e8472b6776d0c7d8299190bd Parents: 4f17fdd Author: Yin Huai Authored: Sun Jun 19 21:45:53 2016 -0700 Committer: Yin Huai Committed: Sun Jun 19 21:45:53 2016 -0700 -- .../plans/logical/basicLogicalOperators.scala | 2 ++ .../org/apache/spark/sql/DataFrameWriter.scala | 24 -- .../sql/execution/datasources/DataSource.scala | 9 ++--- .../spark/sql/execution/datasources/rules.scala | 13 ++-- .../spark/sql/internal/SessionState.scala | 2 +- .../spark/sql/execution/command/DDLSuite.scala | 20 +-- .../spark/sql/hive/HiveSessionState.scala | 2 +- .../sql/hive/InsertIntoHiveTableSuite.scala | 20 +++ .../sql/hive/execution/HiveQuerySuite.scala | 35 .../sql/hive/execution/SQLQuerySuite.scala | 32 -- 10 files changed, 98 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6d0f921a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 69b8b05..ff3dcbc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -369,6 +369,8 @@ case class InsertIntoTable( if (table.output.isEmpty) { None } else { + // Note: The parser (visitPartitionSpec in AstBuilder) already turns + // keys in partition to their lowercase forms. val staticPartCols = partition.filter(_._2.isDefined).keySet Some(table.output.filterNot(a => staticPartCols.contains(a.name))) } http://git-wip-us.apache.org/repos/asf/spark/blob/6d0f921a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e6fc974..ca3972d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -245,29 +245,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { if (partitioningColumns.isDefined) { throw new AnalysisException( "insertInto() can't be used together with partitionBy(). " + - "Partition columns are defined by the table into which is being inserted." + "Partition columns have already be defined for the table. " + + "It is not necessary to use partitionBy()." ) } -val partitions = normalizedParCols.map(_.map(col => col -> Option.empty[String]).toMap) -val overwrite = mode == SaveMode.Overwrite - -// A partitioned relation's schema can be different from the input logicalPlan, since -// partition columns are all moved after data columns. We Project to adjust the ordering. -// TODO: this belongs to the analyzer. -val input = normalizedParCols.map { parCols => - val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { attr => -parCols.contains(attr.name) - } - Project(inputDataCols ++ inputPartCols, df.logicalPlan) -}.getOrElse(df.logicalPlan) - df.sparkSession.sessionState.executePlan( InsertIntoTable( -UnresolvedRelation(tableIdent), -partitions.getOrElse(Map.empty[String, Option[String]]), -input, -overwrite, +table = UnresolvedRelation(tableIdent), +partition = Map.empty[String, Option[String]], +child = df.logicalPlan, +overwrite = mode == SaveMode.Overwrite, ifNotExists = false)).toRdd } http