gaborgsomogyi commented on a change in pull request #23609: [SPARK-26379][SS]
Fix issue on adding current_timestamp/current_date to streaming query
URL: https://github.com/apache/spark/pull/23609#discussion_r249699571
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
##########
@@ -1079,6 +1079,42 @@ class StreamSuite extends StreamTest {
assert(query.exception.isEmpty)
}
}
+
+ Seq(true, false).foreach { useV2Sink =>
+ import org.apache.spark.sql.functions._
+
+ val newTestName = "SPARK-26379 Structured Streaming - Exception on adding
column to Dataset" +
+ s" - use v2 sink - $useV2Sink"
+
+ test(newTestName) {
+ val input = MemoryStream[Int]
+ val df = input.toDS().withColumn("cur_timestamp",
lit(current_timestamp()))
+
+ def assertBatchOutputAndUpdateLastTimestamp(
+ rows: Seq[Row],
+ curTimestamp: Long,
+ expectedValue: Int): Long = {
+ assert(rows.size === 1)
+ val row = rows.head
+ assert(row.getInt(0) === expectedValue)
+ assert(row.getTimestamp(1).getTime > curTimestamp)
+ row.getTimestamp(1).getTime
+ }
+
+ var lastTimestamp = -1L
Review comment:
Not much functional difference but maybe this could be now.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]