This is an automated email from the ASF dual-hosted git repository. tdas pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new df9a506 [SPARK-27453] Pass partitionBy as options in DataFrameWriter df9a506 is described below commit df9a50637e2622a15e9af7d837986a0e868878b1 Author: liwensun <liwen....@databricks.com> AuthorDate: Tue Apr 16 15:03:16 2019 -0700 [SPARK-27453] Pass partitionBy as options in DataFrameWriter Pass partitionBy columns as options and feature-flag this behavior. A new unit test. Closes #24365 from liwensun/partitionby. Authored-by: liwensun <liwen....@databricks.com> Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> (cherry picked from commit 26ed65f4150db1fa37f8bfab24ac0873d2e42936) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 9 +++++++++ .../scala/org/apache/spark/sql/DataFrameWriter.scala | 11 ++++++++++- .../sql/execution/datasources/DataSourceUtils.scala | 20 ++++++++++++++++++++ .../spark/sql/test/DataFrameReaderWriterSuite.scala | 19 +++++++++++++++++++ 4 files changed, 58 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 29bd356..c9ee60e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1550,6 +1550,15 @@ object SQLConf { "WHERE, which does not follow SQL standard.") .booleanConf .createWithDefault(false) + + val LEGACY_PASS_PARTITION_BY_AS_OPTIONS = + buildConf("spark.sql.legacy.sources.write.passPartitionByAsOptions") + .internal() + .doc("Whether to pass the partitionBy columns as options in DataFrameWriter. " + + "Data source V1 now silently drops partitionBy columns for non-file-format sources; " + + "turning the flag on provides a way for these sources to see these partitionBy columns.") + .booleanConf + .createWithDefault(false) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index a2586cc..f90d353 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -28,8 +28,9 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, WriteToDataSourceV2} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.types.StructType @@ -272,6 +273,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } private def saveToV1Source(): Unit = { + if (SparkSession.active.sessionState.conf.getConf( + SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS)) { + partitioningColumns.foreach { columns => + extraOptions += (DataSourceUtils.PARTITIONING_COLUMNS_KEY -> + DataSourceUtils.encodePartitioningColumns(columns)) + } + } + // Code path for data source v1. runCommand(df.sparkSession, "save") { DataSource( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 90cec5e..1cb69d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.datasources import org.apache.hadoop.fs.Path +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.types._ @@ -40,6 +42,24 @@ object DataSourceUtils { } /** + * The key to use for storing partitionBy columns as options. + */ + val PARTITIONING_COLUMNS_KEY = "__partition_columns" + + /** + * Utility methods for converting partitionBy columns to options and back. + */ + private implicit val formats = Serialization.formats(NoTypeHints) + + def encodePartitioningColumns(columns: Seq[String]): String = { + Serialization.write(columns) + } + + def decodePartitioningColumns(str: String): Seq[String] = { + Serialization.read[Seq[String]](str) + } + + /** * Verify if the schema is supported in datasource. This verification should be done * in a driver side. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 2378725..67cd0b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -211,6 +212,24 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be assert(LastOptions.parameters("opt3") == "3") } + test("pass partitionBy as options") { + Seq(true, false).foreach { flag => + withSQLConf(SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS.key -> s"$flag") { + Seq(1).toDF.write + .format("org.apache.spark.sql.test") + .partitionBy("col1", "col2") + .save() + + if (flag) { + val partColumns = LastOptions.parameters(DataSourceUtils.PARTITIONING_COLUMNS_KEY) + assert(DataSourceUtils.decodePartitioningColumns(partColumns) === Seq("col1", "col2")) + } else { + assert(!LastOptions.parameters.contains(DataSourceUtils.PARTITIONING_COLUMNS_KEY)) + } + } + } + } + test("save mode") { val df = spark.read .format("org.apache.spark.sql.test") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org