This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new b1b31ce  [SPARK-34482][SS] Correct the active SparkSession for 
StreamExecution.logicalPlan
b1b31ce is described below

commit b1b31ce4e27db55aaf74920c6ebb406aa5ee327e
Author: yi.wu <yi...@databricks.com>
AuthorDate: Thu Mar 4 22:41:11 2021 +0800

    [SPARK-34482][SS] Correct the active SparkSession for 
StreamExecution.logicalPlan
    
    ### What changes were proposed in this pull request?
    
    Set the active SparkSession to `sparkSessionForStream` and diable AQE & CBO 
before initializing the `StreamExecution.logicalPlan`.
    
    ### Why are the changes needed?
    
    The active session should be `sparkSessionForStream`. Otherwise, settings 
like
    
    
https://github.com/apache/spark/blob/6b34745cb9b294c91cd126c2ea44c039ee83cb84/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L332-L335
    
    wouldn't take effect if callers access them from the active SQLConf, e.g., 
the rule of `InsertAdaptiveSparkPlan`. Besides, unlike 
`InsertAdaptiveSparkPlan` (which skips streaming plan), `CostBasedJoinReorder` 
seems to have the chance to take effect theoretically.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Tested manually. Before the fix, `InsertAdaptiveSparkPlan` would try to 
apply AQE on the plan(wouldn't take effect though). After this fix, the rule 
returns directly.
    
    Closes #31600 from Ngone51/active-session-for-stream.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit e7e016192f882cfb430d706c2099e58e1bcc014c)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/execution/streaming/StreamExecution.scala  | 42 +++++++++++-----------
 .../apache/spark/sql/streaming/StreamSuite.scala   | 33 ++++++++++++++++-
 2 files changed, 54 insertions(+), 21 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6b0d33b..1b145f2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -314,26 +314,28 @@ abstract class StreamExecution(
       startLatch.countDown()
 
       // While active, repeatedly attempt to run batches.
-      SparkSession.setActiveSession(sparkSession)
-
-      updateStatusMessage("Initializing sources")
-      // force initialization of the logical plan so that the sources can be 
created
-      logicalPlan
-
-      // Adaptive execution can change num shuffle partitions, disallow
-      sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, 
"false")
-      // Disable cost-based join optimization as we do not want stateful 
operations to be rearranged
-      sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
-      offsetSeqMetadata = OffsetSeqMetadata(
-        batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf)
-
-      if (state.compareAndSet(INITIALIZING, ACTIVE)) {
-        // Unblock `awaitInitialization`
-        initializationLatch.countDown()
-        runActivatedStream(sparkSessionForStream)
-        updateStatusMessage("Stopped")
-      } else {
-        // `stop()` is already called. Let `finally` finish the cleanup.
+      sparkSessionForStream.withActive {
+        // Adaptive execution can change num shuffle partitions, disallow
+        sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, 
"false")
+        // Disable cost-based join optimization as we do not want stateful 
operations
+        // to be rearranged
+        sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
+
+        updateStatusMessage("Initializing sources")
+        // force initialization of the logical plan so that the sources can be 
created
+        logicalPlan
+
+        offsetSeqMetadata = OffsetSeqMetadata(
+          batchWatermarkMs = 0, batchTimestampMs = 0, 
sparkSessionForStream.conf)
+
+        if (state.compareAndSet(INITIALIZING, ACTIVE)) {
+          // Unblock `awaitInitialization`
+          initializationLatch.countDown()
+          runActivatedStream(sparkSessionForStream)
+          updateStatusMessage("Stopped")
+        } else {
+          // `stop()` is already called. Let `finally` finish the cleanup.
+        }
       }
     } catch {
       case e if isInterruptedByStop(e, sparkSession.sparkContext) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index ed284df..0d2d00f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -34,7 +34,7 @@ import org.scalatest.time.SpanSugar._
 import org.apache.spark.{SparkConf, SparkContext, TaskContext, TestUtils}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.plans.logical.Range
+import org.apache.spark.sql.catalyst.plans.logical.{Range, 
RepartitionByExpression}
 import org.apache.spark.sql.catalyst.streaming.{InternalOutputModes, 
StreamingRelationV2}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.{LocalLimitExec, SimpleMode, SparkPlan}
@@ -1264,6 +1264,37 @@ class StreamSuite extends StreamTest {
       }
     }
   }
+
+  test("SPARK-34482: correct active SparkSession for logicalPlan") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {
+      val df = 
spark.readStream.format(classOf[FakeDefaultSource].getName).load()
+      var query: StreamExecution = null
+      try {
+        query =
+          df.repartition($"a")
+            .writeStream
+            .format("memory")
+            .queryName("memory")
+            .start()
+            .asInstanceOf[StreamingQueryWrapper]
+            .streamingQuery
+        query.awaitInitialization(streamingTimeout.toMillis)
+        val plan = query.logicalPlan
+        val numPartition = plan
+          .find { _.isInstanceOf[RepartitionByExpression] }
+          .map(_.asInstanceOf[RepartitionByExpression].numPartitions)
+        // Before the fix of SPARK-34482, the numPartition is the value of
+        // `COALESCE_PARTITIONS_INITIAL_PARTITION_NUM`.
+        assert(numPartition.get === 
spark.sqlContext.conf.getConf(SQLConf.SHUFFLE_PARTITIONS))
+      } finally {
+        if (query != null) {
+          query.stop()
+        }
+      }
+    }
+  }
 }
 
 abstract class FakeSource extends StreamSourceProvider {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to