Re: Spark - ready for prime time?

2014-04-13 Thread Andrew Ash
It's highly dependent on what the issue is with your particular job, but
the ones I modify most commonly are:

spark.storage.memoryFraction
spark.shuffle.memoryFraction
parallelism (a parameter on many RDD calls) -- increase from the default
level to get more, smaller tasks that are more likely to finish
Use Kryo

A while back I also modified:
spark.storage.blockManagerTimeoutIntervalMs -- when a stop-the-world GC on
a slave caused heartbeat timeouts but the slave would eventually recover, I
would bump up this parameter from default (this was 0.7.x)

It's also been a while since I messed with GC tuning for Spark, but I'd
generally recommend capping your JVM size at about 31.5 GB so you can keep
compressed pointers.  Better to run multiple JVMs at that size than a
single that's 128GB for example.  I think the general practice for GC
tuning is to have your interactive-time JVMs (like the driver or the
master) run with the concurrent mark and sweep collector, and the bulk
computation JVMs (like a worker or executor) run with parallel GC.  Not
sure how the newer G1 collector fits in to those.  And most of the time if
you're messing with GC parameters, your issues are actually at the Spark
level and you should be spending your time figuring out why that's causing
problems instead.

Another thing I did was if a job wouldn't finish but consisted of several
steps, I could manually save each step along the way to disk (HDFS) and
load from there.  A lot of my jobs only need to finish once, so as long as
I get it done (even if it's a more manual process than it should be) is ok.

Hope that helps!
Andrew



On Sun, Apr 13, 2014 at 4:33 PM, Jim Blomo  wrote:

> On Thu, Apr 10, 2014 at 12:24 PM, Andrew Ash  wrote:
> > The biggest issue I've come across is that the cluster is somewhat
> unstable
> > when under memory pressure.  Meaning that if you attempt to persist an
> RDD
> > that's too big for memory, even with MEMORY_AND_DISK, you'll often still
> get
> > OOMs.  I had to carefully modify some of the space tuning parameters and
> GC
> > settings to get some jobs to even finish.
>
> Would you mind sharing some of these settings?  Even just a GitHub
> gist would be helpful.  These are the main issues I've run into as
> well, and memory pressure also seems to be correlated with akka
> timeouts, possibly because of GC pauses.
>


Re: Spark - ready for prime time?

2014-04-13 Thread Jim Blomo
On Thu, Apr 10, 2014 at 12:24 PM, Andrew Ash  wrote:
> The biggest issue I've come across is that the cluster is somewhat unstable
> when under memory pressure.  Meaning that if you attempt to persist an RDD
> that's too big for memory, even with MEMORY_AND_DISK, you'll often still get
> OOMs.  I had to carefully modify some of the space tuning parameters and GC
> settings to get some jobs to even finish.

Would you mind sharing some of these settings?  Even just a GitHub
gist would be helpful.  These are the main issues I've run into as
well, and memory pressure also seems to be correlated with akka
timeouts, possibly because of GC pauses.


Re: Spark - ready for prime time?

2014-04-11 Thread Surendranauth Hiraman
Excellent, thanks you.



On Fri, Apr 11, 2014 at 12:09 PM, Matei Zaharia wrote:

> It's not a new API, it just happens underneath the current one if you have
> spark.shuffle.spill set to true (which it is by default). Take a look at
> the config settings that mention "spill" in
> http://spark.incubator.apache.org/docs/latest/configuration.html.
>
> Matei
>
> On Apr 11, 2014, at 7:02 AM, Surendranauth Hiraman 
> wrote:
>
> Matei,
>
> Where is the functionality in 0.9 to spill data within a task (separately
> from persist)? My apologies if this is something obvious but I don't see it
> in the api docs.
>
> -Suren
>
>
>
> On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia wrote:
>
>> To add onto the discussion about memory working space, 0.9 introduced the
>> ability to spill data within a task to disk, and in 1.0 we're also changing
>> the interface to allow spilling data within the same *group* to disk (e.g.
>> when you do groupBy and get a key with lots of values). The main reason
>> these weren't there was that for a lot of workloads (everything except the
>> same key having lots of values), simply launching more reduce tasks was
>> also a good solution, because it results in an external sort across the
>> cluster similar to what would happen within a task.
>>
>> Overall, expect to see more work to both explain how things execute (
>> http://spark.incubator.apache.org/docs/latest/tuning.html is one
>> example, the monitoring UI is another) and try to make things require no
>> configuration out of the box. We're doing a lot of this based on user
>> feedback, so that's definitely appreciated.
>>
>> Matei
>>
>> On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov  wrote:
>>
>> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash  wrote:
>>
>>> The biggest issue I've come across is that the cluster is somewhat
>>> unstable when under memory pressure.  Meaning that if you attempt to
>>> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
>>> often still get OOMs.  I had to carefully modify some of the space tuning
>>> parameters and GC settings to get some jobs to even finish.
>>>
>>> The other issue I've observed is if you group on a key that is highly
>>> skewed, with a few massively-common keys and a long tail of rare keys, the
>>> one massive key can be too big for a single machine and again cause OOMs.
>>>
>>
>> My take on it -- Spark doesn't believe in sort-and-spill things to enable
>> super long groups, and IMO for a good reason. Here are my thoughts:
>>
>> (1) in my work i don't need "sort" in 99% of the cases, i only need
>> "group" which absolutely doesn't need the spill which makes things slow
>> down to a crawl.
>> (2) if that's an aggregate (such as group count), use combine(), not
>> groupByKey -- this will do tons of good on memory use.
>> (3) if you really need groups that don't fit into memory, that is always
>> because you want to do something that is other than aggregation, with them.
>> E,g build an index of that grouped data. we actually had a case just like
>> that. In this case your friend is really not groupBy, but rather
>> PartitionBy. I.e. what happens there you build a quick count sketch,
>> perhaps on downsampled data, to figure which keys have sufficiently "big"
>> count -- and then you build a partitioner that redirects large groups to a
>> dedicated map(). assuming this map doesn't try to load things in memory but
>> rather do something like streaming BTree build, that should be fine. In
>> certain cituations such processing may require splitting super large group
>> even into smaller sub groups (e.g. partitioned BTree structure), at which
>> point you should be fine even from uniform load point of view. It takes a
>> little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
>> not promise do this all for you in the groupBy contract.
>>
>>
>>
>>>
>>> I'm hopeful that off-heap caching (Tachyon) could fix some of these
>>> issues.
>>>
>>> Just my personal experience, but I've observed significant improvements
>>> in stability since even the 0.7.x days, so I'm confident that things will
>>> continue to get better as long as people report what they're seeing so it
>>> can get fixed.
>>>
>>> Andrew
>>>
>>>
>>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert 
>>> wrote:
>>>
 I'll provide answers from our own experience at Bizo.  We've been using
 Spark for 1+ year now and have found it generally better than previous
 approaches (Hadoop + Hive mostly).



 On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
 andras.nem...@lynxanalytics.com> wrote:

> I. Is it too much magic? Lots of things "just work right" in Spark and
> it's extremely convenient and efficient when it indeed works. But should 
> we
> be worried that customization is hard if the built in behavior is not 
> quite
> right for us? Are we to expect hard to track down issues originating from
> the black box behind the magic?
>

 

Re: Spark - ready for prime time?

2014-04-11 Thread Matei Zaharia
It’s not a new API, it just happens underneath the current one if you have 
spark.shuffle.spill set to true (which it is by default). Take a look at the 
config settings that mention “spill” in 
http://spark.incubator.apache.org/docs/latest/configuration.html.

Matei

On Apr 11, 2014, at 7:02 AM, Surendranauth Hiraman  
wrote:

