Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

2015-04-28 Thread Silvio Fiorito
The initializer is a tuple (0, 0) it seems you just have 0

From: "subscripti...@prismalytics.io<mailto:subscripti...@prismalytics.io>"
Organization: PRISMALYTICS, LLC.
Reply-To: "subscripti...@prismalytics.io<mailto:subscripti...@prismalytics.io>"
Date: Tuesday, April 28, 2015 at 1:28 PM
To: Silvio Fiorito, Todd Nist
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

Thank you Todd, Silvio...

I had to stare at Silvio's answer for a while.

If I'm interpreting the aggregateByKey() statement correctly ...
   (Within-Partition Reduction Step)
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a SCALAR that holds the next Value

  (Cross-Partition Reduction Step)
  a: is a TUPLE that holds: (runningSum, runningCount).
  b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).

Under that interpretation, I tried to write & run the Python equivalent, like 
so:
  rdd1.aggregateByKey(0, lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: 
(a[0] + b[0], a[1] + b[1]))

Sadly, it didn't work, yielding the following exception which indicates that 
the indexing above is incorrect:
lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions)
File "", line 1, in 
TypeError: 'int' object has no attribute '__getitem__'

Sidenote: Surprisingly, there isn't much documentation -- at least not for 
Python -- for this useful aggregateByKey()
method and use case; although I will be sure to write a gist today, once I get 
this working. :)

I think I'm nearly there though, so...
(1) Is my written interpretation above about of what (a,b) are correct?
(2) If yes, what then, is getting passed in the Python case?

I guess I'm looking for the Python equivalent to the first statement in 
Silvio's answer (below). But my
reasoning to deconstruct and reconstruct is missing something.

Thanks again!




On 04/28/2015 11:26 AM, Silvio Fiorito wrote:
If you need to keep the keys, you can use aggregateByKey to calculate an avg of 
the values:

val step1 = data.aggregateByKey((0.0, 0))((a, b) => (a._1 + b, a._2 + 1), (a, 
b) => (a._1 + b._1, a._2 + b._2))
val avgByKey = step1.mapValues(i => i._1/i._2)

Essentially, what this is doing is passing an initializer for sum and count, 
then summing each pair of values and counting the number of values. The last 
argument is to combine the results of each partition, if the data was spread 
across partitions. The result is a tuple of sum and count for each key.

Use mapValues to keep your partitioning by keys intact and minimize a full 
shuffle for downstream keyed operations. It just calculates the avg for each 
key.

From: Todd Nist
Date: Tuesday, April 28, 2015 at 10:20 AM
To: "subscripti...@prismalytics.io<mailto:subscripti...@prismalytics.io>"
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

Can you simply apply the 
https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.util.StatCounter
 to this?  You should be able to do something like this:

val stats = RDD.map(x => x._2).stats()

-Todd

On Tue, Apr 28, 2015 at 10:00 AM, 
subscripti...@prismalytics.io<mailto:subscripti...@prismalytics.io> 
mailto:subscripti...@prismalytics.io>> wrote:
Hello Friends:

I generated a Pair RDD with K/V pairs, like so:

>>>
>>> rdd1.take(10) # Show a small sample.
 [(u'2013-10-09', 7.60117302052786),
 (u'2013-10-10', 9.322709163346612),
 (u'2013-10-10', 28.264462809917358),
 (u'2013-10-07', 9.664429530201343),
 (u'2013-10-07', 12.461538461538463),
 (u'2013-10-09', 20.76923076923077),
 (u'2013-10-08', 11.842105263157894),
 (u'2013-10-13', 32.32514177693762),
 (u'2013-10-13', 26.246),
 (u'2013-10-13', 10.693069306930692)]

Now from the above RDD, I would like to calculate an average of the VALUES for 
each KEY.
I can do so as shown here, which does work:

>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of 
>>> countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerator (i.e. the 
>>> SUM).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide 
>>> each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]

But I wonder if the above semantics/approach is the optimal one; or whether 
perhaps there is a single API call
that handles common use

Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

2015-04-28 Thread subscripti...@prismalytics.io

Thank you Todd, Silvio...

I had to stare at Silvio's answer for a while.

_If I'm interpreting the aggregateByKey() statement__correctly ...
_   (Within-Partition Reduction Step)
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a SCALAR that holds the next Value

  (Cross-Partition Reduction Step)
  a: is a TUPLE that holds: (runningSum, runningCount).
  b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).

Under that interpretation, I tried to write & run the Python equivalent, 
like so:
  rdd1.aggregateByKey(0, lambda a,b: (a[0] + b, a[1] + 1), lambda 
a,b: (a[0] + b[0], a[1] + b[1]))


