EnricoMi commented on code in PR #39431:
URL: https://github.com/apache/spark/pull/39431#discussion_r1063464674


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala:
##########
@@ -181,13 +178,111 @@ class V1WriteCommandSuite extends QueryTest with 
SharedSparkSession with V1Write
             |PARTITIONED BY (k STRING)
             |""".stripMargin)
         executeAndCheckOrdering(
-          hasLogicalSort = true, orderingMatched = enabled, hasEmpty2Null = 
enabled) {
+          hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = 
enabled) {
           sql("INSERT INTO t SELECT * FROM t0 ORDER BY k")
         }
       }
     }
   }
 
+  test("SPARK-41914: v1 write with AQE and in-partition sorted - non-string 
partition column") {
+    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      withPlannedWrite { enabled =>
+        withTable("t") {
+          sql(
+            """
+              |CREATE TABLE t(b INT, value STRING) USING PARQUET
+              |PARTITIONED BY (key INT)
+              |""".stripMargin)
+          executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = 
true) {
+            sql(
+              """
+                |INSERT INTO t
+                |SELECT b, value, key
+                |FROM testData JOIN testData2 ON key = a
+                |SORT BY key, value
+                |""".stripMargin)
+          }
+
+          // inspect the actually executed plan (that is different to 
executeAndCheckOrdering)
+          assert(FileFormatWriter.executedPlan.isDefined)
+          val executedPlan = FileFormatWriter.executedPlan.get
+
+          val plan = if (enabled) {
+            assert(executedPlan.isInstanceOf[WriteFilesExec])
+            executedPlan.asInstanceOf[WriteFilesExec].child
+          } else {
+            executedPlan.transformDown {
+              case a: AdaptiveSparkPlanExec => a.executedPlan
+            }
+          }
+
+          // assert the outer most sort in the executed plan
+          assert(plan.collectFirst {
+            case s: SortExec => s
+          }.exists {
+            case SortExec(Seq(
+            SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, 
NullsFirst, _),
+            SortOrder(AttributeReference("value", StringType, _, _), 
Ascending, NullsFirst, _)
+            ), false, _, _) => true
+            case _ => false
+          }, plan)
+        }
+      }
+    }
+  }
+
+  test("SPARK-41914: v1 write with AQE and in-partition sorted - string 
partition column") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        sql(
+          """
+            |CREATE TABLE t(key INT, b INT) USING PARQUET
+            |PARTITIONED BY (value STRING)
+            |""".stripMargin)
+        executeAndCheckOrdering(
+          hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = 
enabled) {
+          sql(
+            """
+              |INSERT INTO t
+              |SELECT key, b, value
+              |FROM testData JOIN testData2 ON key = a
+              |SORT BY value, key
+              |""".stripMargin)
+        }
+
+        // inspect the actually executed plan (that is different to 
executeAndCheckOrdering)
+        assert(FileFormatWriter.executedPlan.isDefined)
+        val executedPlan = FileFormatWriter.executedPlan.get
+
+        val plan = if (enabled) {
+          assert(executedPlan.isInstanceOf[WriteFilesExec])
+          executedPlan.asInstanceOf[WriteFilesExec].child
+        } else {
+          executedPlan.transformDown {
+            case a: AdaptiveSparkPlanExec => a.executedPlan
+          }
+        }
+
+        // assert the outer most sort in the executed plan
+        assert(plan.collectFirst {
+          case s: SortExec => s
+        }.map(s => (enabled, s)).exists {
+          case (false, SortExec(Seq(
+          SortOrder(AttributeReference("value", StringType, _, _), Ascending, 
NullsFirst, _),
+          SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, 
NullsFirst, _)
+          ), false, _, _)) => true
+
+          // SPARK-40885: this bug removes the in-partition sort, which 
manifests here
+          case (true, SortExec(Seq(
+          SortOrder(AttributeReference("value", StringType, _, _), Ascending, 
NullsFirst, _)

Review Comment:
   that is correctness bug SPARK-40885 discussed in #38356



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to