> Matei,
> 
> Where is the functionality in 0.9 to spill data within a task (separately 
> from persist)? My apologies if this is something obvious but I don't see it 
> in the api docs.
> 
> -Suren
> 
> 
> 
> On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia  
> wrote:
> To add onto the discussion about memory working space, 0.9 introduced the 
> ability to spill data within a task to disk, and in 1.0 we’re also changing 
> the interface to allow spilling data within the same *group* to disk (e.g. 
> when you do groupBy and get a key with lots of values). The main reason these 
> weren’t there was that for a lot of workloads (everything except the same key 
> having lots of values), simply launching more reduce tasks was also a good 
> solution, because it results in an external sort across the cluster similar 
> to what would happen within a task.
> 
> Overall, expect to see more work to both explain how things execute 
> (http://spark.incubator.apache.org/docs/latest/tuning.html is one example, 
> the monitoring UI is another) and try to make things require no configuration 
> out of the box. We’re doing a lot of this based on user feedback, so that’s 
> definitely appreciated.
> 
> Matei
> 
> On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov  wrote:
> 
>> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash  wrote:
>> The biggest issue I've come across is that the cluster is somewhat unstable 
>> when under memory pressure.  Meaning that if you attempt to persist an RDD 
>> that's too big for memory, even with MEMORY_AND_DISK, you'll often still get 
>> OOMs.  I had to carefully modify some of the space tuning parameters and GC 
>> settings to get some jobs to even finish.
>> 
>> The other issue I've observed is if you group on a key that is highly 
>> skewed, with a few massively-common keys and a long tail of rare keys, the 
>> one massive key can be too big for a single machine and again cause OOMs.
>> 
>> My take on it -- Spark doesn't believe in sort-and-spill things to enable 
>> super long groups, and IMO for a good reason. Here are my thoughts:
>> 
>> (1) in my work i don't need "sort" in 99% of the cases, i only need "group" 
>> which absolutely doesn't need the spill which makes things slow down to a 
>> crawl. 
>> (2) if that's an aggregate (such as group count), use combine(), not 
>> groupByKey -- this will do tons of good on memory use.
>> (3) if you really need groups that don't fit into memory, that is always 
>> because you want to do something that is other than aggregation, with them. 
>> E,g build an index of that grouped data. we actually had a case just like 
>> that. In this case your friend is really not groupBy, but rather 
>> PartitionBy. I.e. what happens there you build a quick count sketch, perhaps 
>> on downsampled data, to figure which keys have sufficiently "big" count -- 
>> and then you build a partitioner that redirects large groups to a dedicated 
>> map(). assuming this map doesn't try to load things in memory but rather do 
>> something like streaming BTree build, that should be fine. In certain 
>> cituations such processing may require splitting super large group even into 
>> smaller sub groups (e.g. partitioned BTree structure), at which point you 
>> should be fine even from uniform load point of view. It takes a little of 
>> jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise 
>> do this all for you in the groupBy contract.
>> 
>>  
>> 
>> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
>> 
>> Just my personal experience, but I've observed significant improvements in 
>> stability since even the 0.7.x days, so I'm confident that things will 
>> continue to get better as long as people report what they're seeing so it 
>> can get fixed.
>> 
>> Andrew
>> 
>> 
>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert  
>> wrote:
>> I'll provide answers from our own experience at Bizo.  We've been using 
>> Spark for 1+ year now and have found it generally better than previous 
>> approaches (Hadoop + Hive mostly).
>> 
>> 
>> 
>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth 
>>  wrote:
>> I. Is it too much magic? Lots of things "just work right" in Spark and it's 
>> extremely convenient and efficient when it indeed works. But should we be 
>> worried that customization is hard if the built in behavior is not quite 
>> right for us? Are we to expect hard to track down issues originating from 
>> the black box behind the magic?
>> 
>> I think is goes back to understanding Spark's architecture, its design 
>> constraints and the problems it explicitly set out to address.   If the 
>> solution to your problem

Re: Spark - ready for prime time?

2014-04-11 Thread Surendranauth Hiraman
Matei,

Where is the functionality in 0.9 to spill data within a task (separately
from persist)? My apologies if this is something obvious but I don't see it
in the api docs.

-Suren



On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia wrote:

> To add onto the discussion about memory working space, 0.9 introduced the
> ability to spill data within a task to disk, and in 1.0 we're also changing
> the interface to allow spilling data within the same *group* to disk (e.g.
> when you do groupBy and get a key with lots of values). The main reason
> these weren't there was that for a lot of workloads (everything except the
> same key having lots of values), simply launching more reduce tasks was
> also a good solution, because it results in an external sort across the
> cluster similar to what would happen within a task.
>
> Overall, expect to see more work to both explain how things execute (
> http://spark.incubator.apache.org/docs/latest/tuning.html is one example,
> the monitoring UI is another) and try to make things require no
> configuration out of the box. We're doing a lot of this based on user
> feedback, so that's definitely appreciated.
>
> Matei
>
> On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov  wrote:
>
> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash  wrote:
>
>> The biggest issue I've come across is that the cluster is somewhat
>> unstable when under memory pressure.  Meaning that if you attempt to
>> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
>> often still get OOMs.  I had to carefully modify some of the space tuning
>> parameters and GC settings to get some jobs to even finish.
>>
>> The other issue I've observed is if you group on a key that is highly
>> skewed, with a few massively-common keys and a long tail of rare keys, the
>> one massive key can be too big for a single machine and again cause OOMs.
>>
>
> My take on it -- Spark doesn't believe in sort-and-spill things to enable
> super long groups, and IMO for a good reason. Here are my thoughts:
>
> (1) in my work i don't need "sort" in 99% of the cases, i only need
> "group" which absolutely doesn't need the spill which makes things slow
> down to a crawl.
> (2) if that's an aggregate (such as group count), use combine(), not
> groupByKey -- this will do tons of good on memory use.
> (3) if you really need groups that don't fit into memory, that is always
> because you want to do something that is other than aggregation, with them.
> E,g build an index of that grouped data. we actually had a case just like
> that. In this case your friend is really not groupBy, but rather
> PartitionBy. I.e. what happens there you build a quick count sketch,
> perhaps on downsampled data, to figure which keys have sufficiently "big"
> count -- and then you build a partitioner that redirects large groups to a
> dedicated map(). assuming this map doesn't try to load things in memory but
> rather do something like streaming BTree build, that should be fine. In
> certain cituations such processing may require splitting super large group
> even into smaller sub groups (e.g. partitioned BTree structure), at which
> point you should be fine even from uniform load point of view. It takes a
> little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
> not promise do this all for you in the groupBy contract.
>
>
>
>>
>> I'm hopeful that off-heap caching (Tachyon) could fix some of these
>> issues.
>>
>> Just my personal experience, but I've observed significant improvements
>> in stability since even the 0.7.x days, so I'm confident that things will
>> continue to get better as long as people report what they're seeing so it
>> can get fixed.
>>
>> Andrew
>>
>>
>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert 
>> wrote:
>>
>>> I'll provide answers from our own experience at Bizo.  We've been using
>>> Spark for 1+ year now and have found it generally better than previous
>>> approaches (Hadoop + Hive mostly).
>>>
>>>
>>>
>>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>>> andras.nem...@lynxanalytics.com> wrote:
>>>
 I. Is it too much magic? Lots of things "just work right" in Spark and
 it's extremely convenient and efficient when it indeed works. But should we
 be worried that customization is hard if the built in behavior is not quite
 right for us? Are we to expect hard to track down issues originating from
 the black box behind the magic?

>>>
>>> I think is goes back to understanding Spark's architecture, its design
>>> constraints and the problems it explicitly set out to address.   If the
>>> solution to your problems can be easily formulated in terms of the
>>> map/reduce model, then it's a good choice.  You'll want your
>>> "customizations" to go with (not against) the grain of the architecture.
>>>
>>>
 II. Is it mature enough? E.g. we've created a pull 
 requestwhich fixes a problem 
 that we were very surprised no one ever

Re: Spark - ready for prime time?

2014-04-10 Thread Matei Zaharia
To add onto the discussion about memory working space, 0.9 introduced the 
ability to spill data within a task to disk, and in 1.0 we’re also changing the 
interface to allow spilling data within the same *group* to disk (e.g. when you 
do groupBy and get a key with lots of values). The main reason these weren’t 
there was that for a lot of workloads (everything except the same key having 
lots of values), simply launching more reduce tasks was also a good solution, 
because it results in an external sort across the cluster similar to what would 
happen within a task.

Overall, expect to see more work to both explain how things execute 
(http://spark.incubator.apache.org/docs/latest/tuning.html is one example, the 
monitoring UI is another) and try to make things require no configuration out 
of the box. We’re doing a lot of this based on user feedback, so that’s 
definitely appreciated.

Matei

On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov  wrote:

> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash  wrote:
> The biggest issue I've come across is that the cluster is somewhat unstable 
> when under memory pressure.  Meaning that if you attempt to persist an RDD 
> that's too big for memory, even with MEMORY_AND_DISK, you'll often still get 
> OOMs.  I had to carefully modify some of the space tuning parameters and GC 
> settings to get some jobs to even finish.
> 
> The other issue I've observed is if you group on a key that is highly skewed, 
> with a few massively-common keys and a long tail of rare keys, the one 
> massive key can be too big for a single machine and again cause OOMs.
> 
> My take on it -- Spark doesn't believe in sort-and-spill things to enable 
> super long groups, and IMO for a good reason. Here are my thoughts:
> 
> (1) in my work i don't need "sort" in 99% of the cases, i only need "group" 
> which absolutely doesn't need the spill which makes things slow down to a 
> crawl. 
> (2) if that's an aggregate (such as group count), use combine(), not 
> groupByKey -- this will do tons of good on memory use.
> (3) if you really need groups that don't fit into memory, that is always 
> because you want to do something that is other than aggregation, with them. 
> E,g build an index of that grouped data. we actually had a case just like 
> that. In this case your friend is really not groupBy, but rather PartitionBy. 
> I.e. what happens there you build a quick count sketch, perhaps on 
> downsampled data, to figure which keys have sufficiently "big" count -- and 
> then you build a partitioner that redirects large groups to a dedicated 
> map(). assuming this map doesn't try to load things in memory but rather do 
> something like streaming BTree build, that should be fine. In certain 
> cituations such processing may require splitting super large group even into 
> smaller sub groups (e.g. partitioned BTree structure), at which point you 
> should be fine even from uniform load point of view. It takes a little of 
> jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise 
> do this all for you in the groupBy contract.
> 
>  
> 
> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
> 
> Just my personal experience, but I've observed significant improvements in 
> stability since even the 0.7.x days, so I'm confident that things will 
> continue to get better as long as people report what they're seeing so it can 
> get fixed.
> 
> Andrew
> 
> 
> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert  
> wrote:
> I'll provide answers from our own experience at Bizo.  We've been using Spark 
> for 1+ year now and have found it generally better than previous approaches 
> (Hadoop + Hive mostly).
> 
> 
> 
> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth 
>  wrote:
> I. Is it too much magic? Lots of things "just work right" in Spark and it's 
> extremely convenient and efficient when it indeed works. But should we be 
> worried that customization is hard if the built in behavior is not quite 
> right for us? Are we to expect hard to track down issues originating from the 
> black box behind the magic?
> 
> I think is goes back to understanding Spark's architecture, its design 
> constraints and the problems it explicitly set out to address.   If the 
> solution to your problems can be easily formulated in terms of the map/reduce 
> model, then it's a good choice.  You'll want your "customizations" to go with 
> (not against) the grain of the architecture.
>  
> II. Is it mature enough? E.g. we've created a pull request which fixes a 
> problem that we were very surprised no one ever stumbled upon before. So 
> that's why I'm asking: is Spark being already used in professional settings? 
> Can one already trust it being reasonably bug free and reliable?
> 
> There are lots of ways to use Spark; and not all of the features are 
> necessarily at the same level of maturity.   For instance, we put all the 
> jars on the main classpath so we've never run into the issue 

