This is an automated email from the ASF dual-hosted git repository.

tdas pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new df9a506  [SPARK-27453] Pass partitionBy as options in DataFrameWriter
df9a506 is described below

commit df9a50637e2622a15e9af7d837986a0e868878b1
Author: liwensun <liwen....@databricks.com>
AuthorDate: Tue Apr 16 15:03:16 2019 -0700

    [SPARK-27453] Pass partitionBy as options in DataFrameWriter
    
    Pass partitionBy columns as options and feature-flag this behavior.
    
    A new unit test.
    
    Closes #24365 from liwensun/partitionby.
    
    Authored-by: liwensun <liwen....@databricks.com>
    Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>
    (cherry picked from commit 26ed65f4150db1fa37f8bfab24ac0873d2e42936)
    Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala      |  9 +++++++++
 .../scala/org/apache/spark/sql/DataFrameWriter.scala | 11 ++++++++++-
 .../sql/execution/datasources/DataSourceUtils.scala  | 20 ++++++++++++++++++++
 .../spark/sql/test/DataFrameReaderWriterSuite.scala  | 19 +++++++++++++++++++
 4 files changed, 58 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 29bd356..c9ee60e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1550,6 +1550,15 @@ object SQLConf {
         "WHERE, which does not follow SQL standard.")
       .booleanConf
       .createWithDefault(false)
+
+  val LEGACY_PASS_PARTITION_BY_AS_OPTIONS =
+    buildConf("spark.sql.legacy.sources.write.passPartitionByAsOptions")
+      .internal()
+      .doc("Whether to pass the partitionBy columns as options in 
DataFrameWriter. " +
+        "Data source V1 now silently drops partitionBy columns for 
non-file-format sources; " +
+        "turning the flag on provides a way for these sources to see these 
partitionBy columns.")
+      .booleanConf
+      .createWithDefault(false)
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index a2586cc..f90d353 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -28,8 +28,9 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, 
DataSourceUtils, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2Utils, WriteToDataSourceV2}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.types.StructType
@@ -272,6 +273,14 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
   }
 
   private def saveToV1Source(): Unit = {
+    if (SparkSession.active.sessionState.conf.getConf(
+      SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS)) {
+      partitioningColumns.foreach { columns =>
+        extraOptions += (DataSourceUtils.PARTITIONING_COLUMNS_KEY ->
+          DataSourceUtils.encodePartitioningColumns(columns))
+      }
+    }
+
     // Code path for data source v1.
     runCommand(df.sparkSession, "save") {
       DataSource(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index 90cec5e..1cb69d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.execution.datasources
 
 import org.apache.hadoop.fs.Path
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.types._
@@ -40,6 +42,24 @@ object DataSourceUtils {
   }
 
   /**
+   * The key to use for storing partitionBy columns as options.
+   */
+  val PARTITIONING_COLUMNS_KEY = "__partition_columns"
+
+  /**
+   * Utility methods for converting partitionBy columns to options and back.
+   */
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  def encodePartitioningColumns(columns: Seq[String]): String = {
+    Serialization.write(columns)
+  }
+
+  def decodePartitioningColumns(str: String): Seq[String] = {
+    Serialization.read[Seq[String]](str)
+  }
+
+  /**
    * Verify if the schema is supported in datasource. This verification should 
be done
    * in a driver side.
    */
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 2378725..67cd0b9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -31,6 +31,7 @@ import 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -211,6 +212,24 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSQLContext with Be
     assert(LastOptions.parameters("opt3") == "3")
   }
 
+  test("pass partitionBy as options") {
+    Seq(true, false).foreach { flag =>
+      withSQLConf(SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS.key -> s"$flag") 
{
+        Seq(1).toDF.write
+          .format("org.apache.spark.sql.test")
+          .partitionBy("col1", "col2")
+          .save()
+
+        if (flag) {
+          val partColumns = 
LastOptions.parameters(DataSourceUtils.PARTITIONING_COLUMNS_KEY)
+          assert(DataSourceUtils.decodePartitioningColumns(partColumns) === 
Seq("col1", "col2"))
+        } else {
+          
assert(!LastOptions.parameters.contains(DataSourceUtils.PARTITIONING_COLUMNS_KEY))
+        }
+      }
+    }
+  }
+
   test("save mode") {
     val df = spark.read
       .format("org.apache.spark.sql.test")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to