Sadly, it didn't work, yielding the following exception which indicates 
that the indexing above is incorrect:

lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions)
File "", line 1, in 
TypeError: 'int' object has no attribute '__getitem__'

/Sidenote: Surprisingly, there isn't much documentation -- at least not 
for Python -- for this useful aggregateByKey()//
//method and use case; although I will be sure to write a //g//ist 
today, once I get this working. :)/

_
__I think I'm nearly there though, so..._
(1) Is my written interpretation above about of what (a,b) are correct?
(2) If yes, what then, is getting passed in the Python case?

I guess I'm looking for the Python equivalent to the first statement in 
Silvio's answer (below). But my

reasoning to deconstruct and reconstruct is missing something.

Thanks again!




On 04/28/2015 11:26 AM, Silvio Fiorito wrote:
If you need to keep the keys, you can use aggregateByKey to calculate 
an avg of the values:


val step1 = data.aggregateByKey((0.0, 0))((a, b) => (a._1 + b, a._2 + 
1), (a, b) => (a._1 + b._1, a._2 + b._2))

val avgByKey = step1.mapValues(i => i._1/i._2)

Essentially, what this is doing is passing an initializer for sum and 
count, then summing each pair of values and counting the number of 
values. The last argument is to combine the results of each partition, 
if the data was spread across partitions. The result is a tuple of sum 
and count for each key.


Use mapValues to keep your partitioning by keys intact and minimize a 
full shuffle for downstream keyed operations. It just calculates the 
avg for each key.


From: Todd Nist
Date: Tuesday, April 28, 2015 at 10:20 AM
To: "subscripti...@prismalytics.io <mailto:subscripti...@prismalytics.io>"
Cc: "user@spark.apache.org <mailto:user@spark.apache.org>"
Subject: Re: Calculating the averages for each KEY in a Pairwise (K,V) 
RDD ...


Can you simply apply the 
https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.util.StatCounter 
to this?  You should be able to do something like this:


val stats = RDD.map(x => x._2).stats()

-Todd

On Tue, Apr 28, 2015 at 10:00 AM, subscripti...@prismalytics.io 
<mailto:subscripti...@prismalytics.io> <mailto:subscripti...@prismalytics.io>> wrote:


Hello Friends:

I generated a Pair RDD with K/V pairs, like so:

>>>
>>> rdd1.take(10) # Show a small sample.
 [(u'2013-10-09', 7.60117302052786),
 (u'2013-10-10', 9.322709163346612),
 (u'2013-10-10', 28.264462809917358),
 (u'2013-10-07', 9.664429530201343),
 (u'2013-10-07', 12.461538461538463),
 (u'2013-10-09', 20.76923076923077),
 (u'2013-10-08', 11.842105263157894),
 (u'2013-10-13', 32.32514177693762),
 (u'2013-10-13', 26.246),
 (u'2013-10-13', 10.693069306930692)]

Now from the above RDD, I would like to calculate an average of
the VALUES for each KEY.
I can do so as shown here, which does work:
*
*>>>**countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE
OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08':
69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the
numerator (i.e. the SUM).
>>> rdd1 = rdd1.map(lambda x: (x[0],
x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's
denominator (i.e. COUNT)
>>> print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]

But I wonder if the above semantics/approach is the optimal one;
or whether perhaps there is a single API call
that handles common use case.

Improvement thoughts welcome. =:)

Thank you,
nmv




--
PRISMALYTICS Sincerely yours,
Team PRISMALYTICS

PRISMALYTICS, LLC. <http://www.prismalytics.com/> | www.prismalytics.com 
<http://www.prismalytics.com/>
P: 212.882.1276  | subscripti...@prismalytics.io 
<mailto:subscripti...@prismalytics.io>
Follow Us: https://www.LinkedIn.com/company/prismalytics 
<https://www.linkedin.com/company/prismalytics>


Prismalytics, LLC. <http://www.prismalytics.com/>
data analytics to literally count on


Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

2015-04-28 Thread Silvio Fiorito
If you need to keep the keys, you can use aggregateByKey to calculate an avg of 
the values:

val step1 = data.aggregateByKey((0.0, 0))((a, b) => (a._1 + b, a._2 + 1), (a, 
b) => (a._1 + b._1, a._2 + b._2))
val avgByKey = step1.mapValues(i => i._1/i._2)

Essentially, what this is doing is passing an initializer for sum and count, 
then summing each pair of values and counting the number of values. The last 
argument is to combine the results of each partition, if the data was spread 
across partitions. The result is a tuple of sum and count for each key.

Use mapValues to keep your partitioning by keys intact and minimize a full 
shuffle for downstream keyed operations. It just calculates the avg for each 
key.

