HeartSaVioR commented on code in PR #37368:
URL: https://github.com/apache/spark/pull/37368#discussion_r935061578
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -680,7 +680,14 @@ class MicroBatchExecution(
val batchSinkProgress: Option[StreamWriterCommitProgress] =
reportTimeTaken("addBatch") {
SQLExecution.withNewExecutionId(lastExecution) {
sink match {
- case s: Sink => s.addBatch(currentBatchId, nextBatch)
+ case s: Sink =>
+ s.addBatch(currentBatchId, nextBatch)
+ // DSv2 write node has a mechanism to invalidate DSv2 relation,
but there is no
+ // corresponding one for DSv1. Given we have an information of
catalog table for sink,
+ // we can refresh the catalog table once the write has succeeded.
+ plan.catalogTable.foreach { tbl =>
Review Comment:
Here I only deal with DSv1 path since DSv2 streaming writer node
(`WriteToDataSourceV2Exec`) contains the mechanism of refreshing cache. I think
this would cover the case of DSv2 table and I can't think of coordination
between DSv2 sink and catalog table, but please correct me if I'm missing
something.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]