Thanks for your replies. My answers: > You can try to increase the number of partitions to get ride of the OOM errors. Also try to use reduceByKey instead of groupByKey.
If my operation were associative, I might be able to use fold, and if the operation were associative+commutative, then I could use reduceByKey or aggregate. But determining intervals of time of on/off states is dependent on order (not commutative), and computing each interval in full requires seeing the entire set of ordered states together (not associative => can't split up). > You can try to reduce the number of containers in order to increase their memory. Yeah, I initially ran this with 100 executors with 2g each. I then ran it again with 50 executors with 4g each, but didn't get much further before ending in the same error. (With 100*2g, the job only got through 1/6 of the work before OOM; with 50*4g, it completed 1/5 of the work before OOM.) As a quick set of numbers, the record/case class I use has a 16-char-string, a long, and 5 booleans. I have 8 million elements of that record in the RDD, and a key space size between 1000 and 1500. Even if we assume that raw data balloons up by a factor of 8 when represented as Java objs in the JVM, we get 1800 MB of total data cluster-wide, or only 1.5 MB per key if reasonably assuming a uniform distribution per key. More on this below. * > Obvioulsy one of your key value pair is two large. You can try to increase spark.shuffle.memoryFraction. I currently have spark.storage.memoryFraction set to 0.1, and my spark.shuffle.memoryFraction is taking the default 0.2. If I increase spark.shuffle.memoryFraction, would that affect my value of spark.storage.memoryFraction? > Are you sure you can't : > partition your data by user/time-interval => process with a mapPartition => partition by user => process with a mapPartition > Not efficient but if your operation decrease the amount of data per user it may work. I also had already tried this too: my dataset has a full year of data. Instead of grouping just by user, I also tried created windows of 3 consecutive days (think: days.sliding(3,1)), and then grouping by (user, window). Within each new grouping, I can compute the intervals within the 3-day window, with the reasonable caveat that I'm not allowing/guaranteeing an interval to last longer than 48 hrs/24 hrs. But even this approach ran out of memory. * As a point of comparison, I rewrote the logic of the Spark job mentioned above as a Scalding job 2 days ago. I reused the same logic code from my Spark job in the Scalding job, and decided to go ahead and pull the 'ValueStream' (iterator) of grouped values per key that Scalding gives from a groupBy entirely into memory as a Vector. The Scalding job finished relatively quickly, and the runtime was in the same ballpark as that of similar Spark jobs I've written for this same dataset where I could use .reduceByKey instead of .groupByKey Thanks, Elango On Mon, Sep 28, 2015 at 9:52 AM, Alexis Gillain < alexis.gill...@googlemail.com> wrote: > "Note: As currently implemented, groupByKey must be able to hold all the > key-value pairs for any key in memory. If a key has too many values, it > can result in an [[OutOfMemoryError]]." > > Obvioulsy one of your key value pair is two large. You can try to increase > spark.shuffle.memoryFraction. > > Are you sure you can't : > partition your data by user/time-interval => process with a mapPartition > => partition by user => process with a mapPartition > Not efficient but if your operation decrease the amount of data per user > it may work. > > > 2015-09-29 0:17 GMT+08:00 Fabien Martin <fabien.marti...@gmail.com>: > >> You can try to reduce the number of containers in order to increase their >> memory. >> >> 2015-09-28 9:35 GMT+02:00 Akhil Das <ak...@sigmoidanalytics.com>: >> >>> You can try to increase the number of partitions to get ride of the OOM >>> errors. Also try to use reduceByKey instead of groupByKey. >>> >>> Thanks >>> Best Regards >>> >>> On Sat, Sep 26, 2015 at 1:05 AM, Elango Cheran <elango.che...@gmail.com> >>> wrote: >>> >>>> Hi everyone, >>>> I have an RDD of the format (user: String, timestamp: Long, state: >>>> Boolean). My task invovles converting the states, where on/off is >>>> represented as true/false, into intervals of 'on' of the format (beginTs: >>>> Long, endTs: Long). So this task requires me, per user, to line up all of >>>> the on/off states so that I can compute when it is on, since the >>>> calculation is neither associative nor commutative. >>>> >>>> So there are 2 main operations that I'm trying to accomplish together: >>>> 1. group by each user >>>> 2. sort by time -- keep all of the states in sorted order by time >>>> >>>> The main code inside the method that does grouping by user and sorting >>>> by time looks sort of looks like this: >>>> >>>> >>>> // RDD starts off in format (user, ts, state) of type RDD[(String, >>>> Long, Boolean)] >>>> val grouped = keyedStatesRDD.groupByKey >>>> // after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of >>>> type RDD[(String, Iterable(Long, Boolean))] >>>> // take the sequence of (ts, state) per user, sort, get intervals >>>> val groupedIntervals = grouped.mapValues( >>>> states => { >>>> val sortedStates = states.toSeq.sortBy(_._1) >>>> val intervals = DFUtil.statesToIntervals(sortedStates) >>>> val intervalsList = bucketDurations.map{case(k,v) => >>>> (k,v)}(collection.breakOut).sortBy(_._1) >>>> intervalsList >>>> } >>>> ) >>>> // after .mapValues, new format for RDD is (user, seq-of-(startTime, >>>> endTime)) of type RDD[(String, IndexedSeq(Long, Long))] >>>> >>>> >>>> When I run my Spark job with 1 day's worth of data, the job completes >>>> successfully. When I run with 1 month's or 1 year's worth of data, that >>>> method is where my Spark job consistently crashes with get >>>> OutOfMemoryErrors. I need to run on the full year's worth of data. >>>> >>>> My suspicion is that the groupByKey is the problem (it's pulling all of >>>> the matching data values into a single executor's heap as a plain Scala >>>> Iterable). But alternatives of doing sortByKey on the RDD first before >>>> grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't >>>> quite apply in my scenario because my operation is not associative (can't >>>> combine per-partition results) and I still need to group by users before >>>> doing a foldLeft. >>>> >>>> I've definitely thought about the issue before and come across users >>>> with issues that are similar but not exactly the same: >>>> >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html >>>> >>>> http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E >>>> >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html >>>> >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html >>>> >>>> And this Jira seems relevant too: >>>> https://issues.apache.org/jira/browse/SPARK-3655 >>>> >>>> The amount of memory that I'm using is 2g per executor, and I can't go >>>> higher than that because each executor gets a YARN container from nodes >>>> with 16 GB of RAM and 5 YARN containers allowed per node. >>>> >>>> So I'd like to know if there's an easy solution to executing my logic >>>> on my full dataset in Spark. >>>> >>>> Thanks! >>>> >>>> -- Elango >>>> >>> >>> >> > > > -- > Alexis GILLAIN >