Re: Spark Memory Bounds

2014-05-28 Thread Keith Simmons
Thanks!  Sounds like my rough understanding was roughly right :)

Definitely understand cached RDDs can add to the memory requirements.
 Luckily, like you mentioned, you can configure spark to flush that to disk
and bound its total size in memory via spark.storage.memoryFraction, so I
have a pretty good handle on the overall RDD contribution.

Thanks for all the help.

Keith


On Wed, May 28, 2014 at 6:43 AM, Christopher Nguyen c...@adatao.com wrote:

 Keith, please see inline.

 --
 Christopher T. Nguyen
 Co-founder  CEO, Adatao http://adatao.com
 linkedin.com/in/ctnguyen



 On Tue, May 27, 2014 at 7:22 PM, Keith Simmons ke...@pulse.io wrote:

 A dash of both.  I want to know enough that I can reason about, rather
 than strictly control, the amount of memory Spark will use.  If I have a
 big data set, I want to understand how I can design it so that Spark's
 memory consumption falls below my available resources.  Or alternatively,
 if it's even possible for Spark to process a data set over a certain size.
  And if I run into memory problems, I want to know which knobs to turn, and
 how turning those knobs will affect memory consumption.


 In practice, to avoid OOME, a key dial we use is the size (or inversely,
 number) of the partitions of your dataset. Clearly there is some blow-up
 factor F such that, e.g., if you start out with 128MB on-disk data
 partitions, you would consume 128F MB of memory, both by Spark and by your
 closure code. Knowing this, you would want to size the partitions such that
 AvailableMemoryInMBPerWorker / NumberOfCoresPerWorker  128F. To arrive at
 F, you could do some back-of-the-envelope modeling, and/or run the job and
 observe empirically.



 It's my understanding that between certain key stages in a Spark DAG
 (i.e. group by stages), Spark will serialize all data structures necessary
 to continue the computation at the next stage, including closures.  So in
 theory, per machine, Spark only needs to hold the transient memory required
 to process the partitions assigned to the currently active tasks.  Is my
 understanding correct?  Specifically, once a key/value pair is serialized
 in the shuffle stage of a task, are the references to the raw java objects
 released before the next task is started.


 Yes, that is correct in non-cached mode. At the same time, Spark also does
 something else optionally, which is to keep the data structures (RDDs)
 persistent in memory (*). As such it is possible partitions that are not
 being actively worked on to be consuming memory. Spark will spill all these
 to local disk if they take up more memory than it is allowed to take. So
 the key thing to worry about is less about what Spark does (apart of
 overhead and yes, the possibility of bugs that need to be fixed), and more
 about what your closure code does with JVM memory as a whole. If in doubt,
 refer back to the blow-up factor model described above.

 (*) this is a fundamentally differentiating feature of Spark over a range
 of other in-memory architectures, that focus on raw-data or transient
 caches that serve non-equivalent purposes when viewed from the application
 level. It allows for very fast access to ready-to-consume high-level data
 structures, as long as available RAM permits.




 On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen c...@adatao.comwrote:

 Keith, do you mean bound as in (a) strictly control to some
 quantifiable limit, or (b) try to minimize the amount used by each task?

 If a, then that is outside the scope of Spark's memory management,
 which you should think of as an application-level (that is, above JVM)
 mechanism. In this scope, Spark voluntarily tracks and limits the amount
 of memory it uses for explicitly known data structures, such as RDDs. What
 Spark cannot do is, e.g., control or manage the amount of JVM memory that a
 given piece of user code might take up. For example, I might write some
 closure code that allocates a large array of doubles unbeknownst to Spark.

 If b, then your thinking is in the right direction, although quite
 imperfect, because of things like the example above. We often experience
 OOME if we're not careful with job partitioning. What I think Spark needs
 to evolve to is at least to include a mechanism for application-level hints
 about task memory requirements. We might work on this and submit a PR for
 it.

 --
 Christopher T. Nguyen
 Co-founder  CEO, Adatao http://adatao.com
 linkedin.com/in/ctnguyen



 On Tue, May 27, 2014 at 5:33 PM, Keith Simmons ke...@pulse.io wrote:

 I'm trying to determine how to bound my memory use in a job working
 with more data than can simultaneously fit in RAM.  From reading the tuning
 guide, my impression is that Spark's memory usage is roughly the following:

 (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient
 memory used by all currently running tasks

 I can bound A with spark.storage.memoryFraction and I can bound B with 
 spark.shuffle.memoryFraction.
  I'm 

Spark Memory Bounds

2014-05-27 Thread Keith Simmons
I'm trying to determine how to bound my memory use in a job working with
more data than can simultaneously fit in RAM.  From reading the tuning
guide, my impression is that Spark's memory usage is roughly the following:

(A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory
used by all currently running tasks

I can bound A with spark.storage.memoryFraction and I can bound B with
spark.shuffle.memoryFraction.
 I'm wondering how to bound C.

It's been hinted at a few times on this mailing list that you can reduce
memory use by increasing the number of partitions.  That leads me to
believe that the amount of transient memory is roughly follows:

total_data_set_size/number_of_partitions *
number_of_tasks_simultaneously_running_per_machine

Does this sound right?  In other words, as I increase the number of
partitions, the size of each partition will decrease, and since each task
is processing a single partition and there are a bounded number of tasks in
flight, my memory use has a rough upper limit.

Keith


Re: Spark Memory Bounds

2014-05-27 Thread Christopher Nguyen
Keith, do you mean bound as in (a) strictly control to some quantifiable
limit, or (b) try to minimize the amount used by each task?

If a, then that is outside the scope of Spark's memory management, which
you should think of as an application-level (that is, above JVM) mechanism.
In this scope, Spark voluntarily tracks and limits the amount of memory
it uses for explicitly known data structures, such as RDDs. What Spark
cannot do is, e.g., control or manage the amount of JVM memory that a given
piece of user code might take up. For example, I might write some closure
code that allocates a large array of doubles unbeknownst to Spark.

If b, then your thinking is in the right direction, although quite
imperfect, because of things like the example above. We often experience
OOME if we're not careful with job partitioning. What I think Spark needs
to evolve to is at least to include a mechanism for application-level hints
about task memory requirements. We might work on this and submit a PR for
it.

--
Christopher T. Nguyen
Co-founder  CEO, Adatao http://adatao.com
linkedin.com/in/ctnguyen



On Tue, May 27, 2014 at 5:33 PM, Keith Simmons ke...@pulse.io wrote:

 I'm trying to determine how to bound my memory use in a job working with
 more data than can simultaneously fit in RAM.  From reading the tuning
 guide, my impression is that Spark's memory usage is roughly the following:

 (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory
 used by all currently running tasks

 I can bound A with spark.storage.memoryFraction and I can bound B with 
 spark.shuffle.memoryFraction.
  I'm wondering how to bound C.

 It's been hinted at a few times on this mailing list that you can reduce
 memory use by increasing the number of partitions.  That leads me to
 believe that the amount of transient memory is roughly follows:

 total_data_set_size/number_of_partitions *
 number_of_tasks_simultaneously_running_per_machine

 Does this sound right?  In other words, as I increase the number of
 partitions, the size of each partition will decrease, and since each task
 is processing a single partition and there are a bounded number of tasks in
 flight, my memory use has a rough upper limit.

 Keith



Re: Spark Memory Bounds

2014-05-27 Thread Keith Simmons
A dash of both.  I want to know enough that I can reason about, rather
than strictly control, the amount of memory Spark will use.  If I have a
big data set, I want to understand how I can design it so that Spark's
memory consumption falls below my available resources.  Or alternatively,
if it's even possible for Spark to process a data set over a certain size.
 And if I run into memory problems, I want to know which knobs to turn, and
how turning those knobs will affect memory consumption.

It's my understanding that between certain key stages in a Spark DAG (i.e.
group by stages), Spark will serialize all data structures necessary to
continue the computation at the next stage, including closures.  So in
theory, per machine, Spark only needs to hold the transient memory required
to process the partitions assigned to the currently active tasks.  Is my
understanding correct?  Specifically, once a key/value pair is serialized
in the shuffle stage of a task, are the references to the raw java objects
released before the next task is started.



On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen c...@adatao.com wrote:

 Keith, do you mean bound as in (a) strictly control to some quantifiable
 limit, or (b) try to minimize the amount used by each task?

 If a, then that is outside the scope of Spark's memory management, which
 you should think of as an application-level (that is, above JVM) mechanism.
 In this scope, Spark voluntarily tracks and limits the amount of memory
 it uses for explicitly known data structures, such as RDDs. What Spark
 cannot do is, e.g., control or manage the amount of JVM memory that a given
 piece of user code might take up. For example, I might write some closure
 code that allocates a large array of doubles unbeknownst to Spark.

 If b, then your thinking is in the right direction, although quite
 imperfect, because of things like the example above. We often experience
 OOME if we're not careful with job partitioning. What I think Spark needs
 to evolve to is at least to include a mechanism for application-level hints
 about task memory requirements. We might work on this and submit a PR for
 it.

 --
 Christopher T. Nguyen
 Co-founder  CEO, Adatao http://adatao.com
 linkedin.com/in/ctnguyen



 On Tue, May 27, 2014 at 5:33 PM, Keith Simmons ke...@pulse.io wrote:

 I'm trying to determine how to bound my memory use in a job working with
 more data than can simultaneously fit in RAM.  From reading the tuning
 guide, my impression is that Spark's memory usage is roughly the following:

 (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory
 used by all currently running tasks

 I can bound A with spark.storage.memoryFraction and I can bound B with 
 spark.shuffle.memoryFraction.
  I'm wondering how to bound C.

 It's been hinted at a few times on this mailing list that you can reduce
 memory use by increasing the number of partitions.  That leads me to
 believe that the amount of transient memory is roughly follows:

 total_data_set_size/number_of_partitions *
 number_of_tasks_simultaneously_running_per_machine

 Does this sound right?  In other words, as I increase the number of
 partitions, the size of each partition will decrease, and since each task
 is processing a single partition and there are a bounded number of tasks in
 flight, my memory use has a rough upper limit.

 Keith