My goal is for rows to be partitioned according to timestamp bins (e.g.
with each partition representing an even interval of time), and then
ordered by timestamp *within* each partition. Ordering by user ID is not
important. In my aggregate function, in the seqOp function, I am checking
to verify this fact, and seeing that rows within partitions are not in
order - but I think this should be very easy to solve with mapPartitons(
preservesPartitioning=True) prior to aggregate(), which should maintain the
evenly spaced ranges produced by RangePartitioner.

On Fri, Oct 17, 2014 at 11:04 AM, Cheng Lian <lian.cs....@gmail.com> wrote:

>  Hm, a little confused here. What exactly the ordering do you expect? It
> seems that you want all the elements in the RDD to be sorted first by
> timestamp and then by user_id. If this is true, then you can simply do this:
>
> rawData.map { case (time, user, amount) => (time, user) -> amount 
> }.sortByKey.aggregate(…)
>
> On 10/17/14 10:44 PM, Michael Misiewicz wrote:
>
>   Thank you for sharing this Cheng! This is fantastic. I was able to
> implement it and it seems like it's working quite well. I'm definitely on
> the right track now!
>
>  I'm still having a small problem with the rows inside each partition
> being out of order - but I suspect this is because in the code currently, I
> sortByKey, then use RangePartitioner (which I think does not maintain row
> order within each partition - due to the shuffle in RangePartitioner). I
> suspect I can work around this by doing operations in these order:
>
>  - RangePartitioner
> - mapValues to sort each partition in memory, maintaining partitioning
> - aggregate
>
>  Michael
>
> On Thu, Oct 16, 2014 at 12:35 PM, Cheng Lian <lian.cs....@gmail.com>
> wrote:
>
>>  RDD.aggregate doesn’t require the RDD elements to be pairs, so you
>> don’t need to use user_id to be the key or the RDD. For example, you can
>> use an empty Map as the zero value of the aggregation. The key of the Map
>> is the user_id you extracted from each tuple, and the value is the
>> aggregated value.
>>
>> keyByTimestamp.aggregate(Map.empty[String, Float].withDefaultValue(0.0))({ 
>> (agg, rec) =>
>>   val (time, (user, amount)) = rec
>>   agg.updated(user, agg(user) + amount)
>> }, { (lhs, rhs) =>
>>   lhs.keys.foldLeft(rhs) { (combined, user) =>
>>     combined.updated(user, lhs(user) + rhs(user))
>>   }
>> })
>>
>> Of course, you may use mutable Map for optimized performance. One thing
>> to notice, foldByKey is a transformation, while aggregate is an action. The
>> final result of the code above is a single Map object rather than an RDD.
>> If this map can be very large (say you have billions of users), then
>> aggregate may OOM.
>>
>> On 10/17/14 12:01 AM, Michael Misiewicz wrote:
>>
>> Thanks for the suggestion! That does look really helpful, I see what you
>> mean about it being more general than fold. I think I will replace my fold
>> with aggregate - it should give me more control over the process.
>>
>>  I think the problem will still exist though - which is that I can't get
>> the correct partitioning I need. When I change my key to user_id, I lose
>> the timestamp partitioning. My problem is that I'm trying to retain a
>> parent RDD's partitioning in an RDD that no longer has the same keys as its
>> parent.
>>
>> On Thu, Oct 16, 2014 at 11:46 AM, Cheng Lian <lian.cs....@gmail.com>
>> wrote:
>>
>>>  Hi Michael,
>>>
>>> I'm not sure I fully understood your question, but I think RDD.aggregate
>>> can be helpful in your case. You can see it as a more general version of
>>> fold.
>>>
>>> Cheng
>>>
>>>
>>>
>>> On 10/16/14 11:15 PM, Michael Misiewicz wrote:
>>>
>>> Hi,
>>>
>>>  I'm working on a problem where I'd like to sum items in an RDD *in
>>> order (*approximately*)*. I am currently trying to implement this using
>>> a fold, but I'm having some issues because the sorting key of my data is
>>> not the same as the folding key for my data. I have data that looks like
>>> this:
>>>
>>>  user_id, transaction_timestamp, transaction_amount
>>>
>>>  And I'm interested in doing a foldByKey on user_id to sum transaction
>>> amounts - taking care to note approximately when a user surpasses a total
>>> transaction threshold. I'm using RangePartitioner to make sure that
>>> data is ordered sequentially between partitions, and I'd also make sure
>>> that data is sorted within partitions, though I'm not sure how to do this
>>> exactly (I was going to look at the code for sortByKey to figure this
>>> out - I believe sorting in place in a mapPartitions should work). What
>>> do you think about the approach? Here's some sample code that demonstrates
>>> what I'm thinking:
>>>
>>>  def myFold(V1:Float, V2:Float) : Float = {
>>>  val partialSum = V1 + V2
>>>  if (partialSum >= 500) {
>>>  // make a note of it, do things
>>>  }
>>>  return partialSum
>>> }
>>>
>>>  val rawData = sc.textFile("hdfs://path/to/data").map{ x => // load data
>>>  l = x.split()
>>>  (l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long,
>>> transaction_timestamp:long, transaction_amount:float
>>>  }
>>>  val keyByTimestamp = rawData.map(x=> (x._2, (x._1, x._3))) //
>>> rearrange to make timestamp the key (for sorting), convert to PairRDD
>>> val sortedByTimestamp = keyByTimestamp.sortByKey()
>>> val partitionedByTimestamp = sortedByTimestamp.partitionBy(
>>>  new org.apache.spark.RangePartitioner(partitions=500,
>>> rdd=sortedByTimestamp)).persist()
>>> // By this point, the RDD should be sorted and partitioned according to
>>> the timestamp. However, I need to now make user_id the key,
>>> // because the output must be per user. At this point, since I change
>>> the keys of the PairRDD, I understand that I lose the partitioning
>>> // the consequence of this is that I can no longer be sure in my fold
>>> function that the ordering is retained.
>>>
>>>  val keyByUser = partitionedByTimestamp.map(x => (x._2._1, x._2._2))
>>> val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold)
>>> finalResult.saveAsTextFile("hdfs://...")
>>>
>>>  The problem as you'd expect takes place in the folding function, after
>>> I've re-arranged my RDD to no longer be keyed by timestamp (when I produce
>>> keyByUser, I lose the correct partitioning). As I've read in the
>>> documentation, partitioning is not preserved when keys are changed (which
>>> makes sense).
>>>
>>>  Reading this thread:
>>> https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4 it
>>> appears that one possible solution might be to subclass RDD (à la
>>> MappedValuesRDD) to define my own RDD that retains the partitions of
>>> its parent. This seems simple enough, but I've never done anything like
>>> that before, but I'm not sure where to start. I'm also willing to write my
>>> own custom partitioner class, but it appears that the getPartition
>>> method only accepts a "key" argument - and since the value I need to
>>> partition on in the final step (the timestamp) would be in the Value, my
>>> partitioner class doesn't have the data it needs to make the right
>>> decision. I cannot have timestamp in my key.
>>>
>>>  Alternatively, has anyone else encountered a problem like this (i.e.
>>> an approximately ordered sum) and did they find a good solution? Does my
>>> approach of subclassing RDD make sense? Would there be some way to
>>> finagle a custom partitioner into making this work? Perhaps this might be a
>>> job for some other tool, like spark streaming?
>>>
>>>  Thanks,
>>> Michael
>>>
>>>
>>>
>>   ​
>>
>
>    ​
>

Reply via email to