[ https://issues.apache.org/jira/browse/SPARK-41512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17646898#comment-17646898 ]
Rui Wang commented on SPARK-41512: ---------------------------------- cc [~cloud_fan] > Row count based shuffle read to optimize global limit after a single > partition shuffle (optionally with input partition sorted) > ------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-41512 > URL: https://issues.apache.org/jira/browse/SPARK-41512 > Project: Spark > Issue Type: Task > Components: SQL > Affects Versions: 3.4.0 > Reporter: Rui Wang > Assignee: Rui Wang > Priority: Major > > h3. Problem Statement > In current Spark optimizer, a single partition shuffle might be created for a > limit if this limit is not the last non-action operation (e.g. a filter > following the limit). There is a possibility that the previous output > partitions before go into this limit are sorted. The single partition shuffle > approach has a correctness bug in this case: shuffle read partitions could be > out of partition order and the limit exec just take the first limit rows > which could lose the order thus result into wrong result. This is a shuffle > so it is relatively costly. Meanwhile, to correct this bug, a native solution > is to sort all the data fed into limit again, which is another overhead. > h3. Proposed idea > So we propose a row count based AQE algorithm that optimizes this problem by > two folds: > Avoid the extra sort on the shuffle read side (or with the limit exec) to > achieve the correct result. > Avoid reading all shuffle data from mappers for this single partition shuffle > to reduce shuffle cost. > Note that 1. is only applied for the sorted partition case where 2. is > applied for general single partition shuffle + limit case > > The algorithm works as the following: > 1. Each mapper will record a row count when writing shuffle data. > 2. Since this is single shuffle partition case, there is only one partition > but N mappers. > 3. A accumulatorV2 is implemented to collect a list of tuple which records > the mapping between mapper id and the number of row written by the mapper > (row count metrics) > 4. AQE framework detects a plan shape of shuffle plus a global limit. > 5. AQE framework reads necessary data from mappers based on the limit. For > example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the > limit is 500, AQE creates shuffle read node to write from both mapper 1 and > 2, thus skip the left mappers. > 6. This is both correct for limit with the sorted or non-sorted partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org