Excellent, thanks you.
On Fri, Apr 11, 2014 at 12:09 PM, Matei Zaharia <matei.zaha...@gmail.com>wrote: > It's not a new API, it just happens underneath the current one if you have > spark.shuffle.spill set to true (which it is by default). Take a look at > the config settings that mention "spill" in > http://spark.incubator.apache.org/docs/latest/configuration.html. > > Matei > > On Apr 11, 2014, at 7:02 AM, Surendranauth Hiraman <suren.hira...@velos.io> > wrote: > > Matei, > > Where is the functionality in 0.9 to spill data within a task (separately > from persist)? My apologies if this is something obvious but I don't see it > in the api docs. > > -Suren > > > > On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia <matei.zaha...@gmail.com>wrote: > >> To add onto the discussion about memory working space, 0.9 introduced the >> ability to spill data within a task to disk, and in 1.0 we're also changing >> the interface to allow spilling data within the same *group* to disk (e.g. >> when you do groupBy and get a key with lots of values). The main reason >> these weren't there was that for a lot of workloads (everything except the >> same key having lots of values), simply launching more reduce tasks was >> also a good solution, because it results in an external sort across the >> cluster similar to what would happen within a task. >> >> Overall, expect to see more work to both explain how things execute ( >> http://spark.incubator.apache.org/docs/latest/tuning.html is one >> example, the monitoring UI is another) and try to make things require no >> configuration out of the box. We're doing a lot of this based on user >> feedback, so that's definitely appreciated. >> >> Matei >> >> On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov <dlie...@gmail.com> wrote: >> >> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <and...@andrewash.com> wrote: >> >>> The biggest issue I've come across is that the cluster is somewhat >>> unstable when under memory pressure. Meaning that if you attempt to >>> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll >>> often still get OOMs. I had to carefully modify some of the space tuning >>> parameters and GC settings to get some jobs to even finish. >>> >>> The other issue I've observed is if you group on a key that is highly >>> skewed, with a few massively-common keys and a long tail of rare keys, the >>> one massive key can be too big for a single machine and again cause OOMs. >>> >> >> My take on it -- Spark doesn't believe in sort-and-spill things to enable >> super long groups, and IMO for a good reason. Here are my thoughts: >> >> (1) in my work i don't need "sort" in 99% of the cases, i only need >> "group" which absolutely doesn't need the spill which makes things slow >> down to a crawl. >> (2) if that's an aggregate (such as group count), use combine(), not >> groupByKey -- this will do tons of good on memory use. >> (3) if you really need groups that don't fit into memory, that is always >> because you want to do something that is other than aggregation, with them. >> E,g build an index of that grouped data. we actually had a case just like >> that. In this case your friend is really not groupBy, but rather >> PartitionBy. I.e. what happens there you build a quick count sketch, >> perhaps on downsampled data, to figure which keys have sufficiently "big" >> count -- and then you build a partitioner that redirects large groups to a >> dedicated map(). assuming this map doesn't try to load things in memory but >> rather do something like streaming BTree build, that should be fine. In >> certain cituations such processing may require splitting super large group >> even into smaller sub groups (e.g. partitioned BTree structure), at which >> point you should be fine even from uniform load point of view. It takes a >> little of jiu-jitsu to do it all, but it is not Spark's fault here, it did >> not promise do this all for you in the groupBy contract. >> >> >> >>> >>> I'm hopeful that off-heap caching (Tachyon) could fix some of these >>> issues. >>> >>> Just my personal experience, but I've observed significant improvements >>> in stability since even the 0.7.x days, so I'm confident that things will >>> continue to get better as long as people report what they're seeing so it >>> can get fixed. >>> >>> Andrew >>> >>> >>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert >>> <alex.boisv...@gmail.com>wrote: >>> >>>> I'll provide answers from our own experience at Bizo. We've been using >>>> Spark for 1+ year now and have found it generally better than previous >>>> approaches (Hadoop + Hive mostly). >>>> >>>> >>>> >>>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth < >>>> andras.nem...@lynxanalytics.com> wrote: >>>> >>>>> I. Is it too much magic? Lots of things "just work right" in Spark and >>>>> it's extremely convenient and efficient when it indeed works. But should >>>>> we >>>>> be worried that customization is hard if the built in behavior is not >>>>> quite >>>>> right for us? Are we to expect hard to track down issues originating from >>>>> the black box behind the magic? >>>>> >>>> >>>> I think is goes back to understanding Spark's architecture, its design >>>> constraints and the problems it explicitly set out to address. If the >>>> solution to your problems can be easily formulated in terms of the >>>> map/reduce model, then it's a good choice. You'll want your >>>> "customizations" to go with (not against) the grain of the architecture. >>>> >>>> >>>>> II. Is it mature enough? E.g. we've created a pull >>>>> request<https://github.com/apache/spark/pull/181>which fixes a problem >>>>> that we were very surprised no one ever stumbled upon >>>>> before. So that's why I'm asking: is Spark being already used in >>>>> professional settings? Can one already trust it being reasonably bug free >>>>> and reliable? >>>>> >>>> >>>> There are lots of ways to use Spark; and not all of the features are >>>> necessarily at the same level of maturity. For instance, we put all the >>>> jars on the main classpath so we've never run into the issue your pull >>>> request addresses. >>>> >>>> We definitely use and rely on Spark on a professional basis. We have >>>> 5+ spark jobs running nightly on Amazon's EMR, slicing through GBs of >>>> data. Once we got them working with the proper configuration settings, >>>> they have been running reliability since. >>>> >>>> I would characterize our use of Spark as a "better Hadoop", in the >>>> sense that we use it for batch processing only, no streaming yet. We're >>>> happy it performs better than Hadoop but we don't require/rely on its >>>> memory caching features. In fact, for most of our jobs it would simplify >>>> our lives if Spark wouldn't cache so many things in memory since it would >>>> make configuration/tuning a lot simpler and jobs would run successfully on >>>> the first try instead of having to tweak things (# of partitions and such). >>>> >>>> So, to the concrete issues. Sorry for the long mail, and let me know if >>>>> I should break this out into more threads or if there is some other way to >>>>> have this discussion... >>>>> >>>>> 1. Memory management >>>>> The general direction of these questions is whether it's possible to >>>>> take RDD caching related memory management more into our own hands as LRU >>>>> eviction is nice most of the time but can be very suboptimal in some of >>>>> our >>>>> use cases. >>>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one >>>>> really wants to keep. I'm fine with going down in flames if I mark too >>>>> much >>>>> data essential. >>>>> B. Memory "reflection": can you pragmatically get the memory size of a >>>>> cached rdd and memory sizes available in total/per executor? If we could >>>>> do >>>>> this we could indirectly avoid automatic evictions of things we might >>>>> really want to keep in memory. >>>>> C. Evictions caused by RDD partitions on the driver. I had a setup >>>>> with huge worker memory and smallish memory on the driver JVM. To my >>>>> surprise, the system started to cache RDD partitions on the driver as >>>>> well. >>>>> As the driver ran out of memory I started to see evictions while there >>>>> were >>>>> still plenty of space on workers. This resulted in lengthy recomputations. >>>>> Can this be avoided somehow? >>>>> D. Broadcasts. Is it possible to get rid of a broadcast manually, >>>>> without waiting for the LRU eviction taking care of it? Can you tell the >>>>> size of a broadcast programmatically? >>>>> >>>>> >>>>> 2. Akka lost connections >>>>> We have quite often experienced lost executors due to akka exceptions >>>>> - mostly connection lost or similar. It seems to happen when an executor >>>>> gets extremely busy with some CPU intensive work. Our hypothesis is that >>>>> akka network threads get starved and the executor fails to respond within >>>>> timeout limits. Is this plausible? If yes, what can we do with it? >>>>> >>>> >>>> We've seen these as well. In our case, increasing the akka timeouts >>>> and framesize helped a lot. >>>> >>>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize} >>>> >>>> >>>>> >>>>> In general, these are scary errors in the sense that they come from >>>>> the very core of the framework and it's hard to link it to something we do >>>>> in our own code, and thus hard to find a fix. So a question more for the >>>>> community: how often do you end up scratching your head about cases where >>>>> spark >>>>> >>>> magic doesn't work perfectly? >>>>> >>>> >>>> For us, this happens most often for jobs processing TBs of data >>>> (instead of GBs)... which is frustrating of course because these jobs cost >>>> a lot more in $$$ + time to run/debug/diagnose than smaller jobs. >>>> >>>> It means we have to comb the logs to understand what happened, >>>> interpret stack traces, dump memory / object allocations, read Spark source >>>> to formulate hypothesis about what went wrong and then trial + error to get >>>> to a configuration that works. Again, if Spark had better defaults and >>>> more conservative execution model (rely less on in-memory caching of RDDs >>>> and associated metadata, keepings large communication buffers on the heap, >>>> etc.), it would definitely simplify our lives. >>>> >>>> (Though I recognize that others might use Spark very differently and >>>> that these defaults and conservative behavior might not please everybody.) >>>> >>>> Hopefully this is the kind of feedback you were looking for... >>>> >>>> >>>>> 3. Recalculation of cached rdds >>>>> I see the following scenario happening. I load two RDDs A,B from disk, >>>>> cache them and then do some jobs on them, at the very least a count on >>>>> each. After these jobs are done I see on the storage panel that 100% of >>>>> these RDDs are cached in memory. >>>>> >>>>> Then I create a third RDD C which is created by multiple joins and >>>>> maps from A and B, also cache it and start a job on C. When I do this I >>>>> still see A and B completely cached and also see C slowly getting more and >>>>> more cached. This is all fine and good, but in the meanwhile I see stages >>>>> running on the UI that point to code which is used to load A and B. How is >>>>> this possible? Am I misunderstanding how cached RDDs should behave? >>>>> >>>>> And again the general question - how can one debug such issues? >>>>> >>>>> 4. Shuffle on disk >>>>> Is it true - I couldn't find it in official docs, but did see this >>>>> mentioned in various threads - that shuffle _always_ hits disk? >>>>> (Disregarding OS caches.) Why is this the case? Are you planning to add a >>>>> function to do shuffle in memory or are there some intrinsic reasons for >>>>> this to be impossible? >>>>> >>>>> >>>>> Sorry again for the giant mail, and thanks for any insights! >>>>> >>>>> Andras >>>>> >>>>> >>>>> >>>> >>> >> >> > > > -- > > SUREN HIRAMAN, VP TECHNOLOGY > Velos > Accelerating Machine Learning > > 440 NINTH AVENUE, 11TH FLOOR > NEW YORK, NY 10001 > O: (917) 525-2466 ext. 105 > F: 646.349.4063 > E: suren.hiraman@v <suren.hira...@sociocast.com>elos.io > W: www.velos.io > > > -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v <suren.hira...@sociocast.com>elos.io W: www.velos.io