spark git commit: [SPARK-18811] StreamSource resolution should happen in stream execution thread
Repository: spark Updated Branches: refs/heads/branch-2.1 8bf56cc46 -> b020ce408 [SPARK-18811] StreamSource resolution should happen in stream execution thread ## What changes were proposed in this pull request? When you start a stream, if we are trying to resolve the source of the stream, for example if we need to resolve partition columns, this could take a long time. This long execution time should not block the main thread where `query.start()` was called on. It should happen in the stream execution thread possibly before starting any triggers. ## How was this patch tested? Unit test added. Made sure test fails with no code changes. Author: Burak YavuzCloses #16238 from brkyvz/SPARK-18811. (cherry picked from commit 63c9159870ee274c68e24360594ca01d476b9ace) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b020ce40 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b020ce40 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b020ce40 Branch: refs/heads/branch-2.1 Commit: b020ce408507d7fd57f6d357054a2b3530a5b95e Parents: 8bf56cc Author: Burak Yavuz Authored: Fri Dec 9 22:49:51 2016 -0800 Committer: Shixiong Zhu Committed: Fri Dec 9 22:50:10 2016 -0800 -- .../execution/streaming/StreamExecution.scala | 24 ++- .../sql/streaming/StreamingQueryManager.scala | 14 + .../streaming/StreamingQueryManagerSuite.scala | 28 + .../sql/streaming/util/DefaultSource.scala | 66 4 files changed, 116 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 39be222..b52810d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -47,7 +47,7 @@ class StreamExecution( override val sparkSession: SparkSession, override val name: String, checkpointRoot: String, -val logicalPlan: LogicalPlan, +analyzedPlan: LogicalPlan, val sink: Sink, val trigger: Trigger, val triggerClock: Clock, @@ -115,12 +115,26 @@ class StreamExecution( private val prettyIdString = Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]" + override lazy val logicalPlan: LogicalPlan = { +var nextSourceId = 0L +analyzedPlan.transform { + case StreamingRelation(dataSource, _, output) => +// Materialize source to avoid creating it in every batch +val metadataPath = s"$checkpointRoot/sources/$nextSourceId" +val source = dataSource.createSource(metadataPath) +nextSourceId += 1 +// We still need to use the previous `output` instead of `source.schema` as attributes in +// "df.logicalPlan" has already used attributes of the previous `output`. +StreamingExecutionRelation(source, output) +} + } + /** All stream sources present in the query plan. */ - protected val sources = + protected lazy val sources = logicalPlan.collect { case s: StreamingExecutionRelation => s.source } /** A list of unique sources in the query plan. */ - private val uniqueSources = sources.distinct + private lazy val uniqueSources = sources.distinct private val triggerExecutor = trigger match { case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) @@ -214,6 +228,10 @@ class StreamExecution( // While active, repeatedly attempt to run batches. SparkSession.setActiveSession(sparkSession) + updateStatusMessage("Initializing sources") + // force initialization of the logical plan so that the sources can be created + logicalPlan + triggerExecutor.execute(() => { startTrigger() http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index c6ab416..52d0791 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++
spark git commit: [SPARK-18811] StreamSource resolution should happen in stream execution thread
Repository: spark Updated Branches: refs/heads/master 3e11d5bfe -> 63c915987 [SPARK-18811] StreamSource resolution should happen in stream execution thread ## What changes were proposed in this pull request? When you start a stream, if we are trying to resolve the source of the stream, for example if we need to resolve partition columns, this could take a long time. This long execution time should not block the main thread where `query.start()` was called on. It should happen in the stream execution thread possibly before starting any triggers. ## How was this patch tested? Unit test added. Made sure test fails with no code changes. Author: Burak YavuzCloses #16238 from brkyvz/SPARK-18811. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63c91598 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63c91598 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63c91598 Branch: refs/heads/master Commit: 63c9159870ee274c68e24360594ca01d476b9ace Parents: 3e11d5b Author: Burak Yavuz Authored: Fri Dec 9 22:49:51 2016 -0800 Committer: Shixiong Zhu Committed: Fri Dec 9 22:49:51 2016 -0800 -- .../execution/streaming/StreamExecution.scala | 24 ++- .../sql/streaming/StreamingQueryManager.scala | 14 + .../streaming/StreamingQueryManagerSuite.scala | 28 + .../sql/streaming/util/DefaultSource.scala | 66 4 files changed, 116 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 39be222..b52810d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -47,7 +47,7 @@ class StreamExecution( override val sparkSession: SparkSession, override val name: String, checkpointRoot: String, -val logicalPlan: LogicalPlan, +analyzedPlan: LogicalPlan, val sink: Sink, val trigger: Trigger, val triggerClock: Clock, @@ -115,12 +115,26 @@ class StreamExecution( private val prettyIdString = Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]" + override lazy val logicalPlan: LogicalPlan = { +var nextSourceId = 0L +analyzedPlan.transform { + case StreamingRelation(dataSource, _, output) => +// Materialize source to avoid creating it in every batch +val metadataPath = s"$checkpointRoot/sources/$nextSourceId" +val source = dataSource.createSource(metadataPath) +nextSourceId += 1 +// We still need to use the previous `output` instead of `source.schema` as attributes in +// "df.logicalPlan" has already used attributes of the previous `output`. +StreamingExecutionRelation(source, output) +} + } + /** All stream sources present in the query plan. */ - protected val sources = + protected lazy val sources = logicalPlan.collect { case s: StreamingExecutionRelation => s.source } /** A list of unique sources in the query plan. */ - private val uniqueSources = sources.distinct + private lazy val uniqueSources = sources.distinct private val triggerExecutor = trigger match { case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) @@ -214,6 +228,10 @@ class StreamExecution( // While active, repeatedly attempt to run batches. SparkSession.setActiveSession(sparkSession) + updateStatusMessage("Initializing sources") + // force initialization of the logical plan so that the sources can be created + logicalPlan + triggerExecutor.execute(() => { startTrigger() http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index c6ab416..52d0791 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -251,23 +251,11 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {