cloud-fan commented on code in PR #36963:
URL: https://github.com/apache/spark/pull/36963#discussion_r904804693
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -576,14 +589,22 @@ class MicroBatchExecution(
// Replace sources in the logical plan with data that has arrived since
the last batch.
val newBatchesPlan = logicalPlan transform {
// For v1 sources.
- case StreamingExecutionRelation(source, output) =>
+ case StreamingExecutionRelation(source, output, catalogTable) =>
newData.get(source).map { dataPlan =>
val hasFileMetadata = output.exists {
case FileSourceMetadataAttribute(_) => true
case _ => false
}
val finalDataPlan = dataPlan transformUp {
- case l: LogicalRelation if hasFileMetadata =>
l.withMetadataColumns()
+ case l: LogicalRelation =>
+ var newRelation = l
+ if (hasFileMetadata) {
+ newRelation = newRelation.withMetadataColumns()
+ }
+ catalogTable.foreach { table =>
+ newRelation = newRelation.copy(catalogTable = Some(table))
Review Comment:
yea we should have an assertion. Blindly overwriting the catalogTable in
`LogicalRelation` looks a bit risky.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]