Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22547#discussion_r226363020
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -154,21 +159,25 @@ class StreamSuite extends StreamTest {
}
test("SPARK-20432: union one stream with itself") {
- val df =
spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
- val unioned = df.union(df)
- withTempDir { outputDir =>
- withTempDir { checkpointDir =>
- val query =
- unioned
- .writeStream.format("parquet")
- .option("checkpointLocation", checkpointDir.getAbsolutePath)
- .start(outputDir.getAbsolutePath)
- try {
- query.processAllAvailable()
- val outputDf =
spark.read.parquet(outputDir.getAbsolutePath).as[Long]
- checkDatasetUnorderly[Long](outputDf, (0L to 10L).union((0L to
10L)).toArray: _*)
- } finally {
- query.stop()
+ val v1Source =
spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
+ val v2Source =
spark.readStream.format(classOf[FakeFormat].getName).load().select("a")
+
+ Seq(v1Source, v2Source).foreach { df =>
--- End diff --
improve this test to make sure v2 also works.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]