From: Todd Nist
Date: Tuesday, April 28, 2015 at 10:20 AM
To: "subscripti...@prismalytics.io<mailto:subscripti...@prismalytics.io>"
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

Can you simply apply the 
https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.util.StatCounter
 to this?  You should be able to do something like this:

val stats = RDD.map(x => x._2).stats()

-Todd

On Tue, Apr 28, 2015 at 10:00 AM, 
subscripti...@prismalytics.io<mailto:subscripti...@prismalytics.io> 
mailto:subscripti...@prismalytics.io>> wrote:
Hello Friends:

I generated a Pair RDD with K/V pairs, like so:

>>>
>>> rdd1.take(10) # Show a small sample.
 [(u'2013-10-09', 7.60117302052786),
 (u'2013-10-10', 9.322709163346612),
 (u'2013-10-10', 28.264462809917358),
 (u'2013-10-07', 9.664429530201343),
 (u'2013-10-07', 12.461538461538463),
 (u'2013-10-09', 20.76923076923077),
 (u'2013-10-08', 11.842105263157894),
 (u'2013-10-13', 32.32514177693762),
 (u'2013-10-13', 26.246),
 (u'2013-10-13', 10.693069306930692)]

Now from the above RDD, I would like to calculate an average of the VALUES for 
each KEY.
I can do so as shown here, which does work:

>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of 
>>> countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerator (i.e. the 
>>> SUM).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide 
>>> each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]

But I wonder if the above semantics/approach is the optimal one; or whether 
perhaps there is a single API call
that handles common use case.

Improvement thoughts welcome. =:)

Thank you,
nmv



Re: Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

2015-04-28 Thread Todd Nist
Can you simply apply the
https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.util.StatCounter
to this?  You should be able to do something like this:

val stats = RDD.map(x => x._2).stats()

-Todd

On Tue, Apr 28, 2015 at 10:00 AM, subscripti...@prismalytics.io <
subscripti...@prismalytics.io> wrote:

>  Hello Friends:
>
> I generated a Pair RDD with K/V pairs, like so:
>
> >>>
> >>> rdd1.take(10) # Show a small sample.
>  [(u'2013-10-09', 7.60117302052786),
>  (u'2013-10-10', 9.322709163346612),
>  (u'2013-10-10', 28.264462809917358),
>  (u'2013-10-07', 9.664429530201343),
>  (u'2013-10-07', 12.461538461538463),
>  (u'2013-10-09', 20.76923076923077),
>  (u'2013-10-08', 11.842105263157894),
>  (u'2013-10-13', 32.32514177693762),
>  (u'2013-10-13', 26.246),
>  (u'2013-10-13', 10.693069306930692)]
>
> Now from the above RDD, I would like to calculate an average of the VALUES
> for each KEY.
> I can do so as shown here, which does work:
>
> >>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of
> countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
> >>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerator (i.e.
> the SUM).
> >>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) #
> Divide each SUM by it's denominator (i.e. COUNT)
> >>> print(rdd1.collect())
>   [(u'2013-10-09', 11.235365503035176),
>(u'2013-10-07', 23.39500642456595),
>... snip ...
>   ]
>
> But I wonder if the above semantics/approach is the optimal one; or
> whether perhaps there is a single API call
> that handles common use case.
>
> Improvement thoughts welcome. =:)
>
> Thank you,
> nmv
>


Calculating the averages for each KEY in a Pairwise (K,V) RDD ...

2015-04-28 Thread subscripti...@prismalytics.io

Hello Friends:

I generated a Pair RDD with K/V pairs, like so:

>>>
>>> rdd1.take(10) # Show a small sample.
 [(u'2013-10-09', 7.60117302052786),
 (u'2013-10-10', 9.322709163346612),
 (u'2013-10-10', 28.264462809917358),
 (u'2013-10-07', 9.664429530201343),
 (u'2013-10-07', 12.461538461538463),
 (u'2013-10-09', 20.76923076923077),
 (u'2013-10-08', 11.842105263157894),
 (u'2013-10-13', 32.32514177693762),
 (u'2013-10-13', 26.246),
 (u'2013-10-13', 10.693069306930692)]

Now from the above RDD, I would like to calculate an average of the 
VALUES for each KEY.

I can do so as shown here, which does work:
*
*>>>**countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of 
countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerator 
(i.e. the SUM).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # 
Divide each SUM by it's denominator (i.e. COUNT)

>>> print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]

But I wonder if the above semantics/approach is the optimal one; or 
whether perhaps there is a single API call

that handles common use case.

Improvement thoughts welcome. =:)

Thank you,
nmv