HeartSaVioR commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1115405490


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##########
@@ -463,6 +437,442 @@ class MultiStatefulOperatorsSuite
     )
   }
 
+  test("stream-stream time interval left outer join -> aggregation, append 
mode") {
+    val input1 = MemoryStream[(String, Timestamp)]
+    val input2 = MemoryStream[(String, Timestamp)]
+
+    val s1 = input1.toDF()
+      .selectExpr("_1 AS id1", "_2 AS timestamp1")
+      .withWatermark("timestamp1", "0 seconds")
+      .as("s1")
+
+    val s2 = input2.toDF()
+      .selectExpr("_1 AS id2", "_2 AS timestamp2")
+      .withWatermark("timestamp2", "0 seconds")
+      .as("s2")
+
+    val s3 = s1.join(s2, expr("s1.id1 = s2.id2 AND (s1.timestamp1 BETWEEN " +
+      "s2.timestamp2 - INTERVAL 1 hour AND s2.timestamp2 + INTERVAL 1 hour)"), 
"leftOuter")
+
+    val agg = s3.groupBy(window($"timestamp1", "10 minutes"))

Review Comment:
   I've been thinking through this again... I might be probably overlooked.
   
   My thought was that if there are two event time columns, the state row 
should wait for the watermark to pass by two values (W > et1 && W > et2) so 
that further input won't match with the row. But it seems like the condition is 
OR, not AND. If either W > et1 or W > et2 happens, further input won't match 
with the row, as W is the lower bound of both columns.
   
   I probably need to revisit this tomorrow again (ideal), or postpone to the 
next JIRA ticket. If you can help me finding some test/use case which shows 
expected behavior (and probably an edge-case), that should be awesome.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to