spark git commit: [SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL (branch 2.0)
Repository: spark Updated Branches: refs/heads/branch-2.0 b482b3d58 -> 76b71eef4 [SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL (branch 2.0) ## What changes were proposed in this pull request? Backport #15520 to 2.0. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #15646 from zsxwing/SPARK-13747-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76b71eef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76b71eef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76b71eef Branch: refs/heads/branch-2.0 Commit: 76b71eef46a3b932d6de7f831f0245ea27e3dfe7 Parents: b482b3d Author: Shixiong Zhu Authored: Wed Oct 26 13:21:46 2016 -0700 Committer: Shixiong Zhu Committed: Wed Oct 26 13:21:46 2016 -0700 -- .../org/apache/spark/util/ThreadUtils.scala | 21 scalastyle-config.xml | 1 + .../apache/spark/sql/execution/SparkPlan.scala | 2 +- .../exchange/BroadcastExchangeExec.scala| 3 ++- 4 files changed, 25 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/76b71eef/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 5a6dbc8..d093e7b 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -194,4 +194,25 @@ private[spark] object ThreadUtils { throw new SparkException("Exception thrown in awaitResult: ", t) } } + + /** + * Calls [[Awaitable.result]] directly to avoid using `ForkJoinPool`'s `BlockingContext`, wraps + * and re-throws any exceptions with nice stack track. + * + * Codes running in the user's thread may be in a thread of Scala ForkJoinPool. As concurrent + * executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this method + * basically prevents ForkJoinPool from running other tasks in the current waiting thread. + */ + @throws(classOf[SparkException]) + def awaitResultInForkJoinSafely[T](awaitable: Awaitable[T], atMost: Duration): T = { +try { + // `awaitPermission` is not actually used anywhere so it's safe to pass in null here. + // See SPARK-13747. + val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait] + awaitable.result(Duration.Inf)(awaitPermission) +} catch { + case NonFatal(t) => +throw new SparkException("Exception thrown in awaitResult: ", t) +} + } } http://git-wip-us.apache.org/repos/asf/spark/blob/76b71eef/scalastyle-config.xml -- diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 7fe0697..81d57d7 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -200,6 +200,7 @@ This file is divided into 3 sections: // scalastyle:off awaitresult Await.result(...) // scalastyle:on awaitresult + If your codes use ThreadLocal and may run in threads created by the user, use ThreadUtils.awaitResultInForkJoinSafely instead. ]]> http://git-wip-us.apache.org/repos/asf/spark/blob/76b71eef/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 79cb409..fa40414 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -166,7 +166,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def waitForSubqueries(): Unit = synchronized { // fill in the result of subqueries subqueryResults.foreach { case (e, futureResult) => - val rows = ThreadUtils.awaitResult(futureResult, Duration.Inf) + val rows = ThreadUtils.awaitResultInForkJoinSafely(futureResult, Duration.Inf) if (rows.length > 1) { sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}") } http://git-wip-us.apache.org/repos/asf/spark/blob/76b71eef/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange
spark git commit: [SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL
Repository: spark Updated Branches: refs/heads/master 312ea3f7f -> 7ac70e7ba [SPARK-13747][SQL] Fix concurrent executions in ForkJoinPool for SQL ## What changes were proposed in this pull request? Calling `Await.result` will allow other tasks to be run on the same thread when using ForkJoinPool. However, SQL uses a `ThreadLocal` execution id to trace Spark jobs launched by a query, which doesn't work perfectly in ForkJoinPool. This PR just uses `Awaitable.result` instead to prevent ForkJoinPool from running other tasks in the current waiting thread. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #15520 from zsxwing/SPARK-13747. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ac70e7b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ac70e7b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ac70e7b Branch: refs/heads/master Commit: 7ac70e7ba8d610a45c21a70dc28e4c989c19451b Parents: 312ea3f Author: Shixiong Zhu Authored: Wed Oct 26 10:36:36 2016 -0700 Committer: Shixiong Zhu Committed: Wed Oct 26 10:36:36 2016 -0700 -- .../org/apache/spark/util/ThreadUtils.scala | 21 scalastyle-config.xml | 1 + .../sql/execution/basicPhysicalOperators.scala | 2 +- .../exchange/BroadcastExchangeExec.scala| 3 ++- 4 files changed, 25 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 5a6dbc8..d093e7b 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -194,4 +194,25 @@ private[spark] object ThreadUtils { throw new SparkException("Exception thrown in awaitResult: ", t) } } + + /** + * Calls [[Awaitable.result]] directly to avoid using `ForkJoinPool`'s `BlockingContext`, wraps + * and re-throws any exceptions with nice stack track. + * + * Codes running in the user's thread may be in a thread of Scala ForkJoinPool. As concurrent + * executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this method + * basically prevents ForkJoinPool from running other tasks in the current waiting thread. + */ + @throws(classOf[SparkException]) + def awaitResultInForkJoinSafely[T](awaitable: Awaitable[T], atMost: Duration): T = { +try { + // `awaitPermission` is not actually used anywhere so it's safe to pass in null here. + // See SPARK-13747. + val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait] + awaitable.result(Duration.Inf)(awaitPermission) +} catch { + case NonFatal(t) => +throw new SparkException("Exception thrown in awaitResult: ", t) +} + } } http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/scalastyle-config.xml -- diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 7fe0697..81d57d7 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -200,6 +200,7 @@ This file is divided into 3 sections: // scalastyle:off awaitresult Await.result(...) // scalastyle:on awaitresult + If your codes use ThreadLocal and may run in threads created by the user, use ThreadUtils.awaitResultInForkJoinSafely instead. ]]> http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 37d750e..a5291e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -570,7 +570,7 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { } override def executeCollect(): Array[InternalRow] = { -ThreadUtils.awaitResult(relationFuture, Duration.Inf) +ThreadUtils.awaitResultInForkJoinSafely(relationFuture, Duration.Inf) } } http://git-wip-us.apache.org/repos/asf/spark/blob/7ac70e7b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala -- diff --git a/sql/core/src/main/scala