Re: Spark - ready for prime time?

2014-04-10 Thread Brad Miller
> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, but did see this mentioned
> in various threads - that shuffle _always_ hits disk? (Disregarding OS
> caches.) Why is this the case? Are you planning to add a function to do
> shuffle in memory or are there some intrinsic reasons for this to be
> impossible?
>
> I don't think it's true... as far as I'm concerned Spark doesn't peek into
> the OS and force it to disregard buffer caches. In general, for large
> shuffles, all shuffle files do not fit into memory, so we kind of have to
> write them out to disk. There is an undocumented option to sync writing
> shuffle files to disk every time we write a block, but that is by default
> false and not many people use it (for obvious reasons).

I believe I recently had the experience that for the map portion of
the shuffle all shuffle files seemed to be written into the file
system (albeit potentially on buffer caches).  The size of the shuffle
files on hosts matched the size of the "shuffle write" metric shown in
the UI (pyspark branch-0.9 as of Monday), so there didn't seem to be
any effort to keep the shuffle files in memory.

On Thu, Apr 10, 2014 at 12:43 PM, Andrew Or  wrote:
> Here are answers to a subset of your questions:
>
>> 1. Memory management
>> The general direction of these questions is whether it's possible to take
>> RDD caching related memory management more into our own hands as LRU
>> eviction is nice most of the time but can be very suboptimal in some of our
>> use cases.
>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>> really wants to keep. I'm fine with going down in flames if I mark too much
>> data
>
> As far as I am aware, there is currently no other eviction policies for RDD
> blocks other than LRU. Your suggestion of prioritizing RDDs is an
> interesting one and I'm sure other users would like that as well.
>
>> B. Memory "reflection": can you pragmatically get the memory size of a
>> cached rdd and memory sizes available in total/per executor? If we could do
>> this we could indirectly avoid automatic evictions of things we might really
>> want to keep in memory.
>
> All this information should be displayed on the UI under the Storage tab.
>
>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>> the system started to cache RDD partitions on the driver as well. As the
>> driver ran out of memory I started to see evictions while there were still >
>> plenty of space on workers. This resulted in lengthy recomputations. Can
>> this be avoided somehow?
>
> The amount of space used for RDD storage is only a fraction of the total
> amount of memory available to the JVM. More specifically, it is governed by
> `spark.storage.memoryFraction`, which is by default 60%. This may explain
> why evictions seem to occur pre-maturely sometimes. In the future, we should
> probably add a table that contains information about evicted RDDs on the UI,
> so it's easier to track them. Right now evicted RDD's disappear from the
> face of the planet completely, sometimes leaving the user somewhat
> confounded. Though with off-heap storage (Tachyon) this may become less
> relevant.
>
>> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
>> waiting for the LRU eviction taking care of it? Can you tell the size of a
>> broadcast programmatically?
>
> In Spark 1.0, the mechanism to unpersist blocks used by a broadcast is
> explicitly added! Under the storage tab of the UI, we could probably also
> have a Broadcast table in the future, seeing that there are users interested
> in this feature.
>
>
>> 3. Recalculation of cached rdds
>> I see the following scenario happening. I load two RDDs A,B from disk,
>> cache them and then do some jobs on them, at the very least a count on each.
>> After these jobs are done I see on the storage panel that 100% of these RDDs
>> are cached in memory.
>> Then I create a third RDD C which is created by multiple joins and maps
>> from A and B, also cache it and start a job on C. When I do this I still see
>> A and B completely cached and also see C slowly getting more and more
>> cached. This is all fine and good, but in the meanwhile I see stages running
>> on the UI that point to code which is used to load A and B. How is this
>> possible? Am I misunderstanding how cached RDDs should behave?
>> And again the general question - how can one debug such issues?
>
> From the fractions of RDDs cached in memory, it seems to me that your
> application is running as expected. If you also cache C, then it will slowly
> add more blocks to storage, possibly evicting A and B if there is memory
> pressure. It's entirely possible that there is a bug on finding the call
> site on the stages page (there were a few PRs that made changes to this
> recently).
>
> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, bu

Re: Spark - ready for prime time?

2014-04-10 Thread Andrew Or
Here are answers to a subset of your questions:

> 1. Memory management
> The general direction of these questions is whether it's possible to take
RDD caching related memory management more into our own hands as LRU
eviction is nice most of the time but can be very suboptimal in some of our
use cases.
> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
really wants to keep. I'm fine with going down in flames if I mark too much
data

As far as I am aware, there is currently no other eviction policies for RDD
blocks other than LRU. Your suggestion of prioritizing RDDs is an
interesting one and I'm sure other users would like that as well.

> B. Memory "reflection": can you pragmatically get the memory size of a
cached rdd and memory sizes available in total/per executor? If we could do
this we could indirectly avoid automatic evictions of things we might
really want to keep in memory.

All this information should be displayed on the UI under the Storage tab.

> C. Evictions caused by RDD partitions on the driver. I had a setup with
huge worker memory and smallish memory on the driver JVM. To my surprise,
the system started to cache RDD partitions on the driver as well. As the
driver ran out of memory I started to see evictions while there were still
> plenty of space on workers. This resulted in lengthy recomputations. Can
this be avoided somehow?

The amount of space used for RDD storage is only a fraction of the total
amount of memory available to the JVM. More specifically, it is governed by
`spark.storage.memoryFraction`, which is by default 60%. This may explain
why evictions seem to occur pre-maturely sometimes. In the future, we
should probably add a table that contains information about evicted RDDs on
the UI, so it's easier to track them. Right now evicted RDD's disappear
from the face of the planet completely, sometimes leaving the user somewhat
confounded. Though with off-heap storage (Tachyon) this may become less
relevant.

> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
waiting for the LRU eviction taking care of it? Can you tell the size of a
broadcast programmatically?

In Spark 1.0, the mechanism to unpersist blocks used by a broadcast is
explicitly added! Under the storage tab of the UI, we could probably also
have a Broadcast table in the future, seeing that there are users
interested in this feature.

> 3. Recalculation of cached rdds
> I see the following scenario happening. I load two RDDs A,B from disk,
cache them and then do some jobs on them, at the very least a count on
each. After these jobs are done I see on the storage panel that 100% of
these RDDs are cached in memory.
> Then I create a third RDD C which is created by multiple joins and maps
from A and B, also cache it and start a job on C. When I do this I still
see A and B completely cached and also see C slowly getting more and more
cached. This is all fine and good, but in the meanwhile I see stages
running on the UI that point to code which is used to load A and B. How is
this possible? Am I misunderstanding how cached RDDs should behave?
> And again the general question - how can one debug such issues?

>From the fractions of RDDs cached in memory, it seems to me that your
application is running as expected. If you also cache C, then it will
slowly add more blocks to storage, possibly evicting A and B if there is
memory pressure. It's entirely possible that there is a bug on finding the
call site on the stages page (there were a few PRs that made changes to
this recently).

4. Shuffle on disk
Is it true - I couldn't find it in official docs, but did see this
mentioned in various threads - that shuffle _always_ hits disk?
(Disregarding OS caches.) Why is this the case? Are you planning to add a
function to do shuffle in memory or are there some intrinsic reasons for
this to be impossible?

I don't think it's true... as far as I'm concerned Spark doesn't peek into
the OS and force it to disregard buffer caches. In general, for large
shuffles, all shuffle files do not fit into memory, so we kind of have to
write them out to disk. There is an undocumented option to sync writing
shuffle files to disk every time we write a block, but that is by default
false and not many people use it (for obvious reasons).



