Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19239#discussion_r139077656
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
    @@ -300,6 +300,85 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
         )
       }
     
    +  test("watermark with 2 streams") {
    +    val first = MemoryStream[Int]
    +
    +    val firstDf = first.toDF()
    +      .withColumn("eventTime", $"value".cast("timestamp"))
    +      .withWatermark("eventTime", "10 seconds")
    +      .select('value)
    +
    +    val second = MemoryStream[Int]
    +
    +    val secondDf = second.toDF()
    +      .withColumn("eventTime", $"value".cast("timestamp"))
    +      .withWatermark("eventTime", "5 seconds")
    +      .select('value)
    +
    +    val union = firstDf.union(secondDf)
    +      .writeStream
    +      .format("memory")
    +      .queryName("test")
    +      .start()
    +
    +    def generateAndAssertNewWatermark(
    +        stream: MemoryStream[Int],
    +        data: Seq[Int],
    +        watermark: Int): Unit = {
    +      stream.addData(data)
    +      assertWatermark(watermark)
    +    }
    +
    +    def assertWatermark(watermark: Int) {
    +      union.processAllAvailable()
    +      // add a dummy batch so lastExecution has the new watermark
    +      first.addData(0)
    +      union.processAllAvailable()
    +
    +      val lastExecution = 
union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
    +      assert(lastExecution.offsetSeqMetadata.batchWatermarkMs == watermark)
    +    }
    +
    +    generateAndAssertNewWatermark(first, Seq(11), 1000)
    --- End diff --
    
    instead of having two variations for handing single input and double 
inputs, you can do something like this. 
    ```
    def getWatermarkAfterData(firstData: Seq[Int] = Seq.empty, secondData: 
Seq[Int] = Seq.empty): Long = {
       if (firstData.nonEmpty) first.add(firstData)
       if (secondData.nonEmpty) second.add(secondData)
       union.processAllAvailable()
       // add a dummy batch so lastExecution has the new watermark
        first.addData(0)
        union.processAllAvailable()
        // get updated watermark
        val lastExecution = 
union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
        lastExecution.offsetSeqMetadata.batchWatermarkMs
    }
    
    assert(getWatermarkAfterData(firstData = Seq(...) === 10000)
    assert(getWatermarkAfterData(secondData = Seq(...) === 10000)
    assert(getWatermarkAfterData(firstData = Seq(...), secondData = Seq(...) 
=== 10000)
    
    ```



---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to