rangadi commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1115965485


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##########
@@ -463,6 +437,442 @@ class MultiStatefulOperatorsSuite
     )
   }
 
+  test("stream-stream time interval left outer join -> aggregation, append 
mode") {
+    val input1 = MemoryStream[(String, Timestamp)]
+    val input2 = MemoryStream[(String, Timestamp)]
+
+    val s1 = input1.toDF()
+      .selectExpr("_1 AS id1", "_2 AS timestamp1")
+      .withWatermark("timestamp1", "0 seconds")
+      .as("s1")
+
+    val s2 = input2.toDF()
+      .selectExpr("_1 AS id2", "_2 AS timestamp2")
+      .withWatermark("timestamp2", "0 seconds")
+      .as("s2")
+
+    val s3 = s1.join(s2, expr("s1.id1 = s2.id2 AND (s1.timestamp1 BETWEEN " +
+      "s2.timestamp2 - INTERVAL 1 hour AND s2.timestamp2 + INTERVAL 1 hour)"), 
"leftOuter")
+
+    val agg = s3.groupBy(window($"timestamp1", "10 minutes"))

Review Comment:
   May be we can leave a comment and open to ticket to address it as a follow 
up. But I think it is an important issue as it relates to core of the feature. 
I will think about it deeper to. My question at high level is "what is the 
difference that makes this ok for windowed count, but not ok for deDup()"
   
   Could you point to the code where this difference comes from? I will look 
deeper as well.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to