To add onto the discussion about memory working space, 0.9 introduced the 
ability to spill data within a task to disk, and in 1.0 we’re also changing the 
interface to allow spilling data within the same *group* to disk (e.g. when you 
do groupBy and get a key with lots of values). The main reason these weren’t 
there was that for a lot of workloads (everything except the same key having 
lots of values), simply launching more reduce tasks was also a good solution, 
because it results in an external sort across the cluster similar to what would 
happen within a task.

Overall, expect to see more work to both explain how things execute 
(http://spark.incubator.apache.org/docs/latest/tuning.html is one example, the 
monitoring UI is another) and try to make things require no configuration out 
of the box. We’re doing a lot of this based on user feedback, so that’s 
definitely appreciated.

Matei

On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov <dlie...@gmail.com> wrote:

> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <and...@andrewash.com> wrote:
> The biggest issue I've come across is that the cluster is somewhat unstable 
> when under memory pressure.  Meaning that if you attempt to persist an RDD 
> that's too big for memory, even with MEMORY_AND_DISK, you'll often still get 
> OOMs.  I had to carefully modify some of the space tuning parameters and GC 
> settings to get some jobs to even finish.
> 
> The other issue I've observed is if you group on a key that is highly skewed, 
> with a few massively-common keys and a long tail of rare keys, the one 
> massive key can be too big for a single machine and again cause OOMs.
> 
> My take on it -- Spark doesn't believe in sort-and-spill things to enable 
> super long groups, and IMO for a good reason. Here are my thoughts:
> 
> (1) in my work i don't need "sort" in 99% of the cases, i only need "group" 
> which absolutely doesn't need the spill which makes things slow down to a 
> crawl. 
> (2) if that's an aggregate (such as group count), use combine(), not 
> groupByKey -- this will do tons of good on memory use.
> (3) if you really need groups that don't fit into memory, that is always 
> because you want to do something that is other than aggregation, with them. 
> E,g build an index of that grouped data. we actually had a case just like 
> that. In this case your friend is really not groupBy, but rather PartitionBy. 
> I.e. what happens there you build a quick count sketch, perhaps on 
> downsampled data, to figure which keys have sufficiently "big" count -- and 
> then you build a partitioner that redirects large groups to a dedicated 
> map(). assuming this map doesn't try to load things in memory but rather do 
> something like streaming BTree build, that should be fine. In certain 
> cituations such processing may require splitting super large group even into 
> smaller sub groups (e.g. partitioned BTree structure), at which point you 
> should be fine even from uniform load point of view. It takes a little of 
> jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise 
> do this all for you in the groupBy contract.
> 
>  
> 
> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
> 
> Just my personal experience, but I've observed significant improvements in 
> stability since even the 0.7.x days, so I'm confident that things will 
> continue to get better as long as people report what they're seeing so it can 
> get fixed.
> 
> Andrew
> 
> 
> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <alex.boisv...@gmail.com> 
> wrote:
> I'll provide answers from our own experience at Bizo.  We've been using Spark 
> for 1+ year now and have found it generally better than previous approaches 
> (Hadoop + Hive mostly).
> 
> 
> 
> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth 
> <andras.nem...@lynxanalytics.com> wrote:
> I. Is it too much magic? Lots of things "just work right" in Spark and it's 
> extremely convenient and efficient when it indeed works. But should we be 
> worried that customization is hard if the built in behavior is not quite 
> right for us? Are we to expect hard to track down issues originating from the 
> black box behind the magic?
> 
> I think is goes back to understanding Spark's architecture, its design 
> constraints and the problems it explicitly set out to address.   If the 
> solution to your problems can be easily formulated in terms of the map/reduce 
> model, then it's a good choice.  You'll want your "customizations" to go with 
> (not against) the grain of the architecture.
>  
> II. Is it mature enough? E.g. we've created a pull request which fixes a 
> problem that we were very surprised no one ever stumbled upon before. So 
> that's why I'm asking: is Spark being already used in professional settings? 
> Can one already trust it being reasonably bug free and reliable?
> 
> There are lots of ways to use Spark; and not all of the features are 
> necessarily at the same level of maturity.   For instance, we put all the 
> jars on the main classpath so we've never run into the issue your pull 
> request addresses.
> 
> We definitely use and rely on Spark on a professional basis.  We have 5+ 
> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.   
> Once we got them working with the proper configuration settings, they have 
> been running reliability since.
> 
> I would characterize our use of Spark as a "better Hadoop", in the sense that 
> we use it for batch processing only, no streaming yet.   We're happy it 
> performs better than Hadoop but we don't require/rely on its memory caching 
> features.  In fact, for most of our jobs it would simplify our lives if Spark 
> wouldn't cache so many things in memory since it would make 
> configuration/tuning a lot simpler and jobs would run successfully on the 
> first try instead of having to tweak things (# of partitions and such).
> 
> So, to the concrete issues. Sorry for the long mail, and let me know if I 
> should break this out into more threads or if there is some other way to have 
> this discussion...
> 
> 1. Memory management
> The general direction of these questions is whether it's possible to take RDD 
> caching related memory management more into our own hands as LRU eviction is 
> nice most of the time but can be very suboptimal in some of our use cases.
> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one really 
> wants to keep. I'm fine with going down in flames if I mark too much data 
> essential.
> B. Memory "reflection": can you pragmatically get the memory size of a cached 
> rdd and memory sizes available in total/per executor? If we could do this we 
> could indirectly avoid automatic evictions of things we might really want to 
> keep in memory.
> C. Evictions caused by RDD partitions on the driver. I had a setup with huge 
> worker memory and smallish memory on the driver JVM. To my surprise, the 
> system started to cache RDD partitions on the driver as well. As the driver 
> ran out of memory I started to see evictions while there were still plenty of 
> space on workers. This resulted in lengthy recomputations. Can this be 
> avoided somehow?
> D. Broadcasts. Is it possible to get rid of a broadcast manually, without 
> waiting for the LRU eviction taking care of it? Can you tell the size of a 
> broadcast programmatically?
> 
> 
> 2. Akka lost connections
> We have quite often experienced lost executors due to akka exceptions - 
> mostly connection lost or similar. It seems to happen when an executor gets 
> extremely busy with some CPU intensive work. Our hypothesis is that akka 
> network threads get starved and the executor fails to respond within timeout 
> limits. Is this plausible? If yes, what can we do with it?
> 
> We've seen these as well.  In our case, increasing the akka timeouts and 
> framesize helped a lot.
> 
> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>  
> 
> In general, these are scary errors in the sense that they come from the very 
> core of the framework and it's hard to link it to something we do in our own 
> code, and thus hard to find a fix. So a question more for the community: how 
> often do you end up scratching your head about cases where spark 
> magic doesn't work perfectly?
> 
> For us, this happens most often for jobs processing TBs of data (instead of 
> GBs)... which is frustrating of course because these jobs cost a lot more in 
> $$$ + time to run/debug/diagnose than smaller jobs.
> 
> It means we have to comb the logs to understand what happened, interpret 
> stack traces, dump memory / object allocations, read Spark source to 
> formulate hypothesis about what went wrong and then trial + error to get to a 
> configuration that works.   Again, if Spark had better defaults and more 
> conservative execution model (rely less on in-memory caching of RDDs and 
> associated metadata, keepings large communication buffers on the heap, etc.), 
> it would definitely simplify our lives.  
> 
> (Though I recognize that others might use Spark very differently and that 
> these defaults and conservative behavior might not please everybody.)
> 
> Hopefully this is the kind of feedback you were looking for...
> 
> 
> 3. Recalculation of cached rdds
> I see the following scenario happening. I load two RDDs A,B from disk, cache 
> them and then do some jobs on them, at the very least a count on each. After 
> these jobs are done I see on the storage panel that 100% of these RDDs are 
> cached in memory.
> 
> Then I create a third RDD C which is created by multiple joins and maps from 
> A and B, also cache it and start a job on C. When I do this I still see A and 
> B completely cached and also see C slowly getting more and more cached. This 
> is all fine and good, but in the meanwhile I see stages running on the UI 
> that point to code which is used to load A and B. How is this possible? Am I 
> misunderstanding how cached RDDs should behave?
> 
> And again the general question - how can one debug such issues?
> 
> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, but did see this mentioned 
> in various threads - that shuffle _always_ hits disk? (Disregarding OS 
> caches.) Why is this the case? Are you planning to add a function to do 
> shuffle in memory or are there some intrinsic reasons for this to be 
> impossible?
> 
> 
> Sorry again for the giant mail, and thanks for any insights!
> 
> Andras
> 
> 
> 
> 
> 

Reply via email to