[jira] [Commented] (SPARK-19355) Use map output statistices to improve global limit's parallelism
[ https://issues.apache.org/jira/browse/SPARK-19355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17140071#comment-17140071 ] L. C. Hsieh commented on SPARK-19355: - I think this can be implemented now on top of AQE. But the question is we still don't have number of rows info from shuffle statistics. I am not sure if we want to add it as it could be an extra cost in sending statistics. > Use map output statistices to improve global limit's parallelism > > > Key: SPARK-19355 > URL: https://issues.apache.org/jira/browse/SPARK-19355 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: L. C. Hsieh >Assignee: L. C. Hsieh >Priority: Major > > A logical Limit is performed actually by two physical operations LocalLimit > and GlobalLimit. > In most of time, before GlobalLimit, we will perform a shuffle exchange to > shuffle data to single partition. When the limit number is very big, we > shuffle a lot of data to a single partition and significantly reduce > parallelism, except for the cost of shuffling. > This change tries to perform GlobalLimit without shuffling data to single > partition. Instead, we perform the map stage of the shuffling and collect the > statistics of the number of rows in each partition. Shuffled data are > actually all retrieved locally without from remote executors. > Once we get the number of output rows in each partition, we only take the > required number of rows from the locally shuffled data. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19355) Use map output statistices to improve global limit's parallelism
[ https://issues.apache.org/jira/browse/SPARK-19355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623099#comment-16623099 ] Apache Spark commented on SPARK-19355: -- User 'rxin' has created a pull request for this issue: https://github.com/apache/spark/pull/22456 > Use map output statistices to improve global limit's parallelism > > > Key: SPARK-19355 > URL: https://issues.apache.org/jira/browse/SPARK-19355 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh >Priority: Major > Fix For: 2.4.0 > > > A logical Limit is performed actually by two physical operations LocalLimit > and GlobalLimit. > In most of time, before GlobalLimit, we will perform a shuffle exchange to > shuffle data to single partition. When the limit number is very big, we > shuffle a lot of data to a single partition and significantly reduce > parallelism, except for the cost of shuffling. > This change tries to perform GlobalLimit without shuffling data to single > partition. Instead, we perform the map stage of the shuffling and collect the > statistics of the number of rows in each partition. Shuffled data are > actually all retrieved locally without from remote executors. > Once we get the number of output rows in each partition, we only take the > required number of rows from the locally shuffled data. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19355) Use map output statistices to improve global limit's parallelism
[ https://issues.apache.org/jira/browse/SPARK-19355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622058#comment-16622058 ] Wenchen Fan commented on SPARK-19355: - We have `SparkContext.submitMapStage`, but this API is too low-level and is far from ideal. We do need a better API and thanks for working on it! > Use map output statistices to improve global limit's parallelism > > > Key: SPARK-19355 > URL: https://issues.apache.org/jira/browse/SPARK-19355 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh >Priority: Major > Fix For: 2.4.0 > > > A logical Limit is performed actually by two physical operations LocalLimit > and GlobalLimit. > In most of time, before GlobalLimit, we will perform a shuffle exchange to > shuffle data to single partition. When the limit number is very big, we > shuffle a lot of data to a single partition and significantly reduce > parallelism, except for the cost of shuffling. > This change tries to perform GlobalLimit without shuffling data to single > partition. Instead, we perform the map stage of the shuffling and collect the > statistics of the number of rows in each partition. Shuffled data are > actually all retrieved locally without from remote executors. > Once we get the number of output rows in each partition, we only take the > required number of rows from the locally shuffled data. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19355) Use map output statistices to improve global limit's parallelism
[ https://issues.apache.org/jira/browse/SPARK-19355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16621965#comment-16621965 ] Liang-Chi Hsieh commented on SPARK-19355: - [~cloud_fan] For this, I think we should first have the API we discussed to retrieve data statistics back to driver for an RDD. I will create another ticket for that. > Use map output statistices to improve global limit's parallelism > > > Key: SPARK-19355 > URL: https://issues.apache.org/jira/browse/SPARK-19355 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh >Priority: Major > Fix For: 2.4.0 > > > A logical Limit is performed actually by two physical operations LocalLimit > and GlobalLimit. > In most of time, before GlobalLimit, we will perform a shuffle exchange to > shuffle data to single partition. When the limit number is very big, we > shuffle a lot of data to a single partition and significantly reduce > parallelism, except for the cost of shuffling. > This change tries to perform GlobalLimit without shuffling data to single > partition. Instead, we perform the map stage of the shuffling and collect the > statistics of the number of rows in each partition. Shuffled data are > actually all retrieved locally without from remote executors. > Once we get the number of output rows in each partition, we only take the > required number of rows from the locally shuffled data. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19355) Use map output statistices to improve global limit's parallelism
[ https://issues.apache.org/jira/browse/SPARK-19355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16602990#comment-16602990 ] Apache Spark commented on SPARK-19355: -- User 'jiangxb1987' has created a pull request for this issue: https://github.com/apache/spark/pull/22330 > Use map output statistices to improve global limit's parallelism > > > Key: SPARK-19355 > URL: https://issues.apache.org/jira/browse/SPARK-19355 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh >Priority: Major > Fix For: 2.4.0 > > > A logical Limit is performed actually by two physical operations LocalLimit > and GlobalLimit. > In most of time, before GlobalLimit, we will perform a shuffle exchange to > shuffle data to single partition. When the limit number is very big, we > shuffle a lot of data to a single partition and significantly reduce > parallelism, except for the cost of shuffling. > This change tries to perform GlobalLimit without shuffling data to single > partition. Instead, we perform the map stage of the shuffling and collect the > statistics of the number of rows in each partition. Shuffled data are > actually all retrieved locally without from remote executors. > Once we get the number of output rows in each partition, we only take the > required number of rows from the locally shuffled data. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19355) Use map output statistices to improve global limit's parallelism
[ https://issues.apache.org/jira/browse/SPARK-19355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593080#comment-16593080 ] Apache Spark commented on SPARK-19355: -- User 'viirya' has created a pull request for this issue: https://github.com/apache/spark/pull/22239 > Use map output statistices to improve global limit's parallelism > > > Key: SPARK-19355 > URL: https://issues.apache.org/jira/browse/SPARK-19355 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh >Priority: Major > Fix For: 2.4.0 > > > A logical Limit is performed actually by two physical operations LocalLimit > and GlobalLimit. > In most of time, before GlobalLimit, we will perform a shuffle exchange to > shuffle data to single partition. When the limit number is very big, we > shuffle a lot of data to a single partition and significantly reduce > parallelism, except for the cost of shuffling. > This change tries to perform GlobalLimit without shuffling data to single > partition. Instead, we perform the map stage of the shuffling and collect the > statistics of the number of rows in each partition. Shuffled data are > actually all retrieved locally without from remote executors. > Once we get the number of output rows in each partition, we only take the > required number of rows from the locally shuffled data. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19355) Use map output statistices to improve global limit's parallelism
[ https://issues.apache.org/jira/browse/SPARK-19355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836924#comment-15836924 ] Apache Spark commented on SPARK-19355: -- User 'viirya' has created a pull request for this issue: https://github.com/apache/spark/pull/16677 > Use map output statistices to improve global limit's parallelism > > > Key: SPARK-19355 > URL: https://issues.apache.org/jira/browse/SPARK-19355 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Liang-Chi Hsieh > > A logical Limit is performed actually by two physical operations LocalLimit > and GlobalLimit. > In most of time, before GlobalLimit, we will perform a shuffle exchange to > shuffle data to single partition. When the limit number is very big, we > shuffle a lot of data to a single partition and significantly reduce > parallelism, except for the cost of shuffling. > This change tries to perform GlobalLimit without shuffling data to single > partition. Instead, we perform the map stage of the shuffling and collect the > statistics of the number of rows in each partition. Shuffled data are > actually all retrieved locally without from remote executors. > Once we get the number of output rows in each partition, we only take the > required number of rows from the locally shuffled data. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org