Hm, a little confused here. What exactly the ordering do you expect? It
seems that you want all the elements in the RDD to be sorted first by
timestamp and then by user_id. If this is true, then you can simply do this:
|rawData.map {case (time, user, amount) => (time, user) -> amount
}.sortByKey.aggregate(…)
|
On 10/17/14 10:44 PM, Michael Misiewicz wrote:
Thank you for sharing this Cheng! This is fantastic. I was able to
implement it and it seems like it's working quite well. I'm definitely
on the right track now!
I'm still having a small problem with the rows inside each partition
being out of order - but I suspect this is because in the code
currently, I sortByKey, then use RangePartitioner (which I think does
not maintain row order within each partition - due to the shuffle in
RangePartitioner). I suspect I can work around this by doing
operations in these order:
- RangePartitioner
- mapValues to sort each partition in memory, maintaining partitioning
- aggregate
Michael
On Thu, Oct 16, 2014 at 12:35 PM, Cheng Lian <lian.cs....@gmail.com
<mailto:lian.cs....@gmail.com>> wrote:
RDD.aggregate doesn’t require the RDD elements to be pairs, so you
don’t need to use user_id to be the key or the RDD. For example,
you can use an empty Map as the zero value of the aggregation. The
key of the Map is the user_id you extracted from each tuple, and
the value is the aggregated value.
|keyByTimestamp.aggregate(Map.empty[String,Float].withDefaultValue(0.0))({
(agg, rec) =>
val (time, (user, amount)) = rec
agg.updated(user, agg(user) + amount)
}, { (lhs, rhs) =>
lhs.keys.foldLeft(rhs) { (combined, user) =>
combined.updated(user, lhs(user) + rhs(user))
}
})
|
Of course, you may use mutable Map for optimized performance. One
thing to notice, foldByKey is a transformation, while aggregate is
an action. The final result of the code above is a single Map
object rather than an RDD. If this map can be very large (say you
have billions of users), then aggregate may OOM.
On 10/17/14 12:01 AM, Michael Misiewicz wrote:
Thanks for the suggestion! That does look really helpful, I see
what you mean about it being more general than fold. I think I
will replace my fold with aggregate - it should give me more
control over the process.
I think the problem will still exist though - which is that I
can't get the correct partitioning I need. When I change my key
to user_id, I lose the timestamp partitioning. My problem is that
I'm trying to retain a parent RDD's partitioning in an RDD that
no longer has the same keys as its parent.
On Thu, Oct 16, 2014 at 11:46 AM, Cheng Lian
<lian.cs....@gmail.com <mailto:lian.cs....@gmail.com>> wrote:
Hi Michael,
I'm not sure I fully understood your question, but I think
RDD.aggregate can be helpful in your case. You can see it as
a more general version of fold.
Cheng
On 10/16/14 11:15 PM, Michael Misiewicz wrote:
Hi,
I'm working on a problem where I'd like to sum items in an
RDD /in order (/approximately/)/. I am currently trying to
implement this using a fold, but I'm having some issues
because the sorting key of my data is not the same as the
folding key for my data. I have data that looks like this:
user_id, transaction_timestamp, transaction_amount
And I'm interested in doing a foldByKey on user_id to sum
transaction amounts - taking care to note approximately when
a user surpasses a total transaction threshold. I'm using
RangePartitioner to make sure that data is ordered
sequentially between partitions, and I'd also make sure that
data is sorted within partitions, though I'm not sure how to
do this exactly (I was going to look at the code for
sortByKey to figure this out - I believe sorting in place in
a mapPartitions should work). What do you think about the
approach? Here's some sample code that demonstrates what I'm
thinking:
def myFold(V1:Float, V2:Float) : Float = {
val partialSum = V1 + V2
if (partialSum >= 500) {
// make a note of it, do things
}
return partialSum
}
val rawData = sc.textFile("hdfs://path/to/data").map{ x =>
// load data
l = x.split()
(l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long,
transaction_timestamp:long, transaction_amount:float
}
val keyByTimestamp = rawData.map(x=> (x._2, (x._1, x._3)))
// rearrange to make timestamp the key (for sorting),
convert to PairRDD
val sortedByTimestamp = keyByTimestamp.sortByKey()
val partitionedByTimestamp = sortedByTimestamp.partitionBy(
new org.apache.spark.RangePartitioner(partitions=500,
rdd=sortedByTimestamp)).persist()
// By this point, the RDD should be sorted and partitioned
according to the timestamp. However, I need to now make
user_id the key,
// because the output must be per user. At this point, since
I change the keys of the PairRDD, I understand that I lose
the partitioning
// the consequence of this is that I can no longer be sure
in my fold function that the ordering is retained.
val keyByUser = partitionedByTimestamp.map(x => (x._2._1,
x._2._2))
val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold)
finalResult.saveAsTextFile("hdfs://...")
The problem as you'd expect takes place in the folding
function, after I've re-arranged my RDD to no longer be
keyed by timestamp (when I produce keyByUser, I lose the
correct partitioning). As I've read in the
documentation, partitioning is not preserved when keys are
changed (which makes sense).
Reading this thread:
https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4
<https://groups.google.com/forum/#%21topic/spark-users/Fx7DNtWiSx4>
it appears that one possible solution might be to subclass
RDD (à la MappedValuesRDD) to define my own RDDthat retains
the partitions of its parent. This seems simple enough, but
I've never done anything like that before, but I'm not sure
where to start. I'm also willing to write my own custom
partitioner class, but it appears that the
getPartitionmethod only accepts a "key" argument - and since
the value I need to partition on in the final step (the
timestamp) would be in the Value, my partitioner class
doesn't have the data it needs to make the right decision. I
cannot have timestamp in my key.
Alternatively, has anyone else encountered a problem like
this (i.e. an approximately ordered sum) and did they find a
good solution? Does my approach of subclassing RDDmake
sense? Would there be some way to finagle a custom
partitioner into making this work? Perhaps this might be a
job for some other tool, like spark streaming?
Thanks,
Michael