Can anyone comment on their experience running Spark Streaming in production?
On Thu, Apr 10, 2014 at 10:33 AM, Dmitriy Lyubimov <dlie...@gmail.com>wrote: > > > > On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <and...@andrewash.com> wrote: > >> The biggest issue I've come across is that the cluster is somewhat >> unstable when under memory pressure. Meaning that if you attempt to >> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll >> often still get OOMs. I had to carefully modify some of the space tuning >> parameters and GC settings to get some jobs to even finish. >> >> The other issue I've observed is if you group on a key that is highly >> skewed, with a few massively-common keys and a long tail of rare keys, the >> one massive key can be too big for a single machine and again cause OOMs. >> > > My take on it -- Spark doesn't believe in sort-and-spill things to enable > super long groups, and IMO for a good reason. Here are my thoughts: > > (1) in my work i don't need "sort" in 99% of the cases, i only need > "group" which absolutely doesn't need the spill which makes things slow > down to a crawl. > (2) if that's an aggregate (such as group count), use combine(), not > groupByKey -- this will do tons of good on memory use. > (3) if you really need groups that don't fit into memory, that is always > because you want to do something that is other than aggregation, with them. > E,g build an index of that grouped data. we actually had a case just like > that. In this case your friend is really not groupBy, but rather > PartitionBy. I.e. what happens there you build a quick count sketch, > perhaps on downsampled data, to figure which keys have sufficiently "big" > count -- and then you build a partitioner that redirects large groups to a > dedicated map(). assuming this map doesn't try to load things in memory but > rather do something like streaming BTree build, that should be fine. In > certain cituations such processing may require splitting super large group > even into smaller sub groups (e.g. partitioned BTree structure), at which > point you should be fine even from uniform load point of view. It takes a > little of jiu-jitsu to do it all, but it is not Spark's fault here, it did > not promise do this all for you in the groupBy contract. > > > >> >> I'm hopeful that off-heap caching (Tachyon) could fix some of these >> issues. >> >> Just my personal experience, but I've observed significant improvements >> in stability since even the 0.7.x days, so I'm confident that things will >> continue to get better as long as people report what they're seeing so it >> can get fixed. >> >> Andrew >> >> >> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert >> <alex.boisv...@gmail.com>wrote: >> >>> I'll provide answers from our own experience at Bizo. We've been using >>> Spark for 1+ year now and have found it generally better than previous >>> approaches (Hadoop + Hive mostly). >>> >>> >>> >>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth < >>> andras.nem...@lynxanalytics.com> wrote: >>> >>>> I. Is it too much magic? Lots of things "just work right" in Spark and >>>> it's extremely convenient and efficient when it indeed works. But should we >>>> be worried that customization is hard if the built in behavior is not quite >>>> right for us? Are we to expect hard to track down issues originating from >>>> the black box behind the magic? >>>> >>> >>> I think is goes back to understanding Spark's architecture, its design >>> constraints and the problems it explicitly set out to address. If the >>> solution to your problems can be easily formulated in terms of the >>> map/reduce model, then it's a good choice. You'll want your >>> "customizations" to go with (not against) the grain of the architecture. >>> >>> >>>> II. Is it mature enough? E.g. we've created a pull >>>> request<https://github.com/apache/spark/pull/181>which fixes a problem >>>> that we were very surprised no one ever stumbled upon >>>> before. So that's why I'm asking: is Spark being already used in >>>> professional settings? Can one already trust it being reasonably bug free >>>> and reliable? >>>> >>> >>> There are lots of ways to use Spark; and not all of the features are >>> necessarily at the same level of maturity. For instance, we put all the >>> jars on the main classpath so we've never run into the issue your pull >>> request addresses. >>> >>> We definitely use and rely on Spark on a professional basis. We have 5+ >>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data. >>> Once we got them working with the proper configuration settings, they have >>> been running reliability since. >>> >>> I would characterize our use of Spark as a "better Hadoop", in the sense >>> that we use it for batch processing only, no streaming yet. We're happy >>> it performs better than Hadoop but we don't require/rely on its memory >>> caching features. In fact, for most of our jobs it would simplify our >>> lives if Spark wouldn't cache so many things in memory since it would make >>> configuration/tuning a lot simpler and jobs would run successfully on the >>> first try instead of having to tweak things (# of partitions and such). >>> >>> So, to the concrete issues. Sorry for the long mail, and let me know if >>>> I should break this out into more threads or if there is some other way to >>>> have this discussion... >>>> >>>> 1. Memory management >>>> The general direction of these questions is whether it's possible to >>>> take RDD caching related memory management more into our own hands as LRU >>>> eviction is nice most of the time but can be very suboptimal in some of our >>>> use cases. >>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one >>>> really wants to keep. I'm fine with going down in flames if I mark too much >>>> data essential. >>>> B. Memory "reflection": can you pragmatically get the memory size of a >>>> cached rdd and memory sizes available in total/per executor? If we could do >>>> this we could indirectly avoid automatic evictions of things we might >>>> really want to keep in memory. >>>> C. Evictions caused by RDD partitions on the driver. I had a setup with >>>> huge worker memory and smallish memory on the driver JVM. To my surprise, >>>> the system started to cache RDD partitions on the driver as well. As the >>>> driver ran out of memory I started to see evictions while there were still >>>> plenty of space on workers. This resulted in lengthy recomputations. Can >>>> this be avoided somehow? >>>> D. Broadcasts. Is it possible to get rid of a broadcast manually, >>>> without waiting for the LRU eviction taking care of it? Can you tell the >>>> size of a broadcast programmatically? >>>> >>>> >>>> 2. Akka lost connections >>>> We have quite often experienced lost executors due to akka exceptions - >>>> mostly connection lost or similar. It seems to happen when an executor gets >>>> extremely busy with some CPU intensive work. Our hypothesis is that akka >>>> network threads get starved and the executor fails to respond within >>>> timeout limits. Is this plausible? If yes, what can we do with it? >>>> >>> >>> We've seen these as well. In our case, increasing the akka timeouts and >>> framesize helped a lot. >>> >>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize} >>> >>> >>>> >>>> In general, these are scary errors in the sense that they come from the >>>> very core of the framework and it's hard to link it to something we do in >>>> our own code, and thus hard to find a fix. So a question more for the >>>> community: how often do you end up scratching your head about cases where >>>> spark >>>> >>> magic doesn't work perfectly? >>>> >>> >>> For us, this happens most often for jobs processing TBs of data (instead >>> of GBs)... which is frustrating of course because these jobs cost a lot >>> more in $$$ + time to run/debug/diagnose than smaller jobs. >>> >>> It means we have to comb the logs to understand what happened, interpret >>> stack traces, dump memory / object allocations, read Spark source to >>> formulate hypothesis about what went wrong and then trial + error to get to >>> a configuration that works. Again, if Spark had better defaults and more >>> conservative execution model (rely less on in-memory caching of RDDs and >>> associated metadata, keepings large communication buffers on the heap, >>> etc.), it would definitely simplify our lives. >>> >>> (Though I recognize that others might use Spark very differently and >>> that these defaults and conservative behavior might not please everybody.) >>> >>> Hopefully this is the kind of feedback you were looking for... >>> >>> >>>> 3. Recalculation of cached rdds >>>> I see the following scenario happening. I load two RDDs A,B from disk, >>>> cache them and then do some jobs on them, at the very least a count on >>>> each. After these jobs are done I see on the storage panel that 100% of >>>> these RDDs are cached in memory. >>>> >>>> Then I create a third RDD C which is created by multiple joins and maps >>>> from A and B, also cache it and start a job on C. When I do this I still >>>> see A and B completely cached and also see C slowly getting more and more >>>> cached. This is all fine and good, but in the meanwhile I see stages >>>> running on the UI that point to code which is used to load A and B. How is >>>> this possible? Am I misunderstanding how cached RDDs should behave? >>>> >>>> And again the general question - how can one debug such issues? >>>> >>>> 4. Shuffle on disk >>>> Is it true - I couldn't find it in official docs, but did see this >>>> mentioned in various threads - that shuffle _always_ hits disk? >>>> (Disregarding OS caches.) Why is this the case? Are you planning to add a >>>> function to do shuffle in memory or are there some intrinsic reasons for >>>> this to be impossible? >>>> >>>> >>>> Sorry again for the giant mail, and thanks for any insights! >>>> >>>> Andras >>>> >>>> >>>> >>> >> >