This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new b1b31ce [SPARK-34482][SS] Correct the active SparkSession for StreamExecution.logicalPlan b1b31ce is described below commit b1b31ce4e27db55aaf74920c6ebb406aa5ee327e Author: yi.wu <yi...@databricks.com> AuthorDate: Thu Mar 4 22:41:11 2021 +0800 [SPARK-34482][SS] Correct the active SparkSession for StreamExecution.logicalPlan ### What changes were proposed in this pull request? Set the active SparkSession to `sparkSessionForStream` and diable AQE & CBO before initializing the `StreamExecution.logicalPlan`. ### Why are the changes needed? The active session should be `sparkSessionForStream`. Otherwise, settings like https://github.com/apache/spark/blob/6b34745cb9b294c91cd126c2ea44c039ee83cb84/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L332-L335 wouldn't take effect if callers access them from the active SQLConf, e.g., the rule of `InsertAdaptiveSparkPlan`. Besides, unlike `InsertAdaptiveSparkPlan` (which skips streaming plan), `CostBasedJoinReorder` seems to have the chance to take effect theoretically. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tested manually. Before the fix, `InsertAdaptiveSparkPlan` would try to apply AQE on the plan(wouldn't take effect though). After this fix, the rule returns directly. Closes #31600 from Ngone51/active-session-for-stream. Authored-by: yi.wu <yi...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit e7e016192f882cfb430d706c2099e58e1bcc014c) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/execution/streaming/StreamExecution.scala | 42 +++++++++++----------- .../apache/spark/sql/streaming/StreamSuite.scala | 33 ++++++++++++++++- 2 files changed, 54 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6b0d33b..1b145f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -314,26 +314,28 @@ abstract class StreamExecution( startLatch.countDown() // While active, repeatedly attempt to run batches. - SparkSession.setActiveSession(sparkSession) - - updateStatusMessage("Initializing sources") - // force initialization of the logical plan so that the sources can be created - logicalPlan - - // Adaptive execution can change num shuffle partitions, disallow - sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") - // Disable cost-based join optimization as we do not want stateful operations to be rearranged - sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false") - offsetSeqMetadata = OffsetSeqMetadata( - batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf) - - if (state.compareAndSet(INITIALIZING, ACTIVE)) { - // Unblock `awaitInitialization` - initializationLatch.countDown() - runActivatedStream(sparkSessionForStream) - updateStatusMessage("Stopped") - } else { - // `stop()` is already called. Let `finally` finish the cleanup. + sparkSessionForStream.withActive { + // Adaptive execution can change num shuffle partitions, disallow + sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") + // Disable cost-based join optimization as we do not want stateful operations + // to be rearranged + sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false") + + updateStatusMessage("Initializing sources") + // force initialization of the logical plan so that the sources can be created + logicalPlan + + offsetSeqMetadata = OffsetSeqMetadata( + batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf) + + if (state.compareAndSet(INITIALIZING, ACTIVE)) { + // Unblock `awaitInitialization` + initializationLatch.countDown() + runActivatedStream(sparkSessionForStream) + updateStatusMessage("Stopped") + } else { + // `stop()` is already called. Let `finally` finish the cleanup. + } } } catch { case e if isInterruptedByStop(e, sparkSession.sparkContext) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index ed284df..0d2d00f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -34,7 +34,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkContext, TaskContext, TestUtils} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.plans.logical.Range +import org.apache.spark.sql.catalyst.plans.logical.{Range, RepartitionByExpression} import org.apache.spark.sql.catalyst.streaming.{InternalOutputModes, StreamingRelationV2} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.{LocalLimitExec, SimpleMode, SparkPlan} @@ -1264,6 +1264,37 @@ class StreamSuite extends StreamTest { } } } + + test("SPARK-34482: correct active SparkSession for logicalPlan") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") { + val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load() + var query: StreamExecution = null + try { + query = + df.repartition($"a") + .writeStream + .format("memory") + .queryName("memory") + .start() + .asInstanceOf[StreamingQueryWrapper] + .streamingQuery + query.awaitInitialization(streamingTimeout.toMillis) + val plan = query.logicalPlan + val numPartition = plan + .find { _.isInstanceOf[RepartitionByExpression] } + .map(_.asInstanceOf[RepartitionByExpression].numPartitions) + // Before the fix of SPARK-34482, the numPartition is the value of + // `COALESCE_PARTITIONS_INITIAL_PARTITION_NUM`. + assert(numPartition.get === spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS)) + } finally { + if (query != null) { + query.stop() + } + } + } + } } abstract class FakeSource extends StreamSourceProvider { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org