This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new bbf61eb  [SPARK-26586][SS] Fix race condition that causes streams to 
run with unexpected confs
bbf61eb is described below

commit bbf61eb4222d7b46e71dc91eeedf82d27226fc2c
Author: Mukul Murthy <mukul.mur...@gmail.com>
AuthorDate: Fri Jan 11 11:46:14 2019 -0800

    [SPARK-26586][SS] Fix race condition that causes streams to run with 
unexpected confs
    
    ## What changes were proposed in this pull request?
    
    Fix race condition where streams can have unexpected conf values.
    
    New streaming queries should run with isolated SparkSessions so that they 
aren't affected by conf updates after they are started. In StreamExecution, the 
parent SparkSession is cloned and used to run each batch, but this cloning 
happens in a separate thread and may happen after DataStreamWriter.start() 
returns. If a stream is started and a conf key is set immediately after, the 
stream is likely to have the new value.
    
    ## How was this patch tested?
    
    New unit test that fails prior to the production change and passes with it.
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.
    
    Closes #23513 from mukulmurthy/26586.
    
    Authored-by: Mukul Murthy <mukul.mur...@gmail.com>
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
    (cherry picked from commit ae382c94dd10ff494dde4de44e66182bf6dbe8f8)
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
---
 .../sql/execution/streaming/StreamExecution.scala  |  5 +++--
 .../test/DataStreamReaderWriterSuite.scala         | 24 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index de33844..c1aa98a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -176,6 +176,9 @@ abstract class StreamExecution(
   lazy val streamMetrics = new MetricsReporter(
     this, s"spark.streaming.${Option(name).getOrElse(id)}")
 
+  /** Isolated spark session to run the batches with. */
+  private val sparkSessionForStream = sparkSession.cloneSession()
+
   /**
    * The thread that runs the micro-batches of this stream. Note that this 
thread must be
    * [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: 
interrupting a
@@ -265,8 +268,6 @@ abstract class StreamExecution(
       // force initialization of the logical plan so that the sources can be 
created
       logicalPlan
 
-      // Isolated spark session to run the batches with.
-      val sparkSessionForStream = sparkSession.cloneSession()
       // Adaptive execution can change num shuffle partitions, disallow
       sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, 
"false")
       // Disable cost-based join optimization as we do not want stateful 
operations to be rearranged
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 8212fb9..569114a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.streaming.test
 
 import java.io.File
+import java.util.ConcurrentModificationException
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
@@ -651,4 +652,27 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
 
     LastOptions.clear()
   }
+
+  test("SPARK-26586: Streams should have isolated confs") {
+    import testImplicits._
+    val input = MemoryStream[Int]
+    input.addData(1 to 10)
+    spark.conf.set("testKey1", 0)
+    val queries = (1 to 10).map { i =>
+      spark.conf.set("testKey1", i)
+      input.toDF().writeStream
+        .foreachBatch { (df: Dataset[Row], id: Long) =>
+          val v = df.sparkSession.conf.get("testKey1").toInt
+          if (i != v) {
+            throw new ConcurrentModificationException(s"Stream $i has the 
wrong conf value $v")
+          }
+        }
+        .start()
+    }
+    try {
+      queries.foreach(_.processAllAvailable())
+    } finally {
+      queries.foreach(_.stop())
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to