Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20097#discussion_r159364715
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
---
@@ -367,21 +416,20 @@ class MicroBatchExecution(
// Replace sources in the logical plan with data that has arrived
since the last batch.
val withNewSources = logicalPlan transform {
case StreamingExecutionRelation(source, output) =>
- newData.get(source).map { data =>
- val newPlan = data.logicalPlan
- assert(output.size == newPlan.output.size,
+ newData.get(source).map { dataPlan =>
+ assert(output.size == dataPlan.output.size,
s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
- s"${Utils.truncatedString(newPlan.output, ",")}")
- replacements ++= output.zip(newPlan.output)
- newPlan
+ s"${Utils.truncatedString(dataPlan.output, ",")}")
+ replacements ++= output.zip(dataPlan.output)
+ dataPlan
}.getOrElse {
LocalRelation(output, isStreaming = true)
}
}
// Rewire the plan to use the new attributes that were returned by the
source.
val replacementMap = AttributeMap(replacements)
- val triggerLogicalPlan = withNewSources transformAllExpressions {
+ val withNewExprs = withNewSources transformAllExpressions {
--- End diff --
similarly change `withNewSources`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]