Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/20243#discussion_r162133993
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
---
@@ -280,14 +280,12 @@ final class DataStreamWriter[T] private[sql](ds:
Dataset[T]) {
useTempCheckpointLocation = true,
trigger = trigger)
} else {
- val sink = trigger match {
- case _: ContinuousTrigger =>
- val ds = DataSource.lookupDataSource(source,
df.sparkSession.sessionState.conf)
- ds.newInstance() match {
- case w: ContinuousWriteSupport => w
- case _ => throw new AnalysisException(
- s"Data source $source does not support continuous writing")
- }
+ val ds = DataSource.lookupDataSource(source,
df.sparkSession.sessionState.conf)
+ val sink = (ds.newInstance(), trigger) match {
+ case (w: ContinuousWriteSupport, _: ContinuousTrigger) => w
+ case (_, _: ContinuousTrigger) => throw new
UnsupportedOperationException(
+ s"Data source $source does not support continuous writing")
+ case (w: MicroBatchWriteSupport, _) => w
--- End diff --
In that case, we have to just fall back to the V1 path, because V1 sinks
don't have MicroBatchWriteSupport.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]