mukulmurthy closed pull request #23513: [SPARK-26586][SS] Fix race condition
that causes streams to run with unexpected confs
URL: https://github.com/apache/spark/pull/23513
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 83824f40ab90b..90f7b477103ae 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -181,6 +181,9 @@ abstract class StreamExecution(
lazy val streamMetrics = new MetricsReporter(
this, s"spark.streaming.${Option(name).getOrElse(id)}")
+ /** Isolated spark session to run the batches with. */
+ private val sparkSessionForStream = sparkSession.cloneSession()
+
/**
* The thread that runs the micro-batches of this stream. Note that this
thread must be
* [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894:
interrupting a
@@ -270,8 +273,6 @@ abstract class StreamExecution(
// force initialization of the logical plan so that the sources can be
created
logicalPlan
- // Isolated spark session to run the batches with.
- val sparkSessionForStream = sparkSession.cloneSession()
// Adaptive execution can change num shuffle partitions, disallow
sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key,
"false")
// Disable cost-based join optimization as we do not want stateful
operations to be rearranged
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 4d3a54a048e8e..74ea0bfacba54 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming.test
import java.io.File
+import java.util.ConcurrentModificationException
import java.util.Locale
import java.util.concurrent.TimeUnit
@@ -651,4 +652,27 @@ class DataStreamReaderWriterSuite extends StreamTest with
BeforeAndAfter {
LastOptions.clear()
}
+
+ test("SPARK-26586: Streams should have isolated confs") {
+ import testImplicits._
+ val input = MemoryStream[Int]
+ input.addData(1 to 10)
+ spark.conf.set("testKey1", 0)
+ val queries = (1 to 10).map { i =>
+ spark.conf.set("testKey1", i)
+ input.toDF().writeStream
+ .foreachBatch { (df: Dataset[Row], id: Long) =>
+ val v = df.sparkSession.conf.get("testKey1").toInt
+ if (i != v) {
+ throw new ConcurrentModificationException(s"Stream $i has the
wrong conf value $v")
+ }
+ }
+ .start()
+ }
+ try {
+ queries.foreach(_.processAllAvailable())
+ } finally {
+ queries.foreach(_.stop())
+ }
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]