Hi Michael,

I'm not sure I fully understood your question, but I think RDD.aggregate can be helpful in your case. You can see it as a more general version of fold.

Cheng


On 10/16/14 11:15 PM, Michael Misiewicz wrote:
Hi,

I'm working on a problem where I'd like to sum items in an RDD /in order (/approximately/)/. I am currently trying to implement this using a fold, but I'm having some issues because the sorting key of my data is not the same as the folding key for my data. I have data that looks like this:

user_id, transaction_timestamp, transaction_amount

And I'm interested in doing a foldByKey on user_id to sum transaction amounts - taking care to note approximately when a user surpasses a total transaction threshold. I'm using RangePartitioner to make sure that data is ordered sequentially between partitions, and I'd also make sure that data is sorted within partitions, though I'm not sure how to do this exactly (I was going to look at the code for sortByKey to figure this out - I believe sorting in place in a mapPartitions should work). What do you think about the approach? Here's some sample code that demonstrates what I'm thinking:

def myFold(V1:Float, V2:Float) : Float = {
val partialSum = V1 + V2
if (partialSum >= 500) {
// make a note of it, do things
}
return partialSum
}

val rawData = sc.textFile("hdfs://path/to/data").map{ x => // load data
l = x.split()
(l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long, transaction_timestamp:long, transaction_amount:float
}
val keyByTimestamp = rawData.map(x=> (x._2, (x._1, x._3))) // rearrange to make timestamp the key (for sorting), convert to PairRDD
val sortedByTimestamp = keyByTimestamp.sortByKey()
val partitionedByTimestamp = sortedByTimestamp.partitionBy(
new org.apache.spark.RangePartitioner(partitions=500, rdd=sortedByTimestamp)).persist() // By this point, the RDD should be sorted and partitioned according to the timestamp. However, I need to now make user_id the key, // because the output must be per user. At this point, since I change the keys of the PairRDD, I understand that I lose the partitioning // the consequence of this is that I can no longer be sure in my fold function that the ordering is retained.

val keyByUser = partitionedByTimestamp.map(x => (x._2._1, x._2._2))
val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold)
finalResult.saveAsTextFile("hdfs://...")

The problem as you'd expect takes place in the folding function, after I've re-arranged my RDD to no longer be keyed by timestamp (when I produce keyByUser, I lose the correct partitioning). As I've read in the documentation, partitioning is not preserved when keys are changed (which makes sense).

Reading this thread: https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4 <https://groups.google.com/forum/#%21topic/spark-users/Fx7DNtWiSx4> it appears that one possible solution might be to subclass RDD (à la MappedValuesRDD) to define my own RDDthat retains the partitions of its parent. This seems simple enough, but I've never done anything like that before, but I'm not sure where to start. I'm also willing to write my own custom partitioner class, but it appears that the getPartitionmethod only accepts a "key" argument - and since the value I need to partition on in the final step (the timestamp) would be in the Value, my partitioner class doesn't have the data it needs to make the right decision. I cannot have timestamp in my key.

Alternatively, has anyone else encountered a problem like this (i.e. an approximately ordered sum) and did they find a good solution? Does my approach of subclassing RDDmake sense? Would there be some way to finagle a custom partitioner into making this work? Perhaps this might be a job for some other tool, like spark streaming?

Thanks,
Michael

Reply via email to