gengliangwang commented on code in PR #54394:
URL: https://github.com/apache/spark/pull/54394#discussion_r3395006865
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##########
@@ -333,6 +335,20 @@ class V2ExpressionBuilder(e: Expression, isPredicate:
Boolean = false) extends L
case _ =>
None
}
+ case v: VariantGet
+ if v.path.foldable
+ && v.child.isInstanceOf[Attribute] =>
+ val colName = v.child.asInstanceOf[Attribute].name
+ val path = v.path.eval().toString
+ val typeName = v.dataType.catalogString
+ val colRef = FieldReference.column(colName)
+ val pathLit = LiteralValue(UTF8String.fromString(path), StringType)
+ val typeLit = LiteralValue(UTF8String.fromString(typeName), StringType)
+ val canonName = v.prettyName
+ Some(new UserDefinedScalarFunc(
Review Comment:
This fabricates a `UserDefinedScalarFunc` for a built-in expression, which
diverges from both established patterns in this file: built-ins with
non-expression payload get a dedicated connector expression class
(`GetArrayItem` in the case right above this one — SPARK-54240 — carries
`failOnError` as a typed field; `V2Cast` carries a `DataType`), while the three
existing `UserDefinedScalarFunc` producers all pass a catalog
`ScalarFunction`'s `name()`/`canonicalName()`. `BoundFunction.canonicalName`'s
javadoc requires collision-resistant qualified names (e.g.
`com.mycompany.bucket(string)`) — a bare `variant_get` is indistinguishable
from a genuine catalog UDF of that name, and the 3-arg/`catalogString` encoding
is a wire contract that exists only implicitly between this code and
apache/iceberg#15385. I'd recommend a dedicated `VariantGet` connector
expression carrying `targetType: DataType` and `failOnError: Boolean` —
self-documenting and collision-free; the cost is a (smal
l, `@Evolving`) connector-API addition, which `GetArrayItem` already paid in
4.1.0. If you keep the UDSF route, could you say why in the PR description,
document the encoding for connector implementers, and qualify the canonical
name?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##########
@@ -333,6 +335,20 @@ class V2ExpressionBuilder(e: Expression, isPredicate:
Boolean = false) extends L
case _ =>
None
}
+ case v: VariantGet
+ if v.path.foldable
+ && v.child.isInstanceOf[Attribute] =>
+ val colName = v.child.asInstanceOf[Attribute].name
+ val path = v.path.eval().toString
+ val typeName = v.dataType.catalogString
Review Comment:
`VariantGet` is a `TimeZoneAwareExpression` — a timestamp-target cast uses
the session zone (`VariantCastArgs.zoneId`) — but the translation drops
`timeZoneId` entirely. A connector that fully consumes the pushed predicate and
evaluates the cast in a different zone would produce wrong skipping/results.
The type literal does let a connector decline timestamp targets, but nothing
tells it that it must. Should translation be gated when `targetType`
(recursively) contains a timestamp type, or is the intent to document this
caveat as part of the contract?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##########
@@ -333,6 +335,20 @@ class V2ExpressionBuilder(e: Expression, isPredicate:
Boolean = false) extends L
case _ =>
None
}
+ case v: VariantGet
+ if v.path.foldable
+ && v.child.isInstanceOf[Attribute] =>
+ val colName = v.child.asInstanceOf[Attribute].name
+ val path = v.path.eval().toString
Review Comment:
`v.path.eval()` returns null for a foldable null path — `variant_get(v,
null)` passes analysis (`StringTypeWithCollation` admits a null string literal)
— so `.toString` throws a bare NPE. The main filter pipeline is protected
because NullPropagation folds the null-intolerant `VariantGet` away before
pushdown, but this builder also runs on analyzed-but-un-optimized trees
(CHECK-constraint translation, `constraints.scala:132`), where the NPE is
reachable from the planner. Checking the eval result for null and declining
translation keeps the failure mode graceful.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StrategySuite.scala:
##########
@@ -818,6 +819,57 @@ class DataSourceV2StrategySuite extends SharedSparkSession
{
FieldReference("cdouble"))))
}
+ test("VariantGet serializes to UserDefinedScalarFunc") {
Review Comment:
These tests assert only `name()` and `children().length`, but the children
encoding is exactly the wire contract the Iceberg consumer matches on — a
regression that swapped the path/type argument order, or changed
`catalogString` to `sql`, would pass all three tests. Worth asserting the
children themselves (`FieldReference("v")`, `LiteralValue("$.city",
StringType)`, `LiteralValue("string", StringType)`) and `canonicalName()`. Also
missing: negative tests pinning the guards (non-foldable path and non-Attribute
child should yield `None`), and a boolean `targetType` case — which would have
surfaced the predicate-wrapping issue flagged above.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##########
@@ -333,6 +335,20 @@ class V2ExpressionBuilder(e: Expression, isPredicate:
Boolean = false) extends L
case _ =>
None
}
+ case v: VariantGet
Review Comment:
This case ignores `isPredicate`, unlike its peers
(`generateExpressionWithNameByChildren` wraps boolean results in a
`V2Predicate`; boolean columns wrap as `= TRUE`). For a boolean `targetType` —
`variant_get(v, '$.flag', 'boolean')` is the most natural variant filter — the
case returns a non-`Predicate`, and when that happens under `And`/`Or`/`Not`
*inside* the builder it trips the live `assert(... isInstanceOf[V2Predicate])`
calls and crashes planning with `AssertionError` instead of declining
translation (the pre-PR behavior). This is concretely reachable via
CHECK-constraint translation (`constraints.scala:132`), which calls
`buildPredicate` on whole un-optimized conditions, e.g. `CHECK (variant_get(v,
'$.ok', 'boolean') OR x > 0)`. Mirroring `buildPredicate`'s own escape hatch
fixes it:
```scala
val udf = new UserDefinedScalarFunc(canonName, canonName,
Array[V2Expression](colRef, pathLit, typeLit))
if (isPredicate && v.dataType.isInstanceOf[BooleanType]) {
Some(new V2Predicate("BOOLEAN_EXPRESSION", Array[V2Expression](udf)))
} else {
Some(udf)
}
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##########
@@ -333,6 +335,20 @@ class V2ExpressionBuilder(e: Expression, isPredicate:
Boolean = false) extends L
case _ =>
None
}
+ case v: VariantGet
+ if v.path.foldable
+ && v.child.isInstanceOf[Attribute] =>
+ val colName = v.child.asInstanceOf[Attribute].name
Review Comment:
Consider translating the child through the existing path instead of the
`isInstanceOf` guard + `asInstanceOf` + manual `FieldReference.column`:
`generateExpression(v.child)` already produces the `FieldReference` for an
`Attribute` (via `ColumnOrField`), so matching on a `Some(ref: FieldReference)`
result reuses the shared infrastructure, drops both casts, and leaves the door
open for struct-nested variant columns later without changing the encoding.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]