HeartSaVioR opened a new pull request, #56546: URL: https://github.com/apache/spark/pull/56546
### What changes were proposed in this pull request? `dropDuplicates` / `dropDuplicatesWithinWatermark` historically resolve their key columns eagerly: in the DataFrame implementation for Spark Classic (`Dataset.groupColsFromDropDuplicates`, via `colNames.toSet.toSeq`) and in the Spark Connect planner (`transformDeduplicate`). The Spark Classic path's key order therefore depends on Scala's `Set` iteration order, which is undocumented and changed between Scala 2.12 and 2.13, and which also differs from Spark Connect. Because streaming deduplication binds state-store keys by position, this non-determinism can break query restarts. This keeps binding by position, but makes the resolution deterministic and shared between engines: - Add an unresolved logical node `UnresolvedDeduplicate` and an analyzer rule `ResolveDeduplicate` that both Spark Classic (`Dataset.dropDuplicates*`) and Spark Connect (`transformDeduplicate`) defer to. The rule resolves the requested columns with an explicit, order-preserving dedup (intentionally NOT `Seq.distinct`/`Set`, whose ordering is not contractually guaranteed), so the key order is deterministic and identical across Scala versions and engines. - Add `spark.sql.dropDuplicates.deterministicKeyOrder.enabled` (default `true`), persisted via the offset log (`OffsetSeqMetadata.relevantSQLConfs`, with legacy default `false`), so the deterministic order applies only to newly started streaming queries; existing checkpoints keep their original key order. When the conf is off, the rule reproduces each engine's legacy resolution (Classic `toSet`, Connect identity), selected by the node's `legacyDedupColumnNames`. - Resolve the node under the offset-log-pinned conf during initial analysis via a bootstrap in `StreamingQueryManager.createQuery` (only when an `UnresolvedDeduplicate` is present), mirroring how other initial-analysis confs are pinned per query. ### Why are the changes needed? Streaming deduplication binds state-store keys by position. A key order that depends on Scala's `Set` iteration order is non-deterministic across Scala versions and differs between Spark Classic and Spark Connect, which can break streaming query restarts and cross-engine checkpoint compatibility. ### Does this PR introduce _any_ user-facing change? Yes. For newly started streaming `dropDuplicates`/`dropDuplicatesWithinWatermark` queries, the key columns are resolved in a deterministic, first-occurrence order instead of a Scala-`Set`-dependent order. Deduplication results (rows) are unchanged; only the internal key ordering used for the state store changes. Existing checkpointed queries are unaffected - the effective order is pinned per query via the offset log. Spark Classic and Spark Connect now resolve `dropDuplicates` keys identically, and an unresolvable `dropDuplicates` column now fails at query analysis rather than at the API call in Spark Classic. ### How was this patch tested? New unit suite `ResolveDeduplicateSuite`, a cross-engine interop suite `StreamingDeduplicationConnectInteropSuite`, and an end-to-end offset-log fallback suite `StreamingDeduplicationFallbackSuite`. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.8 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
