Re: 答复: Limit Query Performance Suggestion

2017-01-18 Thread Liang-Chi Hsieh

Hi zhenhua,

Thanks for the idea.

Actually, I think we can completely avoid shuffling the data in a limit
operation, no matter LocalLimit or GlobalLimit.



wangzhenhua (G) wrote
> How about this:
> 1. we can make LocalLimit shuffle to mutiple partitions, i.e. create a new
> partitioner to uniformly dispatch the data
> 
> class LimitUniformPartitioner(partitions: Int) extends Partitioner {
> 
>   def numPartitions: Int = partitions
>   
>   var num = 0
> 
>   def getPartition(key: Any): Int = {
> num = num + 1
> num % partitions
>   }
> 
>   override def equals(other: Any): Boolean = other match {
> case h: HashPartitioner =>
>   h.numPartitions == numPartitions
> case _ =>
>   false
>   }
> 
>   override def hashCode: Int = numPartitions
> }
> 
> 2. then in GlobalLimit, we only take the first
> limit_number/num_of_shufflepartitions elements in each partition.
> 
> One issue left is how to decide shuffle partition number. 
> We can have a config of the maximum number of elements for each
> GlobalLimit task to process,
> then do a factorization to get a number most close to that config.
> E.g. the config is 2000:
> if limit=1,  1 = 2000 * 5, we shuffle to 5 partitions
> if limit=,  =  * 9, we shuffle to 9 partitions
> if limit is a prime number, we just fall back to single partition
> 
> best regards,
> -zhenhua
> 
> 
> -----邮件原件-
> 发件人: Liang-Chi Hsieh [mailto:

> viirya@

> ] 
> 发送时间: 2017年1月18日 15:48
> 收件人: 

> dev@.apache

> 主题: Re: Limit Query Performance Suggestion
> 
> 
> Hi Sujith,
> 
> I saw your updated post. Seems it makes sense to me now.
> 
> If you use a very big limit number, the shuffling before `GlobalLimit`
> would be a bottleneck for performance, of course, even it can eventually
> shuffle enough data to the single partition.
> 
> Unlike `CollectLimit`, actually I think there is no reason `GlobalLimit`
> must shuffle all limited data from all partitions to one single machine
> with respect to query execution. In other words, I think we can avoid
> shuffling data in `GlobalLimit`.
> 
> I have an idea to improve this and may update here later if I can make it
> work.
> 
> 
> sujith71955 wrote
>> Dear Liang,
>> 
>> Thanks for your valuable feedback.
>> 
>> There was a mistake in the previous post  i corrected it, as you 
>> mentioned the  `GlobalLimit` we will only take the required number of 
>> rows from the input iterator which really pulls data from local blocks 
>> and remote blocks.
>> but if the limit value is very high >= 1000,  and when there will 
>> be a shuffle exchange happens  between `GlobalLimit` and `LocalLimit` 
>> to retrieve data from all partitions to one partition, since the limit 
>> value is very large the performance bottleneck still exists.
>>  
>> soon in next  post i will publish a test report with sample data and 
>> also figuring out a solution for this problem.
>> 
>> Please let me know for any clarifications or suggestions regarding 
>> this issue.
>> 
>> Regards,
>> Sujith
> 
> 
> 
> 
> 
> -
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20652.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
> 
> -------------
> To unsubscribe e-mail: 

> dev-unsubscribe@.apache

> 
> 
> -
> To unsubscribe e-mail: 

> dev-unsubscribe@.apache





-
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20657.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



答复: Limit Query Performance Suggestion

2017-01-18 Thread wangzhenhua (G)
How about this:
1. we can make LocalLimit shuffle to mutiple partitions, i.e. create a new 
partitioner to uniformly dispatch the data

class LimitUniformPartitioner(partitions: Int) extends Partitioner {

  def numPartitions: Int = partitions
  
  var num = 0

  def getPartition(key: Any): Int = {
num = num + 1
num % partitions
  }

  override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
  h.numPartitions == numPartitions
case _ =>
  false
  }

  override def hashCode: Int = numPartitions
}

2. then in GlobalLimit, we only take the first 
limit_number/num_of_shufflepartitions elements in each partition.

