Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19327#discussion_r142339309
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
---
@@ -470,3 +475,283 @@ class StreamingJoinSuite extends StreamTest with
StateStoreMetricsTest with Befo
}
}
}
+
+class StreamingOuterJoinSuite extends StreamTest with
StateStoreMetricsTest with BeforeAndAfter {
+
+ import testImplicits._
+ import org.apache.spark.sql.functions._
+
+ before {
+ SparkSession.setActiveSession(spark) // set this before force
initializing 'joinExec'
+ spark.streams.stateStoreCoordinator // initialize the lazy coordinator
+ }
+
+ after {
+ StateStore.stop()
+ }
+
+ private def setupStream(prefix: String, multiplier: Int):
(MemoryStream[Int], DataFrame) = {
+ val input = MemoryStream[Int]
+
+ val df = input.toDF
+ .select(
+ 'value as "key",
+ 'value.cast("timestamp") as s"${prefix}Time",
+ ('value * multiplier) as s"${prefix}Value")
+ .withWatermark(s"${prefix}Time", "10 seconds")
+
+ return (input, df)
+ }
+
+ private def setupWindowedJoin(joinType: String) = {
+ val (input1, df1) = setupStream("left", 2)
+ val (input2, df2) = setupStream("right", 3)
+
+ val windowed1 = df1.select('key, window('leftTime, "10 second"),
'leftValue)
+
+ val windowed2 = df2.select('key, window('rightTime, "10 second"),
'rightValue)
+
+ val joined = windowed1.join(windowed2, Seq("key", "window"), joinType)
+ .select('key, $"window.end".cast("long"), 'leftValue, 'rightValue)
+
+ (input1, input2, joined)
+ }
+
+ test("left stream batch outer join") {
+ val stream = MemoryStream[Int]
+ .toDF()
+ .withColumn("timestamp", 'value.cast("timestamp"))
+ .withWatermark("timestamp", "1 second")
+ val joined =
+ stream.join(Seq(1).toDF(), Seq("value"), "left_outer")
+
+ // This test is in the suite just to confirm the validations below
don't block this valid join.
+ // We don't need to check results, just that the join can happen.
+ testStream(joined)()
+ }
+
+ test("left batch stream outer join") {
+ val stream = MemoryStream[Int]
+ .toDF()
+ .withColumn("timestamp", 'value.cast("timestamp"))
+ .withWatermark("timestamp", "1 second")
+ val joined =
+ Seq(1).toDF().join(stream, Seq("value"), "left_outer")
+
+ val thrown = intercept[AnalysisException] {
+ testStream(joined)()
+ }
+
+ assert(thrown.getMessage.contains(
+ "Left outer join with a streaming DataFrame/Dataset on the right and
a static"))
+ }
+
+ test("right stream batch outer join") {
+ val stream = MemoryStream[Int]
+ .toDF()
+ .withColumn("timestamp", 'value.cast("timestamp"))
+ .withWatermark("timestamp", "1 second")
+ val joined =
+ stream.join(Seq(1).toDF(), Seq("value"), "right_outer")
+
+ val thrown = intercept[AnalysisException] {
+ testStream(joined)()
+ }
+
+ assert(thrown.getMessage.contains(
+ "Right outer join with a streaming DataFrame/Dataset on the left and
a static"))
+ }
+
+ test("left outer join with no watermark") {
+ val joined =
+ MemoryStream[Int].toDF().join(MemoryStream[Int].toDF(),
Seq("value"), "left_outer")
+
+ val thrown = intercept[AnalysisException] {
+ testStream(joined)()
+ }
+
+ assert(thrown.getMessage.contains(
+ "Stream-stream outer join between two streaming DataFrame/Datasets
is not supported " +
+ "without a watermark"))
+ }
+
+ test("right outer join with no watermark") {
--- End diff --
same comment as above
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]