uros-b commented on code in PR #56692:
URL: https://github.com/apache/spark/pull/56692#discussion_r3461442462


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -252,6 +253,15 @@ class AsyncProgressTrackingMicroBatchExecution(
   override protected def getTrigger(): TriggerExecutor = 
validateAndGetTrigger()
 
   private def validateAndGetTrigger(): TriggerExecutor = {
+    // Sink evolution persists per-sink metadata via the V3 commit log written 
in the base
+    // MicroBatchExecution.markMicroBatchEnd, which this class overrides with 
an async write that
+    // only emits V1 commit metadata. The sink metadata would therefore never 
be persisted, so
+    // reject the combination explicitly instead of silently dropping it.
+    if (sparkSession.sessionState.conf.enableStreamingSinkEvolution) {

Review Comment:
   Possible exception-type divergence: the guard lives in validateAndGetTrigger 
(reached via the logicalPlan lazy val), but if .name() is omitted the base 
sinkName val throws UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT first. So the same 
invalid combo surfaces a different exception type/message depending on whether 
.name() is set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to