cloud-fan commented on code in PR #40522:
URL: https://github.com/apache/spark/pull/40522#discussion_r1144851788
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -561,34 +562,30 @@ case class AdaptiveSparkPlanExec(
}
private def newQueryStage(plan: SparkPlan): QueryStageExec = {
- val optimizedPlan = plan match {
- case e: Exchange =>
- e.withNewChildren(Seq(optimizeQueryStage(e.child, isFinalStage =
false)))
- case _ => plan
- }
- val newPlan = applyPhysicalRules(
- optimizedPlan,
- postStageCreationRules(outputsColumnar = plan.supportsColumnar),
- Some((planChangeLogger, "AQE Post Stage Creation")))
val queryStage = plan match {
- case s: ShuffleExchangeLike =>
- if (!newPlan.isInstanceOf[ShuffleExchangeLike]) {
- throw SparkException.internalError(
- "Custom columnar rules cannot transform shuffle node to something
else.")
- }
- ShuffleQueryStageExec(currentStageId, newPlan, s.canonicalized)
- case b: BroadcastExchangeLike =>
- if (!newPlan.isInstanceOf[BroadcastExchangeLike]) {
- throw SparkException.internalError(
- "Custom columnar rules cannot transform broadcast node to
something else.")
+ case e: Exchange =>
+ val optimized = e.withNewChildren(Seq(optimizeQueryStage(e.child,
isFinalStage = false)))
+ val newPlan = applyPhysicalRules(
+ optimized,
+ postStageCreationRules(outputsColumnar = plan.supportsColumnar),
+ Some((planChangeLogger, "AQE Post Stage Creation")))
+ if (e.isInstanceOf[ShuffleExchangeLike]) {
+ if (!newPlan.isInstanceOf[ShuffleExchangeLike]) {
+ throw SparkException.internalError(
+ "Custom columnar rules cannot transform shuffle node to
something else.")
+ }
+ ShuffleQueryStageExec(currentStageId, newPlan, e.canonicalized)
+ } else {
+ assert(e.isInstanceOf[BroadcastExchangeLike])
+ if (!newPlan.isInstanceOf[BroadcastExchangeLike]) {
+ throw SparkException.internalError(
+ "Custom columnar rules cannot transform broadcast node to
something else.")
+ }
+ BroadcastQueryStageExec(currentStageId, newPlan, e.canonicalized)
}
- BroadcastQueryStageExec(currentStageId, newPlan, b.canonicalized)
case i: InMemoryTableScanExec =>
- if (!newPlan.isInstanceOf[InMemoryTableScanExec]) {
Review Comment:
a small cleanup. We don't need to run any rules for `InMemoryTableScanExec`
as it's a leaf node and supports both columnar and row-based output.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]