huaxingao commented on a change in pull request #34914:
URL: https://github.com/apache/spark/pull/34914#discussion_r779185599



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
##########
@@ -104,24 +104,29 @@ private[sql] final case class BucketTransform(
     columns: Seq[NamedReference],
     sortedColumns: Seq[NamedReference] = Seq.empty[NamedReference]) extends 
RewritableTransform {
 
-  override val name: String = "bucket"
+  override val name: String = if (sortedColumns.nonEmpty) "sortedBucket" else 
"bucket"
 
   override def references: Array[NamedReference] = {
     arguments.collect { case named: NamedReference => named }
   }
 
-  override def arguments: Array[Expression] = numBuckets +: columns.toArray
-
-  override def toString: String =
+  override def arguments: Array[Expression] = {
     if (sortedColumns.nonEmpty) {
-      s"bucket(${arguments.map(_.describe).mkString(", ")}," +
-        s" ${sortedColumns.map(_.describe).mkString(", ")})"
+      (columns.toArray :+ numBuckets) ++ sortedColumns
     } else {
-      s"bucket(${arguments.map(_.describe).mkString(", ")})"
+      numBuckets +: columns.toArray
     }
+  }
+
+  override def toString: String = 
s"$name(${arguments.map(_.describe).mkString(", ")})"
 
   override def withReferences(newReferences: Seq[NamedReference]): Transform = 
{
-    this.copy(columns = newReferences)
+    if (sortedColumns.isEmpty) {
+      this.copy(columns = newReferences)
+    } else {
+      val splits = newReferences.grouped(columns.length).toList
+      this.copy(columns = splits(0), sortedColumns = splits(1))

Review comment:
       Changed. Thanks!

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
##########
@@ -104,24 +104,29 @@ private[sql] final case class BucketTransform(
     columns: Seq[NamedReference],
     sortedColumns: Seq[NamedReference] = Seq.empty[NamedReference]) extends 
RewritableTransform {
 
-  override val name: String = "bucket"
+  override val name: String = if (sortedColumns.nonEmpty) "sortedBucket" else 
"bucket"

Review comment:
       Added a new class `SortedBucketTransform`. Thanks!

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
##########
@@ -1566,18 +1566,22 @@ class DataSourceV2SQLSuite
   test("create table using - with sorted bucket") {
     val identifier = "testcat.table_name"
     withTable(identifier) {
-      sql(s"CREATE TABLE $identifier (a int, b string, c int) USING $v2Source 
PARTITIONED BY (c)" +
-        s" CLUSTERED BY (b) SORTED by (a) INTO 4 BUCKETS")

Review comment:
       Just want to make sure multiple columns/sortedColumns work ok.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to