justinuang commented on issue #20303: [SPARK-23128][SQL] A new approach to do 
adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/20303#issuecomment-476386515
 
 
   @carsonwang we found a bug in production when AQE is turned on:
   
   Here is a case where the ShuffleQueryStageInputs to a Union node will have 
differing number of partitions if we explicitly repartition them.
   
   Here is a repro
   ```
         val sparkSession = SparkSession.builder()
           .master("local[2]")
           .config("spark.sql.autoBroadcastJoinThreshold", "-1")
           .config("spark.sql.adaptive.enabled", "true")
           .getOrCreate();
   
         val dataset1 = sparkSession.range(1000);
         val dataset2 = sparkSession.range(1001);
   
         val compute = dataset1.repartition(505, dataset1.col("id"))
           .union(dataset2.repartition(105, dataset2.col("id")))
   
         compute.show()
         compute.explain()
   ```
   
   ```
   == Parsed Logical Plan ==
   Union
   :- AnalysisBarrier RepartitionByExpression [id#152L], 505
   +- AnalysisBarrier RepartitionByExpression [id#155L], 105
   
   == Analyzed Logical Plan ==
   id: bigint
   Union
   :- RepartitionByExpression [id#152L], 505
   :  +- Range (0, 1000, step=1, splits=Some(2))
   +- RepartitionByExpression [id#155L], 105
      +- Range (0, 1001, step=1, splits=Some(2))
   
   == Optimized Logical Plan ==
   Union
   :- RepartitionByExpression [id#152L], 505
   :  +- Range (0, 1000, step=1, splits=Some(2))
   +- RepartitionByExpression [id#155L], 105
      +- Range (0, 1001, step=1, splits=Some(2))
   
   == Physical Plan ==
   *Union
   :- *Exchange hashpartitioning(id#152L, 505)
   :  +- *Range (0, 1000, step=1, splits=2)
   +- *Exchange hashpartitioning(id#155L, 105)
      +- *Range (0, 1001, step=1, splits=2)
   
   assertion failed: There should be only one distinct value of the number 
pre-shuffle partitions among registered Exchange operator.
   java.lang.AssertionError: assertion failed: There should be only one 
distinct value of the number pre-shuffle partitions among registered Exchange 
operator.
        at scala.Predef$.assert(Predef.scala:170)
        at 
org.apache.spark.sql.execution.exchange.ExchangeCoordinator.estimatePartitionStartIndices(ExchangeCoordinator.scala:119)
        at 
org.apache.spark.sql.execution.adaptive.QueryStage.prepareExecuteStage(QueryStage.scala:104)
        at 
org.apache.spark.sql.execution.adaptive.QueryStage.executeCollect(QueryStage.scala:138)
        at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3262)
   ...
   ```
   
   My immediate thought was to get rid of the assert and instead skip automatic 
repartitioning if the number of input partitions are different. There might be 
a better way to fix this though, haven't given it much thought yet.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to