If you need to keep the keys, you can use aggregateByKey to calculate an avg of 
the values:

val step1 = data.aggregateByKey((0.0, 0))((a, b) => (a._1 + b, a._2 + 1), (a, 
b) => (a._1 + b._1, a._2 + b._2))
val avgByKey = step1.mapValues(i => i._1/i._2)

Essentially, what this is doing is passing an initializer for sum and count, 
then summing each pair of values and counting the number of values. The last 
argument is to combine the results of each partition, if the data was spread 
across partitions. The result is a tuple of sum and count for each key.

Use mapValues to keep your partitioning by keys intact and minimize a full 
shuffle for downstream keyed operations. It just calculates the avg for each 
key.

From: Todd Nist
Date: Tuesday, April 28, 2015 at 10:20 AM
To: "subscripti...@prismalytics.io<mailto:subscripti...@prismalytics.io>"
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

Can you simply apply the 
https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.util.StatCounter
 to this?  You should be able to do something like this:

val stats = RDD.map(x => x._2).stats()

-Todd

On Tue, Apr 28, 2015 at 10:00 AM, 
subscripti...@prismalytics.io<mailto:subscripti...@prismalytics.io> 
<subscripti...@prismalytics.io<mailto:subscripti...@prismalytics.io>> wrote:
Hello Friends:

I generated a Pair RDD with K/V pairs, like so:

>>>
>>> rdd1.take(10) # Show a small sample.
 [(u'2013-10-09', 7.60117302052786),
 (u'2013-10-10', 9.322709163346612),
 (u'2013-10-10', 28.264462809917358),
 (u'2013-10-07', 9.664429530201343),
 (u'2013-10-07', 12.461538461538463),
 (u'2013-10-09', 20.76923076923077),
 (u'2013-10-08', 11.842105263157894),
 (u'2013-10-13', 32.32514177693762),
 (u'2013-10-13', 26.249999999999996),
 (u'2013-10-13', 10.693069306930692)]

Now from the above RDD, I would like to calculate an average of the VALUES for 
each KEY.
I can do so as shown here, which does work:

>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of 
>>> countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerator (i.e. the 
>>> SUM).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide 
>>> each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]

But I wonder if the above semantics/approach is the optimal one; or whether 
perhaps there is a single API call
that handles common use case.

Improvement thoughts welcome. =:)

Thank you,
nmv

Reply via email to