On Thu, Apr 10, 2014 at 12:05 PM, Roger Hoover wrote:

> Can anyone comment on their experience running Spark Streaming in
> production?
>
>
> On Thu, Apr 10, 2014 at 10:33 AM, Dmitriy Lyubimov wrote:
>
>>
>>
>>
>> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash  wrote:
>>
>>> The biggest issue I've come across is that the cluster is somewhat
>>> unstable when under memory pressure.  Meaning that if you attempt to
>>> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
>>> often still get OOMs.  I had to carefully modify some of the space tuning
>>> parameters and GC settings to get some jobs to even finish.
>>>
>>> The other issue I've observed is if yo

Re: Spark - ready for prime time?

2014-04-10 Thread Roger Hoover
Can anyone comment on their experience running Spark Streaming in
production?


On Thu, Apr 10, 2014 at 10:33 AM, Dmitriy Lyubimov wrote:

>
>
>
> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash  wrote:
>
>> The biggest issue I've come across is that the cluster is somewhat
>> unstable when under memory pressure.  Meaning that if you attempt to
>> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
>> often still get OOMs.  I had to carefully modify some of the space tuning
>> parameters and GC settings to get some jobs to even finish.
>>
>> The other issue I've observed is if you group on a key that is highly
>> skewed, with a few massively-common keys and a long tail of rare keys, the
>> one massive key can be too big for a single machine and again cause OOMs.
>>
>
> My take on it -- Spark doesn't believe in sort-and-spill things to enable
> super long groups, and IMO for a good reason. Here are my thoughts:
>
> (1) in my work i don't need "sort" in 99% of the cases, i only need
> "group" which absolutely doesn't need the spill which makes things slow
> down to a crawl.
> (2) if that's an aggregate (such as group count), use combine(), not
> groupByKey -- this will do tons of good on memory use.
> (3) if you really need groups that don't fit into memory, that is always
> because you want to do something that is other than aggregation, with them.
> E,g build an index of that grouped data. we actually had a case just like
> that. In this case your friend is really not groupBy, but rather
> PartitionBy. I.e. what happens there you build a quick count sketch,
> perhaps on downsampled data, to figure which keys have sufficiently "big"
> count -- and then you build a partitioner that redirects large groups to a
> dedicated map(). assuming this map doesn't try to load things in memory but
> rather do something like streaming BTree build, that should be fine. In
> certain cituations such processing may require splitting super large group
> even into smaller sub groups (e.g. partitioned BTree structure), at which
> point you should be fine even from uniform load point of view. It takes a
> little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
> not promise do this all for you in the groupBy contract.
>
>
>
>>
>> I'm hopeful that off-heap caching (Tachyon) could fix some of these
>> issues.
>>
>> Just my personal experience, but I've observed significant improvements
>> in stability since even the 0.7.x days, so I'm confident that things will
>> continue to get better as long as people report what they're seeing so it
>> can get fixed.
>>
>> Andrew
>>
>>
>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert 
>> wrote:
>>
>>> I'll provide answers from our own experience at Bizo.  We've been using
>>> Spark for 1+ year now and have found it generally better than previous
>>> approaches (Hadoop + Hive mostly).
>>>
>>>
>>>
>>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>>> andras.nem...@lynxanalytics.com> wrote:
>>>
 I. Is it too much magic? Lots of things "just work right" in Spark and
 it's extremely convenient and efficient when it indeed works. But should we
 be worried that customization is hard if the built in behavior is not quite
 right for us? Are we to expect hard to track down issues originating from
 the black box behind the magic?

>>>
>>> I think is goes back to understanding Spark's architecture, its design
>>> constraints and the problems it explicitly set out to address.   If the
>>> solution to your problems can be easily formulated in terms of the
>>> map/reduce model, then it's a good choice.  You'll want your
>>> "customizations" to go with (not against) the grain of the architecture.
>>>
>>>
 II. Is it mature enough? E.g. we've created a pull 
 requestwhich fixes a problem 
 that we were very surprised no one ever stumbled upon
 before. So that's why I'm asking: is Spark being already used in
 professional settings? Can one already trust it being reasonably bug free
 and reliable?

>>>
>>> There are lots of ways to use Spark; and not all of the features are
>>> necessarily at the same level of maturity.   For instance, we put all the
>>> jars on the main classpath so we've never run into the issue your pull
>>> request addresses.
>>>
>>> We definitely use and rely on Spark on a professional basis.  We have 5+
>>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>>> Once we got them working with the proper configuration settings, they have
>>> been running reliability since.
>>>
>>> I would characterize our use of Spark as a "better Hadoop", in the sense
>>> that we use it for batch processing only, no streaming yet.   We're happy
>>> it performs better than Hadoop but we don't require/rely on its memory
>>> caching features.  In fact, for most of our jobs it would simplify our
>>> lives if Spark wouldn't cache so many things in memory sin

Re: Spark - ready for prime time?

2014-04-10 Thread Dmitriy Lyubimov
On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash  wrote:

> The biggest issue I've come across is that the cluster is somewhat
> unstable when under memory pressure.  Meaning that if you attempt to
> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
> often still get OOMs.  I had to carefully modify some of the space tuning
> parameters and GC settings to get some jobs to even finish.
>
> The other issue I've observed is if you group on a key that is highly
> skewed, with a few massively-common keys and a long tail of rare keys, the
> one massive key can be too big for a single machine and again cause OOMs.
>

My take on it -- Spark doesn't believe in sort-and-spill things to enable
super long groups, and IMO for a good reason. Here are my thoughts:

(1) in my work i don't need "sort" in 99% of the cases, i only need "group"
which absolutely doesn't need the spill which makes things slow down to a
crawl.
(2) if that's an aggregate (such as group count), use combine(), not
groupByKey -- this will do tons of good on memory use.
(3) if you really need groups that don't fit into memory, that is always
because you want to do something that is other than aggregation, with them.
E,g build an index of that grouped data. we actually had a case just like
that. In this case your friend is really not groupBy, but rather
PartitionBy. I.e. what happens there you build a quick count sketch,
perhaps on downsampled data, to figure which keys have sufficiently "big"
count -- and then you build a partitioner that redirects large groups to a
dedicated map(). assuming this map doesn't try to load things in memory but
rather do something like streaming BTree build, that should be fine. In
certain cituations such processing may require splitting super large group
even into smaller sub groups (e.g. partitioned BTree structure), at which
point you should be fine even from uniform load point of view. It takes a
little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
not promise do this all for you in the groupBy contract.



