EnricoMi commented on code in PR #39431:
URL: https://github.com/apache/spark/pull/39431#discussion_r1063464674
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala:
##########
@@ -181,13 +178,111 @@ class V1WriteCommandSuite extends QueryTest with
SharedSparkSession with V1Write
|PARTITIONED BY (k STRING)
|""".stripMargin)
executeAndCheckOrdering(
- hasLogicalSort = true, orderingMatched = enabled, hasEmpty2Null =
enabled) {
+ hasLogicalSort = true, orderingMatched = true, hasEmpty2Null =
enabled) {
sql("INSERT INTO t SELECT * FROM t0 ORDER BY k")
}
}
}
}
+ test("SPARK-41914: v1 write with AQE and in-partition sorted - non-string
partition column") {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ sql(
+ """
+ |CREATE TABLE t(b INT, value STRING) USING PARQUET
+ |PARTITIONED BY (key INT)
+ |""".stripMargin)
+ executeAndCheckOrdering(hasLogicalSort = true, orderingMatched =
true) {
+ sql(
+ """
+ |INSERT INTO t
+ |SELECT b, value, key
+ |FROM testData JOIN testData2 ON key = a
+ |SORT BY key, value
+ |""".stripMargin)
+ }
+
+ // inspect the actually executed plan (that is different to
executeAndCheckOrdering)
+ assert(FileFormatWriter.executedPlan.isDefined)
+ val executedPlan = FileFormatWriter.executedPlan.get
+
+ val plan = if (enabled) {
+ assert(executedPlan.isInstanceOf[WriteFilesExec])
+ executedPlan.asInstanceOf[WriteFilesExec].child
+ } else {
+ executedPlan.transformDown {
+ case a: AdaptiveSparkPlanExec => a.executedPlan
+ }
+ }
+
+ // assert the outer most sort in the executed plan
+ assert(plan.collectFirst {
+ case s: SortExec => s
+ }.exists {
+ case SortExec(Seq(
+ SortOrder(AttributeReference("key", IntegerType, _, _), Ascending,
NullsFirst, _),
+ SortOrder(AttributeReference("value", StringType, _, _),
Ascending, NullsFirst, _)
+ ), false, _, _) => true
+ case _ => false
+ }, plan)
+ }
+ }
+ }
+ }
+
+ test("SPARK-41914: v1 write with AQE and in-partition sorted - string
partition column") {
+ withPlannedWrite { enabled =>
+ withTable("t") {
+ sql(
+ """
+ |CREATE TABLE t(key INT, b INT) USING PARQUET
+ |PARTITIONED BY (value STRING)
+ |""".stripMargin)
+ executeAndCheckOrdering(
+ hasLogicalSort = true, orderingMatched = true, hasEmpty2Null =
enabled) {
+ sql(
+ """
+ |INSERT INTO t
+ |SELECT key, b, value
+ |FROM testData JOIN testData2 ON key = a
+ |SORT BY value, key
+ |""".stripMargin)
+ }
+
+ // inspect the actually executed plan (that is different to
executeAndCheckOrdering)
+ assert(FileFormatWriter.executedPlan.isDefined)
+ val executedPlan = FileFormatWriter.executedPlan.get
+
+ val plan = if (enabled) {
+ assert(executedPlan.isInstanceOf[WriteFilesExec])
+ executedPlan.asInstanceOf[WriteFilesExec].child
+ } else {
+ executedPlan.transformDown {
+ case a: AdaptiveSparkPlanExec => a.executedPlan
+ }
+ }
+
+ // assert the outer most sort in the executed plan
+ assert(plan.collectFirst {
+ case s: SortExec => s
+ }.map(s => (enabled, s)).exists {
+ case (false, SortExec(Seq(
+ SortOrder(AttributeReference("value", StringType, _, _), Ascending,
NullsFirst, _),
+ SortOrder(AttributeReference("key", IntegerType, _, _), Ascending,
NullsFirst, _)
+ ), false, _, _)) => true
+
+ // SPARK-40885: this bug removes the in-partition sort, which
manifests here
+ case (true, SortExec(Seq(
+ SortOrder(AttributeReference("value", StringType, _, _), Ascending,
NullsFirst, _)
Review Comment:
that is correctness bug SPARK-40885 discussed in #38356
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]