This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6b34745 [SPARK-34049][SS] DataSource V2: Use Write abstraction in StreamExecution 6b34745 is described below commit 6b34745cb9b294c91cd126c2ea44c039ee83cb84 Author: Anton Okolnychyi <aokolnyc...@apple.com> AuthorDate: Fri Jan 8 20:37:35 2021 -0800 [SPARK-34049][SS] DataSource V2: Use Write abstraction in StreamExecution ### What changes were proposed in this pull request? This PR makes `StreamExecution` use the `Write` abstraction introduced in SPARK-33779. Note: we will need separate plans for streaming writes in order to support the required distribution and ordering in SS. This change only migrates to the `Write` abstraction. ### Why are the changes needed? These changes prevent exceptions from data sources that implement only the `build` method in `WriteBuilder`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #31093 from aokolnychyi/spark-34049. Authored-by: Anton Okolnychyi <aokolnyc...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../scala/org/apache/spark/sql/connector/InMemoryTable.scala | 10 ++++++---- .../apache/spark/sql/execution/streaming/StreamExecution.scala | 9 +++++---- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala index a1253df..2756185 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala @@ -274,11 +274,13 @@ class InMemoryTable( this } - override def buildForBatch(): BatchWrite = writer + override def build(): Write = new Write { + override def toBatch: BatchWrite = writer - override def buildForStreaming(): StreamingWrite = streamingWriter match { - case exc: StreamingNotSupportedOperation => exc.throwsException() - case s => s + override def toStreaming: StreamingWrite = streamingWriter match { + case exc: StreamingNotSupportedOperation => exc.throwsException() + case s => s + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index c9f40fa..67803ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -627,21 +627,22 @@ abstract class StreamExecution( inputPlan.schema, new CaseInsensitiveStringMap(options.asJava)) val writeBuilder = table.newWriteBuilder(info) - outputMode match { + val write = outputMode match { case Append => - writeBuilder.buildForStreaming() + writeBuilder.build() case Complete => // TODO: we should do this check earlier when we have capability API. require(writeBuilder.isInstanceOf[SupportsTruncate], table.name + " does not support Complete mode.") - writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming() + writeBuilder.asInstanceOf[SupportsTruncate].truncate().build() case Update => require(writeBuilder.isInstanceOf[SupportsStreamingUpdateAsAppend], table.name + " does not support Update mode.") - writeBuilder.asInstanceOf[SupportsStreamingUpdateAsAppend].buildForStreaming() + writeBuilder.asInstanceOf[SupportsStreamingUpdateAsAppend].build() } + write.toStreaming } protected def purge(threshold: Long): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org