cloud-fan commented on code in PR #56044:
URL: https://github.com/apache/spark/pull/56044#discussion_r3287267023


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala:
##########
@@ -196,18 +196,33 @@ class ChangelogResolutionSuite extends SharedSparkSession 
{
       parameters = Map("relationId" -> "`x`"))
   }
 
-  test("CHANGES clause passes changelogInfo to catalog") {
+  test("CHANGES clause passes changelogContext to catalog") {
     sql(s"SELECT * FROM $cdcCatalogName.test_table CHANGES FROM VERSION 1 TO 
VERSION 5")
     val cat = spark.sessionState.catalogManager
       .catalog(cdcCatalogName)
       .asInstanceOf[InMemoryChangelogCatalog]
-    val info = cat.lastChangelogInfo
+    val info = cat.lastChangelogContext
     assert(info.isDefined)
     val range = info.get.range().asInstanceOf[ChangelogRange.VersionRange]
     assert(range.startingVersion() == "1")
     assert(range.endingVersion().get() == "5")
   }
 
+  test("user-defined options are forwarded to loadChangelog") {
+    val cat = spark.sessionState.catalogManager
+      .catalog(cdcCatalogName)
+      .asInstanceOf[InMemoryChangelogCatalog]
+
+    spark.read
+      .option("startingVersion", "1")
+      .option("customOption", "customValue")
+      .changes(s"$cdcCatalogName.test_table")
+
+    val opts = cat.lastOptions
+    assert(opts.isDefined)
+    assert(opts.get.get("customOption") == "customValue")

Review Comment:
   The Javadoc on `loadChangelog` says options "include the CDC-recognized keys 
(range, deduplication mode, etc.) that are also parsed into `context`" — but 
this test only asserts the custom key is forwarded. Worth also asserting a 
CDC-recognized key is present so the Javadoc claim is pinned by a test:
   
   ```suggestion
       assert(opts.get.get("customOption") == "customValue")
       assert(opts.get.get("startingVersion") == "1")
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogResolutionSuite.scala:
##########
@@ -196,18 +196,33 @@ class ChangelogResolutionSuite extends SharedSparkSession 
{
       parameters = Map("relationId" -> "`x`"))
   }
 
-  test("CHANGES clause passes changelogInfo to catalog") {
+  test("CHANGES clause passes changelogContext to catalog") {
     sql(s"SELECT * FROM $cdcCatalogName.test_table CHANGES FROM VERSION 1 TO 
VERSION 5")
     val cat = spark.sessionState.catalogManager
       .catalog(cdcCatalogName)
       .asInstanceOf[InMemoryChangelogCatalog]
-    val info = cat.lastChangelogInfo
+    val info = cat.lastChangelogContext
     assert(info.isDefined)
     val range = info.get.range().asInstanceOf[ChangelogRange.VersionRange]
     assert(range.startingVersion() == "1")
     assert(range.endingVersion().get() == "5")
   }
 
+  test("user-defined options are forwarded to loadChangelog") {

Review Comment:
   This test covers the DataFrame batch path, but the SQL `WITH`-clause path 
(`AstBuilder`) and the streaming paths (`DataStreamReader`, streaming SQL) each 
independently construct the `UnresolvedRelation.options` in their own frontend 
before `RelationResolution` dispatches. Would you mind adding smoke tests for 
at least the SQL and streaming DataFrame paths so a future regression in any 
frontend is caught here? (The end-to-end suite touches `lastChangelogContext` 
for those paths but not `lastOptions`.)



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -194,22 +194,22 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
    */
   private def evaluateRequirements(
       changelog: Changelog,
-      options: ChangelogInfo): PostProcessingRequirements = {
+      options: ChangelogContext): PostProcessingRequirements = {
     val requiresCarryOverRemoval =
-      options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE &&
+      options.deduplicationMode() != ChangelogContext.DeduplicationMode.NONE &&
         changelog.containsCarryoverRows()
     val requiresUpdateDetection =
       options.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert()
     val requiresNetChanges =
-      options.deduplicationMode() == 
ChangelogInfo.DeduplicationMode.NET_CHANGES &&
+      options.deduplicationMode() == 
ChangelogContext.DeduplicationMode.NET_CHANGES &&
         changelog.containsIntermediateChanges()
 
     // If carry-overs are surfaced and update detection is enabled without 
carry-over
     // removal, carry-overs would be falsely classified as updates, leading to 
wrong
     // results. Hence we throw.
     if (requiresUpdateDetection &&
         changelog.containsCarryoverRows() &&
-        options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) {
+        options.deduplicationMode() == 
ChangelogContext.DeduplicationMode.NONE) {

Review Comment:
   Now that `options: CaseInsensitiveStringMap` is a real distinct concept in 
this PR, naming a `ChangelogContext` value `options` here (and the doc just 
above — "user-provided `ChangelogContext` options") reads awkwardly. Since the 
PR is already updating every `options.` access in this method, mind renaming to 
`context` to match the rest of the rename?
   
   ```suggestion
     private def evaluateRequirements(
         changelog: Changelog,
         context: ChangelogContext): PostProcessingRequirements = {
       val requiresCarryOverRemoval =
         context.deduplicationMode() != ChangelogContext.DeduplicationMode.NONE 
&&
           changelog.containsCarryoverRows()
       val requiresUpdateDetection =
         context.computeUpdates() && 
changelog.representsUpdateAsDeleteAndInsert()
       val requiresNetChanges =
         context.deduplicationMode() == 
ChangelogContext.DeduplicationMode.NET_CHANGES &&
           changelog.containsIntermediateChanges()
   
       // If carry-overs are surfaced and update detection is enabled without 
carry-over
       // removal, carry-overs would be falsely classified as updates, leading 
to wrong
       // results. Hence we throw.
       if (requiresUpdateDetection &&
           changelog.containsCarryoverRows() &&
           context.deduplicationMode() == 
ChangelogContext.DeduplicationMode.NONE) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to