ericm-db opened a new pull request, #55672: URL: https://github.com/apache/spark/pull/55672
### What changes were proposed in this pull request? This PR adds the ability to name streaming sinks via the `name()` method on `DataStreamWriter`, laying the groundwork for sink evolution capability. This is analogous to the existing source evolution support (`DataStreamReader.name()`). **Changes:** - Add `name(sinkName)` method to `DataStreamWriter` (API abstract method, classic implementation, Connect stub) - Add `sinkName: Option[String]` field to `WriteToStream` and `userSpecifiedSinkName: Option[String]` to `WriteToStreamStatement` plan nodes - Add `spark.sql.streaming.queryEvolution.enableSinkEvolution` internal config to `SQLConf` - Add sink name validation — names must be alphanumeric + underscore only - Add enforcement in `MicroBatchExecution` — when sink evolution is enabled, sinks must be explicitly named - Add `MicroBatchExecution.DEFAULT_SINK_NAME` (`"sink-0"`) for backward compatibility - Thread `sinkName` through `StreamingQueryManager` and `ResolveWriteToStream` - Add error conditions: `INVALID_SINK_NAME`, `UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT` - Add `QueryCompilationErrors.invalidStreamingSinkNameError` - Add `StreamingSinkEvolutionSuite` with tests for validation and enforcement All new APIs are `private[sql]` or `internal()` — the `name()` method is not yet publicly callable. It will be opened up once commit log support for persisting sink metadata is added in a follow-up PR. ### Why are the changes needed? Currently, streaming queries have no mechanism for sink evolution. If a user wants to change the sink of a streaming query while preserving the checkpoint, there is no way to track which sink was used historically. This PR introduces the naming API as the first step toward full sink evolution support, where sinks can be added, removed, or replaced while maintaining checkpoint integrity. This mirrors the existing source evolution support added via `DataStreamReader.name()` and `spark.sql.streaming.queryEvolution.enableSourceEvolution`. ### Does this PR introduce _any_ user-facing change? No. All new APIs are `private[sql]` and the config is `internal()`. No user-facing changes until the feature is fully implemented with commit log support in a follow-up PR. ### How was this patch tested? Added `StreamingSinkEvolutionSuite` with 7 test cases covering: - Invalid sink name validation (hyphen, space, special characters) - Valid sink name patterns (alphanumeric, underscore, digits) - Enforcement: unnamed sink with evolution enabled throws `UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT` - Enforcement: unnamed sink without evolution enabled succeeds (backward compatibility) - Named sink with evolution enabled succeeds - Continuing with the same sink name across restarts works ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-6) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
