Hm, a little confused here. What exactly the ordering do you expect? It seems that you want all the elements in the RDD to be sorted first by timestamp and then by user_id. If this is true, then you can simply do this:

|rawData.map {case  (time, user, amount) => (time, user) -> amount 
}.sortByKey.aggregate(…)
|

On 10/17/14 10:44 PM, Michael Misiewicz wrote:

Thank you for sharing this Cheng! This is fantastic. I was able to implement it and it seems like it's working quite well. I'm definitely on the right track now!

I'm still having a small problem with the rows inside each partition being out of order - but I suspect this is because in the code currently, I sortByKey, then use RangePartitioner (which I think does not maintain row order within each partition - due to the shuffle in RangePartitioner). I suspect I can work around this by doing operations in these order:

- RangePartitioner
- mapValues to sort each partition in memory, maintaining partitioning
- aggregate

Michael

On Thu, Oct 16, 2014 at 12:35 PM, Cheng Lian <lian.cs....@gmail.com <mailto:lian.cs....@gmail.com>> wrote:

    RDD.aggregate doesn’t require the RDD elements to be pairs, so you
    don’t need to use user_id to be the key or the RDD. For example,
    you can use an empty Map as the zero value of the aggregation. The
    key of the Map is the user_id you extracted from each tuple, and
    the value is the aggregated value.

    |keyByTimestamp.aggregate(Map.empty[String,Float].withDefaultValue(0.0))({ 
(agg, rec) =>
       val  (time, (user, amount)) = rec
       agg.updated(user, agg(user) + amount)
    }, { (lhs, rhs) =>
       lhs.keys.foldLeft(rhs) { (combined, user) =>
         combined.updated(user, lhs(user) + rhs(user))
       }
    })
    |

    Of course, you may use mutable Map for optimized performance. One
    thing to notice, foldByKey is a transformation, while aggregate is
    an action. The final result of the code above is a single Map
    object rather than an RDD. If this map can be very large (say you
    have billions of users), then aggregate may OOM.

    On 10/17/14 12:01 AM, Michael Misiewicz wrote:

    Thanks for the suggestion! That does look really helpful, I see
    what you mean about it being more general than fold. I think I
    will replace my fold with aggregate - it should give me more
    control over the process.

    I think the problem will still exist though - which is that I
    can't get the correct partitioning I need. When I change my key
    to user_id, I lose the timestamp partitioning. My problem is that
    I'm trying to retain a parent RDD's partitioning in an RDD that
    no longer has the same keys as its parent.

    On Thu, Oct 16, 2014 at 11:46 AM, Cheng Lian
    <lian.cs....@gmail.com <mailto:lian.cs....@gmail.com>> wrote:

        Hi Michael,

        I'm not sure I fully understood your question, but I think
        RDD.aggregate can be helpful in your case. You can see it as
        a more general version of fold.

        Cheng



        On 10/16/14 11:15 PM, Michael Misiewicz wrote:
        Hi,

        I'm working on a problem where I'd like to sum items in an
        RDD /in order (/approximately/)/. I am currently trying to
        implement this using a fold, but I'm having some issues
        because the sorting key of my data is not the same as the
        folding key for my data. I have data that looks like this:

        user_id, transaction_timestamp, transaction_amount

        And I'm interested in doing a foldByKey on user_id to sum
        transaction amounts - taking care to note approximately when
        a user surpasses a total transaction threshold. I'm using
        RangePartitioner to make sure that data is ordered
        sequentially between partitions, and I'd also make sure that
        data is sorted within partitions, though I'm not sure how to
        do this exactly (I was going to look at the code for
        sortByKey to figure this out - I believe sorting in place in
        a mapPartitions should work). What do you think about the
        approach? Here's some sample code that demonstrates what I'm
        thinking:

        def myFold(V1:Float, V2:Float) : Float = {
        val partialSum = V1 + V2
        if (partialSum >= 500) {
        // make a note of it, do things
        }
        return partialSum
        }

        val rawData = sc.textFile("hdfs://path/to/data").map{ x =>
        // load data
        l = x.split()
        (l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long,
        transaction_timestamp:long, transaction_amount:float
        }
        val keyByTimestamp = rawData.map(x=> (x._2, (x._1, x._3)))
        // rearrange to make timestamp the key (for sorting),
        convert to PairRDD
        val sortedByTimestamp = keyByTimestamp.sortByKey()
        val partitionedByTimestamp = sortedByTimestamp.partitionBy(
        new org.apache.spark.RangePartitioner(partitions=500,
        rdd=sortedByTimestamp)).persist()
        // By this point, the RDD should be sorted and partitioned
        according to the timestamp. However, I need to now make
        user_id the key,
        // because the output must be per user. At this point, since
        I change the keys of the PairRDD, I understand that I lose
        the partitioning
        // the consequence of this is that I can no longer be sure
        in my fold function that the ordering is retained.

        val keyByUser = partitionedByTimestamp.map(x => (x._2._1,
        x._2._2))
        val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold)
        finalResult.saveAsTextFile("hdfs://...")

        The problem as you'd expect takes place in the folding
        function, after I've re-arranged my RDD to no longer be
        keyed by timestamp (when I produce keyByUser, I lose the
        correct partitioning). As I've read in the
        documentation, partitioning is not preserved when keys are
        changed (which makes sense).

        Reading this thread:
        https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4
        <https://groups.google.com/forum/#%21topic/spark-users/Fx7DNtWiSx4>
        it appears that one possible solution might be to subclass
        RDD (à la MappedValuesRDD) to define my own RDDthat retains
        the partitions of its parent. This seems simple enough, but
        I've never done anything like that before, but I'm not sure
        where to start. I'm also willing to write my own custom
        partitioner class, but it appears that the
        getPartitionmethod only accepts a "key" argument - and since
        the value I need to partition on in the final step (the
        timestamp) would be in the Value, my partitioner class
        doesn't have the data it needs to make the right decision. I
        cannot have timestamp in my key.

        Alternatively, has anyone else encountered a problem like
        this (i.e. an approximately ordered sum) and did they find a
        good solution? Does my approach of subclassing RDDmake
        sense? Would there be some way to finagle a custom
        partitioner into making this work? Perhaps this might be a
        job for some other tool, like spark streaming?

        Thanks,
        Michael


Reply via email to