Github user attilapiros commented on a diff in the pull request:
https://github.com/apache/spark/pull/21222#discussion_r207857933
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -513,6 +515,125 @@ class StreamSuite extends StreamTest {
}
}
+ test("explain-continuous") {
+ val inputData = ContinuousMemoryStream[Int]
+ val df = inputData.toDS().map(_ * 2).filter(_ > 5)
+
+ // Test `df.explain`
+ val explain = ExplainCommand(df.queryExecution.logical, extended =
false)
+ val explainString =
+ spark.sessionState
+ .executePlan(explain)
+ .executedPlan
+ .executeCollect()
+ .map(_.getString(0))
+ .mkString("\n")
+ assert(explainString.contains("Filter"))
+ assert(explainString.contains("MapElements"))
+ assert(!explainString.contains("LocalTableScan"))
+
+ // Test StreamingQuery.display
+ val q = df.writeStream.queryName("memory_continuous_explain")
+ .outputMode(OutputMode.Update()).format("memory")
+ .trigger(Trigger.Continuous("1 seconds"))
+ .start()
+ .asInstanceOf[StreamingQueryWrapper]
+ .streamingQuery
+ try {
+ // in continuous mode, the query will be run even there's no data
+ // sleep a bit to ensure initialization
+ eventually(timeout(2.seconds), interval(100.milliseconds)) {
+ assert(q.lastExecution != null)
+ }
+
+ val explainWithoutExtended = q.explainInternal(false)
+
+ // `extended = false` only displays the physical plan.
+ assert("Streaming RelationV2 ContinuousMemoryStream".r
+ .findAllMatchIn(explainWithoutExtended).size === 0)
+ assert("ScanV2 ContinuousMemoryStream".r
+ .findAllMatchIn(explainWithoutExtended).size === 1)
+
+ val explainWithExtended = q.explainInternal(true)
+ // `extended = true` displays 3 logical plans
(Parsed/Optimized/Optimized) and 1 physical
+ // plan.
+ assert("Streaming RelationV2 ContinuousMemoryStream".r
+ .findAllMatchIn(explainWithExtended).size === 3)
+ assert("ScanV2 ContinuousMemoryStream".r
+ .findAllMatchIn(explainWithExtended).size === 1)
+ } finally {
+ q.stop()
+ }
+ }
+
+ test("codegen-microbatch") {
+ import org.apache.spark.sql.execution.debug._
+
+ val inputData = MemoryStream[Int]
+ val df = inputData.toDS().map(_ * 2).filter(_ > 5)
+
+ // Test StreamingQuery.codegen
+ val q = df.writeStream.queryName("memory_microbatch_codegen")
+ .outputMode(OutputMode.Update)
+ .format("memory")
+ .trigger(Trigger.ProcessingTime("1 seconds"))
+ .start()
+
+ try {
+ assert("No physical plan. Waiting for data." === codegenString(q))
+ assert(codegenStringSeq(q).isEmpty)
+
+ inputData.addData(1, 2, 3, 4, 5)
+ q.processAllAvailable()
+
+ val codegenStr = codegenString(q)
--- End diff --
What about to extract the following 8 lines into a private method as the
`codegenString` and `codegenStringSeq` result checking are the same for
microbatch and continues executions?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]