cloud-fan commented on a change in pull request #26813: [SPARK-30188][SQL][WIP]
Enable adaptive query execution by default
URL: https://github.com/apache/spark/pull/26813#discussion_r362051735
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
##########
@@ -423,48 +426,54 @@ class PlannerSuite extends SharedSparkSession {
test("SPARK-30036: Remove unnecessary RoundRobinPartitioning " +
"if SortExec is followed by RoundRobinPartitioning") {
- val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) ::
Nil)
- val partitioning = RoundRobinPartitioning(5)
- assert(!partitioning.satisfies(distribution))
-
- val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil,
- global = true,
- child = ShuffleExchangeExec(
- partitioning,
- DummySparkPlan(outputPartitioning = partitioning)))
- val outputPlan =
EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
- assert(outputPlan.find {
- case ShuffleExchangeExec(_: RoundRobinPartitioning, _, _) => true
- case _ => false
- }.isEmpty,
- "RoundRobinPartitioning should be changed to RangePartitioning")
-
- val query = testData.select('key, 'value).repartition(2).sort('key.asc)
- assert(query.rdd.getNumPartitions == 2)
- assert(query.rdd.collectPartitions()(0).map(_.get(0)).toSeq == (1 to 50))
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ // when enable AQE, the post partiiton number is changed.
+ val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending)
:: Nil)
+ val partitioning = RoundRobinPartitioning(5)
+ assert(!partitioning.satisfies(distribution))
+
+ val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil,
+ global = true,
+ child = ShuffleExchangeExec(
+ partitioning,
+ DummySparkPlan(outputPartitioning = partitioning)))
+ val outputPlan =
EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
+ assert(outputPlan.find {
+ case ShuffleExchangeExec(_: RoundRobinPartitioning, _, _) => true
+ case _ => false
+ }.isEmpty,
+ "RoundRobinPartitioning should be changed to RangePartitioning")
+
+ val query = testData.select('key, 'value).repartition(2).sort('key.asc)
+ assert(query.rdd.getNumPartitions == 2)
+ assert(query.rdd.collectPartitions()(0).map(_.get(0)).toSeq == (1 to 50))
Review comment:
We only need to wrap the above 3 lines with `withSQLConf`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]