To add onto the discussion about memory working space, 0.9 introduced the ability to spill data within a task to disk, and in 1.0 we’re also changing the interface to allow spilling data within the same *group* to disk (e.g. when you do groupBy and get a key with lots of values). The main reason these weren’t there was that for a lot of workloads (everything except the same key having lots of values), simply launching more reduce tasks was also a good solution, because it results in an external sort across the cluster similar to what would happen within a task.
Overall, expect to see more work to both explain how things execute (http://spark.incubator.apache.org/docs/latest/tuning.html is one example, the monitoring UI is another) and try to make things require no configuration out of the box. We’re doing a lot of this based on user feedback, so that’s definitely appreciated. Matei On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov <dlie...@gmail.com> wrote: > On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <and...@andrewash.com> wrote: > The biggest issue I've come across is that the cluster is somewhat unstable > when under memory pressure. Meaning that if you attempt to persist an RDD > that's too big for memory, even with MEMORY_AND_DISK, you'll often still get > OOMs. I had to carefully modify some of the space tuning parameters and GC > settings to get some jobs to even finish. > > The other issue I've observed is if you group on a key that is highly skewed, > with a few massively-common keys and a long tail of rare keys, the one > massive key can be too big for a single machine and again cause OOMs. > > My take on it -- Spark doesn't believe in sort-and-spill things to enable > super long groups, and IMO for a good reason. Here are my thoughts: > > (1) in my work i don't need "sort" in 99% of the cases, i only need "group" > which absolutely doesn't need the spill which makes things slow down to a > crawl. > (2) if that's an aggregate (such as group count), use combine(), not > groupByKey -- this will do tons of good on memory use. > (3) if you really need groups that don't fit into memory, that is always > because you want to do something that is other than aggregation, with them. > E,g build an index of that grouped data. we actually had a case just like > that. In this case your friend is really not groupBy, but rather PartitionBy. > I.e. what happens there you build a quick count sketch, perhaps on > downsampled data, to figure which keys have sufficiently "big" count -- and > then you build a partitioner that redirects large groups to a dedicated > map(). assuming this map doesn't try to load things in memory but rather do > something like streaming BTree build, that should be fine. In certain > cituations such processing may require splitting super large group even into > smaller sub groups (e.g. partitioned BTree structure), at which point you > should be fine even from uniform load point of view. It takes a little of > jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise > do this all for you in the groupBy contract. > > > > I'm hopeful that off-heap caching (Tachyon) could fix some of these issues. > > Just my personal experience, but I've observed significant improvements in > stability since even the 0.7.x days, so I'm confident that things will > continue to get better as long as people report what they're seeing so it can > get fixed. > > Andrew > > > On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <alex.boisv...@gmail.com> > wrote: > I'll provide answers from our own experience at Bizo. We've been using Spark > for 1+ year now and have found it generally better than previous approaches > (Hadoop + Hive mostly). > > > > On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth > <andras.nem...@lynxanalytics.com> wrote: > I. Is it too much magic? Lots of things "just work right" in Spark and it's > extremely convenient and efficient when it indeed works. But should we be > worried that customization is hard if the built in behavior is not quite > right for us? Are we to expect hard to track down issues originating from the > black box behind the magic? > > I think is goes back to understanding Spark's architecture, its design > constraints and the problems it explicitly set out to address. If the > solution to your problems can be easily formulated in terms of the map/reduce > model, then it's a good choice. You'll want your "customizations" to go with > (not against) the grain of the architecture. > > II. Is it mature enough? E.g. we've created a pull request which fixes a > problem that we were very surprised no one ever stumbled upon before. So > that's why I'm asking: is Spark being already used in professional settings? > Can one already trust it being reasonably bug free and reliable? > > There are lots of ways to use Spark; and not all of the features are > necessarily at the same level of maturity. For instance, we put all the > jars on the main classpath so we've never run into the issue your pull > request addresses. > > We definitely use and rely on Spark on a professional basis. We have 5+ > spark jobs running nightly on Amazon's EMR, slicing through GBs of data. > Once we got them working with the proper configuration settings, they have > been running reliability since. > > I would characterize our use of Spark as a "better Hadoop", in the sense that > we use it for batch processing only, no streaming yet. We're happy it > performs better than Hadoop but we don't require/rely on its memory caching > features. In fact, for most of our jobs it would simplify our lives if Spark > wouldn't cache so many things in memory since it would make > configuration/tuning a lot simpler and jobs would run successfully on the > first try instead of having to tweak things (# of partitions and such). > > So, to the concrete issues. Sorry for the long mail, and let me know if I > should break this out into more threads or if there is some other way to have > this discussion... > > 1. Memory management > The general direction of these questions is whether it's possible to take RDD > caching related memory management more into our own hands as LRU eviction is > nice most of the time but can be very suboptimal in some of our use cases. > A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one really > wants to keep. I'm fine with going down in flames if I mark too much data > essential. > B. Memory "reflection": can you pragmatically get the memory size of a cached > rdd and memory sizes available in total/per executor? If we could do this we > could indirectly avoid automatic evictions of things we might really want to > keep in memory. > C. Evictions caused by RDD partitions on the driver. I had a setup with huge > worker memory and smallish memory on the driver JVM. To my surprise, the > system started to cache RDD partitions on the driver as well. As the driver > ran out of memory I started to see evictions while there were still plenty of > space on workers. This resulted in lengthy recomputations. Can this be > avoided somehow? > D. Broadcasts. Is it possible to get rid of a broadcast manually, without > waiting for the LRU eviction taking care of it? Can you tell the size of a > broadcast programmatically? > > > 2. Akka lost connections > We have quite often experienced lost executors due to akka exceptions - > mostly connection lost or similar. It seems to happen when an executor gets > extremely busy with some CPU intensive work. Our hypothesis is that akka > network threads get starved and the executor fails to respond within timeout > limits. Is this plausible? If yes, what can we do with it? > > We've seen these as well. In our case, increasing the akka timeouts and > framesize helped a lot. > > e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize} > > > In general, these are scary errors in the sense that they come from the very > core of the framework and it's hard to link it to something we do in our own > code, and thus hard to find a fix. So a question more for the community: how > often do you end up scratching your head about cases where spark > magic doesn't work perfectly? > > For us, this happens most often for jobs processing TBs of data (instead of > GBs)... which is frustrating of course because these jobs cost a lot more in > $$$ + time to run/debug/diagnose than smaller jobs. > > It means we have to comb the logs to understand what happened, interpret > stack traces, dump memory / object allocations, read Spark source to > formulate hypothesis about what went wrong and then trial + error to get to a > configuration that works. Again, if Spark had better defaults and more > conservative execution model (rely less on in-memory caching of RDDs and > associated metadata, keepings large communication buffers on the heap, etc.), > it would definitely simplify our lives. > > (Though I recognize that others might use Spark very differently and that > these defaults and conservative behavior might not please everybody.) > > Hopefully this is the kind of feedback you were looking for... > > > 3. Recalculation of cached rdds > I see the following scenario happening. I load two RDDs A,B from disk, cache > them and then do some jobs on them, at the very least a count on each. After > these jobs are done I see on the storage panel that 100% of these RDDs are > cached in memory. > > Then I create a third RDD C which is created by multiple joins and maps from > A and B, also cache it and start a job on C. When I do this I still see A and > B completely cached and also see C slowly getting more and more cached. This > is all fine and good, but in the meanwhile I see stages running on the UI > that point to code which is used to load A and B. How is this possible? Am I > misunderstanding how cached RDDs should behave? > > And again the general question - how can one debug such issues? > > 4. Shuffle on disk > Is it true - I couldn't find it in official docs, but did see this mentioned > in various threads - that shuffle _always_ hits disk? (Disregarding OS > caches.) Why is this the case? Are you planning to add a function to do > shuffle in memory or are there some intrinsic reasons for this to be > impossible? > > > Sorry again for the giant mail, and thanks for any insights! > > Andras > > > > >