cloud-fan commented on a change in pull request #23859: [SPARK-26956][SQL]
remove streaming output mode from data source v2 APIs
URL: https://github.com/apache/spark/pull/23859#discussion_r259003876
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
##########
@@ -574,6 +579,35 @@ abstract class StreamExecution(
Option(name).map(_ + "<br/>").getOrElse("") +
s"id = $id<br/>runId = $runId<br/>batch = $batchDescription"
}
+
+ protected def createStreamingWrite(
+ table: SupportsStreamingWrite,
+ options: Map[String, String],
+ inputPlan: LogicalPlan): StreamingWrite = {
+ val writeBuilder = table.newWriteBuilder(new
DataSourceOptions(options.asJava))
+ .withQueryId(runId.toString)
+ .withInputDataSchema(inputPlan.schema)
+ outputMode match {
+ case Append =>
+ writeBuilder.buildForStreaming()
+
+ case Complete =>
+ // TODO: we should do this check earlier when we have capability API.
+ require(writeBuilder.isInstanceOf[SupportsTruncate],
+ table.name + " does not support Complete mode.")
+
writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming()
+
+ case Update =>
+ // Although no v2 sinks really support Update mode now, but during
tests we do want them
+ // to pretend to support Update mode, and treat Update mode same as
Append mode.
Review comment:
This is very useful when testing with aggregate/join. We don't want to
complicate the test cases using watermarks, and we can't use complete mode as
some sinks don't support it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]