Repository: spark Updated Branches: refs/heads/master dbf2a7cfa -> 37fcda3e6
[SPARK-13747][SQL] Fix concurrent query with fork-join pool ## What changes were proposed in this pull request? Fix this use case, which was already fixed in SPARK-10548 in 1.6 but was broken in master due to #9264: ``` (1 to 100).par.foreach { _ => sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() } ``` This threw `IllegalArgumentException` consistently before this patch. For more detail, see the JIRA. ## How was this patch tested? New test in `SQLExecutionSuite`. Author: Andrew Or <and...@databricks.com> Closes #11586 from andrewor14/fix-concurrent-sql. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37fcda3e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37fcda3e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37fcda3e Branch: refs/heads/master Commit: 37fcda3e6cf1707fb7a348a4d47231849ef8abf6 Parents: dbf2a7c Author: Andrew Or <and...@databricks.com> Authored: Wed Mar 9 17:34:28 2016 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Wed Mar 9 17:34:28 2016 -0800 ---------------------------------------------------------------------- .../org/apache/spark/scheduler/DAGScheduler.scala | 7 ++++++- .../spark/sql/execution/SQLExecutionSuite.scala | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/37fcda3e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e2eaef5..b576d4c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -613,7 +613,12 @@ class DAGScheduler( properties: Properties): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) - Await.ready(waiter.completionFuture, atMost = Duration.Inf) + // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`, + // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that + // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's + // safe to pass in null here. For more detail, see SPARK-13747. + val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait] + waiter.completionFuture.ready(Duration.Inf)(awaitPermission) waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format http://git-wip-us.apache.org/repos/asf/spark/blob/37fcda3e/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala index 824d89e..c9f517c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala @@ -49,6 +49,20 @@ class SQLExecutionSuite extends SparkFunSuite { } } + test("concurrent query execution with fork-join pool (SPARK-13747)") { + val sc = new SparkContext("local[*]", "test") + val sqlContext = new SQLContext(sc) + import sqlContext.implicits._ + try { + // Should not throw IllegalArgumentException + (1 to 100).par.foreach { _ => + sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() + } + } finally { + sc.stop() + } + } + /** * Trigger SPARK-10548 by mocking a parent and its child thread executing queries concurrently. */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org