c21 commented on code in PR #37290:
URL: https://github.com/apache/spark/pull/37290#discussion_r932846383


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala:
##########
@@ -117,20 +117,26 @@ object V1WritesUtils {
       outputColumns: Seq[Attribute],
       partitionColumns: Seq[Attribute],
       bucketSpec: Option[BucketSpec],
-      options: Map[String, String]): Seq[SortOrder] = {
+      options: Map[String, String],
+      numStaticPartitions: Int = 0): Seq[SortOrder] = {
+    assert(partitionColumns.size >= numStaticPartitions)

Review Comment:
   ditto.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala:
##########
@@ -107,8 +108,10 @@ object FileFormatWriter extends Logging {
       partitionColumns: Seq[Attribute],
       bucketSpec: Option[BucketSpec],
       statsTrackers: Seq[WriteJobStatsTracker],
-      options: Map[String, String])
+      options: Map[String, String],
+      numStaticPartitions: Int = 0)
     : Set[String] = {
+    assert(partitionColumns.size >= numStaticPartitions)

Review Comment:
   nit: would `require()` be better?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala:
##########
@@ -214,4 +216,34 @@ class V1WriteCommandSuite extends V1WriteCommandSuiteBase 
with SharedSparkSessio
       }
     }
   }
+
+  test("SPARK-37194: Avoid unnecessary sort in v1 write if it's not dynamic 
partition") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        sql(
+          """
+            |CREATE TABLE t(key INT, value STRING) USING PARQUET
+            |PARTITIONED BY (p1 INT, p2 STRING)
+            |""".stripMargin)
+
+        // partition columns are static
+        executeAndCheckOrdering(hasLogicalSort = false, orderingMatched = 
true) {
+          sql(
+            """
+              |INSERT INTO t PARTITION(p1=1, p2='a')
+              |SELECT key, value FROM testData
+              |""".stripMargin)
+        }
+
+        // one static partition column and one dynamic partition column
+        executeAndCheckOrdering(hasLogicalSort = enabled, orderingMatched = 
enabled) {
+          sql(
+            """
+              |INSERT INTO t PARTITION(p1=1, p2)
+              |SELECT key, value, value FROM testData
+              |""".stripMargin)
+        }
+      }
+    }

Review Comment:
   would it be good to have one more unit test for no static columns? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala:
##########
@@ -83,6 +83,7 @@ object FileFormatWriter extends Logging {
    */
   private[sql] var outputOrderingMatched: Boolean = false
 
+  // scalastyle:off argcount

Review Comment:
   nit: we can pass in a wrapper class `PartitionSpec(partitionColumns: 
Seq[Attribute], numStaticPartitions: Int)` to avoid this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to