Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16322#discussion_r93113059
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
    @@ -206,6 +201,36 @@ class StreamExecution(
         startLatch.await()  // Wait until thread started and QueryStart event 
has been posted
       }
     
    +  private def generateLogicalPlan: LogicalPlan = {
    +    var nextSourceId = 0L
    +    val internalLogicalPlan = analyzedPlan.transform {
    +      case StreamingRelation(dataSource, _, output) =>
    +        // Materialize source to avoid creating it in every batch
    +        val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
    +        val source = dataSource.createSource(metadataPath)
    +        nextSourceId += 1
    +        // We still need to use the previous `output` instead of 
`source.schema` as attributes in
    +        // "df.logicalPlan" has already used attributes of the previous 
`output`.
    +        StreamingExecutionRelation(source, output)
    +    }
    +    sources = internalLogicalPlan.collect { case s: 
StreamingExecutionRelation => s.source }
    +    uniqueSources = sources.distinct
    +    internalLogicalPlan
    +  }
    +
    +  override def logicalPlan: LogicalPlan = {
    +    if (_logicalPlan == null) {
    +      localPlanLock.synchronized {
    +        if (_logicalPlan == null) {
    +          _logicalPlan = generateLogicalPlan
    +        }
    +      }
    +    }
    +    _logicalPlan
    +  }
    --- End diff --
    
    This is getting pretty complicated...  Do we really need to include all of 
this information in `StreamingQueryException`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to