One issue left is how to decide shuffle partition number. 
We can have a config of the maximum number of elements for each GlobalLimit 
task to process,
then do a factorization to get a number most close to that config.
E.g. the config is 2000:
if limit=1,  1 = 2000 * 5, we shuffle to 5 partitions
if limit=,  =  * 9, we shuffle to 9 partitions
if limit is a prime number, we just fall back to single partition

best regards,
-zhenhua


-邮件原件-
发件人: Liang-Chi Hsieh [mailto:vii...@gmail.com] 
发送时间: 2017年1月18日 15:48
收件人: dev@spark.apache.org
主题: Re: Limit Query Performance Suggestion


Hi Sujith,

I saw your updated post. Seems it makes sense to me now.

If you use a very big limit number, the shuffling before `GlobalLimit` would be 
a bottleneck for performance, of course, even it can eventually shuffle enough 
data to the single partition.

Unlike `CollectLimit`, actually I think there is no reason `GlobalLimit` must 
shuffle all limited data from all partitions to one single machine with respect 
to query execution. In other words, I think we can avoid shuffling data in 
`GlobalLimit`.

I have an idea to improve this and may update here later if I can make it work.


sujith71955 wrote
> Dear Liang,
> 
> Thanks for your valuable feedback.
> 
> There was a mistake in the previous post  i corrected it, as you 
> mentioned the  `GlobalLimit` we will only take the required number of 
> rows from the input iterator which really pulls data from local blocks 
> and remote blocks.
> but if the limit value is very high >= 1000,  and when there will 
> be a shuffle exchange happens  between `GlobalLimit` and `LocalLimit` 
> to retrieve data from all partitions to one partition, since the limit 
> value is very large the performance bottleneck still exists.
>  
> soon in next  post i will publish a test report with sample data and 
> also figuring out a solution for this problem.
> 
> Please let me know for any clarifications or suggestions regarding 
> this issue.
> 
> Regards,
> Sujith





-
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20652.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Limit Query Performance Suggestion

2017-01-17 Thread sujith71955
Dear Liang,

Thanks for your valuable feedback.

There was a mistake in the previous post  i corrected it, as you mentioned
the  `GlobalLimit` we will only take the required number of rows from the
input iterator which really pulls data from local blocks and remote blocks.
but if the limit value is very high >= 1000,  and when there will be a
shuffle exchange happens  between `GlobalLimit` and `LocalLimit` to retrieve
data from all partitions to one partition, since the limit value is very
large the performance bottleneck still exists.
 
soon in next  post i will publish a test report with sample data and also
figuring out a solution for this problem. 

Please let me know for any clarifications or suggestions regarding this
issue.

Regards,
Sujith



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20640.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Limit Query Performance Suggestion

2017-01-15 Thread Liang-Chi Hsieh

Hi Sujith,

Thanks for suggestion.

The codes you quoted are from `CollectLimitExec` which will be in the plan
if a logical `Limit` is the final operator in an logical plan. But in the
physical plan you showed, there are `GlobalLimit` and `LocalLimit` for the
logical `Limit` operation, so the `doExecute` method of `CollectLimitExec`
will not be executed.

In the case `CollectLimitExec` is the final physical operation, its
`executeCollect` will be executed and delegate to `SparkPlan.executeTake`
which is optimized to only retrieved required number of rows back to the
driver. So when using `limit n` with a huge partition number it should not
be a problem.

In the case `GlobalLimit` and `LocalLimit` are the final physical
operations, your concern is that when returning `n` rows from `N` partitions
and `N` is huge, the total `n * N` rows will cause heavy memory pressure on
the driver. I am not sure if you really observe this problem or you just
think it might be a problem. In this case, there will be a shuffle exchange
between `GlobalLimit` and `LocalLimit` to retrieve data from all partitions
to one partition. In `GlobalLimit` we will only take the required number of
rows from the input iterator which really pulls data from local blocks and
remote blocks. Due to the use of iterator approach, I think when we get the
enough rows in `GlobalLimit`, we won't continue to consume the input
iterator and pull more data back. So I don't think your concern will be a
problem.



