Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/21222#discussion_r205918179
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -513,6 +514,131 @@ class StreamSuite extends StreamTest {
}
}
+ test("explain-continuous") {
+ val inputData = ContinuousMemoryStream[Int]
+ val df = inputData.toDS().map(_ * 2).filter(_ > 5)
+
+ // Test `df.explain`
+ val explain = ExplainCommand(df.queryExecution.logical, extended =
false)
+ val explainString =
+ spark.sessionState
+ .executePlan(explain)
+ .executedPlan
+ .executeCollect()
+ .map(_.getString(0))
+ .mkString("\n")
+ assert(explainString.contains("Filter"))
+ assert(explainString.contains("MapElements"))
+ assert(!explainString.contains("LocalTableScan"))
+
+ // Test StreamingQuery.display
+ val q = df.writeStream.queryName("memory_continuous_explain")
+ .outputMode(OutputMode.Update()).format("memory")
+ .trigger(Trigger.Continuous("1 seconds"))
+ .start()
+ .asInstanceOf[StreamingQueryWrapper]
+ .streamingQuery
+ try {
+ // in continuous mode, the query will be run even there's no data
+ // sleep a bit to ensure initialization
+ waitForLastExecution(q)
+
+ val explainWithoutExtended = q.explainInternal(false)
+
+ print(explainWithoutExtended)
--- End diff --
nit: remove this line
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]