RDD.aggregate doesn’t require the RDD elements to be pairs, so you don’t
need to use user_id to be the key or the RDD. For example, you can use
an empty Map as the zero value of the aggregation. The key of the Map is
the user_id you extracted from each tuple, and the value is the
aggregated value.
|keyByTimestamp.aggregate(Map.empty[String,Float].withDefaultValue(0.0))({ (agg,
rec) =>
val (time, (user, amount)) = rec
agg.updated(user, agg(user) + amount)
}, { (lhs, rhs) =>
lhs.keys.foldLeft(rhs) { (combined, user) =>
combined.updated(user, lhs(user) + rhs(user))
}
})
|
Of course, you may use mutable Map for optimized performance. One thing
to notice, foldByKey is a transformation, while aggregate is an action.
The final result of the code above is a single Map object rather than an
RDD. If this map can be very large (say you have billions of users),
then aggregate may OOM.
On 10/17/14 12:01 AM, Michael Misiewicz wrote:
Thanks for the suggestion! That does look really helpful, I see what
you mean about it being more general than fold. I think I will replace
my fold with aggregate - it should give me more control over the process.
I think the problem will still exist though - which is that I can't
get the correct partitioning I need. When I change my key to user_id,
I lose the timestamp partitioning. My problem is that I'm trying to
retain a parent RDD's partitioning in an RDD that no longer has the
same keys as its parent.
On Thu, Oct 16, 2014 at 11:46 AM, Cheng Lian <lian.cs....@gmail.com
<mailto:lian.cs....@gmail.com>> wrote:
Hi Michael,
I'm not sure I fully understood your question, but I think
RDD.aggregate can be helpful in your case. You can see it as a
more general version of fold.
Cheng
On 10/16/14 11:15 PM, Michael Misiewicz wrote:
Hi,
I'm working on a problem where I'd like to sum items in an RDD
/in order (/approximately/)/. I am currently trying to implement
this using a fold, but I'm having some issues because the sorting
key of my data is not the same as the folding key for my data. I
have data that looks like this:
user_id, transaction_timestamp, transaction_amount
And I'm interested in doing a foldByKey on user_id to sum
transaction amounts - taking care to note approximately when a
user surpasses a total transaction threshold. I'm using
RangePartitioner to make sure that data is ordered sequentially
between partitions, and I'd also make sure that data is sorted
within partitions, though I'm not sure how to do this exactly (I
was going to look at the code for sortByKey to figure this out -
I believe sorting in place in a mapPartitions should work). What
do you think about the approach? Here's some sample code that
demonstrates what I'm thinking:
def myFold(V1:Float, V2:Float) : Float = {
val partialSum = V1 + V2
if (partialSum >= 500) {
// make a note of it, do things
}
return partialSum
}
val rawData = sc.textFile("hdfs://path/to/data").map{ x => //
load data
l = x.split()
(l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long,
transaction_timestamp:long, transaction_amount:float
}
val keyByTimestamp = rawData.map(x=> (x._2, (x._1, x._3))) //
rearrange to make timestamp the key (for sorting), convert to PairRDD
val sortedByTimestamp = keyByTimestamp.sortByKey()
val partitionedByTimestamp = sortedByTimestamp.partitionBy(
new org.apache.spark.RangePartitioner(partitions=500,
rdd=sortedByTimestamp)).persist()
// By this point, the RDD should be sorted and partitioned
according to the timestamp. However, I need to now make user_id
the key,
// because the output must be per user. At this point, since I
change the keys of the PairRDD, I understand that I lose the
partitioning
// the consequence of this is that I can no longer be sure in my
fold function that the ordering is retained.
val keyByUser = partitionedByTimestamp.map(x => (x._2._1, x._2._2))
val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold)
finalResult.saveAsTextFile("hdfs://...")
The problem as you'd expect takes place in the folding function,
after I've re-arranged my RDD to no longer be keyed by timestamp
(when I produce keyByUser, I lose the correct partitioning). As
I've read in the documentation, partitioning is not preserved
when keys are changed (which makes sense).
Reading this thread:
https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4
<https://groups.google.com/forum/#%21topic/spark-users/Fx7DNtWiSx4>
it appears that one possible solution might be to subclass RDD (à
la MappedValuesRDD) to define my own RDDthat retains the
partitions of its parent. This seems simple enough, but I've
never done anything like that before, but I'm not sure where to
start. I'm also willing to write my own custom partitioner class,
but it appears that the getPartitionmethod only accepts a "key"
argument - and since the value I need to partition on in the
final step (the timestamp) would be in the Value, my
partitioner class doesn't have the data it needs to make the
right decision. I cannot have timestamp in my key.
Alternatively, has anyone else encountered a problem like this
(i.e. an approximately ordered sum) and did they find a good
solution? Does my approach of subclassing RDDmake sense? Would
there be some way to finagle a custom partitioner into making
this work? Perhaps this might be a job for some other tool, like
spark streaming?
Thanks,
Michael