Re: 答复: Limit Query Performance Suggestion
Hi zhenhua, Thanks for the idea. Actually, I think we can completely avoid shuffling the data in a limit operation, no matter LocalLimit or GlobalLimit. wangzhenhua (G) wrote > How about this: > 1. we can make LocalLimit shuffle to mutiple partitions, i.e. create a new > partitioner to uniformly dispatch the data > > class LimitUniformPartitioner(partitions: Int) extends Partitioner { > > def numPartitions: Int = partitions > > var num = 0 > > def getPartition(key: Any): Int = { > num = num + 1 > num % partitions > } > > override def equals(other: Any): Boolean = other match { > case h: HashPartitioner => > h.numPartitions == numPartitions > case _ => > false > } > > override def hashCode: Int = numPartitions > } > > 2. then in GlobalLimit, we only take the first > limit_number/num_of_shufflepartitions elements in each partition. > > One issue left is how to decide shuffle partition number. > We can have a config of the maximum number of elements for each > GlobalLimit task to process, > then do a factorization to get a number most close to that config. > E.g. the config is 2000: > if limit=1, 1 = 2000 * 5, we shuffle to 5 partitions > if limit=, = * 9, we shuffle to 9 partitions > if limit is a prime number, we just fall back to single partition > > best regards, > -zhenhua > > > -----邮件原件- > 发件人: Liang-Chi Hsieh [mailto: > viirya@ > ] > 发送时间: 2017年1月18日 15:48 > 收件人: > dev@.apache > 主题: Re: Limit Query Performance Suggestion > > > Hi Sujith, > > I saw your updated post. Seems it makes sense to me now. > > If you use a very big limit number, the shuffling before `GlobalLimit` > would be a bottleneck for performance, of course, even it can eventually > shuffle enough data to the single partition. > > Unlike `CollectLimit`, actually I think there is no reason `GlobalLimit` > must shuffle all limited data from all partitions to one single machine > with respect to query execution. In other words, I think we can avoid > shuffling data in `GlobalLimit`. > > I have an idea to improve this and may update here later if I can make it > work. > > > sujith71955 wrote >> Dear Liang, >> >> Thanks for your valuable feedback. >> >> There was a mistake in the previous post i corrected it, as you >> mentioned the `GlobalLimit` we will only take the required number of >> rows from the input iterator which really pulls data from local blocks >> and remote blocks. >> but if the limit value is very high >= 1000, and when there will >> be a shuffle exchange happens between `GlobalLimit` and `LocalLimit` >> to retrieve data from all partitions to one partition, since the limit >> value is very large the performance bottleneck still exists. >> >> soon in next post i will publish a test report with sample data and >> also figuring out a solution for this problem. >> >> Please let me know for any clarifications or suggestions regarding >> this issue. >> >> Regards, >> Sujith > > > > > > - > Liang-Chi Hsieh | @viirya > Spark Technology Center > http://www.spark.tc/ > -- > View this message in context: > http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20652.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > ------------- > To unsubscribe e-mail: > dev-unsubscribe@.apache > > > - > To unsubscribe e-mail: > dev-unsubscribe@.apache - Liang-Chi Hsieh | @viirya Spark Technology Center http://www.spark.tc/ -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20657.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
答复: Limit Query Performance Suggestion
How about this: 1. we can make LocalLimit shuffle to mutiple partitions, i.e. create a new partitioner to uniformly dispatch the data class LimitUniformPartitioner(partitions: Int) extends Partitioner { def numPartitions: Int = partitions var num = 0 def getPartition(key: Any): Int = { num = num + 1 num % partitions } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions } 2. then in GlobalLimit, we only take the first limit_number/num_of_shufflepartitions elements in each partition. One issue left is how to decide shuffle partition number. We can have a config of the maximum number of elements for each GlobalLimit task to process, then do a factorization to get a number most close to that config. E.g. the config is 2000: if limit=1, 1 = 2000 * 5, we shuffle to 5 partitions if limit=, = * 9, we shuffle to 9 partitions if limit is a prime number, we just fall back to single partition best regards, -zhenhua -邮件原件- 发件人: Liang-Chi Hsieh [mailto:vii...@gmail.com] 发送时间: 2017年1月18日 15:48 收件人: dev@spark.apache.org 主题: Re: Limit Query Performance Suggestion Hi Sujith, I saw your updated post. Seems it makes sense to me now. If you use a very big limit number, the shuffling before `GlobalLimit` would be a bottleneck for performance, of course, even it can eventually shuffle enough data to the single partition. Unlike `CollectLimit`, actually I think there is no reason `GlobalLimit` must shuffle all limited data from all partitions to one single machine with respect to query execution. In other words, I think we can avoid shuffling data in `GlobalLimit`. I have an idea to improve this and may update here later if I can make it work. sujith71955 wrote > Dear Liang, > > Thanks for your valuable feedback. > > There was a mistake in the previous post i corrected it, as you > mentioned the `GlobalLimit` we will only take the required number of > rows from the input iterator which really pulls data from local blocks > and remote blocks. > but if the limit value is very high >= 1000, and when there will > be a shuffle exchange happens between `GlobalLimit` and `LocalLimit` > to retrieve data from all partitions to one partition, since the limit > value is very large the performance bottleneck still exists. > > soon in next post i will publish a test report with sample data and > also figuring out a solution for this problem. > > Please let me know for any clarifications or suggestions regarding > this issue. > > Regards, > Sujith - Liang-Chi Hsieh | @viirya Spark Technology Center http://www.spark.tc/ -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20652.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
Re: Limit Query Performance Suggestion
Dear Liang, Thanks for your valuable feedback. There was a mistake in the previous post i corrected it, as you mentioned the `GlobalLimit` we will only take the required number of rows from the input iterator which really pulls data from local blocks and remote blocks. but if the limit value is very high >= 1000, and when there will be a shuffle exchange happens between `GlobalLimit` and `LocalLimit` to retrieve data from all partitions to one partition, since the limit value is very large the performance bottleneck still exists. soon in next post i will publish a test report with sample data and also figuring out a solution for this problem. Please let me know for any clarifications or suggestions regarding this issue. Regards, Sujith -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20640.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
Re: Limit Query Performance Suggestion
Hi Sujith, Thanks for suggestion. The codes you quoted are from `CollectLimitExec` which will be in the plan if a logical `Limit` is the final operator in an logical plan. But in the physical plan you showed, there are `GlobalLimit` and `LocalLimit` for the logical `Limit` operation, so the `doExecute` method of `CollectLimitExec` will not be executed. In the case `CollectLimitExec` is the final physical operation, its `executeCollect` will be executed and delegate to `SparkPlan.executeTake` which is optimized to only retrieved required number of rows back to the driver. So when using `limit n` with a huge partition number it should not be a problem. In the case `GlobalLimit` and `LocalLimit` are the final physical operations, your concern is that when returning `n` rows from `N` partitions and `N` is huge, the total `n * N` rows will cause heavy memory pressure on the driver. I am not sure if you really observe this problem or you just think it might be a problem. In this case, there will be a shuffle exchange between `GlobalLimit` and `LocalLimit` to retrieve data from all partitions to one partition. In `GlobalLimit` we will only take the required number of rows from the input iterator which really pulls data from local blocks and remote blocks. Due to the use of iterator approach, I think when we get the enough rows in `GlobalLimit`, we won't continue to consume the input iterator and pull more data back. So I don't think your concern will be a problem. sujith71955 wrote > When limit is being added in the terminal of the physical plan there will > be possibility of memory bottleneck > if the limit value is too large and system will try to aggregate all the > partition limit values as part of single partition. > Description: > Eg: > create table src_temp as select * from src limit n; > == Physical Plan == > ExecutedCommand >+- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2, > InsertIntoHiveTable] > +- GlobalLimit 2 > +- LocalLimit 2 >+- Project [imei#101, age#102, task#103L, num#104, > level#105, productdate#106, name#107, point#108] > +- SubqueryAlias hive > +- > Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108] > csv | > > As shown in above plan when the limit comes in terminal ,there can be two > types of performance bottlenecks. > scenario 1: when the partition count is very high and limit value is small > scenario 2: when the limit value is very large > > protected override def doExecute(): RDD[InternalRow] = { > val locallyLimited = > child.execute().mapPartitionsInternal(_.take(limit)) > val shuffled = new ShuffledRowRDD( > ShuffleExchange.prepareShuffleDependency( > locallyLimited, child.output, SinglePartition, serializer)) > shuffled.mapPartitionsInternal(_.take(limit)) > } > } > > As per my understanding the current algorithm first creates the > MapPartitionsRDD by applying limit from each partition, then > ShuffledRowRDD > will be created by grouping data from all partitions into single > partition, > this can create overhead since all partitions will return limit n data , > so > while grouping there will be N partition * limit N which can be very huge, > in both scenarios mentioned above this logic can be a bottle neck. > > My suggestion for handling scenario 1 where large number of partition and > limit value is small, in this case driver can create an accumulator value > and try to send to all partitions, all executer will be updating the > accumulator value based on the data fetched , > eg: number of partition = 100, number of cores =10 > tasks will be launched in a group of 10(10*10 = 100), once the first group > finishes the tasks driver will check whether the accumulator value is been > reached the limit value > if its reached then no further task will be launched to executers and the > result will be returned. > > Let me know for any furthur suggestions or solution. > > Thanks in advance, > Sujith ----- Liang-Chi Hsieh | @viirya Spark Technology Center http://www.spark.tc/ -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20607.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
Limit Query Performance Suggestion
When limit is being added in the terminal of the physical plan there will be possibility of memory bottleneck if the limit value is too large and system will try to aggregate all the partition limit values as part of single partition. Description: Eg: create table src_temp as select * from src limit n; == Physical Plan == ExecutedCommand +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2, InsertIntoHiveTable] +- GlobalLimit 2 +- LocalLimit 2 +- Project [imei#101, age#102, task#103L, num#104, level#105, productdate#106, name#107, point#108] +- SubqueryAlias hive +- Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108] csv | As shown in above plan when the limit comes in terminal ,there can be two types of performance bottlenecks. scenario 1: when the partition count is very high and limit value is small scenario 2: when the limit value is very large protected override def doExecute(): RDD[InternalRow] = { val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) val shuffled = new ShuffledRowRDD( ShuffleExchange.prepareShuffleDependency( locallyLimited, child.output, SinglePartition, serializer)) shuffled.mapPartitionsInternal(_.take(limit)) } } As per my understanding the current algorithm first creates the MapPartitionsRDD by applying limit from each partition, then ShuffledRowRDD will be created by grouping data from all partitions into single partition, this can create overhead since all partitions will return limit n data , so while grouping there will be N partition * limit N which can be very huge, in both scenarios mentioned above this logic can be a bottle neck. My suggestion for handling scenario 1 where large number of partition and limit value is small, in this case driver can create an accumulator value and try to send to all partitions, all executer will be updating the accumulator value based on the data fetched , eg: number of partition = 100, number of cores =10 tasks will be launched in a group of 10(10*10 = 100), once the first group finishes the tasks driver will check whether the accumulator value is been reached the limit value if its reached then no further task will be launched to executers and the result will be returned. Let me know for any furthur suggestions or solution. Thanks in advance, Sujith