Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/20097#discussion_r159555174
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
---
@@ -240,31 +240,35 @@ class StreamingQueryManager private[sql]
(sparkSession: SparkSession) extends Lo
"is not supported in streaming DataFrames/Datasets and will be
disabled.")
}
- sink match {
- case v1Sink: Sink =>
- new StreamingQueryWrapper(new MicroBatchExecution(
+ (sink, trigger) match {
+ case (v2Sink: ContinuousWriteSupport, trigger: ContinuousTrigger) =>
+ UnsupportedOperationChecker.checkForContinuous(analyzedPlan,
outputMode)
+ new StreamingQueryWrapper(new ContinuousExecution(
sparkSession,
userSpecifiedName.orNull,
checkpointLocation,
analyzedPlan,
- v1Sink,
+ v2Sink,
trigger,
triggerClock,
outputMode,
+ extraOptions,
deleteCheckpointOnStop))
- case v2Sink: ContinuousWriteSupport =>
- UnsupportedOperationChecker.checkForContinuous(analyzedPlan,
outputMode)
- new StreamingQueryWrapper(new ContinuousExecution(
+ case (_: MicroBatchWriteSupport, _) | (_: Sink, _) =>
+ new StreamingQueryWrapper(new MicroBatchExecution(
sparkSession,
userSpecifiedName.orNull,
checkpointLocation,
analyzedPlan,
- v2Sink,
+ sink,
trigger,
triggerClock,
outputMode,
extraOptions,
deleteCheckpointOnStop))
+ case _ =>
+ throw new AnalysisException(
--- End diff --
I think is the only other option. MicroBatchWriteSupport and Sink will have
already matched with any trigger, ContinuousWriteSupport will have already
matched with a continuous trigger, and there aren't any other implementations
of BaseStreamingSink.
I agree it's cleaner to check explicitly.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]