Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20243#discussion_r161321448
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
---
@@ -280,14 +280,12 @@ final class DataStreamWriter[T] private[sql](ds:
Dataset[T]) {
useTempCheckpointLocation = true,
trigger = trigger)
} else {
- val sink = trigger match {
- case _: ContinuousTrigger =>
- val ds = DataSource.lookupDataSource(source,
df.sparkSession.sessionState.conf)
- ds.newInstance() match {
- case w: ContinuousWriteSupport => w
- case _ => throw new AnalysisException(
- s"Data source $source does not support continuous writing")
- }
+ val ds = DataSource.lookupDataSource(source,
df.sparkSession.sessionState.conf)
--- End diff --
We are checking for the same conditions here as well as in the
StreamingQueryManager.createQuery. I think we need to refactor this, probably
sometime in the future once we get rid of v1 completely.
Either way, we should immediately add a general test suite (say
StreamingDataSourceV2Suite) that tests these cases with various fake data
sources.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]