HeartSaVioR commented on code in PR #36963:
URL: https://github.com/apache/spark/pull/36963#discussion_r904747378
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -576,14 +589,22 @@ class MicroBatchExecution(
// Replace sources in the logical plan with data that has arrived since
the last batch.
val newBatchesPlan = logicalPlan transform {
// For v1 sources.
- case StreamingExecutionRelation(source, output) =>
+ case StreamingExecutionRelation(source, output, catalogTable) =>
newData.get(source).map { dataPlan =>
val hasFileMetadata = output.exists {
case FileSourceMetadataAttribute(_) => true
case _ => false
}
val finalDataPlan = dataPlan transformUp {
- case l: LogicalRelation if hasFileMetadata =>
l.withMetadataColumns()
+ case l: LogicalRelation =>
+ var newRelation = l
+ if (hasFileMetadata) {
+ newRelation = newRelation.withMetadataColumns()
+ }
+ catalogTable.foreach { table =>
+ newRelation = newRelation.copy(catalogTable = Some(table))
Review Comment:
Just to clarify, did you mean having an assertion, or don't overwrite
catalogTable if data source produces one in the relation? I can't imagine the
codepath latter becomes true, but just wanted to clarify so that I can reflect
the review properly.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -576,14 +589,22 @@ class MicroBatchExecution(
// Replace sources in the logical plan with data that has arrived since
the last batch.
val newBatchesPlan = logicalPlan transform {
// For v1 sources.
- case StreamingExecutionRelation(source, output) =>
+ case StreamingExecutionRelation(source, output, catalogTable) =>
newData.get(source).map { dataPlan =>
val hasFileMetadata = output.exists {
case FileSourceMetadataAttribute(_) => true
case _ => false
}
val finalDataPlan = dataPlan transformUp {
- case l: LogicalRelation if hasFileMetadata =>
l.withMetadataColumns()
+ case l: LogicalRelation =>
+ var newRelation = l
+ if (hasFileMetadata) {
+ newRelation = newRelation.withMetadataColumns()
+ }
+ catalogTable.foreach { table =>
+ newRelation = newRelation.copy(catalogTable = Some(table))
Review Comment:
Just to clarify, did you mean having an assertion, or don't overwrite
catalogTable if data source produces one in the relation? I can't imagine the
codepath latter can be true, but just wanted to clarify so that I can reflect
the review properly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]