Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/20097#discussion_r159985442
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
---
@@ -240,31 +240,35 @@ class StreamingQueryManager private[sql]
(sparkSession: SparkSession) extends Lo
"is not supported in streaming DataFrames/Datasets and will be
disabled.")
}
- sink match {
- case v1Sink: Sink =>
- new StreamingQueryWrapper(new MicroBatchExecution(
+ (sink, trigger) match {
+ case (v2Sink: ContinuousWriteSupport, trigger: ContinuousTrigger) =>
+ UnsupportedOperationChecker.checkForContinuous(analyzedPlan,
outputMode)
+ new StreamingQueryWrapper(new ContinuousExecution(
sparkSession,
userSpecifiedName.orNull,
checkpointLocation,
analyzedPlan,
- v1Sink,
+ v2Sink,
trigger,
triggerClock,
outputMode,
+ extraOptions,
deleteCheckpointOnStop))
- case v2Sink: ContinuousWriteSupport =>
- UnsupportedOperationChecker.checkForContinuous(analyzedPlan,
outputMode)
- new StreamingQueryWrapper(new ContinuousExecution(
+ case (_: MicroBatchWriteSupport, _) | (_: Sink, _) =>
--- End diff --
As discussed offline, we do throw that error in the MicroBatchExecution
constructor. Once all the pieces are in we'll need to refactor this a bit to
get all the checking in the same place.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]