spark git commit: [SPARK-18811] StreamSource resolution should happen in stream execution thread

2016-12-09 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 8bf56cc46 -> b020ce408


[SPARK-18811] StreamSource resolution should happen in stream execution thread

## What changes were proposed in this pull request?

When you start a stream, if we are trying to resolve the source of the stream, 
for example if we need to resolve partition columns, this could take a long 
time. This long execution time should not block the main thread where 
`query.start()` was called on. It should happen in the stream execution thread 
possibly before starting any triggers.

## How was this patch tested?

Unit test added. Made sure test fails with no code changes.

Author: Burak Yavuz 

Closes #16238 from brkyvz/SPARK-18811.

(cherry picked from commit 63c9159870ee274c68e24360594ca01d476b9ace)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b020ce40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b020ce40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b020ce40

Branch: refs/heads/branch-2.1
Commit: b020ce408507d7fd57f6d357054a2b3530a5b95e
Parents: 8bf56cc
Author: Burak Yavuz 
Authored: Fri Dec 9 22:49:51 2016 -0800
Committer: Shixiong Zhu 
Committed: Fri Dec 9 22:50:10 2016 -0800

--
 .../execution/streaming/StreamExecution.scala   | 24 ++-
 .../sql/streaming/StreamingQueryManager.scala   | 14 +
 .../streaming/StreamingQueryManagerSuite.scala  | 28 +
 .../sql/streaming/util/DefaultSource.scala  | 66 
 4 files changed, 116 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 39be222..b52810d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -47,7 +47,7 @@ class StreamExecution(
 override val sparkSession: SparkSession,
 override val name: String,
 checkpointRoot: String,
-val logicalPlan: LogicalPlan,
+analyzedPlan: LogicalPlan,
 val sink: Sink,
 val trigger: Trigger,
 val triggerClock: Clock,
@@ -115,12 +115,26 @@ class StreamExecution(
   private val prettyIdString =
 Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"
 
+  override lazy val logicalPlan: LogicalPlan = {
+var nextSourceId = 0L
+analyzedPlan.transform {
+  case StreamingRelation(dataSource, _, output) =>
+// Materialize source to avoid creating it in every batch
+val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
+val source = dataSource.createSource(metadataPath)
+nextSourceId += 1
+// We still need to use the previous `output` instead of 
`source.schema` as attributes in
+// "df.logicalPlan" has already used attributes of the previous 
`output`.
+StreamingExecutionRelation(source, output)
+}
+  }
+
   /** All stream sources present in the query plan. */
-  protected val sources =
+  protected lazy val sources =
 logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
 
   /** A list of unique sources in the query plan. */
-  private val uniqueSources = sources.distinct
+  private lazy val uniqueSources = sources.distinct
 
   private val triggerExecutor = trigger match {
 case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
@@ -214,6 +228,10 @@ class StreamExecution(
   // While active, repeatedly attempt to run batches.
   SparkSession.setActiveSession(sparkSession)
 
+  updateStatusMessage("Initializing sources")
+  // force initialization of the logical plan so that the sources can be 
created
+  logicalPlan
+
   triggerExecutor.execute(() => {
 startTrigger()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b020ce40/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index c6ab416..52d0791 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 

spark git commit: [SPARK-18811] StreamSource resolution should happen in stream execution thread

2016-12-09 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 3e11d5bfe -> 63c915987


[SPARK-18811] StreamSource resolution should happen in stream execution thread

## What changes were proposed in this pull request?

When you start a stream, if we are trying to resolve the source of the stream, 
for example if we need to resolve partition columns, this could take a long 
time. This long execution time should not block the main thread where 
`query.start()` was called on. It should happen in the stream execution thread 
possibly before starting any triggers.

## How was this patch tested?

Unit test added. Made sure test fails with no code changes.

Author: Burak Yavuz 

Closes #16238 from brkyvz/SPARK-18811.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63c91598
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63c91598
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63c91598

Branch: refs/heads/master
Commit: 63c9159870ee274c68e24360594ca01d476b9ace
Parents: 3e11d5b
Author: Burak Yavuz 
Authored: Fri Dec 9 22:49:51 2016 -0800
Committer: Shixiong Zhu 
Committed: Fri Dec 9 22:49:51 2016 -0800

--
 .../execution/streaming/StreamExecution.scala   | 24 ++-
 .../sql/streaming/StreamingQueryManager.scala   | 14 +
 .../streaming/StreamingQueryManagerSuite.scala  | 28 +
 .../sql/streaming/util/DefaultSource.scala  | 66 
 4 files changed, 116 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 39be222..b52810d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -47,7 +47,7 @@ class StreamExecution(
 override val sparkSession: SparkSession,
 override val name: String,
 checkpointRoot: String,
-val logicalPlan: LogicalPlan,
+analyzedPlan: LogicalPlan,
 val sink: Sink,
 val trigger: Trigger,
 val triggerClock: Clock,
@@ -115,12 +115,26 @@ class StreamExecution(
   private val prettyIdString =
 Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"
 
+  override lazy val logicalPlan: LogicalPlan = {
+var nextSourceId = 0L
+analyzedPlan.transform {
+  case StreamingRelation(dataSource, _, output) =>
+// Materialize source to avoid creating it in every batch
+val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
+val source = dataSource.createSource(metadataPath)
+nextSourceId += 1
+// We still need to use the previous `output` instead of 
`source.schema` as attributes in
+// "df.logicalPlan" has already used attributes of the previous 
`output`.
+StreamingExecutionRelation(source, output)
+}
+  }
+
   /** All stream sources present in the query plan. */
-  protected val sources =
+  protected lazy val sources =
 logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
 
   /** A list of unique sources in the query plan. */
-  private val uniqueSources = sources.distinct
+  private lazy val uniqueSources = sources.distinct
 
   private val triggerExecutor = trigger match {
 case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
@@ -214,6 +228,10 @@ class StreamExecution(
   // While active, repeatedly attempt to run batches.
   SparkSession.setActiveSession(sparkSession)
 
+  updateStatusMessage("Initializing sources")
+  // force initialization of the logical plan so that the sources can be 
created
+  logicalPlan
+
   triggerExecutor.execute(() => {
 startTrigger()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index c6ab416..52d0791 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -251,23 +251,11 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) {