gengliangwang commented on code in PR #54394:
URL: https://github.com/apache/spark/pull/54394#discussion_r3395006865


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##########
@@ -333,6 +335,20 @@ class V2ExpressionBuilder(e: Expression, isPredicate: 
Boolean = false) extends L
         case _ =>
           None
       }
+    case v: VariantGet
+        if v.path.foldable
+        && v.child.isInstanceOf[Attribute] =>
+      val colName = v.child.asInstanceOf[Attribute].name
+      val path = v.path.eval().toString
+      val typeName = v.dataType.catalogString
+      val colRef = FieldReference.column(colName)
+      val pathLit = LiteralValue(UTF8String.fromString(path), StringType)
+      val typeLit = LiteralValue(UTF8String.fromString(typeName), StringType)
+      val canonName = v.prettyName
+      Some(new UserDefinedScalarFunc(

Review Comment:
   This fabricates a `UserDefinedScalarFunc` for a built-in expression, which 
diverges from both established patterns in this file: built-ins with 
non-expression payload get a dedicated connector expression class 
(`GetArrayItem` in the case right above this one — SPARK-54240 — carries 
`failOnError` as a typed field; `V2Cast` carries a `DataType`), while the three 
existing `UserDefinedScalarFunc` producers all pass a catalog 
`ScalarFunction`'s `name()`/`canonicalName()`. `BoundFunction.canonicalName`'s 
javadoc requires collision-resistant qualified names (e.g. 
`com.mycompany.bucket(string)`) — a bare `variant_get` is indistinguishable 
from a genuine catalog UDF of that name, and the 3-arg/`catalogString` encoding 
is a wire contract that exists only implicitly between this code and 
apache/iceberg#15385. I'd recommend a dedicated `VariantGet` connector 
expression carrying `targetType: DataType` and `failOnError: Boolean` — 
self-documenting and collision-free; the cost is a (smal
 l, `@Evolving`) connector-API addition, which `GetArrayItem` already paid in 
4.1.0. If you keep the UDSF route, could you say why in the PR description, 
document the encoding for connector implementers, and qualify the canonical 
name?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##########
@@ -333,6 +335,20 @@ class V2ExpressionBuilder(e: Expression, isPredicate: 
Boolean = false) extends L
         case _ =>
           None
       }
+    case v: VariantGet
+        if v.path.foldable
+        && v.child.isInstanceOf[Attribute] =>
+      val colName = v.child.asInstanceOf[Attribute].name
+      val path = v.path.eval().toString
+      val typeName = v.dataType.catalogString

Review Comment:
   `VariantGet` is a `TimeZoneAwareExpression` — a timestamp-target cast uses 
the session zone (`VariantCastArgs.zoneId`) — but the translation drops 
`timeZoneId` entirely. A connector that fully consumes the pushed predicate and 
evaluates the cast in a different zone would produce wrong skipping/results. 
The type literal does let a connector decline timestamp targets, but nothing 
tells it that it must. Should translation be gated when `targetType` 
(recursively) contains a timestamp type, or is the intent to document this 
caveat as part of the contract?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##########
@@ -333,6 +335,20 @@ class V2ExpressionBuilder(e: Expression, isPredicate: 
Boolean = false) extends L
         case _ =>
           None
       }
+    case v: VariantGet
+        if v.path.foldable
+        && v.child.isInstanceOf[Attribute] =>
+      val colName = v.child.asInstanceOf[Attribute].name
+      val path = v.path.eval().toString

Review Comment:
   `v.path.eval()` returns null for a foldable null path — `variant_get(v, 
null)` passes analysis (`StringTypeWithCollation` admits a null string literal) 
— so `.toString` throws a bare NPE. The main filter pipeline is protected 
because NullPropagation folds the null-intolerant `VariantGet` away before 
pushdown, but this builder also runs on analyzed-but-un-optimized trees 
(CHECK-constraint translation, `constraints.scala:132`), where the NPE is 
reachable from the planner. Checking the eval result for null and declining 
translation keeps the failure mode graceful.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StrategySuite.scala:
##########
@@ -818,6 +819,57 @@ class DataSourceV2StrategySuite extends SharedSparkSession 
{
         FieldReference("cdouble"))))
   }
 
+  test("VariantGet serializes to UserDefinedScalarFunc") {

Review Comment:
   These tests assert only `name()` and `children().length`, but the children 
encoding is exactly the wire contract the Iceberg consumer matches on — a 
regression that swapped the path/type argument order, or changed 
`catalogString` to `sql`, would pass all three tests. Worth asserting the 
children themselves (`FieldReference("v")`, `LiteralValue("$.city", 
StringType)`, `LiteralValue("string", StringType)`) and `canonicalName()`. Also 
missing: negative tests pinning the guards (non-foldable path and non-Attribute 
child should yield `None`), and a boolean `targetType` case — which would have 
surfaced the predicate-wrapping issue flagged above.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##########
@@ -333,6 +335,20 @@ class V2ExpressionBuilder(e: Expression, isPredicate: 
Boolean = false) extends L
         case _ =>
           None
       }
+    case v: VariantGet

Review Comment:
   This case ignores `isPredicate`, unlike its peers 
(`generateExpressionWithNameByChildren` wraps boolean results in a 
`V2Predicate`; boolean columns wrap as `= TRUE`). For a boolean `targetType` — 
`variant_get(v, '$.flag', 'boolean')` is the most natural variant filter — the 
case returns a non-`Predicate`, and when that happens under `And`/`Or`/`Not` 
*inside* the builder it trips the live `assert(... isInstanceOf[V2Predicate])` 
calls and crashes planning with `AssertionError` instead of declining 
translation (the pre-PR behavior). This is concretely reachable via 
CHECK-constraint translation (`constraints.scala:132`), which calls 
`buildPredicate` on whole un-optimized conditions, e.g. `CHECK (variant_get(v, 
'$.ok', 'boolean') OR x > 0)`. Mirroring `buildPredicate`'s own escape hatch 
fixes it:
   
   ```scala
   val udf = new UserDefinedScalarFunc(canonName, canonName, 
Array[V2Expression](colRef, pathLit, typeLit))
   if (isPredicate && v.dataType.isInstanceOf[BooleanType]) {
     Some(new V2Predicate("BOOLEAN_EXPRESSION", Array[V2Expression](udf)))
   } else {
     Some(udf)
   }
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##########
@@ -333,6 +335,20 @@ class V2ExpressionBuilder(e: Expression, isPredicate: 
Boolean = false) extends L
         case _ =>
           None
       }
+    case v: VariantGet
+        if v.path.foldable
+        && v.child.isInstanceOf[Attribute] =>
+      val colName = v.child.asInstanceOf[Attribute].name

Review Comment:
   Consider translating the child through the existing path instead of the 
`isInstanceOf` guard + `asInstanceOf` + manual `FieldReference.column`: 
`generateExpression(v.child)` already produces the `FieldReference` for an 
`Attribute` (via `ColumnOrField`), so matching on a `Some(ref: FieldReference)` 
result reuses the shared infrastructure, drops both casts, and leaves the door 
open for struct-nested variant columns later without changing the encoding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to