>
> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
>
> Just my personal experience, but I've observed significant improvements in
> stability since even the 0.7.x days, so I'm confident that things will
> continue to get better as long as people report what they're seeing so it
> can get fixed.
>
> Andrew
>
>
> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert wrote:
>
>> I'll provide answers from our own experience at Bizo.  We've been using
>> Spark for 1+ year now and have found it generally better than previous
>> approaches (Hadoop + Hive mostly).
>>
>>
>>
>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>> andras.nem...@lynxanalytics.com> wrote:
>>
>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>> it's extremely convenient and efficient when it indeed works. But should we
>>> be worried that customization is hard if the built in behavior is not quite
>>> right for us? Are we to expect hard to track down issues originating from
>>> the black box behind the magic?
>>>
>>
>> I think is goes back to understanding Spark's architecture, its design
>> constraints and the problems it explicitly set out to address.   If the
>> solution to your problems can be easily formulated in terms of the
>> map/reduce model, then it's a good choice.  You'll want your
>> "customizations" to go with (not against) the grain of the architecture.
>>
>>
>>> II. Is it mature enough? E.g. we've created a pull 
>>> requestwhich fixes a problem that 
>>> we were very surprised no one ever stumbled upon
>>> before. So that's why I'm asking: is Spark being already used in
>>> professional settings? Can one already trust it being reasonably bug free
>>> and reliable?
>>>
>>
>> There are lots of ways to use Spark; and not all of the features are
>> necessarily at the same level of maturity.   For instance, we put all the
>> jars on the main classpath so we've never run into the issue your pull
>> request addresses.
>>
>> We definitely use and rely on Spark on a professional basis.  We have 5+
>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>> Once we got them working with the proper configuration settings, they have
>> been running reliability since.
>>
>> I would characterize our use of Spark as a "better Hadoop", in the sense
>> that we use it for batch processing only, no streaming yet.   We're happy
>> it performs better than Hadoop but we don't require/rely on its memory
>> caching features.  In fact, for most of our jobs it would simplify our
>> lives if Spark wouldn't cache so many things in memory since it would make
>> configuration/tuning a lot simpler and jobs would run successfully on the
>> first try instead of having to tweak things (# of partitions and such).
>>
>> So, to the concrete issues. Sorry for the long mail, and let me know if I
>>> should break this 

Re: Spark - ready for prime time?

2014-04-10 Thread Debasish Das
I agree with AndrewEvery time I underestimate the RAM requirementmy
hand calculations are always ways less than what JVM actually allocates...

But I guess I will understand the Scala JVM optimizations as I get more
pain


On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash  wrote:

> The biggest issue I've come across is that the cluster is somewhat
> unstable when under memory pressure.  Meaning that if you attempt to
> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
> often still get OOMs.  I had to carefully modify some of the space tuning
> parameters and GC settings to get some jobs to even finish.
>
> The other issue I've observed is if you group on a key that is highly
> skewed, with a few massively-common keys and a long tail of rare keys, the
> one massive key can be too big for a single machine and again cause OOMs.
>
> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
>
> Just my personal experience, but I've observed significant improvements in
> stability since even the 0.7.x days, so I'm confident that things will
> continue to get better as long as people report what they're seeing so it
> can get fixed.
>
> Andrew
>
>
> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert wrote:
>
>> I'll provide answers from our own experience at Bizo.  We've been using
>> Spark for 1+ year now and have found it generally better than previous
>> approaches (Hadoop + Hive mostly).
>>
>>
>>
>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>> andras.nem...@lynxanalytics.com> wrote:
>>
>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>> it's extremely convenient and efficient when it indeed works. But should we
>>> be worried that customization is hard if the built in behavior is not quite
>>> right for us? Are we to expect hard to track down issues originating from
>>> the black box behind the magic?
>>>
>>
>> I think is goes back to understanding Spark's architecture, its design
>> constraints and the problems it explicitly set out to address.   If the
>> solution to your problems can be easily formulated in terms of the
>> map/reduce model, then it's a good choice.  You'll want your
>> "customizations" to go with (not against) the grain of the architecture.
>>
>>
>>> II. Is it mature enough? E.g. we've created a pull 
>>> requestwhich fixes a problem that 
>>> we were very surprised no one ever stumbled upon
>>> before. So that's why I'm asking: is Spark being already used in
>>> professional settings? Can one already trust it being reasonably bug free
>>> and reliable?
>>>
>>
>> There are lots of ways to use Spark; and not all of the features are
>> necessarily at the same level of maturity.   For instance, we put all the
>> jars on the main classpath so we've never run into the issue your pull
>> request addresses.
>>
>> We definitely use and rely on Spark on a professional basis.  We have 5+
>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>> Once we got them working with the proper configuration settings, they have
>> been running reliability since.
>>
>> I would characterize our use of Spark as a "better Hadoop", in the sense
>> that we use it for batch processing only, no streaming yet.   We're happy
>> it performs better than Hadoop but we don't require/rely on its memory
>> caching features.  In fact, for most of our jobs it would simplify our
>> lives if Spark wouldn't cache so many things in memory since it would make
>> configuration/tuning a lot simpler and jobs would run successfully on the
>> first try instead of having to tweak things (# of partitions and such).
>>
>> So, to the concrete issues. Sorry for the long mail, and let me know if I
>>> should break this out into more threads or if there is some other way to
>>> have this discussion...
>>>
>>> 1. Memory management
>>> The general direction of these questions is whether it's possible to
>>> take RDD caching related memory management more into our own hands as LRU
>>> eviction is nice most of the time but can be very suboptimal in some of our
>>> use cases.
>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>> really wants to keep. I'm fine with going down in flames if I mark too much
>>> data essential.
>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>> cached rdd and memory sizes available in total/per executor? If we could do
>>> this we could indirectly avoid automatic evictions of things we might
>>> really want to keep in memory.
>>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>>> the system started to cache RDD partitions on the driver as well. As the
>>> driver ran out of memory I started to see evictions while there were still
>>> plenty of space on workers. This resulted in lengthy recomputations. Can
>>> this be avoided 

Re: Spark - ready for prime time?

2014-04-10 Thread Brad Miller
I would echo much of what Andrew has said.

I manage a small/medium sized cluster (48 cores, 512G ram, 512G disk
space dedicated to spark, data storage in separate HDFS shares).  I've
been using spark since 0.7, and as with Andrew I've observed
significant and consistent improvements in stability (and in the
PySpark API) since then.  I have run into some trouble with mesos, and
I have run into some trouble when working with data which is large
relative to the size of my cluster (e.g. 100G), but overall it's
worked well and our group is continuing to build on top of spark.

On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash  wrote:
> The biggest issue I've come across is that the cluster is somewhat unstable
> when under memory pressure.  Meaning that if you attempt to persist an RDD
> that's too big for memory, even with MEMORY_AND_DISK, you'll often still get
> OOMs.  I had to carefully modify some of the space tuning parameters and GC
> settings to get some jobs to even finish.
>
> The other issue I've observed is if you group on a key that is highly
> skewed, with a few massively-common keys and a long tail of rare keys, the
> one massive key can be too big for a single machine and again cause OOMs.
>
> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
>
> Just my personal experience, but I've observed significant improvements in
> stability since even the 0.7.x days, so I'm confident that things will
> continue to get better as long as people report what they're seeing so it
> can get fixed.
>
> Andrew
>
>
> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert 
> wrote:
>>
>> I'll provide answers from our own experience at Bizo.  We've been using
>> Spark for 1+ year now and have found it generally better than previous
>> approaches (Hadoop + Hive mostly).
>>
>>
>>
>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth
>>  wrote:
>>>
>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>> it's extremely convenient and efficient when it indeed works. But should we
>>> be worried that customization is hard if the built in behavior is not quite
>>> right for us? Are we to expect hard to track down issues originating from
>>> the black box behind the magic?
>>
>>
>> I think is goes back to understanding Spark's architecture, its design
>> constraints and the problems it explicitly set out to address.   If the
>> solution to your problems can be easily formulated in terms of the
>> map/reduce model, then it's a good choice.  You'll want your
>> "customizations" to go with (not against) the grain of the architecture.
>>
>>>
>>> II. Is it mature enough? E.g. we've created a pull request which fixes a
>>> problem that we were very surprised no one ever stumbled upon before. So
>>> that's why I'm asking: is Spark being already used in professional settings?
>>> Can one already trust it being reasonably bug free and reliable?
>>
>>
>> There are lots of ways to use Spark; and not all of the features are
>> necessarily at the same level of maturity.   For instance, we put all the
>> jars on the main classpath so we've never run into the issue your pull
>> request addresses.
>>
>> We definitely use and rely on Spark on a professional basis.  We have 5+
>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>> Once we got them working with the proper configuration settings, they have
>> been running reliability since.
>>
>> I would characterize our use of Spark as a "better Hadoop", in the sense
>> that we use it for batch processing only, no streaming yet.   We're happy it
>> performs better than Hadoop but we don't require/rely on its memory caching
>> features.  In fact, for most of our jobs it would simplify our lives if
>> Spark wouldn't cache so many things in memory since it would make
>> configuration/tuning a lot simpler and jobs would run successfully on the
>> first try instead of having to tweak things (# of partitions and such).
>>
>>> So, to the concrete issues. Sorry for the long mail, and let me know if I
>>> should break this out into more threads or if there is some other way to
>>> have this discussion...
>>>
>>> 1. Memory management
>>> The general direction of these questions is whether it's possible to take
>>> RDD caching related memory management more into our own hands as LRU
>>> eviction is nice most of the time but can be very suboptimal in some of our
>>> use cases.
>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>> really wants to keep. I'm fine with going down in flames if I mark too much
>>> data essential.
>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>> cached rdd and memory sizes available in total/per executor? If we could do
>>> this we could indirectly avoid automatic evictions of things we might really
>>> want to keep in memory.
>>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>>> huge worker memory and smallish memory on the driver JVM. To my

Re: Spark - ready for prime time?

2014-04-10 Thread Andrew Ash
The biggest issue I've come across is that the cluster is somewhat unstable
when under memory pressure.  Meaning that if you attempt to persist an RDD
that's too big for memory, even with MEMORY_AND_DISK, you'll often still
get OOMs.  I had to carefully modify some of the space tuning parameters
and GC settings to get some jobs to even finish.

The other issue I've observed is if you group on a key that is highly
skewed, with a few massively-common keys and a long tail of rare keys, the
one massive key can be too big for a single machine and again cause OOMs.

I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.

Just my personal experience, but I've observed significant improvements in
stability since even the 0.7.x days, so I'm confident that things will
continue to get better as long as people report what they're seeing so it
can get fixed.

Andrew


On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert wrote:

> I'll provide answers from our own experience at Bizo.  We've been using
> Spark for 1+ year now and have found it generally better than previous
> approaches (Hadoop + Hive mostly).
>
>
>
> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
> andras.nem...@lynxanalytics.com> wrote:
>
>> I. Is it too much magic? Lots of things "just work right" in Spark and
>> it's extremely convenient and efficient when it indeed works. But should we
>> be worried that customization is hard if the built in behavior is not quite
>> right for us? Are we to expect hard to track down issues originating from
>> the black box behind the magic?
>>
>
> I think is goes back to understanding Spark's architecture, its design
> constraints and the problems it explicitly set out to address.   If the
> solution to your problems can be easily formulated in terms of the
> map/reduce model, then it's a good choice.  You'll want your
> "customizations" to go with (not against) the grain of the architecture.
>
>
>> II. Is it mature enough? E.g. we've created a pull 
>> requestwhich fixes a problem that 
>> we were very surprised no one ever stumbled upon
>> before. So that's why I'm asking: is Spark being already used in
>> professional settings? Can one already trust it being reasonably bug free
>> and reliable?
>>
>
> There are lots of ways to use Spark; and not all of the features are
> necessarily at the same level of maturity.   For instance, we put all the
> jars on the main classpath so we've never run into the issue your pull
> request addresses.
>
> We definitely use and rely on Spark on a professional basis.  We have 5+
> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
> Once we got them working with the proper configuration settings, they have
> been running reliability since.
>
> I would characterize our use of Spark as a "better Hadoop", in the sense
> that we use it for batch processing only, no streaming yet.   We're happy
> it performs better than Hadoop but we don't require/rely on its memory
> caching features.  In fact, for most of our jobs it would simplify our
> lives if Spark wouldn't cache so many things in memory since it would make
> configuration/tuning a lot simpler and jobs would run successfully on the
> first try instead of having to tweak things (# of partitions and such).
>
> So, to the concrete issues. Sorry for the long mail, and let me know if I
>> should break this out into more threads or if there is some other way to
>> have this discussion...
>>
>> 1. Memory management
>> The general direction of these questions is whether it's possible to take
>> RDD caching related memory management more into our own hands as LRU
>> eviction is nice most of the time but can be very suboptimal in some of our
>> use cases.
>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>> really wants to keep. I'm fine with going down in flames if I mark too much
>> data essential.
>> B. Memory "reflection": can you pragmatically get the memory size of a
>> cached rdd and memory sizes available in total/per executor? If we could do
>> this we could indirectly avoid automatic evictions of things we might
>> really want to keep in memory.
>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>> the system started to cache RDD partitions on the driver as well. As the
>> driver ran out of memory I started to see evictions while there were still
>> plenty of space on workers. This resulted in lengthy recomputations. Can
>> this be avoided somehow?
>> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
>> waiting for the LRU eviction taking care of it? Can you tell the size of a
>> broadcast programmatically?
>>
>>
>> 2. Akka lost connections
>> We have quite often experienced lost executors due to akka exceptions -
>> mostly connection lost or similar. It seems to happen when an executor gets
>> extremely 

Re: Spark - ready for prime time?

2014-04-10 Thread Alex Boisvert
I'll provide answers from our own experience at Bizo.  We've been using
Spark for 1+ year now and have found it generally better than previous
approaches (Hadoop + Hive mostly).


On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
andras.nem...@lynxanalytics.com> wrote:

> I. Is it too much magic? Lots of things "just work right" in Spark and
> it's extremely convenient and efficient when it indeed works. But should we
> be worried that customization is hard if the built in behavior is not quite
> right for us? Are we to expect hard to track down issues originating from
> the black box behind the magic?
>

I think is goes back to understanding Spark's architecture, its design
constraints and the problems it explicitly set out to address.   If the
solution to your problems can be easily formulated in terms of the
map/reduce model, then it's a good choice.  You'll want your
"customizations" to go with (not against) the grain of the architecture.


> II. Is it mature enough? E.g. we've created a pull 
> requestwhich fixes a problem that 
> we were very surprised no one ever stumbled upon
> before. So that's why I'm asking: is Spark being already used in
> professional settings? Can one already trust it being reasonably bug free
> and reliable?
>

There are lots of ways to use Spark; and not all of the features are
necessarily at the same level of maturity.   For instance, we put all the
jars on the main classpath so we've never run into the issue your pull
request addresses.

We definitely use and rely on Spark on a professional basis.  We have 5+
spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
Once we got them working with the proper configuration settings, they have
been running reliability since.

I would characterize our use of Spark as a "better Hadoop", in the sense
that we use it for batch processing only, no streaming yet.   We're happy
it performs better than Hadoop but we don't require/rely on its memory
caching features.  In fact, for most of our jobs it would simplify our
lives if Spark wouldn't cache so many things in memory since it would make
configuration/tuning a lot simpler and jobs would run successfully on the
first try instead of having to tweak things (# of partitions and such).

So, to the concrete issues. Sorry for the long mail, and let me know if I
> should break this out into more threads or if there is some other way to
> have this discussion...
>
> 1. Memory management
> The general direction of these questions is whether it's possible to take
> RDD caching related memory management more into our own hands as LRU
> eviction is nice most of the time but can be very suboptimal in some of our
> use cases.
> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
> really wants to keep. I'm fine with going down in flames if I mark too much
> data essential.
> B. Memory "reflection": can you pragmatically get the memory size of a
> cached rdd and memory sizes available in total/per executor? If we could do
> this we could indirectly avoid automatic evictions of things we might
> really want to keep in memory.
> C. Evictions caused by RDD partitions on the driver. I had a setup with
> huge worker memory and smallish memory on the driver JVM. To my surprise,
> the system started to cache RDD partitions on the driver as well. As the
> driver ran out of memory I started to see evictions while there were still
> plenty of space on workers. This resulted in lengthy recomputations. Can
> this be avoided somehow?
> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
> waiting for the LRU eviction taking care of it? Can you tell the size of a
> broadcast programmatically?
>
>
> 2. Akka lost connections
> We have quite often experienced lost executors due to akka exceptions -
> mostly connection lost or similar. It seems to happen when an executor gets
> extremely busy with some CPU intensive work. Our hypothesis is that akka
> network threads get starved and the executor fails to respond within
> timeout limits. Is this plausible? If yes, what can we do with it?
>

We've seen these as well.  In our case, increasing the akka timeouts and
framesize helped a lot.

e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}


>
> In general, these are scary errors in the sense that they come from the
> very core of the framework and it's hard to link it to something we do in
> our own code, and thus hard to find a fix. So a question more for the
> community: how often do you end up scratching your head about cases where
> spark
>
magic doesn't work perfectly?
>

For us, this happens most often for jobs processing TBs of data (instead of
GBs)... which is frustrating of course because these jobs cost a lot more
in $$$ + time to run/debug/diagnose than smaller jobs.

It means we have to comb the logs to understand what happened, interpret
stack traces, dump memory / object allocations, read Spark 

Re: Spark - ready for prime time?

2014-04-10 Thread Sean Owen
Mike Olson's comment:

http://vision.cloudera.com/mapreduce-spark/

Here's the partnership announcement:

http://databricks.com/blog/2013/10/28/databricks-and-cloudera-partner-to-support-spark.html

> On Thu, Apr 10, 2014 at 10:42 AM, Ian Ferreira 
> wrote:
>>
>> Do you have the link to the Cloudera comment?


Re: Spark - ready for prime time?

2014-04-10 Thread Dean Wampler
Here are several good ones:

https://www.google.com/search?q=cloudera+spark&oq=cloudera+spark&aqs=chrome..69i57j69i65l3j69i60l2.4439j0j7&sourceid=chrome&espv=2&es_sm=119&ie=UTF-8



On Thu, Apr 10, 2014 at 10:42 AM, Ian Ferreira wrote:

>  Do you have the link to the Cloudera comment?
>
> Sent from Windows Mail
>
> *From:* Dean Wampler 
> *Sent:* Thursday, April 10, 2014 7:39 AM
> *To:* Spark Users 
> *Cc:* Daniel Darabos , Andras 
> Barjak
>
> Spark has been endorsed by Cloudera as the successor to MapReduce. That
> says a lot...
>
>
> On Thu, Apr 10, 2014 at 10:11 AM, Andras Nemeth <
> andras.nem...@lynxanalytics.com> wrote:
>
>> Hello Spark Users,
>>
>> With the recent graduation of Spark to a top level project (grats, btw!),
>> maybe a well timed question. :)
>>
>> We are at the very beginning of a large scale big data project and after
>> two months of exploration work we'd like to settle on the technologies to
>> use, roll up our sleeves and start to build the system.
>>
>> Spark is one of the forerunners for our technology choice.
>>
>> My question in essence is whether it's a good idea or is Spark too
>> 'experimental' just yet to bet our lives (well, the project's life) on it.
>>
>> The benefits of choosing Spark are numerous and I guess all too obvious
>> for this audience - e.g. we love its powerful abstraction, ease of
>> development and the potential for using a single system for serving and
>> manipulating huge amount of data.
>>
>> This email aims to ask about the risks. I enlist concrete issues we've
>> encountered below, but basically my concern boils down to two philosophical
>> points:
>> I. Is it too much magic? Lots of things "just work right" in Spark and
>> it's extremely convenient and efficient when it indeed works. But should we
>> be worried that customization is hard if the built in behavior is not quite
>> right for us? Are we to expect hard to track down issues originating from
>> the black box behind the magic?
>> II. Is it mature enough? E.g. we've created a pull 
>> requestwhich fixes a problem that 
>> we were very surprised no one ever stumbled upon
>> before. So that's why I'm asking: is Spark being already used in
>> professional settings? Can one already trust it being reasonably bug free
>> and reliable?
>>
>> I know I'm asking a biased audience, but that's fine, as I want to be
>> convinced. :)
>>
>> So, to the concrete issues. Sorry for the long mail, and let me know if I
>> should break this out into more threads or if there is some other way to
>> have this discussion...
>>
>> 1. Memory management
>> The general direction of these questions is whether it's possible to take
>> RDD caching related memory management more into our own hands as LRU
>> eviction is nice most of the time but can be very suboptimal in some of our
>> use cases.
>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>> really wants to keep. I'm fine with going down in flames if I mark too much
>> data essential.
>> B. Memory "reflection": can you pragmatically get the memory size of a
>> cached rdd and memory sizes available in total/per executor? If we could do
>> this we could indirectly avoid automatic evictions of things we might
>> really want to keep in memory.
>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>> the system started to cache RDD partitions on the driver as well. As the
>> driver ran out of memory I started to see evictions while there were still
>> plenty of space on workers. This resulted in lengthy recomputations. Can
>> this be avoided somehow?
>> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
>> waiting for the LRU eviction taking care of it? Can you tell the size of a
>> broadcast programmatically?
>>
>>
>> 2. Akka lost connections
>> We have quite often experienced lost executors due to akka exceptions -
>> mostly connection lost or similar. It seems to happen when an executor gets
>> extremely busy with some CPU intensive work. Our hypothesis is that akka
>> network threads get starved and the executor fails to respond within
>> timeout limits. Is this plausible? If yes, what can we do with it?
>>
>> In general, these are scary errors in the sense that they come from the
>> very core of the framework and it's hard to link it to something we do in
>> our own code, and thus hard to find a fix. So a question more for the
>> community: how often do you end up scratching your head about cases where
>> spark magic doesn't work perfectly?
>>
>>
>> 3. Recalculation of cached rdds
>> I see the following scenario happening. I load two RDDs A,B from disk,
>> cache them and then do some jobs on them, at the very least a count on
>> each. After these jobs are done I see on the storage panel that 100% of
>> these RDDs are cached in memory.
>>
>> Then I create a third RDD C which is created 

Re: Spark - ready for prime time?

2014-04-10 Thread Ian Ferreira
Do you have the link to the Cloudera comment?






Sent from Windows Mail





From: Dean Wampler
Sent: ‎Thursday‎, ‎April‎ ‎10‎, ‎2014 ‎7‎:‎39‎ ‎AM
To: Spark Users
Cc: Daniel Darabos, Andras Barjak






Spark has been endorsed by Cloudera as the successor to MapReduce. That says a 
lot...




On Thu, Apr 10, 2014 at 10:11 AM, Andras Nemeth 
 wrote:





Hello Spark Users,




With the recent graduation of Spark to a top level project (grats, btw!), maybe 
a well timed question. :)




We are at the very beginning of a large scale big data project and after two 
months of exploration work we'd like to settle on the technologies to use, roll 
up our sleeves and start to build the system.




Spark is one of the forerunners for our technology choice.




My question in essence is whether it's a good idea or is Spark too 
'experimental' just yet to bet our lives (well, the project's life) on it.




The benefits of choosing Spark are numerous and I guess all too obvious for 
this audience - e.g. we love its powerful abstraction, ease of development and 
the potential for using a single system for serving and manipulating huge 
amount of data.




This email aims to ask about the risks. I enlist concrete issues we've 
encountered below, but basically my concern boils down to two philosophical 
points:

I. Is it too much magic? Lots of things "just work right" in Spark and it's 
extremely convenient and efficient when it indeed works. But should we be 
worried that customization is hard if the built in behavior is not quite right 
for us? Are we to expect hard to track down issues originating from the black 
box behind the magic?

II. Is it mature enough? E.g. we've created a pull request which fixes a 
problem that we were very surprised no one ever stumbled upon before. So that's 
why I'm asking: is Spark being already used in professional settings? Can one 
already trust it being reasonably bug free and reliable?




I know I'm asking a biased audience, but that's fine, as I want to be 
convinced. :)




So, to the concrete issues. Sorry for the long mail, and let me know if I 
should break this out into more threads or if there is some other way to have 
this discussion...




1. Memory management


The general direction of these questions is whether it's possible to take RDD 
caching related memory management more into our own hands as LRU eviction is 
nice most of the time but can be very suboptimal in some of our use cases.

A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one really 
wants to keep. I'm fine with going down in flames if I mark too much data 
essential.

B. Memory "reflection": can you pragmatically get the memory size of a cached 
rdd and memory sizes available in total/per executor? If we could do this we 
could indirectly avoid automatic evictions of things we might really want to 
keep in memory.

C. Evictions caused by RDD partitions on the driver. I had a setup with huge 
worker memory and smallish memory on the driver JVM. To my surprise, the system 
started to cache RDD partitions on the driver as well. As the driver ran out of 
memory I started to see evictions while there were still plenty of space on 
workers. This resulted in lengthy recomputations. Can this be avoided somehow?

D. Broadcasts. Is it possible to get rid of a broadcast manually, without 
waiting for the LRU eviction taking care of it? Can you tell the size of a 
broadcast programmatically?







2. Akka lost connections


We have quite often experienced lost executors due to akka exceptions - mostly 
connection lost or similar. It seems to happen when an executor gets extremely 
busy with some CPU intensive work. Our hypothesis is that akka network threads 
get starved and the executor fails to respond within timeout limits. Is this 
plausible? If yes, what can we do with it?




In general, these are scary errors in the sense that they come from the very 
core of the framework and it's hard to link it to something we do in our own 
code, and thus hard to find a fix. So a question more for the community: how 
often do you end up scratching your head about cases where spark magic doesn't 
work perfectly?







3. Recalculation of cached rdds


I see the following scenario happening. I load two RDDs A,B from disk, cache 
them and then do some jobs on them, at the very least a count on each. After 
these jobs are done I see on the storage panel that 100% of these RDDs are 
cached in memory.




Then I create a third RDD C which is created by multiple joins and maps from A 
and B, also cache it and start a job on C. When I do this I still see A and B 
completely cached and also see C slowly getting more and more cached. This is 
all fine and good, but in the meanwhile I see stages running on the UI that 
point to code which is used to load A and B. How is this possible? Am I 
misunderstanding how cached RDDs should behave?




And again the general question - how can one debug such issues?





Re: Spark - ready for prime time?

2014-04-10 Thread Dean Wampler
Spark has been endorsed by Cloudera as the successor to MapReduce. That
says a lot...


On Thu, Apr 10, 2014 at 10:11 AM, Andras Nemeth <
andras.nem...@lynxanalytics.com> wrote:

> Hello Spark Users,
>
> With the recent graduation of Spark to a top level project (grats, btw!),
> maybe a well timed question. :)
>
> We are at the very beginning of a large scale big data project and after
> two months of exploration work we'd like to settle on the technologies to
> use, roll up our sleeves and start to build the system.
>
> Spark is one of the forerunners for our technology choice.
>
> My question in essence is whether it's a good idea or is Spark too
> 'experimental' just yet to bet our lives (well, the project's life) on it.
>
> The benefits of choosing Spark are numerous and I guess all too obvious
> for this audience - e.g. we love its powerful abstraction, ease of
> development and the potential for using a single system for serving and
> manipulating huge amount of data.
>
> This email aims to ask about the risks. I enlist concrete issues we've
> encountered below, but basically my concern boils down to two philosophical
> points:
> I. Is it too much magic? Lots of things "just work right" in Spark and
> it's extremely convenient and efficient when it indeed works. But should we
> be worried that customization is hard if the built in behavior is not quite
> right for us? Are we to expect hard to track down issues originating from
> the black box behind the magic?
> II. Is it mature enough? E.g. we've created a pull 
> requestwhich fixes a problem that 
> we were very surprised no one ever stumbled upon
> before. So that's why I'm asking: is Spark being already used in
> professional settings? Can one already trust it being reasonably bug free
> and reliable?
>
> I know I'm asking a biased audience, but that's fine, as I want to be
> convinced. :)
>
> So, to the concrete issues. Sorry for the long mail, and let me know if I
> should break this out into more threads or if there is some other way to
> have this discussion...
>
> 1. Memory management
> The general direction of these questions is whether it's possible to take
> RDD caching related memory management more into our own hands as LRU
> eviction is nice most of the time but can be very suboptimal in some of our
> use cases.
> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
> really wants to keep. I'm fine with going down in flames if I mark too much
> data essential.
> B. Memory "reflection": can you pragmatically get the memory size of a
> cached rdd and memory sizes available in total/per executor? If we could do
> this we could indirectly avoid automatic evictions of things we might
> really want to keep in memory.
> C. Evictions caused by RDD partitions on the driver. I had a setup with
> huge worker memory and smallish memory on the driver JVM. To my surprise,
> the system started to cache RDD partitions on the driver as well. As the
> driver ran out of memory I started to see evictions while there were still
> plenty of space on workers. This resulted in lengthy recomputations. Can
> this be avoided somehow?
> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
> waiting for the LRU eviction taking care of it? Can you tell the size of a
> broadcast programmatically?
>
>
> 2. Akka lost connections
> We have quite often experienced lost executors due to akka exceptions -
> mostly connection lost or similar. It seems to happen when an executor gets
> extremely busy with some CPU intensive work. Our hypothesis is that akka
> network threads get starved and the executor fails to respond within
> timeout limits. Is this plausible? If yes, what can we do with it?
>
> In general, these are scary errors in the sense that they come from the
> very core of the framework and it's hard to link it to something we do in
> our own code, and thus hard to find a fix. So a question more for the
> community: how often do you end up scratching your head about cases where
> spark magic doesn't work perfectly?
>
>
> 3. Recalculation of cached rdds
> I see the following scenario happening. I load two RDDs A,B from disk,
> cache them and then do some jobs on them, at the very least a count on
> each. After these jobs are done I see on the storage panel that 100% of
> these RDDs are cached in memory.
>
> Then I create a third RDD C which is created by multiple joins and maps
> from A and B, also cache it and start a job on C. When I do this I still
> see A and B completely cached and also see C slowly getting more and more
> cached. This is all fine and good, but in the meanwhile I see stages
> running on the UI that point to code which is used to load A and B. How is
> this possible? Am I misunderstanding how cached RDDs should behave?
>
> And again the general question - how can one debug such issues?
>
> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, but 

Re: Spark - ready for prime time?

2014-04-10 Thread Debasish Das
When you say "Spark is one of the forerunners for our technology choice",
what are the other options you are looking into ?

I start cross validation runs on a 40 core, 160 GB spark job using a
script...I woke up in the morning, none of the jobs crashed ! and the
project just came out of incubation

I wish Spark keep evolving as a standalone Akka cluster (MPI cluster if you
remember C++ mpiexec :-) where you can plug and play any distributed file
system (HDFS,..,) or distributed caching systems (HBase, Cassandra,..)

I am also confident that Spark as a standalone akka cluster can serve
analytics driven scalable frontend appsand by analytics I don't mean
sql analytics...but predictive analytics...



On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
andras.nem...@lynxanalytics.com> wrote:

> Hello Spark Users,
>
> With the recent graduation of Spark to a top level project (grats, btw!),
> maybe a well timed question. :)
>
> We are at the very beginning of a large scale big data project and after
> two months of exploration work we'd like to settle on the technologies to
> use, roll up our sleeves and start to build the system.
>
> Spark is one of the forerunners for our technology choice.
>
> My question in essence is whether it's a good idea or is Spark too
> 'experimental' just yet to bet our lives (well, the project's life) on it.
>
> The benefits of choosing Spark are numerous and I guess all too obvious
> for this audience - e.g. we love its powerful abstraction, ease of
> development and the potential for using a single system for serving and
> manipulating huge amount of data.
>
> This email aims to ask about the risks. I enlist concrete issues we've
> encountered below, but basically my concern boils down to two philosophical
> points:
> I. Is it too much magic? Lots of things "just work right" in Spark and
> it's extremely convenient and efficient when it indeed works. But should we
> be worried that customization is hard if the built in behavior is not quite
> right for us? Are we to expect hard to track down issues originating from
> the black box behind the magic?
> II. Is it mature enough? E.g. we've created a pull 
> requestwhich fixes a problem that 
> we were very surprised no one ever stumbled upon
> before. So that's why I'm asking: is Spark being already used in
> professional settings? Can one already trust it being reasonably bug free
> and reliable?
>
> I know I'm asking a biased audience, but that's fine, as I want to be
> convinced. :)
>
> So, to the concrete issues. Sorry for the long mail, and let me know if I
> should break this out into more threads or if there is some other way to
> have this discussion...
>
> 1. Memory management
> The general direction of these questions is whether it's possible to take
> RDD caching related memory management more into our own hands as LRU
> eviction is nice most of the time but can be very suboptimal in some of our
> use cases.
> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
> really wants to keep. I'm fine with going down in flames if I mark too much
> data essential.
> B. Memory "reflection": can you pragmatically get the memory size of a
> cached rdd and memory sizes available in total/per executor? If we could do
> this we could indirectly avoid automatic evictions of things we might
> really want to keep in memory.
> C. Evictions caused by RDD partitions on the driver. I had a setup with
> huge worker memory and smallish memory on the driver JVM. To my surprise,
> the system started to cache RDD partitions on the driver as well. As the
> driver ran out of memory I started to see evictions while there were still
> plenty of space on workers. This resulted in lengthy recomputations. Can
> this be avoided somehow?
> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
> waiting for the LRU eviction taking care of it? Can you tell the size of a
> broadcast programmatically?
>
>
> 2. Akka lost connections
> We have quite often experienced lost executors due to akka exceptions -
> mostly connection lost or similar. It seems to happen when an executor gets
> extremely busy with some CPU intensive work. Our hypothesis is that akka
> network threads get starved and the executor fails to respond within
> timeout limits. Is this plausible? If yes, what can we do with it?
>
> In general, these are scary errors in the sense that they come from the
> very core of the framework and it's hard to link it to something we do in
> our own code, and thus hard to find a fix. So a question more for the
> community: how often do you end up scratching your head about cases where
> spark magic doesn't work perfectly?
>
>
> 3. Recalculation of cached rdds
> I see the following scenario happening. I load two RDDs A,B from disk,
> cache them and then do some jobs on them, at the very least a count on
> each. After these jobs are done I see on the storage panel that 100% of
> t

