tgravescs commented on a change in pull request #29134:
URL: https://github.com/apache/spark/pull/29134#discussion_r455958613
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
##########
@@ -145,33 +150,56 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
}
}
- test("inject columnar") {
+ test("inject columnar AQE on") {
+ testInjectColumnar(true)
+ }
+
+ test("inject columnar AQE off") {
+ testInjectColumnar(false)
+ }
+
+ private def testInjectColumnar(adaptiveEnabled: Boolean) {
+
+ def collectPlanSteps(plan: SparkPlan): Seq[Int] = plan match {
+ case a: AdaptiveSparkPlanExec =>
+ assert(a.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true"))
+ collectPlanSteps(a.executedPlan)
+ case _ => plan.collect {
+ case _: ReplacedRowToColumnarExec => 1
+ case _: ColumnarProjectExec => 10
+ case _: ColumnarToRowExec => 100
+ case s: QueryStageExec => collectPlanSteps(s.plan).sum
+ case _: MyShuffleExchangeExec => 1000
+ case _: MyBroadcastExchangeExec => 10000
+ }
+ }
+
val extensions = create { extensions =>
extensions.injectColumnar(session =>
MyColumarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))
}
withSession(extensions) { session =>
- // The ApplyColumnarRulesAndInsertTransitions rule is not applied when
enable AQE
- session.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED,
false)
+ session.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED,
adaptiveEnabled)
assert(session.sessionState.columnarRules.contains(
MyColumarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
import session.sqlContext.implicits._
- // repartitioning avoids having the add operation pushed up into the
LocalTableScan
- val data = Seq((100L), (200L), (300L)).toDF("vals").repartition(1)
- val df = data.selectExpr("vals + 1")
+ // perform a join to inject a broadcast exchange
+ val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2")
+ val right = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("r1", "r2")
+ val data = left.join(right, $"l1" === $"r1")
+ // repartitioning avoids having the add operation pushed up into the
LocalTableScan
+ .repartition(1)
+ val df = data.selectExpr("l2 + r2")
+ // execute the plan so that the final adaptive plan is available when
AQE is on
+ df.collect()
// Verify that both pre and post processing of the plan worked.
- val found = df.queryExecution.executedPlan.collect {
- case rep: ReplacedRowToColumnarExec => 1
- case proj: ColumnarProjectExec => 10
- case c2r: ColumnarToRowExec => 100
- }.sum
- assert(found == 111)
-
+ val found = collectPlanSteps(df.queryExecution.executedPlan).sum
+ assert(found == 11121)
Review comment:
might be nice to comment what 11121 equals in terms of the execs -
MyBroadcastExchangeExec, etc..
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]