sahnib commented on code in PR #43517:
URL: https://github.com/apache/spark/pull/43517#discussion_r1371967970
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala:
##########
@@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends
StreamTest with Eventually {
)
}
+ test("SPARK-45655: test current batch timestamp in streaming query metrics")
{
+ import testImplicits._
+
+ val inputData = MemoryStream[Timestamp]
+
+ val query = inputData.toDF()
+ .filter("value < current_date()")
Review Comment:
Done.
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala:
##########
@@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends
StreamTest with Eventually {
)
}
+ test("SPARK-45655: test current batch timestamp in streaming query metrics")
{
Review Comment:
Done
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala:
##########
@@ -354,6 +357,47 @@ class StreamingQueryStatusAndProgressSuite extends
StreamTest with Eventually {
)
}
+ test("SPARK-45655: test current batch timestamp in streaming query metrics")
{
+ import testImplicits._
+
+ val inputData = MemoryStream[Timestamp]
+
+ val query = inputData.toDF()
+ .filter("value < current_date()")
+ .observe("metrics", count(expr("value >=
current_date()")).alias("dropped"))
+ .writeStream
+ .queryName("ts_metrics_test")
+ .format("memory")
+ .outputMode("append")
+ .start()
+
+ val timeNow = Instant.now()
+
+ // this value would be accepted by the filter and would not count towards
+ // dropped metrics.
+ val validValue = Timestamp.from(timeNow.minus(2, ChronoUnit.DAYS))
+ inputData.addData(validValue)
+
+ // would be dropped by the filter and count towards dropped metrics
+ inputData.addData(Timestamp.from(timeNow.plus(2, ChronoUnit.DAYS)))
+
+ query.processAllAvailable()
+
+ val dropped = query.recentProgress.map(p => {
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]