neilramaswamy commented on code in PR #44323:
URL: https://github.com/apache/spark/pull/44323#discussion_r1513510120
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -257,6 +257,78 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
)
}
+ test("wei comment") {
+
+ }
+
+ test("stream stream inner join with one time-interval condition " +
+ "- with one watermark") {
+ val leftSource = MemoryStream[(Int, Int)]
+ val rightSource = MemoryStream[(Int, Int)]
+
+ val leftStream = leftSource
+ .toDF()
+ .withColumn("left_value", $"_1")
+ .withColumn("left_time", timestamp_seconds($"_2"))
+ .withWatermark("left_time", "15 seconds")
+
+ // Take note: no watermark is defined on the right
+ val rightStream = rightSource
+ .toDF()
+ .withColumn("right_value", $"_1")
+ .withColumn("right_time", timestamp_seconds($"_2"))
+
+ // Our condition is R > L + 10 and we have a watermark on the left, so
state
+ // should be dropped on the right
+ val joined = leftStream
+ .join(
+ rightStream,
+ expr("left_value = right_value AND right_time > left_time + interval
10 seconds"),
+ "inner"
+ )
+ .select($"left_value", $"left_time".cast("int"),
$"right_time".cast("int"))
+
+ testStream(joined)(
+ StartStream(),
+ AddData(leftSource, (4, 100)),
+ AddData(rightSource, (3, 105)),
+ CheckNewAnswer(),
Review Comment:
It asserts that the data passed in (nothing) is equal to the Spark answer:
https://github.com/apache/spark/blob/c417c757907e262f78af5a84b8358e253d0d0954/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala#L774
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]