HeartSaVioR commented on code in PR #50015:
URL: https://github.com/apache/spark/pull/50015#discussion_r1963056691
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala:
##########
@@ -422,6 +422,24 @@ class QueryExecutionSuite extends SharedSparkSession {
mockCallback.assertAnalyzed()
}
+ test("SPARK-51265 Running eagerlyExecuteCommand with streaming source should
give an user " +
+ "facing error") {
+ withTempView("s") {
+ val streamDf = spark.readStream.format("rate").load()
+ streamDf.createOrReplaceTempView("s")
+ withTable("output") {
+ val ex = intercept[AnalysisException] {
+ // Creates a table from streaming source with batch query. This
should fail.
+ spark.sql("CREATE TABLE output AS SELECT * FROM s")
Review Comment:
So when this query is executed, following is happening:
CreateDataSourceTableAsSelectCommand is executed. This reaches
assertSupported, but it's a leaf node and it hides the query, hence the
assertion is no-op.
It triggers InsertIntoHadoopFsRelationCommand. This exposes the query as
child so we expect assertSupported is triggered, but the problem happens on
creating "explainString" (planDesc).
When the query is determined as streaming (any leaf node is string), Spark
creates IncrementalExecution (since there are streaming specific rules being
defined there), which "disables" assertSupported(). This is not a bug, because
we shouldn't check the query with batch query's criteria and it should have
been checked with streaming query's criteria before.
I'd say it is just conflicted - QueryExecution only works properly with
batch query, and IncrementalExecution only works properly with streaming query.
We have a case where QueryExecution somehow receives the execution of
"streaming query" (at least from isStreaming flag perspective).
So what happens? withCachedData is called infinitely (haven't followed about
why it made a loop) and ended up with StackOverflowError.
This is only a case of CTAS, and there are lots of commands which we can't
check everything, so I'd like to simply block the case where QueryExecution has
to handle "streaming query" (which seems to be happening only with commands,
but I could be wrong).
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala:
##########
@@ -422,6 +422,24 @@ class QueryExecutionSuite extends SharedSparkSession {
mockCallback.assertAnalyzed()
}
+ test("SPARK-51265 Running eagerlyExecuteCommand with streaming source should
give an user " +
+ "facing error") {
+ withTempView("s") {
+ val streamDf = spark.readStream.format("rate").load()
+ streamDf.createOrReplaceTempView("s")
+ withTable("output") {
+ val ex = intercept[AnalysisException] {
+ // Creates a table from streaming source with batch query. This
should fail.
+ spark.sql("CREATE TABLE output AS SELECT * FROM s")
Review Comment:
@cloud-fan FYI
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]