HeartSaVioR commented on a change in pull request #23609: [SPARK-26379][SS] Fix 
issue on adding current_timestamp/current_date to streaming query
URL: https://github.com/apache/spark/pull/23609#discussion_r249731522
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
 ##########
 @@ -1079,6 +1080,51 @@ class StreamSuite extends StreamTest {
       assert(query.exception.isEmpty)
     }
   }
+
+  test("SPARK-26379 Structured Streaming - Exception on adding 
current_timestamp / current_date" +
+    " to Dataset - use v2 sink") {
+    testCurrentTimestampOnStreamingQuery(useV2Sink = true)
+  }
+
+  test("SPARK-26379 Structured Streaming - Exception on adding 
current_timestamp / current_date" +
+    " to Dataset - use v1 sink") {
+    testCurrentTimestampOnStreamingQuery(useV2Sink = false)
+  }
+
+  private def testCurrentTimestampOnStreamingQuery(useV2Sink: Boolean): Unit = 
{
+    val input = MemoryStream[Int]
+    val df = input.toDS()
+      .withColumn("cur_timestamp", lit(current_timestamp()))
+      .withColumn("cur_date", lit(current_date()))
+
+    def assertBatchOutputAndUpdateLastTimestamp(
+        rows: Seq[Row],
+        curTimestamp: Long,
+        curDate: Int,
+        expectedValue: Int): Long = {
+      assert(rows.size === 1)
+      val row = rows.head
+      assert(row.getInt(0) === expectedValue)
+      assert(row.getTimestamp(1).getTime >= curTimestamp)
+      val days = DateTimeUtils.millisToDays(row.getDate(2).getTime)
+      assert(days == curDate || days == curDate + 1)
+      row.getTimestamp(1).getTime
+    }
+
+    var lastTimestamp = System.currentTimeMillis()
+    val currentDate = DateTimeUtils.millisToDays(lastTimestamp)
+    testStream(df, useV2Sink = useV2Sink) (
+      AddData(input, 1),
+      CheckLastBatch { rows: Seq[Row] =>
+        lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, 
lastTimestamp, currentDate, 1)
+      },
+      Execute { _ => Thread.sleep(3 * 1000) },
 
 Review comment:
   Yeah good suggestion. 1s would be enough.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to