justinuang commented on issue #20303: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL URL: https://github.com/apache/spark/pull/20303#issuecomment-476386515 @carsonwang we found a bug in production when AQE is turned on: Here is a case where the ShuffleQueryStageInputs to a Union node will have differing number of partitions if we explicitly repartition them. Here is a repro ``` val sparkSession = SparkSession.builder() .master("local[2]") .config("spark.sql.autoBroadcastJoinThreshold", "-1") .config("spark.sql.adaptive.enabled", "true") .getOrCreate(); val dataset1 = sparkSession.range(1000); val dataset2 = sparkSession.range(1001); val compute = dataset1.repartition(505, dataset1.col("id")) .union(dataset2.repartition(105, dataset2.col("id"))) compute.show() compute.explain() ``` ``` == Parsed Logical Plan == Union :- AnalysisBarrier RepartitionByExpression [id#152L], 505 +- AnalysisBarrier RepartitionByExpression [id#155L], 105 == Analyzed Logical Plan == id: bigint Union :- RepartitionByExpression [id#152L], 505 : +- Range (0, 1000, step=1, splits=Some(2)) +- RepartitionByExpression [id#155L], 105 +- Range (0, 1001, step=1, splits=Some(2)) == Optimized Logical Plan == Union :- RepartitionByExpression [id#152L], 505 : +- Range (0, 1000, step=1, splits=Some(2)) +- RepartitionByExpression [id#155L], 105 +- Range (0, 1001, step=1, splits=Some(2)) == Physical Plan == *Union :- *Exchange hashpartitioning(id#152L, 505) : +- *Range (0, 1000, step=1, splits=2) +- *Exchange hashpartitioning(id#155L, 105) +- *Range (0, 1001, step=1, splits=2) assertion failed: There should be only one distinct value of the number pre-shuffle partitions among registered Exchange operator. java.lang.AssertionError: assertion failed: There should be only one distinct value of the number pre-shuffle partitions among registered Exchange operator. at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.estimatePartitionStartIndices(ExchangeCoordinator.scala:119) at org.apache.spark.sql.execution.adaptive.QueryStage.prepareExecuteStage(QueryStage.scala:104) at org.apache.spark.sql.execution.adaptive.QueryStage.executeCollect(QueryStage.scala:138) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3262) ... ``` My immediate thought was to get rid of the assert and instead skip automatic repartitioning if the number of input partitions are different. There might be a better way to fix this though, haven't given it much thought yet.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
