cloud-fan commented on a change in pull request #23702: [SPARK-26785][SQL] data 
source v2 API refactor: streaming write
URL: https://github.com/apache/spark/pull/23702#discussion_r258113277
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ##########
 @@ -513,13 +514,16 @@ class MicroBatchExecution(
 
     val triggerLogicalPlan = sink match {
       case _: Sink => newAttributePlan
-      case s: StreamingWriteSupportProvider =>
-        val writer = s.createStreamingWriteSupport(
-          s"$runId",
-          newAttributePlan.schema,
-          outputMode,
-          new DataSourceOptions(extraOptions.asJava))
-        WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), 
newAttributePlan)
+      case s: SupportsStreamingWrite =>
+        // TODO: we should translate OutputMode to concrete write actions like 
truncate, but
+        // the truncate action is being developed in SPARK-26666.
 
 Review comment:
   I spent more time looking into the streaming output mode. The UPDATE mode is 
by design and is useful when the watermark is not specified, i.e. users don't 
want to drop any record no matter how late the record is.
   
   The output mode is applied to the entire streaming query, so all the 
stateful operators and the sinks need to support it.
   
   The `StateStoreSaveExec` support UPDATE mode with a well-defined semantic: 
it saves all the input records to state store and output them to downstream 
operators, so the downstream aggregate/join can produce results earlier. e.g. 
the first epoch outputs
   ```
   |     date     | count |
   |   2018-1-1   |   1   |
   |   2018-1-2   |   2   |
   ```
   and the next eppch outputs
   ```
   |     date     | count |
   |   2018-1-2   |   1   |
   |   2018-1-3   |   3   |
   ```
   
   However, if there are 2 aggregates, the second aggregate can't support 
UPDATE mode and deal with the early results from the first aggregate. 
Fortunately we disallow more than one streaming aggregate in a query, so there 
is no problem.
   
   The other problem is, all the existing sinks do not support UPDATE mode. 
They either ignore output mode completely, or treat UPDATE same as APPEND. I 
think they should fail if output mode is UPDATE, as they don't really support 
it.
   
   Overall, I think UPDATE mode is useful but is hard to implement. We need a 
mechanism to propagate the "update key" to down stream sateful operators and 
the sink. That's why I think no source can support UPDATE mode unless we 
improve the streaming framework to progate the "update key".
   
   Is it possible to remove UPDATE mode completely and re-add it when we have a 
detailed design? cc @marmbrus @tdas @jose-torres 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to