cloud-fan commented on code in PR #55327:
URL: https://github.com/apache/spark/pull/55327#discussion_r3245959707
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala:
##########
@@ -291,7 +320,104 @@ object ClusterBySpec {
* Converts the clustering column property to a ClusterBySpec.
*/
def fromProperty(columns: String): ClusterBySpec = {
-
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_)))
+
ClusterBySpec.fromColumnEntries(mapper.readValue[Seq[Seq[String]]](columns))
+ }
+
+ /**
+ * Constructs a [[ClusterBySpec]] from the stored column entries (each a
Seq[String]).
+ * An entry is either a multi-part column name or a single-element Seq
containing an
+ * expression string like "variant_get(col,'$.foo','STRING')".
+ */
+ def fromColumnEntries(entries: Seq[Seq[String]]): ClusterBySpec = {
+ val parsedCols: Seq[(NamedReference, Option[Transform])] = entries.map {
+ case names if names.length == 1 =>
+ // Could be an expression of form "funcName(col, arg1, arg2, ...)"
+ try {
+ CatalystSqlParser.parseExpression(names.head) match {
+ case u: UnresolvedFunction =>
+ val transform: Transform = new ClusteringColumnTransform(
+ QuotingUtils.quoted(u.nameParts.toArray),
+ u.children.map {
+ case a: UnresolvedAttribute =>
+ FieldReference(QuotingUtils.quoted(a.nameParts.toArray))
+ case l: Literal => LiteralValue(l.value, l.dataType)
+ case other => throw new IllegalStateException(
+ s"Unexpected argument type in CLUSTER BY expression:
${other.getClass}")
+ }.toArray)
Review Comment:
Two SQL-string round-trips into the v2 API on these lines (and the parallel
block at :379-387):
1. `name = QuotingUtils.quoted(u.nameParts.toArray)` puts a backtick-quoted
SQL identifier string into `Transform.name()` for multi-part function names.
Every existing v2 transform's `name` is an unqualified plain string
(`"bucket"`, `"years"`, `ApplyTransform.name = applyCtx.identifier.getText` at
`AstBuilder.scala:5091`). Connectors comparing `transform.name()` against a
plain function name will silently miss namespaced cases.
2. `FieldReference(QuotingUtils.quoted(a.nameParts.toArray))` has structured
`Seq[String]` parts in hand, stringifies them with backtick quoting, then
`FieldReference(String)` re-parses via
`CatalystSqlParser.parseMultipartIdentifier`. Should be
`FieldReference(a.nameParts)` (the `Seq[String]` apply that exists already).
See body summary, pillar 2.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala:
##########
@@ -271,11 +273,38 @@ case class CatalogTablePartition(
* A container for clustering information.
*
* @param columnNames the names of the columns used for clustering.
+ * @param clusteringColumnTransforms per-column transforms for
expression-based clustering.
+ * When non-empty, each element corresponds
to a column in
+ * columnNames: None means a plain column
reference,
+ * Some(transform) means an expression like
UPPER(col).
+ * An empty Seq means no transforms on any
columns.
*/
-case class ClusterBySpec(columnNames: Seq[NamedReference]) {
+case class ClusterBySpec(
+ columnNames: Seq[NamedReference],
+ clusteringColumnTransforms: Seq[Option[Transform]] = Seq.empty) {
override def toString: String = toJson
- def toJson: String =
ClusterBySpec.mapper.writeValueAsString(columnNames.map(_.fieldNames))
+ def toJson: String = toColumnNames
+
+ def toColumnNames: String = {
+ val entries: Seq[Seq[String]] = if (clusteringColumnTransforms.isEmpty) {
+ columnNames.map(_.fieldNames().toSeq)
+ } else {
+ columnNames.zip(clusteringColumnTransforms).map {
+ case (colName, None) => colName.fieldNames().toSeq
+ case (colName, Some(transform)) =>
+ val args = transform.arguments().map {
+ case n: NamedReference =>
n.fieldNames().map(QuotingUtils.quoteIfNeeded).mkString(".")
+ case LiteralValue(value, dataType) =>
+ Literal(value, dataType).sql
+ case other => throw new IllegalStateException(
+ s"Unexpected argument type in CLUSTER BY expression:
${other.getClass}")
+ }
+
Seq(s"${QuotingUtils.quoteIfNeeded(transform.name())}(${args.mkString(",")})")
+ }
+ }
+ ClusterBySpec.mapper.writeValueAsString(entries)
+ }
Review Comment:
This stringifies a v2 Transform into a SQL function-call expression and
embeds it in the existing `Seq[Seq[String]]` JSON shape; `fromColumnEntries` at
:331 then calls `CatalystSqlParser.parseExpression` on each single-element
entry to recover the transform. Two concrete consequences:
- A column whose name parses as a function call — e.g. backtick-quoted
``CLUSTER BY (`lower(col)`)`` — is stored as `[["lower(col)"]]`
(indistinguishable from a transform entry) and read back as a
`ClusteringColumnTransform`. Silent regression on existing tables with such
names.
- Round-trip stability depends on `Literal(...).sql` ↔ the SQL parser.
`TimestampType.sql` emits `TIMESTAMP 'literal'`, which the parser turns into a
`Cast` of a string, not a `Literal` — the `ClusterBySpecSuite` timestamp tests
pass only because they pin the session timezone.
Please use a structured format (e.g. `{"col": [...], "transform": {"name":
..., "args": [...]}}`) using the same Jackson mapper. See body summary, pillar
3.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala:
##########
@@ -291,7 +320,104 @@ object ClusterBySpec {
* Converts the clustering column property to a ClusterBySpec.
*/
def fromProperty(columns: String): ClusterBySpec = {
-
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_)))
+
ClusterBySpec.fromColumnEntries(mapper.readValue[Seq[Seq[String]]](columns))
+ }
+
+ /**
+ * Constructs a [[ClusterBySpec]] from the stored column entries (each a
Seq[String]).
+ * An entry is either a multi-part column name or a single-element Seq
containing an
+ * expression string like "variant_get(col,'$.foo','STRING')".
+ */
+ def fromColumnEntries(entries: Seq[Seq[String]]): ClusterBySpec = {
+ val parsedCols: Seq[(NamedReference, Option[Transform])] = entries.map {
+ case names if names.length == 1 =>
+ // Could be an expression of form "funcName(col, arg1, arg2, ...)"
+ try {
+ CatalystSqlParser.parseExpression(names.head) match {
+ case u: UnresolvedFunction =>
+ val transform: Transform = new ClusteringColumnTransform(
+ QuotingUtils.quoted(u.nameParts.toArray),
+ u.children.map {
+ case a: UnresolvedAttribute =>
+ FieldReference(QuotingUtils.quoted(a.nameParts.toArray))
+ case l: Literal => LiteralValue(l.value, l.dataType)
+ case other => throw new IllegalStateException(
+ s"Unexpected argument type in CLUSTER BY expression:
${other.getClass}")
+ }.toArray)
+ val colRef = transform.arguments().collectFirst {
+ case f: FieldReference => f
+ }.getOrElse(throw new IllegalStateException(
+ "CLUSTER BY expression must contain exactly one column
reference"))
+ (colRef, Some(transform))
+ case _ => (FieldReference(names), None)
+ }
+ } catch {
+ // Sometimes, we can get a parse exception if the column name
contains invalid
+ // characters by itself. Quote the column name and see if parsing it
as a multipart
+ // identifier works, and if so, use that as a direct FieldReference
to a column.
+ case _: ParseException =>
+ val identifier = CatalystSqlParser.parseMultipartIdentifier(
+ QuotingUtils.quoteIfNeeded(names.head))
+
(FieldReference(identifier.map(_.stripPrefix("`").stripSuffix("`"))), None)
+ }
+ case names => (FieldReference(names), None)
+ }
+ val (colNames, transforms) = parsedCols.unzip
+ val transformsSeq = if (transforms.forall(_.isEmpty)) Seq.empty else
transforms
+ ClusterBySpec(colNames, transformsSeq)
+ }
+
+ def fromExpressions(
+ parsedCols: Seq[Either[Expression, Seq[String]]]): ClusterBySpec = {
+ val (clusteringColumnNames, clusteringColumnExpressions) = parsedCols.map {
+ case Left(e) =>
+ e match {
+ // A bare column reference parsed as an expression - treat as plain
column.
+ case a: UnresolvedAttribute =>
+ (FieldReference(a.nameParts), None)
+ case u: UnresolvedFunction =>
+ val transform = new ClusteringColumnTransform(
+ QuotingUtils.quoted(u.nameParts.toArray),
+ u.children.map {
+ case a: UnresolvedAttribute =>
+ FieldReference(QuotingUtils.quoted(a.nameParts.toArray))
+ case l: Literal => LiteralValue(l.value, l.dataType)
+ case _ => throw new IllegalStateException(
+ "Unsupported expression argument in CLUSTER BY transform")
+ }.toArray)
+ val transformName = QuotingUtils.quoted(u.nameParts.toArray)
+ val refs = transform.arguments().collect {
+ case f: FieldReference => f
+ }
+ if (refs.isEmpty) {
+ throw new AnalysisException(
+ errorClass =
"CLUSTER_BY_EXPRESSION_INCORRECT_COLUMN_REFERENCE",
+ messageParameters = Map("expressionType" -> transformName))
+ }
+ if (refs.length != 1) {
+ throw new AnalysisException(
+ errorClass =
"CLUSTER_BY_EXPRESSION_INCORRECT_COLUMN_REFERENCE",
+ messageParameters = Map("expressionType" -> transformName))
+ }
+ if (!transform.arguments().head.isInstanceOf[FieldReference]) {
Review Comment:
Why must the column reference be at `arguments().head`? `PARTITIONED BY
(bucket(4, c1, c2))` already allows columns at any position; the equivalent
restriction here is undocumented and the error message at :403-406 doesn't
mention it. If the constraint is intentional, please document it and surface it
in the error; if not, drop this check.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala:
##########
@@ -157,12 +157,30 @@ private[sql] object BucketTransform {
}
}
+/**
+ * Minimal description of a per-column transform applied within a CLUSTER BY
expression.
+ *
+ * @param columnIndex index into [[ClusterByTransform.columnNames]]
identifying the column
+ * being transformed.
+ * @param argumentIndex the index in the argument list where the bound
clustering column
+ * should be substituted. Zero-indexed, and any arguments
at or after
+ * this index in `arguments` should be shifted to the
right by one.
+ * @param function canonical SQL function name (e.g. "variant_get").
+ * @param arguments the non-column literal arguments to the function.
+ */
+case class ClusterByColumnTransform(
+ columnIndex: Int,
+ argumentIndex: Int,
+ function: String,
+ arguments: Seq[LiteralValue[_]])
Review Comment:
Both `ClusterByColumnTransform` (this abstract descriptor) and
`ClusteringColumnTransform` (the concrete Transform added further below) can be
removed if you reuse `ApplyTransform` and store per-column transforms as the
`arguments` of `ClusterByTransform` itself — `cluster_by(c1, upper(c2),
bucket(4, c3))`. That gives correct `describe()` / `toString` / `arguments()`
for free, fixes the DESCRIBE / SHOW CREATE TABLE round-trip, and matches
`PARTITIONED BY`'s wire shape. The `argumentIndex` field here is also dead
state today because `fromExpressions` forces the column reference to position
0. See body summary, pillar 1.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableClusterBySuiteBase.scala:
##########
@@ -43,6 +44,17 @@ trait AlterTableClusterBySuiteBase extends QueryTest with
DDLCommandTestUtils {
def validateClusterBy(tableName: String, clusteringColumns: Seq[String]):
Unit
+ /**
+ * Validates clustering columns and their associated transforms.
+ * @param expectedTransforms per-column transforms, where None means a plain
column reference
+ * and Some(transform) means an expression-based
clustering column.
+ * Must have the same length as clusteringColumns.
+ */
+ def validateClusterBy(
+ tableName: String,
+ clusteringColumns: Seq[String],
+ expectedTransforms: Seq[Option[Transform]]): Unit
Review Comment:
This new abstract `validateClusterBy(... expectedTransforms)` is implemented
in all four v1/v2 Create/Alter suites but never called from any test — no test
issues `CREATE TABLE … CLUSTER BY (func(col))` or the ALTER equivalent and
verifies the transform survives the parser → spec → catalog → read-back
round-trip. The only coverage of the new feature is `ClusterBySpecSuite`
exercising helpers directly. Please add at least one happy-path test per v1/v2
catalog (and the corresponding ones in `CreateTableClusterBySuiteBase` at
:47-56).
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -725,6 +725,12 @@
],
"sqlState" : "0A000"
},
+ "CLUSTER_BY_EXPRESSION_INCORRECT_COLUMN_REFERENCE": {
+ "message" : [
+ "CLUSTER BY expression <expressionType> has either no column reference,
or a column reference in an unsupported argument position."
Review Comment:
This error is thrown for three distinct conditions in
`ClusterBySpec.fromExpressions` (`interface.scala:392-407`): (a) no column
reference, (b) more than one column reference, and (c) the column is not at
argument position 0. The message describes only (a) and (c) — a `CLUSTER BY
(concat(c1, c2))` failure surfaces as if it were about argument position.
Either split into separate error classes or rewrite the message to cover all
three, and document the position-0 constraint somewhere user-facing.
##########
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##########
@@ -546,8 +546,17 @@ replaceTableHeader
: (CREATE OR)? REPLACE TABLE identifierReference
;
+expressionOrMultipartIdentifier
+ : expression
+ | multipartIdentifier
+ ;
Review Comment:
Consider reusing the existing `transform` rule from line 1251 (used by
`PARTITIONED BY`) instead of inventing `expressionOrMultipartIdentifier`. The
general `expression` rule accepts arbitrary inputs (e.g. `1+1`, `CASE … END`)
that then fail post-parse with a generic `IllegalStateException` from
`fromExpressions`; the dedicated `applyTransform: identifier '('
transformArgument,... ')'` rule rejects those at parse time with a targeted
error. See the body summary, pillar 1.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala:
##########
@@ -340,12 +466,17 @@ object ClusterBySpec {
normalizedColumns.map(_.toString),
resolver)
- ClusterBySpec(normalizedColumns)
+ ClusterBySpec(normalizedColumns, clusterBySpec.clusteringColumnTransforms)
Review Comment:
`clusterBySpec.clusteringColumnTransforms` is passed through unchanged — the
`FieldReference` inside each transform's `arguments` is not normalized against
the schema, so after normalization the column case/path in `transforms` can
disagree with the case/path in `normalizedColumns`.
##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java:
##########
@@ -277,8 +279,10 @@ static TableChange deleteColumn(String[] fieldNames,
Boolean ifExists) {
* field names.
* @return a TableChange for this assignment
*/
- static TableChange clusterBy(NamedReference[] clusteringColumns) {
- return new ClusterBy(clusteringColumns);
+ static TableChange clusterBy(
+ NamedReference[] clusteringColumns,
+ Optional<Transform>[] transforms) {
+ return new ClusterBy(clusteringColumns, transforms);
}
Review Comment:
This replaces the previous `clusterBy(NamedReference[])` factory with a new
two-arg one — source-incompatible for any external connector that constructs
`TableChange.ClusterBy` today. The PR description says the change is
backward-compatible for connectors, but that's only true for receivers of the
TableChange. Please add a backward-compatible single-arg overload (delegating
with an empty `Optional<Transform>[]`).
Also, the Javadoc above only documents `@param clusteringColumns`; please
add `@param transforms`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]