Hey Ken,

1. You're correct, cached RDDs live on the JVM heap. (There's an off-heap
storage option using Alluxio, formerly Tachyon, with which I have no
experience however.)

2. The worker memory setting is not a hard maximum unfortunately. What
happens is that during aggregation the Python daemon will check its process
size. If the size is larger than this setting, it will start spilling to
disk. I've seen many occasions where my daemons grew larger. Also, you're
relying on Python's memory management to free up space again once objects
are evicted. In practice, leave this setting reasonably small but make sure
there's enough free memory on the machine so you don't run into OOM
conditions. If the lower memory setting causes strains for your users, make
sure they increase the parallelism of their jobs (smaller partitions
meaning less data is processed at a time).

3. I believe that is the behavior you can expect when setting
spark.executor.cores. I've not experimented much with it and haven't looked
at that part of the code, but what you describe also reflects my
understanding. Please share your findings here, I'm sure those will be very
helpful to others, too.

One more suggestion for your users is to move to the Pyspark DataFrame API.
Much of the processing will then happen in the JVM, and you will bump into
fewer Python resource contention issues.

Best,
-Sven


On Sat, Mar 26, 2016 at 1:38 PM, Carlile, Ken <carli...@janelia.hhmi.org>
wrote:

> This is extremely helpful!
>
> I’ll have to talk to my users about how the python memory limit should be
> adjusted and what their expectations are. I’m fairly certain we bumped it
> up in the dark past when jobs were failing because of insufficient memory
> for the python processes.
>
> So just to make sure I’m understanding correctly:
>
>
>    - JVM memory (set by SPARK_EXECUTOR_MEMORY and/or
>    SPARK_WORKER_MEMORY?) is where the RDDs are stored. Currently both of those
>    values are set to 90GB
>    - spark.python.worker.memory controls how much RAM each python task
>    can take maximum (roughly speaking. Currently set to 4GB
>    - spark.task.cpus controls how many java worker threads will exist and
>    thus indirectly how many pyspark daemon processes will exist
>
>
> I’m also looking into fixing my cron jobs so they don’t stack up by
> implementing flock in the jobs and changing how teardowns of the spark
> cluster work as far as failed workers.
>
> Thanks again,
> —Ken
>
> On Mar 26, 2016, at 4:08 PM, Sven Krasser <kras...@gmail.com> wrote:
>
> My understanding is that the spark.executor.cores setting controls the
> number of worker threads in the executor in the JVM. Each worker thread
> communicates then with a pyspark daemon process (these are not threads) to
> stream data into Python. There should be one daemon process per worker
> thread (but as I mentioned I sometimes see a low multiple).
>
> Your 4GB limit for Python is fairly high, that means even for 12 workers
> you're looking at a max of 48GB (and it goes frequently beyond that). You
> will be better off using a lower number there and instead increasing the
> parallelism of your job (i.e. dividing the job into more and smaller
> partitions).
>
> On Sat, Mar 26, 2016 at 7:10 AM, Carlile, Ken <carli...@janelia.hhmi.org>
> wrote:
>
>> Thanks, Sven!
>>
>> I know that I’ve messed up the memory allocation, but I’m trying not to
>> think too much about that (because I’ve advertised it to my users as “90GB
>> for Spark works!” and that’s how it displays in the Spark UI (totally
>> ignoring the python processes). So I’ll need to deal with that at some
>> point… esp since I’ve set the max python memory usage to 4GB to work around
>> other issues!
>>
>> The load issue comes in because we have a lot of background cron jobs
>> (mostly to clean up after spark…), and those will stack up behind the high
>> load and keep stacking until the whole thing comes crashing down. I will
>> look into how to avoid this stacking, as I think one of my predecessors had
>> a way, but that’s why the high load nukes the nodes. I don’t have the
>> spark.executor.cores set, but will setting that to say, 12 limit the
>> pyspark threads, or will it just limit the jvm threads?
>>
>> Thanks!
>> Ken
>>
>> On Mar 25, 2016, at 9:10 PM, Sven Krasser <kras...@gmail.com> wrote:
>>
>> Hey Ken,
>>
>> I also frequently see more pyspark daemons than configured concurrency,
>> often it's a low multiple. (There was an issue pre-1.3.0 that caused this
>> to be quite a bit higher, so make sure you at least have a recent version;
>> see SPARK-5395.)
>>
>> Each pyspark daemon tries to stay below the configured memory limit
>> during aggregation (which is separate from the JVM heap as you note). Since
>> the number of daemons can be high and the memory limit is per daemon (each
>> daemon is actually a process and not a thread and therefore has its own
>> memory it tracks against the configured per-worker limit), I found memory
>> depletion to be the main source of pyspark problems on larger data sets.
>> Also, as Sea already noted the memory limit is not firm and individual
>> daemons can grow larger.
>>
>> With that said, a run queue of 25 on a 16 core machine does not sound
>> great but also not awful enough to knock it offline. I suspect something
>> else may be going on. If you want to limit the amount of work running
>> concurrently, try reducing spark.executor.cores (under normal circumstances
>> this would leave parts of your resources underutilized).
>>
>> Hope this helps!
>> -Sven
>>
>>
>> On Fri, Mar 25, 2016 at 10:41 AM, Carlile, Ken <carli...@janelia.hhmi.org
>> > wrote:
>>
>>> Further data on this.
>>> I’m watching another job right now where there are 16 pyspark.daemon
>>> threads, all of which are trying to get a full core (remember, this is a 16
>>> core machine). Unfortunately , the java process actually running the spark
>>> worker is trying to take several cores of its own, driving the load up. I’m
>>> hoping someone has seen something like this.
>>>
>>> —Ken
>>>
>>> On Mar 21, 2016, at 3:07 PM, Carlile, Ken <carli...@janelia.hhmi.org>
>>> wrote:
>>>
>>> No further input on this? I discovered today that the pyspark.daemon
>>> threadcount was actually 48, which makes a little more sense (at least it’s
>>> a multiple of 16), and it seems to be happening at reduce and collect
>>> portions of the code.
>>>
>>> —Ken
>>>
>>> On Mar 17, 2016, at 10:51 AM, Carlile, Ken <carli...@janelia.hhmi.org>
>>> wrote:
>>>
>>> Thanks! I found that part just after I sent the email… whoops. I’m
>>> guessing that’s not an issue for my users, since it’s been set that way for
>>> a couple of years now.
>>>
>>> The thread count is definitely an issue, though, since if enough nodes
>>> go down, they can’t schedule their spark clusters.
>>>
>>> —Ken
>>>
>>> On Mar 17, 2016, at 10:50 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>> I took a look at docs/configuration.md
>>> Though I didn't find answer for your first question, I think the
>>> following pertains to your second question:
>>>
>>> <tr>
>>>   <td><code>spark.python.worker.memory</code></td>
>>>   <td>512m</td>
>>>   <td>
>>>     Amount of memory to use per python worker process during
>>> aggregation, in the same
>>>     format as JVM memory strings (e.g. <code>512m</code>,
>>> <code>2g</code>). If the memory
>>>     used during aggregation goes above this amount, it will spill the
>>> data into disks.
>>>   </td>
>>> </tr>
>>>
>>> On Thu, Mar 17, 2016 at 7:43 AM, Carlile, Ken <carli...@janelia.hhmi.org
>>> > wrote:
>>>
>>>> Hello,
>>>>
>>>> We have an HPC cluster that we run Spark jobs on using standalone mode
>>>> and a number of scripts I’ve built up to dynamically schedule and start
>>>> spark clusters within the Grid Engine framework. Nodes in the cluster have
>>>> 16 cores and 128GB of RAM.
>>>>
>>>> My users use pyspark heavily. We’ve been having a number of problems
>>>> with nodes going offline with extraordinarily high load. I was able to look
>>>> at one of those nodes today before it went truly sideways, and I discovered
>>>> that the user was running 50 pyspark.daemon threads (remember, this is a 16
>>>> core box), and the load was somewhere around 25 or so, with all CPUs maxed
>>>> out at 100%.
>>>>
>>>> So while the spark worker is aware it’s only got 16 cores and behaves
>>>> accordingly, pyspark seems to be happy to overrun everything like crazy. Is
>>>> there a global parameter I can use to limit pyspark threads to a sane
>>>> number, say 15 or 16? It would also be interesting to set a memory limit,
>>>> which leads to another question.
>>>>
>>>> How is memory managed when pyspark is used? I have the spark worker
>>>> memory set to 90GB, and there is 8GB of system overhead (GPFS caching), so
>>>> if pyspark operates outside of the JVM memory pool, that leaves it at most
>>>> 30GB to play with, assuming there is no overhead outside the JVM’s 90GB
>>>> heap (ha ha.)
>>>>
>>>> Thanks,
>>>> Ken Carlile
>>>> Sr. Unix Engineer
>>>> HHMI/Janelia Research Campus
>>>> 571-209-4363
>>>>
>>>>
>>>
>>>
>>> Т���������������������������������������������������������������������ХF�V�7V'67&�&R�R���âW6W"�V�7V'67&�&T7&��6�R��&pФf�"FF�F����6����G2�R���âW6W"ֆV�7&��6�R��&pР
>>>
>>>
>>> --------------------------------------------------------------------- To
>>> unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
>>> commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>>
>>
>>
>> --
>> www.skrasser.com <http://www.skrasser.com/?utm_source=sig>
>>
>>
>>
>
>
> --
> www.skrasser.com <http://www.skrasser.com/?utm_source=sig>
>
>
>


-- 
www.skrasser.com <http://www.skrasser.com/?utm_source=sig>

Reply via email to