Fwd: Spark - ready for prime time?

2014-04-10 Thread Andras Nemeth
Hello Spark Users,

With the recent graduation of Spark to a top level project (grats, btw!),
maybe a well timed question. :)

We are at the very beginning of a large scale big data project and after
two months of exploration work we'd like to settle on the technologies to
use, roll up our sleeves and start to build the system.

Spark is one of the forerunners for our technology choice.

My question in essence is whether it's a good idea or is Spark too
'experimental' just yet to bet our lives (well, the project's life) on it.

The benefits of choosing Spark are numerous and I guess all too obvious for
this audience - e.g. we love its powerful abstraction, ease of development
and the potential for using a single system for serving and manipulating
huge amount of data.

This email aims to ask about the risks. I enlist concrete issues we've
encountered below, but basically my concern boils down to two philosophical
points:
I. Is it too much magic? Lots of things "just work right" in Spark and it's
extremely convenient and efficient when it indeed works. But should we be
worried that customization is hard if the built in behavior is not quite
right for us? Are we to expect hard to track down issues originating from
the black box behind the magic?
II. Is it mature enough? E.g. we've created a pull
requestwhich fixes a problem
that we were very surprised no one ever stumbled upon
before. So that's why I'm asking: is Spark being already used in
professional settings? Can one already trust it being reasonably bug free
and reliable?

I know I'm asking a biased audience, but that's fine, as I want to be
convinced. :)

