RDD.aggregate doesn’t require the RDD elements to be pairs, so you don’t need to use user_id to be the key or the RDD. For example, you can use an empty Map as the zero value of the aggregation. The key of the Map is the user_id you extracted from each tuple, and the value is the aggregated value.

|keyByTimestamp.aggregate(Map.empty[String,Float].withDefaultValue(0.0))({ (agg, 
rec) =>
  val  (time, (user, amount)) = rec
  agg.updated(user, agg(user) + amount)
}, { (lhs, rhs) =>
  lhs.keys.foldLeft(rhs) { (combined, user) =>
    combined.updated(user, lhs(user) + rhs(user))
  }
})
|

Of course, you may use mutable Map for optimized performance. One thing to notice, foldByKey is a transformation, while aggregate is an action. The final result of the code above is a single Map object rather than an RDD. If this map can be very large (say you have billions of users), then aggregate may OOM.

On 10/17/14 12:01 AM, Michael Misiewicz wrote:

Thanks for the suggestion! That does look really helpful, I see what you mean about it being more general than fold. I think I will replace my fold with aggregate - it should give me more control over the process.

I think the problem will still exist though - which is that I can't get the correct partitioning I need. When I change my key to user_id, I lose the timestamp partitioning. My problem is that I'm trying to retain a parent RDD's partitioning in an RDD that no longer has the same keys as its parent.

On Thu, Oct 16, 2014 at 11:46 AM, Cheng Lian <lian.cs....@gmail.com <mailto:lian.cs....@gmail.com>> wrote:

    Hi Michael,

    I'm not sure I fully understood your question, but I think
    RDD.aggregate can be helpful in your case. You can see it as a
    more general version of fold.

    Cheng



    On 10/16/14 11:15 PM, Michael Misiewicz wrote:
    Hi,

    I'm working on a problem where I'd like to sum items in an RDD
    /in order (/approximately/)/. I am currently trying to implement
    this using a fold, but I'm having some issues because the sorting
    key of my data is not the same as the folding key for my data. I
    have data that looks like this:

    user_id, transaction_timestamp, transaction_amount

    And I'm interested in doing a foldByKey on user_id to sum
    transaction amounts - taking care to note approximately when a
    user surpasses a total transaction threshold. I'm using
    RangePartitioner to make sure that data is ordered sequentially
    between partitions, and I'd also make sure that data is sorted
    within partitions, though I'm not sure how to do this exactly (I
    was going to look at the code for sortByKey to figure this out -
    I believe sorting in place in a mapPartitions should work). What
    do you think about the approach? Here's some sample code that
    demonstrates what I'm thinking:

    def myFold(V1:Float, V2:Float) : Float = {
    val partialSum = V1 + V2
    if (partialSum >= 500) {
    // make a note of it, do things
    }
    return partialSum
    }

    val rawData = sc.textFile("hdfs://path/to/data").map{ x => //
    load data
    l = x.split()
    (l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long,
    transaction_timestamp:long, transaction_amount:float
    }
    val keyByTimestamp = rawData.map(x=> (x._2, (x._1, x._3))) //
    rearrange to make timestamp the key (for sorting), convert to PairRDD
    val sortedByTimestamp = keyByTimestamp.sortByKey()
    val partitionedByTimestamp = sortedByTimestamp.partitionBy(
    new org.apache.spark.RangePartitioner(partitions=500,
    rdd=sortedByTimestamp)).persist()
    // By this point, the RDD should be sorted and partitioned
    according to the timestamp. However, I need to now make user_id
    the key,
    // because the output must be per user. At this point, since I
    change the keys of the PairRDD, I understand that I lose the
    partitioning
    // the consequence of this is that I can no longer be sure in my
    fold function that the ordering is retained.

    val keyByUser = partitionedByTimestamp.map(x => (x._2._1, x._2._2))
    val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold)
    finalResult.saveAsTextFile("hdfs://...")

    The problem as you'd expect takes place in the folding function,
    after I've re-arranged my RDD to no longer be keyed by timestamp
    (when I produce keyByUser, I lose the correct partitioning). As
    I've read in the documentation, partitioning is not preserved
    when keys are changed (which makes sense).

    Reading this thread:
    https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4
    <https://groups.google.com/forum/#%21topic/spark-users/Fx7DNtWiSx4>
    it appears that one possible solution might be to subclass RDD (à
    la MappedValuesRDD) to define my own RDDthat retains the
    partitions of its parent. This seems simple enough, but I've
    never done anything like that before, but I'm not sure where to
    start. I'm also willing to write my own custom partitioner class,
    but it appears that the getPartitionmethod only accepts a "key"
    argument - and since the value I need to partition on in the
    final step (the timestamp) would be in the Value, my
    partitioner class doesn't have the data it needs to make the
    right decision. I cannot have timestamp in my key.

    Alternatively, has anyone else encountered a problem like this
    (i.e. an approximately ordered sum) and did they find a good
    solution? Does my approach of subclassing RDDmake sense? Would
    there be some way to finagle a custom partitioner into making
    this work? Perhaps this might be a job for some other tool, like
    spark streaming?

    Thanks,
    Michael


Reply via email to