Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21126#discussion_r183541978
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
    @@ -492,6 +492,77 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
         assert(progress.sources(0).numInputRows === 10)
       }
     
    +
    +  test("input row calculation with trigger having data for one of two V2 
sources") {
    +    val streamInput1 = MemoryStream[Int]
    +    val streamInput2 = MemoryStream[Int]
    +
    +    testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = 
true)(
    +      AddData(streamInput1, 1, 2, 3),
    +      CheckAnswer(1, 2, 3),
    +      AssertOnQuery { q =>
    +        val lastProgress = getLastProgressWithData(q)
    +        assert(lastProgress.nonEmpty)
    +        assert(lastProgress.get.numInputRows == 3)
    +        assert(lastProgress.get.sources.length == 2)
    +        assert(lastProgress.get.sources(0).numInputRows == 3)
    +        assert(lastProgress.get.sources(1).numInputRows == 0)
    +        true
    +      }
    +    )
    +  }
    +
    +  test("input row calculation with mixed batch and streaming V2 sources") {
    +
    +    val streamInput = MemoryStream[Int]
    +    val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> 
"2")).toDF("value", "anotherValue")
    +
    +    testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink 
= true)(
    +      AddData(streamInput, 1, 2, 3),
    +      AssertOnQuery { q =>
    +        q.processAllAvailable()
    +
    +        // The number of leaves in the trigger's logical plan should be 
same as the executed plan.
    +        require(
    +          q.lastExecution.logical.collectLeaves().length ==
    +            q.lastExecution.executedPlan.collectLeaves().length)
    +
    +        val lastProgress = getLastProgressWithData(q)
    +        assert(lastProgress.nonEmpty)
    +        assert(lastProgress.get.numInputRows == 3)
    +        assert(lastProgress.get.sources.length == 1)
    +        assert(lastProgress.get.sources(0).numInputRows == 3)
    +        true
    +      }
    +    )
    +
    +    val streamInput2 = MemoryStream[Int]
    +    val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
    --- End diff --
    
    nit: unpersist later?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to