So, to the concrete issues. Sorry for the long mail, and let me know if I
should break this out into more threads or if there is some other way to
have this discussion...

1. Memory management
The general direction of these questions is whether it's possible to take
RDD caching related memory management more into our own hands as LRU
eviction is nice most of the time but can be very suboptimal in some of our
use cases.
A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
really wants to keep. I'm fine with going down in flames if I mark too much
data essential.
B. Memory "reflection": can you pragmatically get the memory size of a
cached rdd and memory sizes available in total/per executor? If we could do
this we could indirectly avoid automatic evictions of things we might
really want to keep in memory.
C. Evictions caused by RDD partitions on the driver. I had a setup with
huge worker memory and smallish memory on the driver JVM. To my surprise,
the system started to cache RDD partitions on the driver as well. As the
driver ran out of memory I started to see evictions while there were still
plenty of space on workers. This resulted in lengthy recomputations. Can
this be avoided somehow?
D. Broadcasts. Is it possible to get rid of a broadcast manually, without
waiting for the LRU eviction taking care of it? Can you tell the size of a
broadcast programmatically?


2. Akka lost connections
We have quite often experienced lost executors due to akka exceptions -
mostly connection lost or similar. It seems to happen when an executor gets
extremely busy with some CPU intensive work. Our hypothesis is that akka
network threads get starved and the executor fails to respond within
timeout limits. Is this plausible? If yes, what can we do with it?

In general, these are scary errors in the sense that they come from the
very core of the framework and it's hard to link it to something we do in
our own code, and thus hard to find a fix. So a question more for the
community: how often do you end up scratching your head about cases where
spark magic doesn't work perfectly?


3. Recalculation of cached rdds
I see the following scenario happening. I load two RDDs A,B from disk,
cache them and then do some jobs on them, at the very least a count on
each. After these jobs are done I see on the storage panel that 100% of
these RDDs are cached in memory.

Then I create a third RDD C which is created by multiple joins and maps
from A and B, also cache it and start a job on C. When I do this I still
see A and B completely cached and also see C slowly getting more and more
cached. This is all fine and good, but in the meanwhile I see stages
running on the UI that point to code which is used to load A and B. How is
this possible? Am I misunderstanding how cached RDDs should behave?

And again the general question - how can one debug such issues?

4. Shuffle on disk
Is it true - I couldn't find it in official docs, but did see this
mentioned in various threads - that shuffle _always_ hits disk?
(Disregarding OS caches.) Why is this the case? Are you planning to add a
function to do shuffle in memory or are there some intrinsic reasons for
this to be impossible?


Sorry again for the giant mail, and thanks for any insights!

Andras