HeartSaVioR commented on a change in pull request #28326:
URL: https://github.com/apache/spark/pull/28326#discussion_r416137750



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
##########
@@ -593,6 +593,17 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
     }
   }
 
+  test("SPARK-27340 Alias on TimeWindow expression cause watermark metadata 
lost") {
+    val inputData = MemoryStream[Int]
+    val aliasWindow = inputData.toDF()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "10 seconds")
+      .select(window($"eventTime", "5 seconds") as 'aliasWindow)
+    // Check the eventTime metadata is kept in the top level alias.
+    assert(aliasWindow.logicalPlan.output.exists(
+      _.metadata.contains(EventTimeWatermark.delayKey)))

Review comment:
       ```
   
       val windowedAggregation = aliasWindow
         .groupBy('aliasWindow)
         .agg(count("*") as 'count)
         .select($"aliasWindow".getField("start").cast("long").as[Long], 
$"count".as[Long])
   
       testStream(windowedAggregation)(
         AddData(inputData, 10, 11, 12, 13, 14, 15),
         CheckNewAnswer(),
         AddData(inputData, 25),   // Advance watermark to 15 seconds
         CheckNewAnswer((10, 5)),
         assertNumStateRows(2),
         AddData(inputData, 10),   // Should not emit anything as data less 
than watermark
         CheckNewAnswer(),
         assertNumStateRows(2)
       )
   ```
   
   Let's append this to make the UT verifying E2E (yes this is same as other 
UTs in this suite, and the revised UT fails on master branch even without 
assertion to check metadata directly) - and then we no longer need to have 
complicated stream-stream join UT.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -991,4 +991,30 @@ class StreamingOuterJoinSuite extends StreamTest with 
StateStoreMetricsTest with
       )
     }
   }
+
+  test("SPARK-27340 Windowed left out join with Alias on TimeWindow") {

Review comment:
       I guess this is to retain the efforts of origin PR, but based on root 
cause, it should be pretty much easier to reproduce (and you actually did it in 
EventTimeWatermarkSuite).
   
   Let's remove this test in new commit (so that we can still retain the 
credit) and append more code on new UT to do E2E test. I'll comment there for 
code we need to add.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to