spark git commit: [SPARK-26042][SS][TESTS] Fix a potential hang in KafkaContinuousSourceTopicDeletionSuite
Repository: spark Updated Branches: refs/heads/master 5f11e8c4c -> 4035c98a0 [SPARK-26042][SS][TESTS] Fix a potential hang in KafkaContinuousSourceTopicDeletionSuite ## What changes were proposed in this pull request? As initializing lazy vals shares the same lock, a thread is trying to initialize `executedPlan` when `isRDD` is running, this thread will hang forever. This PR just materializes `executedPlan` so that accessing it when `toRdd` is running doesn't need to wait for a lock ## How was this patch tested? Jenkins Closes #23023 from zsxwing/SPARK-26042. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4035c98a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4035c98a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4035c98a Branch: refs/heads/master Commit: 4035c98a0c03cf61d1fb9a9916df513ab1081a9b Parents: 5f11e8c Author: Shixiong Zhu Authored: Wed Nov 14 10:19:20 2018 -0800 Committer: Shixiong Zhu Committed: Wed Nov 14 10:19:20 2018 -0800 -- .../execution/streaming/continuous/ContinuousExecution.scala | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4035c98a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index f009c52..4a7df73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -262,7 +262,12 @@ class ContinuousExecution( reportTimeTaken("runContinuous") { SQLExecution.withNewExecutionId( - sparkSessionForQuery, lastExecution)(lastExecution.toRdd) + sparkSessionForQuery, lastExecution) { + // Materialize `executedPlan` so that accessing it when `toRdd` is running doesn't need to + // wait for a lock + lastExecution.executedPlan + lastExecution.toRdd +} } } catch { case t: Throwable if StreamExecution.isInterruptionException(t, sparkSession.sparkContext) && - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-26042][SS][TESTS] Fix a potential hang in KafkaContinuousSourceTopicDeletionSuite
Repository: spark Updated Branches: refs/heads/branch-2.4 e2e1f0ad8 -> ca426bfa5 [SPARK-26042][SS][TESTS] Fix a potential hang in KafkaContinuousSourceTopicDeletionSuite ## What changes were proposed in this pull request? As initializing lazy vals shares the same lock, a thread is trying to initialize `executedPlan` when `isRDD` is running, this thread will hang forever. This PR just materializes `executedPlan` so that accessing it when `toRdd` is running doesn't need to wait for a lock ## How was this patch tested? Jenkins Closes #23023 from zsxwing/SPARK-26042. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu (cherry picked from commit 4035c98a0c03cf61d1fb9a9916df513ab1081a9b) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca426bfa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca426bfa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca426bfa Branch: refs/heads/branch-2.4 Commit: ca426bfa56045f01de0ea14480a375753073e025 Parents: e2e1f0a Author: Shixiong Zhu Authored: Wed Nov 14 10:19:20 2018 -0800 Committer: Shixiong Zhu Committed: Wed Nov 14 10:19:37 2018 -0800 -- .../execution/streaming/continuous/ContinuousExecution.scala | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ca426bfa/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index f104422..2e24fa6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -259,7 +259,12 @@ class ContinuousExecution( reportTimeTaken("runContinuous") { SQLExecution.withNewExecutionId( - sparkSessionForQuery, lastExecution)(lastExecution.toRdd) + sparkSessionForQuery, lastExecution) { + // Materialize `executedPlan` so that accessing it when `toRdd` is running doesn't need to + // wait for a lock + lastExecution.executedPlan + lastExecution.toRdd +} } } catch { case t: Throwable - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org