This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b8b6d17ad8e4 [SPARK-48004][SQL] Add WriteFilesExecBase trait for v1 
write
b8b6d17ad8e4 is described below

commit b8b6d17ad8e472307fb4c03ca388efcc4ac7059e
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Fri Apr 26 18:32:18 2024 +0800

    [SPARK-48004][SQL] Add WriteFilesExecBase trait for v1 write
    
    ### What changes were proposed in this pull request?
    
    This pr adds a new trait `WriteFilesExecBase` for v1 write, so that the 
downstream project can inherit `WriteFilesExecBase` rather than 
`WriteFilesExec`. The reason is that, inherit a `case class` is a bad practice 
in scala world.
    
    ### Why are the changes needed?
    
    Make downstream project easy to develop.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    Pass CI
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #46240 from ulysses-you/WriteFilesExecBase.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../spark/sql/execution/datasources/V1Writes.scala       |  4 ++--
 .../spark/sql/execution/datasources/WriteFiles.scala     | 16 +++++++++-------
 .../apache/spark/sql/SparkSessionExtensionSuite.scala    | 13 ++++++-------
 .../sql/execution/datasources/V1WriteCommandSuite.scala  |  8 ++++----
 4 files changed, 21 insertions(+), 20 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
index d7a8d7aec0b7..1d6c2a6f8112 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -213,9 +213,9 @@ object V1WritesUtils {
     }
   }
 
-  def getWriteFilesOpt(child: SparkPlan): Option[WriteFilesExec] = {
+  def getWriteFilesOpt(child: SparkPlan): Option[WriteFilesExecBase] = {
     child.collectFirst {
-      case w: WriteFilesExec => w
+      case w: WriteFilesExecBase => w
     }
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala
index a4fd57e7dffa..c6c34b7fcea3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala
@@ -58,6 +58,14 @@ case class WriteFiles(
     copy(child = newChild)
 }
 
+trait WriteFilesExecBase extends UnaryExecNode {
+  override def output: Seq[Attribute] = Seq.empty
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    throw SparkException.internalError(s"$nodeName does not support doExecute")
+  }
+}
+
 /**
  * Responsible for writing files.
  */
@@ -67,9 +75,7 @@ case class WriteFilesExec(
     partitionColumns: Seq[Attribute],
     bucketSpec: Option[BucketSpec],
     options: Map[String, String],
-    staticPartitions: TablePartitionSpec) extends UnaryExecNode {
-  override def output: Seq[Attribute] = Seq.empty
-
+    staticPartitions: TablePartitionSpec) extends WriteFilesExecBase {
   override protected def doExecuteWrite(
       writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {
     val rdd = child.execute()
@@ -105,10 +111,6 @@ case class WriteFilesExec(
     }
   }
 
-  override protected def doExecute(): RDD[InternalRow] = {
-    throw SparkException.internalError(s"$nodeName does not support doExecute")
-  }
-
   override protected def stringArgs: Iterator[Any] = Iterator(child)
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
WriteFilesExec =
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 1c44d0c3b4ea..4d38e360f438 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -40,7 +40,7 @@ import 
org.apache.spark.sql.connector.write.WriterCommitMessage
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AdaptiveSparkPlanHelper, AQEShuffleReadExec, QueryStageExec, 
ShuffleQueryStageExec}
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
-import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec, 
WriteFilesSpec}
+import org.apache.spark.sql.execution.datasources.{FileFormat, WriteFilesExec, 
WriteFilesExecBase, WriteFilesSpec}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
BroadcastExchangeLike, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin}
 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
 import org.apache.spark.sql.internal.SQLConf
@@ -842,14 +842,13 @@ class ColumnarProjectExec(projectList: 
Seq[NamedExpression], child: SparkPlan)
     new ColumnarProjectExec(projectList, newChild)
 }
 
-class ColumnarWriteExec(
+case class ColumnarWriteExec(
     child: SparkPlan,
     fileFormat: FileFormat,
     partitionColumns: Seq[Attribute],
     bucketSpec: Option[BucketSpec],
     options: Map[String, String],
-    staticPartitions: TablePartitionSpec) extends WriteFilesExec(
-  child, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) {
+    staticPartitions: TablePartitionSpec) extends WriteFilesExecBase {
 
   override def supportsColumnar: Boolean = true
 
@@ -858,8 +857,8 @@ class ColumnarWriteExec(
     throw new Exception("columnar write")
   }
 
-  override protected def withNewChildInternal(newChild: SparkPlan): 
WriteFilesExec =
-    new ColumnarWriteExec(
+  override protected def withNewChildInternal(newChild: SparkPlan): 
ColumnarWriteExec =
+    ColumnarWriteExec(
       newChild, fileFormat, partitionColumns, bucketSpec, options, 
staticPartitions)
 }
 
@@ -971,7 +970,7 @@ case class PreRuleReplaceAddWithBrokenVersion() extends 
Rule[SparkPlan] {
             replaceWithColumnarExpression(exp).asInstanceOf[NamedExpression]),
             replaceWithColumnarPlan(plan.child))
         case write: WriteFilesExec =>
-          new ColumnarWriteExec(
+          ColumnarWriteExec(
             replaceWithColumnarPlan(write.child),
             write.fileFormat,
             write.partitionColumns,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
index ce43edb79c12..04a7b4834f4b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
@@ -214,8 +214,8 @@ class V1WriteCommandSuite extends QueryTest with 
SharedSparkSession with V1Write
           val executedPlan = FileFormatWriter.executedPlan.get
 
           val plan = if (enabled) {
-            assert(executedPlan.isInstanceOf[WriteFilesExec])
-            executedPlan.asInstanceOf[WriteFilesExec].child
+            assert(executedPlan.isInstanceOf[WriteFilesExecBase])
+            executedPlan.asInstanceOf[WriteFilesExecBase].child
           } else {
             executedPlan.transformDown {
               case a: AdaptiveSparkPlanExec => a.executedPlan
@@ -261,8 +261,8 @@ class V1WriteCommandSuite extends QueryTest with 
SharedSparkSession with V1Write
         val executedPlan = FileFormatWriter.executedPlan.get
 
         val plan = if (enabled) {
-          assert(executedPlan.isInstanceOf[WriteFilesExec])
-          executedPlan.asInstanceOf[WriteFilesExec].child
+          assert(executedPlan.isInstanceOf[WriteFilesExecBase])
+          executedPlan.asInstanceOf[WriteFilesExecBase].child
         } else {
           executedPlan.transformDown {
             case a: AdaptiveSparkPlanExec => a.executedPlan


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to