cloud-fan commented on code in PR #56044:
URL: https://github.com/apache/spark/pull/56044#discussion_r3287267023
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala:
##########
@@ -196,18 +196,33 @@ class ChangelogResolutionSuite extends SharedSparkSession
{
parameters = Map("relationId" -> "`x`"))
}
- test("CHANGES clause passes changelogInfo to catalog") {
+ test("CHANGES clause passes changelogContext to catalog") {
sql(s"SELECT * FROM $cdcCatalogName.test_table CHANGES FROM VERSION 1 TO
VERSION 5")
val cat = spark.sessionState.catalogManager
.catalog(cdcCatalogName)
.asInstanceOf[InMemoryChangelogCatalog]
- val info = cat.lastChangelogInfo
+ val info = cat.lastChangelogContext
assert(info.isDefined)
val range = info.get.range().asInstanceOf[ChangelogRange.VersionRange]
assert(range.startingVersion() == "1")
assert(range.endingVersion().get() == "5")
}
+ test("user-defined options are forwarded to loadChangelog") {
+ val cat = spark.sessionState.catalogManager
+ .catalog(cdcCatalogName)
+ .asInstanceOf[InMemoryChangelogCatalog]
+
+ spark.read
+ .option("startingVersion", "1")
+ .option("customOption", "customValue")
+ .changes(s"$cdcCatalogName.test_table")
+
+ val opts = cat.lastOptions
+ assert(opts.isDefined)
+ assert(opts.get.get("customOption") == "customValue")
Review Comment:
The Javadoc on `loadChangelog` says options "include the CDC-recognized keys
(range, deduplication mode, etc.) that are also parsed into `context`" — but
this test only asserts the custom key is forwarded. Worth also asserting a
CDC-recognized key is present so the Javadoc claim is pinned by a test:
```suggestion
assert(opts.get.get("customOption") == "customValue")
assert(opts.get.get("startingVersion") == "1")
```
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala:
##########
@@ -196,18 +196,33 @@ class ChangelogResolutionSuite extends SharedSparkSession
{
parameters = Map("relationId" -> "`x`"))
}
- test("CHANGES clause passes changelogInfo to catalog") {
+ test("CHANGES clause passes changelogContext to catalog") {
sql(s"SELECT * FROM $cdcCatalogName.test_table CHANGES FROM VERSION 1 TO
VERSION 5")
val cat = spark.sessionState.catalogManager
.catalog(cdcCatalogName)
.asInstanceOf[InMemoryChangelogCatalog]
- val info = cat.lastChangelogInfo
+ val info = cat.lastChangelogContext
assert(info.isDefined)
val range = info.get.range().asInstanceOf[ChangelogRange.VersionRange]
assert(range.startingVersion() == "1")
assert(range.endingVersion().get() == "5")
}
+ test("user-defined options are forwarded to loadChangelog") {
Review Comment:
This test covers the DataFrame batch path, but the SQL `WITH`-clause path
(`AstBuilder`) and the streaming paths (`DataStreamReader`, streaming SQL) each
independently construct the `UnresolvedRelation.options` in their own frontend
before `RelationResolution` dispatches. Would you mind adding smoke tests for
at least the SQL and streaming DataFrame paths so a future regression in any
frontend is caught here? (The end-to-end suite touches `lastChangelogContext`
for those paths but not `lastOptions`.)
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -194,22 +194,22 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
*/
private def evaluateRequirements(
changelog: Changelog,
- options: ChangelogInfo): PostProcessingRequirements = {
+ options: ChangelogContext): PostProcessingRequirements = {
val requiresCarryOverRemoval =
- options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE &&
+ options.deduplicationMode() != ChangelogContext.DeduplicationMode.NONE &&
changelog.containsCarryoverRows()
val requiresUpdateDetection =
options.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert()
val requiresNetChanges =
- options.deduplicationMode() ==
ChangelogInfo.DeduplicationMode.NET_CHANGES &&
+ options.deduplicationMode() ==
ChangelogContext.DeduplicationMode.NET_CHANGES &&
changelog.containsIntermediateChanges()
// If carry-overs are surfaced and update detection is enabled without
carry-over
// removal, carry-overs would be falsely classified as updates, leading to
wrong
// results. Hence we throw.
if (requiresUpdateDetection &&
changelog.containsCarryoverRows() &&
- options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) {
+ options.deduplicationMode() ==
ChangelogContext.DeduplicationMode.NONE) {
Review Comment:
Now that `options: CaseInsensitiveStringMap` is a real distinct concept in
this PR, naming a `ChangelogContext` value `options` here (and the doc just
above — "user-provided `ChangelogContext` options") reads awkwardly. Since the
PR is already updating every `options.` access in this method, mind renaming to
`context` to match the rest of the rename?
```suggestion
private def evaluateRequirements(
changelog: Changelog,
context: ChangelogContext): PostProcessingRequirements = {
val requiresCarryOverRemoval =
context.deduplicationMode() != ChangelogContext.DeduplicationMode.NONE
&&
changelog.containsCarryoverRows()
val requiresUpdateDetection =
context.computeUpdates() &&
changelog.representsUpdateAsDeleteAndInsert()
val requiresNetChanges =
context.deduplicationMode() ==
ChangelogContext.DeduplicationMode.NET_CHANGES &&
changelog.containsIntermediateChanges()
// If carry-overs are surfaced and update detection is enabled without
carry-over
// removal, carry-overs would be falsely classified as updates, leading
to wrong
// results. Hence we throw.
if (requiresUpdateDetection &&
changelog.containsCarryoverRows() &&
context.deduplicationMode() ==
ChangelogContext.DeduplicationMode.NONE) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]