Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22462#discussion_r219020125
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
---
@@ -204,6 +249,37 @@ class StreamingDataSourceV2Suite extends StreamTest {
}
}
+ Seq(
+ Tuple2(classOf[FakeReadMicroBatchOnly], Trigger.Once()),
+ Tuple2(classOf[FakeReadContinuousOnly], Trigger.Continuous(1000))
+ ).foreach { case (source, trigger) =>
+ test(s"SPARK-25460: session options are respected in structured
streaming sources - $source") {
+ // `keyPrefix` and `shortName` are the same in this test case
+ val readSource = source.newInstance().shortName()
+ val writeSource = "fake-write-microbatch-continuous"
+
+ val readOptionName = "optionA"
+ withSQLConf(s"spark.datasource.$readSource.$readOptionName" ->
"true") {
+ testPositiveCase(readSource, writeSource, trigger, shouldStop =
false)
--- End diff --
In order to make it throw an exception at
https://github.com/apache/spark/blob/0d6e40db85f1936ea8e7d7e0f46c9471c4c93762/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala#L81
and it calls actual `createStreamingWriteSupport` interface, which is not
called if that's directly stopped.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]