Hi Michael,
I'm not sure I fully understood your question, but I think RDD.aggregate
can be helpful in your case. You can see it as a more general version of
fold.
Cheng
On 10/16/14 11:15 PM, Michael Misiewicz wrote:
Hi,
I'm working on a problem where I'd like to sum items in an RDD /in
order (/approximately/)/. I am currently trying to implement this
using a fold, but I'm having some issues because the sorting key of my
data is not the same as the folding key for my data. I have data that
looks like this:
user_id, transaction_timestamp, transaction_amount
And I'm interested in doing a foldByKey on user_id to sum transaction
amounts - taking care to note approximately when a user surpasses a
total transaction threshold. I'm using RangePartitioner to make sure
that data is ordered sequentially between partitions, and I'd also
make sure that data is sorted within partitions, though I'm not sure
how to do this exactly (I was going to look at the code for sortByKey
to figure this out - I believe sorting in place in a mapPartitions
should work). What do you think about the approach? Here's some sample
code that demonstrates what I'm thinking:
def myFold(V1:Float, V2:Float) : Float = {
val partialSum = V1 + V2
if (partialSum >= 500) {
// make a note of it, do things
}
return partialSum
}
val rawData = sc.textFile("hdfs://path/to/data").map{ x => // load data
l = x.split()
(l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long,
transaction_timestamp:long, transaction_amount:float
}
val keyByTimestamp = rawData.map(x=> (x._2, (x._1, x._3))) //
rearrange to make timestamp the key (for sorting), convert to PairRDD
val sortedByTimestamp = keyByTimestamp.sortByKey()
val partitionedByTimestamp = sortedByTimestamp.partitionBy(
new org.apache.spark.RangePartitioner(partitions=500,
rdd=sortedByTimestamp)).persist()
// By this point, the RDD should be sorted and partitioned according
to the timestamp. However, I need to now make user_id the key,
// because the output must be per user. At this point, since I change
the keys of the PairRDD, I understand that I lose the partitioning
// the consequence of this is that I can no longer be sure in my fold
function that the ordering is retained.
val keyByUser = partitionedByTimestamp.map(x => (x._2._1, x._2._2))
val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold)
finalResult.saveAsTextFile("hdfs://...")
The problem as you'd expect takes place in the folding function, after
I've re-arranged my RDD to no longer be keyed by timestamp (when I
produce keyByUser, I lose the correct partitioning). As I've read in
the documentation, partitioning is not preserved when keys are changed
(which makes sense).
Reading this thread:
https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4
<https://groups.google.com/forum/#%21topic/spark-users/Fx7DNtWiSx4> it
appears that one possible solution might be to subclass RDD (à la
MappedValuesRDD) to define my own RDDthat retains the partitions of
its parent. This seems simple enough, but I've never done anything
like that before, but I'm not sure where to start. I'm also willing to
write my own custom partitioner class, but it appears that the
getPartitionmethod only accepts a "key" argument - and since the value
I need to partition on in the final step (the timestamp) would be in
the Value, my partitioner class doesn't have the data it needs to make
the right decision. I cannot have timestamp in my key.
Alternatively, has anyone else encountered a problem like this (i.e.
an approximately ordered sum) and did they find a good solution? Does
my approach of subclassing RDDmake sense? Would there be some way to
finagle a custom partitioner into making this work? Perhaps this might
be a job for some other tool, like spark streaming?
Thanks,
Michael