sujith71955 wrote
> When limit is being added in the terminal of the physical plan there will
> be possibility of memory bottleneck
> if the limit value is too large and system will try to aggregate all the
> partition limit values as part of single partition.
> Description:
> Eg:
> create table src_temp as select * from src limit n;
> == Physical Plan ==
> ExecutedCommand
>+- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2,
> InsertIntoHiveTable]
>  +- GlobalLimit 2
> +- LocalLimit 2
>+- Project [imei#101, age#102, task#103L, num#104,
> level#105, productdate#106, name#107, point#108]
>   +- SubqueryAlias hive
>  +-
> Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108]
> csv  |
> 
> As shown in above plan when the limit comes in terminal ,there can be two
> types of performance bottlenecks.
> scenario 1: when the partition count is very high and limit value is small
> scenario 2: when the limit value is very large
> 
>  protected override def doExecute(): RDD[InternalRow] = {
> val locallyLimited =
> child.execute().mapPartitionsInternal(_.take(limit))
> val shuffled = new ShuffledRowRDD(
>   ShuffleExchange.prepareShuffleDependency(
> locallyLimited, child.output, SinglePartition, serializer))
> shuffled.mapPartitionsInternal(_.take(limit))
>   }
> }
> 
> As per my understanding the current algorithm first creates the
> MapPartitionsRDD by applying limit from each partition, then
> ShuffledRowRDD
> will be created by grouping data from all partitions into single
> partition,
> this can create overhead since all partitions will return limit n data ,
> so
> while grouping there will be N partition * limit N which can be very huge,
> in both scenarios mentioned above this logic can be a bottle neck.
> 
> My suggestion for handling scenario 1 where large number of partition and
> limit value is small, in this case driver can create an accumulator value
> and try to send to all partitions, all executer will be updating the
> accumulator value based on the data fetched ,
> eg: number of partition = 100, number of cores =10
> tasks will be launched in a group of 10(10*10 = 100), once the first group
> finishes the tasks driver will check whether the accumulator value is been
> reached the limit value
> if its reached then no further task will be launched to executers and the
> result will be returned.
> 
> Let me know for any furthur suggestions or solution.
> 
> Thanks in advance,
> Sujith





-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20607.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Limit Query Performance Suggestion

2017-01-12 Thread sujith chacko
When limit is being added in the terminal of the physical plan there will
be possibility of memory bottleneck
if the limit value is too large and system will try to aggregate all the
partition limit values as part of single partition.
Description:
Eg:
create table src_temp as select * from src limit n;
== Physical Plan ==
ExecutedCommand
   +- CreateHiveTableAsSelectCommand [Database:spark}, TableName: t2,
InsertIntoHiveTable]
 +- GlobalLimit 2
+- LocalLimit 2
   +- Project [imei#101, age#102, task#103L, num#104,
level#105, productdate#106, name#107, point#108]
  +- SubqueryAlias hive
 +-
Relation[imei#101,age#102,task#103L,num#104,level#105,productdate#106,name#107,point#108]
csv  |

As shown in above plan when the limit comes in terminal ,there can be two
types of performance bottlenecks.
scenario 1: when the partition count is very high and limit value is small
scenario 2: when the limit value is very large

 protected override def doExecute(): RDD[InternalRow] = {
val locallyLimited =
child.execute().mapPartitionsInternal(_.take(limit))
val shuffled = new ShuffledRowRDD(
  ShuffleExchange.prepareShuffleDependency(
locallyLimited, child.output, SinglePartition, serializer))
shuffled.mapPartitionsInternal(_.take(limit))
  }
}

As per my understanding the current algorithm first creates the
MapPartitionsRDD by applying limit from each partition, then ShuffledRowRDD
will be created by grouping data from all partitions into single partition,
this can create overhead since all partitions will return limit n data , so
while grouping there will be N partition * limit N which can be very huge,
in both scenarios mentioned above this logic can be a bottle neck.

My suggestion for handling scenario 1 where large number of partition and
limit value is small, in this case driver can create an accumulator value
and try to send to all partitions, all executer will be updating the
accumulator value based on the data fetched ,
eg: number of partition = 100, number of cores =10
tasks will be launched in a group of 10(10*10 = 100), once the first group
finishes the tasks driver will check whether the accumulator value is been
reached the limit value
if its reached then no further task will be launched to executers and the
result will be returned.

Let me know for any furthur suggestions or solution.

Thanks in advance,
Sujith