cloud-fan commented on code in PR #55327:
URL: https://github.com/apache/spark/pull/55327#discussion_r3400003574


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala:
##########
@@ -270,12 +271,47 @@ case class CatalogTablePartition(
 /**
  * A container for clustering information.
  *
- * @param columnNames the names of the columns used for clustering.
+ * @param entries the clustering entries, each either an `IdentityTransform` 
(plain column)
+ *                or an `ApplyTransform` (expression like `upper(col)`).
  */
-case class ClusterBySpec(columnNames: Seq[NamedReference]) {
+case class ClusterBySpec(entries: Seq[Transform]) {
+  def columnNames: Seq[NamedReference] = entries.map {
+    case IdentityTransform(ref) => ref
+    case t => t.references.head
+  }
+
   override def toString: String = toJson
 
-  def toJson: String = 
ClusterBySpec.mapper.writeValueAsString(columnNames.map(_.fieldNames))
+  def toJson: String = {
+    val hasTransforms = entries.exists(_.isInstanceOf[ApplyTransform])
+    if (hasTransforms) {
+      // New structured JSON format
+      val jsonEntries = entries.map {
+        case IdentityTransform(ref) =>
+          JObject("col" -> JArray(ref.fieldNames().toList.map(JString)))
+        case a: ApplyTransform =>
+          val argsJson = a.args.collect {

Review Comment:
   This encoding can't represent what the parser now accepts: it keeps one 
column ref plus only the literal args, and `fromProperty` rebuilds 
`ApplyTransform(name, colRef +: literals)` (line 382). Two concrete losses: 
`CLUSTER BY (date_trunc('day', ts))` reads back as `date_trunc(ts, 'day')` 
(argument order scrambled), and `CLUSTER BY (concat(c1, c2))` reads back as 
`concat(c1)` (extra column refs silently dropped) — `visitClusterBySpec` 
deliberately allows multiple refs at any position.
   
   Serialize the full ordered argument list instead, each element tagged as a 
column or a literal, e.g. `"args": [{"value": "day", "type": "string"}, {"col": 
["ts"]}]` — then the separate `col` field can go away.
   
   The new round-trip tests only use column-first single-ref transforms, which 
is exactly the subset this encoding preserves; please add literal-first and 
multi-ref cases.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala:
##########
@@ -287,21 +323,131 @@ object ClusterBySpec {
     ret
   }
 
+  /** Factory that wraps plain column references as IdentityTransforms. */
+  def ofColumns(columnNames: Seq[NamedReference]): ClusterBySpec =
+    new ClusterBySpec(columnNames.map(IdentityTransform(_)))
+
   /**
    * Converts the clustering column property to a ClusterBySpec.
+   * Detects format: if top-level elements are arrays, it's old format;
+   * if objects, it's new structured format.
    */
   def fromProperty(columns: String): ClusterBySpec = {
-    
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_)))
+    val parsed = parse(columns)
+    parsed match {
+      case JArray(elements) if elements.nonEmpty =>
+        elements.head match {
+          case _: JArray =>
+            // Old format: Seq[Seq[String]]
+            val colArrays = mapper.readValue[Seq[Seq[String]]](columns)
+            new ClusterBySpec(
+              colArrays.map(names => IdentityTransform(FieldReference(names))))
+          case _: JObject =>
+            // New structured format
+            new ClusterBySpec(elements.map(parseJsonEntry))
+          case _ =>
+            // Fallback: try old format
+            val colArrays = mapper.readValue[Seq[Seq[String]]](columns)
+            new ClusterBySpec(
+              colArrays.map(names => IdentityTransform(FieldReference(names))))
+        }
+      case _ =>
+        // Empty or unexpected -- return empty
+        new ClusterBySpec(Seq.empty[Transform])
+    }
+  }
+
+  private def parseJsonEntry(entry: JValue): Transform = entry match {
+    case JObject(fields) =>
+      val fieldMap = fields.toMap
+      val colParts = fieldMap("col") match {
+        case JArray(parts) => parts.map { case JString(s) => s; case other =>
+          throw new IllegalStateException(s"Unexpected col part: $other") }
+        case other => throw new IllegalStateException(s"Unexpected col value: 
$other")
+      }
+      val colRef = FieldReference(colParts)
+      fieldMap.get("transform") match {
+        case None => IdentityTransform(colRef)
+        case Some(JObject(tFields)) =>
+          val tMap = tFields.toMap
+          val funcName = tMap("name") match {
+            case JString(s) => s
+            case other => throw new IllegalStateException(
+              s"Unexpected transform name: $other")
+          }
+          val litArgs: Seq[V2Expression] = tMap.get("args") match {
+            case Some(JArray(args)) => args.map(jsonToLiteral)
+            case _ => Nil
+          }
+          ApplyTransform(funcName, colRef +: litArgs)
+        case other =>
+          throw new IllegalStateException(s"Unexpected transform value: 
$other")
+      }
+    case other =>
+      throw new IllegalStateException(s"Unexpected JSON entry in 
ClusterBySpec: $other")
+  }
+
+  private[catalog] def literalToJson(lit: LiteralValue[_]): JValue = {
+    val typeStr = lit.dataType.typeName
+    val valueJson = lit.value match {
+      case null => JNull
+      case s: UTF8String => JString(s.toString)
+      case i: Int => JInt(i)
+      case l: Long => JLong(l)
+      case d: Double => JDouble(d)
+      case b: Boolean => JBool(b)
+      case s: Short => JInt(s.toInt)
+      case b: Byte => JInt(b.toInt)
+      case f: Float => JDouble(f.toDouble)
+      case bd: java.math.BigDecimal => JDecimal(BigDecimal(bd))
+      case bd: BigDecimal => JDecimal(bd)
+      case other => JString(other.toString)

Review Comment:
   This fallback plus the JSON-node-type dispatch in `jsonToLiteral` doesn't 
preserve the catalyst value type across the round-trip: a `DateType` literal 
(an `Int`) comes back as a `Long`; a `DecimalType` value (catalyst `Decimal`) 
falls through here as `value.toString` and is read back as a raw `String`; a 
binary literal serializes as `"[B@1a2b3c"`.
   
   Key the decode off the stored `type` field rather than the JSON node type, 
and reject unsupported literal types loudly at write time instead of 
`toString`-ing them.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala:
##########
@@ -287,21 +323,131 @@ object ClusterBySpec {
     ret
   }
 
+  /** Factory that wraps plain column references as IdentityTransforms. */
+  def ofColumns(columnNames: Seq[NamedReference]): ClusterBySpec =
+    new ClusterBySpec(columnNames.map(IdentityTransform(_)))
+
   /**
    * Converts the clustering column property to a ClusterBySpec.
+   * Detects format: if top-level elements are arrays, it's old format;
+   * if objects, it's new structured format.
    */
   def fromProperty(columns: String): ClusterBySpec = {
-    
ClusterBySpec(mapper.readValue[Seq[Seq[String]]](columns).map(FieldReference(_)))
+    val parsed = parse(columns)
+    parsed match {
+      case JArray(elements) if elements.nonEmpty =>
+        elements.head match {
+          case _: JArray =>
+            // Old format: Seq[Seq[String]]
+            val colArrays = mapper.readValue[Seq[Seq[String]]](columns)
+            new ClusterBySpec(
+              colArrays.map(names => IdentityTransform(FieldReference(names))))
+          case _: JObject =>
+            // New structured format
+            new ClusterBySpec(elements.map(parseJsonEntry))
+          case _ =>
+            // Fallback: try old format
+            val colArrays = mapper.readValue[Seq[Seq[String]]](columns)
+            new ClusterBySpec(
+              colArrays.map(names => IdentityTransform(FieldReference(names))))
+        }
+      case _ =>
+        // Empty or unexpected -- return empty

Review Comment:
   Before this PR a malformed `clusteringColumns` property failed with a 
Jackson error; now it silently reads as "no clustering". Corrupted table 
metadata losing its clustering info with no signal is worse than an error — 
please throw on unrecognized input.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -4874,12 +4874,30 @@ class AstBuilder extends DataTypeAstBuilder
   }
 
   /**
-   * Create a [[ClusterBySpec]].
+   * Create a [[ClusterBySpec]] by reusing the `transform` grammar rule.
+   * Each entry is converted to an IdentityTransform (plain column) or 
ApplyTransform
+   * (function call). For CLUSTER BY, we require at least one column reference 
in each
+   * ApplyTransform.
    */
   override def visitClusterBySpec(ctx: ClusterBySpecContext): ClusterBySpec = 
withOrigin(ctx) {
-    val columnNames = ctx.multipartIdentifierList.multipartIdentifier.asScala
-      .map(typedVisit[Seq[String]]).map(FieldReference(_)).toSeq
-    ClusterBySpec(columnNames)
+    val entries = ctx.transform.asScala.map { transformCtx =>
+      transformCtx match {
+        case identityCtx: IdentityTransformContext =>
+          IdentityTransform(FieldReference(
+            typedVisit[Seq[String]](identityCtx.qualifiedName))): Transform
+        case applyCtx: ApplyTransformContext =>
+          val funcName = applyCtx.identifier.getText

Review Comment:
   `PARTITIONED BY (years(ts))` is special-cased into `YearsTransform` in 
`visitPartitionTransform` (line 5154), but the same syntax here always produces 
a generic `ApplyTransform("years")`. Connectors that pattern-match the typed 
transforms will see different shapes for identical syntax across the two 
clauses — intentional?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala:
##########
@@ -160,31 +163,70 @@ private[sql] object BucketTransform {
 /**
  * This class represents a transform for `ClusterBySpec`. This is used to 
bundle
  * ClusterBySpec in CreateTable's partitioning transforms to pass it down to 
analyzer.
+ *
+ * Each entry is either an `IdentityTransform` (plain column) or an 
`ApplyTransform`
+ * (function call like `upper(col)`).
  */
-final case class ClusterByTransform(
-    columnNames: Seq[NamedReference]) extends RewritableTransform {
+private[sql] final case class ClusterByTransform(

Review Comment:
   Two visibility changes here look unintended. This class was public and 
becomes `private[sql]`, while the companion stays public and returns the 
now-private type from `ofColumns` — peers pair a `private[sql]` class with a 
`private[sql]` object (e.g. `BucketTransform`). And `LiteralValue` (line 432) 
goes `private[sql]` → public with no stated need: `Expressions.literal` and the 
public `Literal` interface already cover construction and consumption.
   
   If both are deliberate, please call them out in the PR description; 
otherwise revert.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ClusterBySpecSuite.scala:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.connector.expressions.{ApplyTransform, 
FieldReference, IdentityTransform, LiteralValue}
+import org.apache.spark.sql.types.{IntegerType, StringType, TimestampNTZType, 
TimestampType}
+import org.apache.spark.unsafe.types.UTF8String
+
+class ClusterBySpecSuite extends SparkFunSuite with SQLHelper {
+
+  // -- plain column roundtrip tests
+  test("plain columns roundtrip through toJson and fromProperty") {
+    val spec = ClusterBySpec.ofColumns(Seq(FieldReference(Seq("a")), 
FieldReference(Seq("b", "c"))))
+    val json = spec.toJson
+    val roundtripped = ClusterBySpec.fromProperty(json)
+    assert(roundtripped.columnNames === spec.columnNames)
+    assert(roundtripped.entries.forall(_.isInstanceOf[IdentityTransform]))
+  }
+
+  // -- ApplyTransform roundtrip tests
+  test("ApplyTransform entry roundtrips through toJson and fromProperty") {
+    val spec = new ClusterBySpec(Seq(
+      ApplyTransform("variant_get", Seq(
+        FieldReference(Seq("col1")),
+        LiteralValue(UTF8String.fromString("$.foo"), StringType),
+        LiteralValue(UTF8String.fromString("STRING"), StringType)))))
+
+    val json = spec.toJson
+    val roundtripped = ClusterBySpec.fromProperty(json)
+
+    assert(roundtripped.columnNames === Seq(FieldReference(Seq("col1"))))
+    assert(roundtripped.entries.length === 1)
+    val rt = roundtripped.entries.head.asInstanceOf[ApplyTransform]
+    assert(rt.name === "variant_get")
+    assert(rt.args.length === 3)
+    assert(rt.args(0).asInstanceOf[FieldReference].fieldNames().toSeq === 
Seq("col1"))
+  }
+
+  test("mixed plain and expression entries roundtrip") {
+    val spec = new ClusterBySpec(Seq(
+      IdentityTransform(FieldReference(Seq("id"))),
+      ApplyTransform("upper", Seq(FieldReference(Seq("name"))))))
+
+    val roundtripped = ClusterBySpec.fromProperty(spec.toJson)
+
+    assert(roundtripped.columnNames ===
+      Seq(FieldReference(Seq("id")), FieldReference(Seq("name"))))
+    assert(roundtripped.entries(0).isInstanceOf[IdentityTransform])
+    val rt = roundtripped.entries(1).asInstanceOf[ApplyTransform]
+    assert(rt.name === "upper")
+  }
+
+  test("fromProperty is backward compatible with the old plain 
Seq[Seq[String]] format") {
+    // Old format: just column name arrays
+    val oldFormat = """[["id"],["data"]]"""
+    val spec = ClusterBySpec.fromProperty(oldFormat)
+    assert(spec.columnNames === Seq(FieldReference(Seq("id")), 
FieldReference(Seq("data"))))
+    assert(spec.entries.forall(_.isInstanceOf[IdentityTransform]))
+  }
+
+  test("integer literal in expression roundtrips correctly") {
+    val spec = new ClusterBySpec(Seq(
+      ApplyTransform("some_func", Seq(
+        FieldReference(Seq("col")),
+        LiteralValue(42, IntegerType)))))
+
+    val roundtripped = ClusterBySpec.fromProperty(spec.toJson)
+    assert(roundtripped.columnNames === Seq(FieldReference(Seq("col"))))
+    val rt = roundtripped.entries.head.asInstanceOf[ApplyTransform]
+    assert(rt.name === "some_func")
+    assert(rt.args.length === 2)
+  }
+
+  // -- string literal round-trip tests
+  test("string literal values are preserved through toJson/fromProperty 
round-trip") {
+    val spec = new ClusterBySpec(Seq(
+      ApplyTransform("variant_get", Seq(
+        FieldReference(Seq("col")),
+        LiteralValue(UTF8String.fromString("$.foo"), StringType),
+        LiteralValue(UTF8String.fromString("STRING"), StringType)))))
+
+    val json = spec.toJson
+    val roundtripped = ClusterBySpec.fromProperty(json)
+
+    val rt = roundtripped.entries.head.asInstanceOf[ApplyTransform]
+    assert(rt.name === "variant_get")
+    assert(rt.args.length === 3)
+
+    val lit1 = rt.args(1).asInstanceOf[LiteralValue[_]]
+    assert(lit1.dataType === StringType)
+    assert(lit1.value.toString === "$.foo")
+
+    val lit2 = rt.args(2).asInstanceOf[LiteralValue[_]]
+    assert(lit2.dataType === StringType)
+    assert(lit2.value.toString === "STRING")
+  }
+
+  test("string literal with embedded single quotes round-trips correctly") {
+    val spec = new ClusterBySpec(Seq(
+      ApplyTransform("some_func", Seq(
+        FieldReference(Seq("col")),
+        LiteralValue(UTF8String.fromString("it's a test"), StringType)))))
+
+    val json = spec.toJson
+    val roundtripped = ClusterBySpec.fromProperty(json)
+
+    val rt = roundtripped.entries.head.asInstanceOf[ApplyTransform]
+    val lit = rt.args(1).asInstanceOf[LiteralValue[_]]
+    assert(lit.dataType === StringType)
+    assert(lit.value.toString === "it's a test")
+  }
+
+  test("string literal that looks like a column name round-trips as string, 
not column ref") {
+    val spec = new ClusterBySpec(Seq(
+      ApplyTransform("some_func", Seq(
+        FieldReference(Seq("col")),
+        LiteralValue(UTF8String.fromString("my_column"), StringType)))))
+
+    val json = spec.toJson
+    val roundtripped = ClusterBySpec.fromProperty(json)
+
+    val rt = roundtripped.entries.head.asInstanceOf[ApplyTransform]
+    assert(rt.args(1).isInstanceOf[LiteralValue[_]])
+    val lit = rt.args(1).asInstanceOf[LiteralValue[_]]
+    assert(lit.dataType === StringType)
+    assert(lit.value.toString === "my_column")
+  }
+
+  // -- timestamp literal round-trip tests
+  private def testTimestampRoundTrip(microseconds: Long): Unit = {
+    val spec = new ClusterBySpec(Seq(
+      ApplyTransform("date_trunc", Seq(
+        FieldReference(Seq("ts_col")),
+        LiteralValue(microseconds, TimestampType)))))
+
+    val json = spec.toJson
+    val roundtripped = ClusterBySpec.fromProperty(json)
+
+    val rt = roundtripped.entries.head.asInstanceOf[ApplyTransform]
+    assert(rt.name === "date_trunc")
+    assert(rt.args.length === 2)
+    val lit = rt.args(1).asInstanceOf[LiteralValue[_]]
+    assert(lit.dataType === TimestampType)
+    assert(lit.value == microseconds)
+  }
+
+  test("timestamp literal round-trips correctly") {
+    withSQLConf("spark.sql.session.timeZone" -> "UTC") {
+      testTimestampRoundTrip(1672531200000000L)
+    }
+  }
+
+  test("timestamp literal round-trip with non-UTC timezone") {
+    withSQLConf("spark.sql.session.timeZone" -> "America/Toronto") {
+      testTimestampRoundTrip(1672531200000000L)
+    }
+  }
+
+  test("timestamp_ntz literal round-trips correctly") {
+    val microseconds = 1672531200000000L
+    val spec = new ClusterBySpec(Seq(
+      ApplyTransform("date_trunc", Seq(
+        FieldReference(Seq("ts_col")),
+        LiteralValue(microseconds, TimestampNTZType)))))
+
+    val json = spec.toJson
+    val roundtripped = ClusterBySpec.fromProperty(json)
+
+    val rt = roundtripped.entries.head.asInstanceOf[ApplyTransform]
+    assert(rt.args.length === 2)
+    val lit = rt.args(1).asInstanceOf[LiteralValue[_]]
+    assert(lit.dataType === TimestampNTZType)
+    assert(lit.value == microseconds)
+  }
+
+  // -- backward-compatible factory tests
+  test("apply(Seq[NamedReference]) creates IdentityTransform entries") {

Review Comment:
   The factory under test is `ofColumns`, not `apply`:
   ```suggestion
     test("ofColumns(Seq[NamedReference]) creates IdentityTransform entries") {
   ```



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -848,6 +848,12 @@
     ],
     "sqlState" : "0A000"
   },
+  "CLUSTER_BY_EXPRESSION_INCORRECT_COLUMN_REFERENCE": {

Review Comment:
   This is the `SparkThrowableSuite."Error conditions are correctly formatted"` 
failure in this PR's CI: the file must be sorted by key (`CLUSTER_BY_*` sorts 
*after* `CLUSTERING_*` because `_` > `I` in string order), and the key needs 
the file's `" : {"` spacing. Move the entry below `CLUSTERING_NOT_SUPPORTED` 
and add the space before the colon.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala:
##########
@@ -160,31 +163,70 @@ private[sql] object BucketTransform {
 /**
  * This class represents a transform for `ClusterBySpec`. This is used to 
bundle
  * ClusterBySpec in CreateTable's partitioning transforms to pass it down to 
analyzer.
+ *
+ * Each entry is either an `IdentityTransform` (plain column) or an 
`ApplyTransform`
+ * (function call like `upper(col)`).
  */
-final case class ClusterByTransform(
-    columnNames: Seq[NamedReference]) extends RewritableTransform {
+private[sql] final case class ClusterByTransform(
+    entries: Seq[Transform]) extends RewritableTransform {
 
   override val name: String = "cluster_by"
 
-  override def references: Array[NamedReference] = columnNames.toArray
+  def columnNames: Seq[NamedReference] = entries.map {
+    case IdentityTransform(ref) => ref
+    case t => t.references.head

Review Comment:
   `entries` has an undocumented-but-load-bearing invariant: each entry must be 
an `IdentityTransform` or an `ApplyTransform` with at least one column 
reference. Nothing enforces it off the SQL path — `TableChange.clusterBy` / 
`LogicalExpressions.clusterBy(Seq)` accept anything — and the failure modes are 
a bare `NoSuchElementException` here (and at `ClusterBySpec.columnNames` / 
`normalizeClusterBySpec`), an `IllegalStateException` at property-write time, 
or — worst — silent degradation to a plain column in the legacy `toJson` branch 
(interface.scala:308-311) for e.g. a `YearsTransform` entry.
   
   A `require` in one shared factory would turn all of these into a single 
clear error and make the parser-side check redundant.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala:
##########
@@ -330,22 +473,40 @@ object ClusterBySpec {
       return clusterBySpec
     }
 
-    val normalizedColumns = clusterBySpec.columnNames.map { columnName =>
-      val position = SchemaUtils.findColumnPosition(
-        columnName.fieldNames().toImmutableArraySeq, schema, resolver)
-      FieldReference(SchemaUtils.getColumnName(position, schema))
+    val normalizedEntries = clusterBySpec.entries.map {
+      case IdentityTransform(ref) =>
+        val position = SchemaUtils.findColumnPosition(
+          ref.fieldNames().toImmutableArraySeq, schema, resolver)
+        IdentityTransform(FieldReference(
+          SchemaUtils.getColumnName(position, schema))): Transform
+      case a: ApplyTransform =>
+        val newArgs: Seq[V2Expression] = a.args.map {
+          case ref: NamedReference =>
+            val position = SchemaUtils.findColumnPosition(
+              ref.fieldNames().toImmutableArraySeq, schema, resolver)
+            FieldReference(SchemaUtils.getColumnName(position, schema))
+          case other => other
+        }
+        ApplyTransform(a.name, newArgs): Transform
+      case other => other
+    }
+
+    val normalizedColumns = normalizedEntries.map {

Review Comment:
   This dedups on each entry's source column, so `CLUSTER BY (variant_get(data, 
'$.a'), variant_get(data, '$.b'))` — the PR's motivating use case — fails with 
a duplicate-column error on the session-catalog path. Pure-v2 CREATE never 
reaches this normalize, so the same statement succeeds there: a v1/v2 
divergence.
   
   Compare whole normalized entries instead of their head columns. (This 
predates round 2 — my miss in the first review.)



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala:
##########
@@ -160,31 +163,70 @@ private[sql] object BucketTransform {
 /**
  * This class represents a transform for `ClusterBySpec`. This is used to 
bundle
  * ClusterBySpec in CreateTable's partitioning transforms to pass it down to 
analyzer.
+ *
+ * Each entry is either an `IdentityTransform` (plain column) or an 
`ApplyTransform`
+ * (function call like `upper(col)`).
  */
-final case class ClusterByTransform(
-    columnNames: Seq[NamedReference]) extends RewritableTransform {
+private[sql] final case class ClusterByTransform(
+    entries: Seq[Transform]) extends RewritableTransform {
 
   override val name: String = "cluster_by"
 
-  override def references: Array[NamedReference] = columnNames.toArray
+  def columnNames: Seq[NamedReference] = entries.map {
+    case IdentityTransform(ref) => ref
+    case t => t.references.head
+  }
 
-  override def arguments: Array[Expression] = columnNames.toArray
+  override def references: Array[NamedReference] = 
entries.flatMap(_.references).toArray
 
-  override def toString: String = 
s"$name(${arguments.map(_.describe).mkString(", ")})"
+  override def arguments: Array[Expression] = entries.toArray
+
+  override def describe: String = 
s"cluster_by(${entries.map(_.describe).mkString(", ")})"
+
+  override def toString: String = describe
 
   override def withReferences(newReferences: Seq[NamedReference]): Transform = 
{
-    this.copy(columnNames = newReferences)
+    var refIdx = 0
+    val newEntries = entries.map {
+      case _: IdentityTransform =>
+        val e = IdentityTransform(newReferences(refIdx))
+        refIdx += 1
+        e
+      case a: ApplyTransform =>
+        val newArgs = a.args.map {
+          case _: NamedReference =>
+            val r = newReferences(refIdx)
+            refIdx += 1
+            r
+          case other => other
+        }
+        ApplyTransform(a.name, newArgs)
+      case other =>
+        refIdx += other.references.length
+        other
+    }
+    this.copy(entries = newEntries)
   }
 }
 
 /**
- * Convenience extractor for ClusterByTransform.
+ * Companion object for ClusterByTransform with extractor.
  */
 object ClusterByTransform {
-  def unapply(transform: Transform): Option[Seq[NamedReference]] =
+  /** Factory that wraps plain column references as IdentityTransforms. */
+  def ofColumns(columnNames: Seq[NamedReference]): ClusterByTransform =
+    new ClusterByTransform(columnNames.map(IdentityTransform(_)))
+
+  def unapply(transform: Transform): Option[Seq[Transform]] =
     transform match {
+      case ct: ClusterByTransform => Some(ct.entries)
       case NamedTransform("cluster_by", arguments) =>
-        Some(arguments.map(_.asInstanceOf[NamedReference]))
+        Some(arguments.map {
+          case ref: NamedReference => IdentityTransform(ref)
+          case t: Transform => t
+          case other => throw new IllegalArgumentException(
+            s"Unexpected argument type in cluster_by: ${other.getClass}")

Review Comment:
   Throwing from an extractor means any `case ClusterByTransform(...)` arm 
crashes on this input instead of falling through to the next case. Return 
`None` for unexpected argument types and let the caller decide.



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java:
##########
@@ -273,12 +273,12 @@ static TableChange deleteColumn(String[] fieldNames, 
Boolean ifExists) {
   /**
    * Create a TableChange for changing clustering columns for a table.
    *
-   * @param clusteringColumns clustering columns to change to. Each clustering 
column represents
-   *                          field names.
+   * @param entries clustering entries. Each entry is either an 
IdentityTransform (plain column)

Review Comment:
   This documents the public API contract in terms of 
`IdentityTransform`/`ApplyTransform`, which are `private[sql]` Scala classes a 
connector can't reference. Describe the contract via the public surface instead 
(`Expressions.identity(...)` / `Expressions.apply(name, args)`).
   
   And from the round-1 thread, still open: please keep a 
`clusterBy(NamedReference[])` overload and a derived `clusteringColumns()` 
accessor so existing connectors compile — the description's 
backward-compatibility claim currently doesn't hold.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -4874,12 +4874,30 @@ class AstBuilder extends DataTypeAstBuilder
   }
 
   /**
-   * Create a [[ClusterBySpec]].
+   * Create a [[ClusterBySpec]] by reusing the `transform` grammar rule.
+   * Each entry is converted to an IdentityTransform (plain column) or 
ApplyTransform
+   * (function call). For CLUSTER BY, we require at least one column reference 
in each
+   * ApplyTransform.
    */
   override def visitClusterBySpec(ctx: ClusterBySpecContext): ClusterBySpec = 
withOrigin(ctx) {
-    val columnNames = ctx.multipartIdentifierList.multipartIdentifier.asScala
-      .map(typedVisit[Seq[String]]).map(FieldReference(_)).toSeq
-    ClusterBySpec(columnNames)
+    val entries = ctx.transform.asScala.map { transformCtx =>
+      transformCtx match {
+        case identityCtx: IdentityTransformContext =>
+          IdentityTransform(FieldReference(
+            typedVisit[Seq[String]](identityCtx.qualifiedName))): Transform
+        case applyCtx: ApplyTransformContext =>
+          val funcName = applyCtx.identifier.getText
+          val arguments = 
applyCtx.argument.asScala.map(visitTransformArgument).toSeq
+          val refs = arguments.collect { case ref: FieldReference => ref }
+          if (refs.isEmpty) {
+            throw new AnalysisException(
+              errorClass = "CLUSTER_BY_EXPRESSION_INCORRECT_COLUMN_REFERENCE",

Review Comment:
   Three small things on this error: (1) this 2-arg `AnalysisException` 
constructor carries no position — use the `(errorClass, messageParameters, 
origin)` overload (you're inside `withOrigin(ctx)`, so `CurrentOrigin.get` has 
it) so the error points at the offending expression; (2) the parameter is named 
`expressionType` but is filled with the function *name*; (3) the condition name 
says `INCORRECT_COLUMN_REFERENCE` but the only case that fires is a *missing* 
column reference — `CLUSTER_BY_EXPRESSION_MISSING_COLUMN_REFERENCE` reads 
truer, and renaming is free before this merges.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableClusterBySuite.scala:
##########
@@ -35,7 +35,45 @@ class AlterTableClusterBySuite extends 
command.AlterTableClusterBySuiteBase
       .loadTable(Identifier.of(Array(namespace), table))
       .asInstanceOf[InMemoryTable]
     assert(partTable.partitioning ===
-      Array(ClusterByTransform(clusteringColumns.map(FieldReference(_)))))
+      
Array(ClusterByTransform.ofColumns(clusteringColumns.map(FieldReference(_)))))
+  }
+
+  // The V2 in-memory test catalog does not apply ClusterBy changes via 
alterTable,

Review Comment:
   This comment contradicts the body — the `Some` branch below does validate 
transforms — and the `case None =>` branch is assertion-free, so the helper 
silently passes if clustering is dropped entirely. Please make the `None` case 
fail (or assert the expected absence explicitly) and reword the comment.
   
   More importantly, this helper still has zero call sites across all four 
suites (round-1 thread), so nothing end-to-end exercises `CREATE/ALTER ... 
CLUSTER BY (func(col))`. One note for those tests: `InMemoryBaseTable.getKey` 
clusters rows by the raw source column for expression entries, so only metadata 
(not data layout) is verifiable in the in-memory catalog.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to