HeartSaVioR commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1116216190
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##########
@@ -463,6 +437,442 @@ class MultiStatefulOperatorsSuite
)
}
+ test("stream-stream time interval left outer join -> aggregation, append
mode") {
+ val input1 = MemoryStream[(String, Timestamp)]
+ val input2 = MemoryStream[(String, Timestamp)]
+
+ val s1 = input1.toDF()
+ .selectExpr("_1 AS id1", "_2 AS timestamp1")
+ .withWatermark("timestamp1", "0 seconds")
+ .as("s1")
+
+ val s2 = input2.toDF()
+ .selectExpr("_1 AS id2", "_2 AS timestamp2")
+ .withWatermark("timestamp2", "0 seconds")
+ .as("s2")
+
+ val s3 = s1.join(s2, expr("s1.id1 = s2.id2 AND (s1.timestamp1 BETWEEN " +
+ "s2.timestamp2 - INTERVAL 1 hour AND s2.timestamp2 + INTERVAL 1 hour)"),
"leftOuter")
+
+ val agg = s3.groupBy(window($"timestamp1", "10 minutes"))
Review Comment:
dedup() with dropping one of event time column from the list of columns
should also work. That's not about which operator it is. It's just that a
stateful operator can receive two event time columns, and current logic does
not consider the case (or we did it but I don't get it).
Let me think about this once more. We probably wouldn't like to disallow the
functionality first and allow it later, hence it sounds to me we need to give a
try to avoid disallowing the functionality.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]