Re: *ByKey aggregations: performance + order

2015-01-15 Thread Sean Owen
I'm interested too and don't know for sure but I do not think this case is
optimized this way. However if you know your keys aren't split across
partitions and you have small enough partitions you can implement the same
grouping with mapPartitions and Scala.
On Jan 15, 2015 1:27 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Sean,

 thanks for your message.

 On Wed, Jan 14, 2015 at 8:36 PM, Sean Owen so...@cloudera.com wrote:

 On Wed, Jan 14, 2015 at 4:53 AM, Tobias Pfeiffer t...@preferred.jp
 wrote:
  OK, it seems like even on a local machine (with no network overhead),
 the
  groupByKey version is about 5 times slower than any of the other
  (reduceByKey, combineByKey etc.) functions...

 Even without network overhead, you're still paying the cost of setting
 up the shuffle and serialization.


 Can I pick an appropriate scheduler some time before so that Spark knows
 all items with the same key are on the same host? (Or enforce this?)

 Thanks
 Tobias





Re: *ByKey aggregations: performance + order

2015-01-14 Thread Tobias Pfeiffer
Sean,

thanks for your message.

On Wed, Jan 14, 2015 at 8:36 PM, Sean Owen so...@cloudera.com wrote:

 On Wed, Jan 14, 2015 at 4:53 AM, Tobias Pfeiffer t...@preferred.jp wrote:
  OK, it seems like even on a local machine (with no network overhead), the
  groupByKey version is about 5 times slower than any of the other
  (reduceByKey, combineByKey etc.) functions...

 Even without network overhead, you're still paying the cost of setting
 up the shuffle and serialization.


Can I pick an appropriate scheduler some time before so that Spark knows
all items with the same key are on the same host? (Or enforce this?)

Thanks
Tobias


Re: *ByKey aggregations: performance + order

2015-01-14 Thread Sean Owen
On Wed, Jan 14, 2015 at 4:53 AM, Tobias Pfeiffer t...@preferred.jp wrote:
 Now I don't know (yet) if all of the functions I want to compute can be
 expressed in this way and I was wondering about *how much* more expensive we
 are talking about.


 OK, it seems like even on a local machine (with no network overhead), the
 groupByKey version is about 5 times slower than any of the other
 (reduceByKey, combineByKey etc.) functions...

Even without network overhead, you're still paying the cost of setting
up the shuffle and serialization.

Yes the overhead is that groupByKey has to move all the data around.
If an aggregation is really what's needed, then most of the reducing /
combining can happen locally before the result ever goes anywhere else
on the network.

There's also the issue of keys with very large values.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: *ByKey aggregations: performance + order

2015-01-13 Thread Tobias Pfeiffer
Hi,

On Wed, Jan 14, 2015 at 12:11 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Now I don't know (yet) if all of the functions I want to compute can be
 expressed in this way and I was wondering about *how much* more expensive
 we are talking about.


OK, it seems like even on a local machine (with no network overhead), the
groupByKey version is about 5 times slower than any of the other
(reduceByKey, combineByKey etc.) functions...

  val rdd = sc.parallelize(1 to 500)
  val withKeys = rdd.zipWithIndex.map(kv = (kv._2/10, kv._1))
  withKeys.cache()
  withKeys.count

  // around 850-1100 ms
  for (i - 1 to 5) yield {
val start = System.currentTimeMillis
withKeys.reduceByKey(_ + _).count()
System.currentTimeMillis - start
  }

  // around 800-1100 ms
  for (i - 1 to 5) yield {
val start = System.currentTimeMillis
withKeys.combineByKey((x: Int) = x, (x: Int, y: Int) = x + y,
  (x: Int, y: Int) = x + y).count()
System.currentTimeMillis - start
  }

  // around 1500-1900 ms
  for (i - 1 to 5) yield {
val start = System.currentTimeMillis
withKeys.foldByKey(0)(_ + _).count()
System.currentTimeMillis - start
  }

  // around 1400-1800 ms
  for (i - 1 to 5) yield {
val start = System.currentTimeMillis
withKeys.aggregateByKey(0)(_ + _, _ + _).count()
System.currentTimeMillis - start
  }

  // around 5500-6200 ms
  for (i - 1 to 5) yield {
val start = System.currentTimeMillis
withKeys.groupByKey().mapValues(_.reduceLeft(_ + _)).count()
System.currentTimeMillis - start
  }

Tobias


*ByKey aggregations: performance + order

2015-01-13 Thread Tobias Pfeiffer
Hi,

I have an RDD[(Long, MyData)] where I want to compute various functions on
lists of MyData items with the same key (this will in general be a rather
short lists, around 10 items per key).

Naturally I was thinking of groupByKey() but was a bit intimidated by the
warning: This operation may be very expensive. If you are grouping in
order to perform an aggregation (such as a sum or average) over each key,
using PairRDDFunctions.aggregateByKey or PairRDDFunctions.reduceByKey will
provide much better performance.

Now I don't know (yet) if all of the functions I want to compute can be
expressed in this way and I was wondering about *how much* more expensive
we are talking about. Say I have something like

  rdd.zipWithIndex.map(kv = (kv._2/10, kv._1)).groupByKey(),

i.e. items that will be grouped will 99% live in the same partition (do
they?), does this change the performance?

Also, if my operations depend on the order in the original RDD (say, string
concatenation), how could I make sure the order of the original RDD is
respected?

Thanks
Tobias