This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b34745  [SPARK-34049][SS] DataSource V2: Use Write abstraction in 
StreamExecution
6b34745 is described below

commit 6b34745cb9b294c91cd126c2ea44c039ee83cb84
Author: Anton Okolnychyi <aokolnyc...@apple.com>
AuthorDate: Fri Jan 8 20:37:35 2021 -0800

    [SPARK-34049][SS] DataSource V2: Use Write abstraction in StreamExecution
    
    ### What changes were proposed in this pull request?
    
    This PR makes `StreamExecution` use the `Write` abstraction introduced in 
SPARK-33779.
    
    Note: we will need separate plans for streaming writes in order to support 
the required distribution and ordering in SS. This change only migrates to the 
`Write` abstraction.
    
    ### Why are the changes needed?
    
    These changes prevent exceptions from data sources that implement only the 
`build` method in `WriteBuilder`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #31093 from aokolnychyi/spark-34049.
    
    Authored-by: Anton Okolnychyi <aokolnyc...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../scala/org/apache/spark/sql/connector/InMemoryTable.scala   | 10 ++++++----
 .../apache/spark/sql/execution/streaming/StreamExecution.scala |  9 +++++----
 2 files changed, 11 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index a1253df..2756185 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -274,11 +274,13 @@ class InMemoryTable(
         this
       }
 
-      override def buildForBatch(): BatchWrite = writer
+      override def build(): Write = new Write {
+        override def toBatch: BatchWrite = writer
 
-      override def buildForStreaming(): StreamingWrite = streamingWriter match 
{
-        case exc: StreamingNotSupportedOperation => exc.throwsException()
-        case s => s
+        override def toStreaming: StreamingWrite = streamingWriter match {
+          case exc: StreamingNotSupportedOperation => exc.throwsException()
+          case s => s
+        }
       }
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index c9f40fa..67803ad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -627,21 +627,22 @@ abstract class StreamExecution(
       inputPlan.schema,
       new CaseInsensitiveStringMap(options.asJava))
     val writeBuilder = table.newWriteBuilder(info)
-    outputMode match {
+    val write = outputMode match {
       case Append =>
-        writeBuilder.buildForStreaming()
+        writeBuilder.build()
 
       case Complete =>
         // TODO: we should do this check earlier when we have capability API.
         require(writeBuilder.isInstanceOf[SupportsTruncate],
           table.name + " does not support Complete mode.")
-        
writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming()
+        writeBuilder.asInstanceOf[SupportsTruncate].truncate().build()
 
       case Update =>
         require(writeBuilder.isInstanceOf[SupportsStreamingUpdateAsAppend],
           table.name + " does not support Update mode.")
-        
writeBuilder.asInstanceOf[SupportsStreamingUpdateAsAppend].buildForStreaming()
+        writeBuilder.asInstanceOf[SupportsStreamingUpdateAsAppend].build()
     }
+    write.toStreaming
   }
 
   protected def purge(threshold: Long): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to