HyukjinKwon commented on a change in pull request #23606: [SPARK-26666][SQL] 
Support DSv2 overwrite and dynamic partition overwrite.
URL: https://github.com/apache/spark/pull/23606#discussion_r257676139
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
 ##########
 @@ -264,29 +265,38 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
       val dsOptions = new DataSourceOptions(options.asJava)
       provider.getTable(dsOptions) match {
         case table: SupportsBatchWrite =>
-          if (mode == SaveMode.Append) {
-            val relation = DataSourceV2Relation.create(table, options)
-            runCommand(df.sparkSession, "save") {
-              AppendData.byName(relation, df.logicalPlan)
-            }
-          } else {
-            val writeBuilder = table.newWriteBuilder(dsOptions)
-              .withQueryId(UUID.randomUUID().toString)
-              .withInputDataSchema(df.logicalPlan.schema)
-            writeBuilder match {
-              case s: SupportsSaveMode =>
-                val write = s.mode(mode).buildForBatch()
-                // It can only return null with `SupportsSaveMode`. We can 
clean it up after
-                // removing `SupportsSaveMode`.
-                if (write != null) {
-                  runCommand(df.sparkSession, "save") {
-                    WriteToDataSourceV2(write, df.logicalPlan)
+          lazy val relation = DataSourceV2Relation.create(table, options)
+          mode match {
+            case SaveMode.Append =>
+              runCommand(df.sparkSession, "save") {
+                AppendData.byName(relation, df.logicalPlan)
+              }
+
+            case SaveMode.Overwrite =>
+              // truncate the table
+              runCommand(df.sparkSession, "save") {
+                OverwriteByExpression.byName(relation, df.logicalPlan, 
Literal(true))
 
 Review comment:
   Two tests are being failed by this 
(https://github.com/apache/spark/commit/4dce45a5992e6a89a26b5a0739b33cfeaf979208
 and this were independently merged at similar time range so the tests didn't 
catch, which is fine - it happens time to time) :
   
   ```
   org.apache.spark.sql.FileBasedDataSourceSuite.SPARK-24204 error handling for 
unsupported Interval data types - csv, json, parquet, orc
   org.apache.spark.sql.FileBasedDataSourceSuite.SPARK-24204 error handling for 
unsupported Null data types - csv, parquet, orc
   ```
   
   ```
   ScalaTestFailureLocation: org.apache.spark.sql.FileBasedDataSourceSuite at 
(FileBasedDataSourceSuite.scala:349)
   org.scalatest.exceptions.TestFailedException: Expected exception 
org.apache.spark.sql.AnalysisException to be thrown, but 
org.apache.spark.SparkException was thrown
        at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:528)
        at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:527)
        at 
org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
        at org.scalatest.Assertions.intercept(Assertions.scala:812)
        at org.scalatest.Assertions.intercept$(Assertions.scala:802)
        at org.scalatest.FunSuite.intercept(FunSuite.scala:1560)
        at 
org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$69(FileBasedDataSourceSuite.scala:349)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$68(FileBasedDataSourceSuite.scala:348)
        at 
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:47)
        at 
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:31)
        at 
org.apache.spark.sql.FileBasedDataSourceSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(FileBasedDataSourceSuite.scala:37)
        at 
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:230)
        at 
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:228)
        at 
org.apache.spark.sql.FileBasedDataSourceSuite.withSQLConf(FileBasedDataSourceSuite.scala:37)
        at 
org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$67(FileBasedDataSourceSuite.scala:346)
        at 
org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$67$adapted(FileBasedDataSourceSuite.scala:332)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$66(FileBasedDataSourceSuite.scala:332)
        at 
org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$66$adapted(FileBasedDataSourceSuite.scala:330)
        at 
org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1(SQLTestUtils.scala:75)
        at 
org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1$adapted(SQLTestUtils.scala:74)
        at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:116)
        at 
org.apache.spark.sql.FileBasedDataSourceSuite.org$apache$spark$sql$test$SQLTestUtils$$super$withTempDir(FileBasedDataSourceSuite.scala:37)
        at 
org.apache.spark.sql.test.SQLTestUtils.withTempDir(SQLTestUtils.scala:74)
        at 
org.apache.spark.sql.test.SQLTestUtils.withTempDir$(SQLTestUtils.scala:73)
        at 
org.apache.spark.sql.FileBasedDataSourceSuite.withTempDir(FileBasedDataSourceSuite.scala:37)
        at 
org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$65(FileBasedDataSourceSuite.scala:330)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
        at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:104)
        at 
org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
        at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
        at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
        at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
        at 
org.apache.spark.sql.FileBasedDataSourceSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(FileBasedDataSourceSuite.scala:37)
        at 
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
        at 
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
        at 
org.apache.spark.sql.FileBasedDataSourceSuite.runTest(FileBasedDataSourceSuite.scala:37)
        at 
org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
        at 
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
        at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
        at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
        at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
        at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
        at org.scalatest.Suite.run(Suite.scala:1147)
        at org.scalatest.Suite.run$(Suite.scala:1129)
        at 
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
        at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
        at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
        at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
        at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:53)
        at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
        at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
        at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
        at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:53)
        at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
        at 
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1340)
        at 
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1334)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
        at 
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
        at 
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
        at 
org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
        at 
org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
        at org.scalatest.tools.Runner$.run(Runner.scala:850)
        at org.scalatest.tools.Runner.run(Runner.scala)
        at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
        at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
   Caused by: org.apache.spark.SparkException: Could not read footer for file: 
file:/private/var/folders/71/484zt4z10ks1vydt03bhp6hr0000gp/T/spark-77cc900f-2c11-4961-a704-8f013e7390b6/files/part-00001-7a740eab-e3b0-4354-83ba-2e3c0423aeb2-c000.csv
        at 
org.apache.spark.sql.execution.datasources.orc.OrcUtils$.readSchema(OrcUtils.scala:75)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcUtils$.$anonfun$readSchema$2(OrcUtils.scala:85)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at 
scala.collection.TraversableOnce.collectFirst(TraversableOnce.scala:148)
        at 
scala.collection.TraversableOnce.collectFirst$(TraversableOnce.scala:135)
        at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1429)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcUtils$.readSchema(OrcUtils.scala:85)
        at 
org.apache.spark.sql.execution.datasources.v2.orc.OrcTable.inferSchema(OrcTable.scala:38)
        at 
org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$1(FileTable.scala:45)
        at scala.Option.orElse(Option.scala:306)
        at 
org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:45)
        at 
org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:44)
        at 
org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:53)
        at 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:100)
        at 
org.apache.spark.sql.DataFrameWriter.relation$lzycompute$1(DataFrameWriter.scala:268)
        at 
org.apache.spark.sql.DataFrameWriter.relation$1(DataFrameWriter.scala:268)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:278)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
        at 
org.apache.spark.sql.FileBasedDataSourceSuite.$anonfun$new$70(FileBasedDataSourceSuite.scala:350)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.scalatest.Assertions.intercept(Assertions.scala:805)
        ... 75 more
   Caused by: org.apache.orc.FileFormatException: Not a valid ORC file 
file:/private/var/folders/71/484zt4z10ks1vydt03bhp6hr0000gp/T/spark-77cc900f-2c11-4961-a704-8f013e7390b6/files/part-00001-7a740eab-e3b0-4354-83ba-2e3c0423aeb2-c000.csv
 (maxFileLength= 9223372036854775807)
        at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:546)
        at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:370)
        at org.apache.orc.OrcFile.createReader(OrcFile.java:342)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcUtils$.readSchema(OrcUtils.scala:62)
        ... 95 more
   
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to