Hi,

I am seeing this issue too with pyspark (Using Spark 1.6.1).  I have set
spark.executor.cores to 1, but I see that whenever streaming batch starts
processing data, see python -m pyspark.daemon processes increase gradually
to about 5, (increasing CPU% on a box about 4-5 times, each pyspark.daemon
takes up around 100 % CPU)

After the processing is done 4 pyspark.daemon processes go away and we are
left with one till the next batch run. Also sometimes the  CPU usage for
executor process spikes to about 800% even though spark.executor.core is
set to 1

e.g. top output
PID USER      PR   NI  VIRT  RES  SHR S       %CPU %MEM    TIME+  COMMAND
19634 spark     20   0 8871420 1.790g  32056 S 814.1  2.9   0:39.33
/usr/lib/j+ <--EXECUTOR

13897 spark     20   0   46576  17916   6720 S   100.0  0.0   0:00.17
python -m + <--pyspark.daemon
13991 spark     20   0   46524  15572   4124 S   98.0  0.0   0:08.18 python
-m + <--pyspark.daemon
14488 spark     20   0   46524  15636   4188 S   98.0  0.0   0:07.25 python
-m + <--pyspark.daemon
14514 spark     20   0   46524  15636   4188 S   94.0  0.0   0:06.72 python
-m + <--pyspark.daemon
14526 spark     20   0   48200  17172   4092 S   0.0  0.0   0:00.38 python
-m + <--pyspark.daemon



Is there any way to control the number of pyspark.daemon processes that get
spawned ?

Thank you
Agateaaa

On Sun, Mar 27, 2016 at 1:08 AM, Sven Krasser <kras...@gmail.com> wrote:

> Hey Ken,
>
> 1. You're correct, cached RDDs live on the JVM heap. (There's an off-heap
> storage option using Alluxio, formerly Tachyon, with which I have no
> experience however.)
>
> 2. The worker memory setting is not a hard maximum unfortunately. What
> happens is that during aggregation the Python daemon will check its process
> size. If the size is larger than this setting, it will start spilling to
> disk. I've seen many occasions where my daemons grew larger. Also, you're
> relying on Python's memory management to free up space again once objects
> are evicted. In practice, leave this setting reasonably small but make sure
> there's enough free memory on the machine so you don't run into OOM
> conditions. If the lower memory setting causes strains for your users, make
> sure they increase the parallelism of their jobs (smaller partitions
> meaning less data is processed at a time).
>
> 3. I believe that is the behavior you can expect when setting
> spark.executor.cores. I've not experimented much with it and haven't looked
> at that part of the code, but what you describe also reflects my
> understanding. Please share your findings here, I'm sure those will be very
> helpful to others, too.
>
> One more suggestion for your users is to move to the Pyspark DataFrame
> API. Much of the processing will then happen in the JVM, and you will bump
> into fewer Python resource contention issues.
>
> Best,
> -Sven
>
>
> On Sat, Mar 26, 2016 at 1:38 PM, Carlile, Ken <carli...@janelia.hhmi.org>
> wrote:
>
>> This is extremely helpful!
>>
>> I’ll have to talk to my users about how the python memory limit should be
>> adjusted and what their expectations are. I’m fairly certain we bumped it
>> up in the dark past when jobs were failing because of insufficient memory
>> for the python processes.
>>
>> So just to make sure I’m understanding correctly:
>>
>>
>>    - JVM memory (set by SPARK_EXECUTOR_MEMORY and/or
>>    SPARK_WORKER_MEMORY?) is where the RDDs are stored. Currently both of 
>> those
>>    values are set to 90GB
>>    - spark.python.worker.memory controls how much RAM each python task
>>    can take maximum (roughly speaking. Currently set to 4GB
>>    - spark.task.cpus controls how many java worker threads will exist
>>    and thus indirectly how many pyspark daemon processes will exist
>>
>>
>> I’m also looking into fixing my cron jobs so they don’t stack up by
>> implementing flock in the jobs and changing how teardowns of the spark
>> cluster work as far as failed workers.
>>
>> Thanks again,
>> —Ken
>>
>> On Mar 26, 2016, at 4:08 PM, Sven Krasser <kras...@gmail.com> wrote:
>>
>> My understanding is that the spark.executor.cores setting controls the
>> number of worker threads in the executor in the JVM. Each worker thread
>> communicates then with a pyspark daemon process (these are not threads) to
>> stream data into Python. There should be one daemon process per worker
>> thread (but as I mentioned I sometimes see a low multiple).
>>
>> Your 4GB limit for Python is fairly high, that means even for 12 workers
>> you're looking at a max of 48GB (and it goes frequently beyond that). You
>> will be better off using a lower number there and instead increasing the
>> parallelism of your job (i.e. dividing the job into more and smaller
>> partitions).
>>
>> On Sat, Mar 26, 2016 at 7:10 AM, Carlile, Ken <carli...@janelia.hhmi.org>
>>  wrote:
>>
>>> Thanks, Sven!
>>>
>>> I know that I’ve messed up the memory allocation, but I’m trying not to
>>> think too much about that (because I’ve advertised it to my users as “90GB
>>> for Spark works!” and that’s how it displays in the Spark UI (totally
>>> ignoring the python processes). So I’ll need to deal with that at some
>>> point… esp since I’ve set the max python memory usage to 4GB to work around
>>> other issues!
>>>
>>> The load issue comes in because we have a lot of background cron jobs
>>> (mostly to clean up after spark…), and those will stack up behind the high
>>> load and keep stacking until the whole thing comes crashing down. I will
>>> look into how to avoid this stacking, as I think one of my predecessors had
>>> a way, but that’s why the high load nukes the nodes. I don’t have the
>>> spark.executor.cores set, but will setting that to say, 12 limit the
>>> pyspark threads, or will it just limit the jvm threads?
>>>
>>> Thanks!
>>> Ken
>>>
>>> On Mar 25, 2016, at 9:10 PM, Sven Krasser <kras...@gmail.com> wrote:
>>>
>>> Hey Ken,
>>>
>>> I also frequently see more pyspark daemons than configured concurrency,
>>> often it's a low multiple. (There was an issue pre-1.3.0 that caused this
>>> to be quite a bit higher, so make sure you at least have a recent version;
>>> see SPARK-5395.)
>>>
>>> Each pyspark daemon tries to stay below the configured memory limit
>>> during aggregation (which is separate from the JVM heap as you note). Since
>>> the number of daemons can be high and the memory limit is per daemon (each
>>> daemon is actually a process and not a thread and therefore has its own
>>> memory it tracks against the configured per-worker limit), I found memory
>>> depletion to be the main source of pyspark problems on larger data sets.
>>> Also, as Sea already noted the memory limit is not firm and individual
>>> daemons can grow larger.
>>>
>>> With that said, a run queue of 25 on a 16 core machine does not sound
>>> great but also not awful enough to knock it offline. I suspect something
>>> else may be going on. If you want to limit the amount of work running
>>> concurrently, try reducing spark.executor.cores (under normal circumstances
>>> this would leave parts of your resources underutilized).
>>>
>>> Hope this helps!
>>> -Sven
>>>
>>>
>>> On Fri, Mar 25, 2016 at 10:41 AM, Carlile, Ken <
>>> carli...@janelia.hhmi.org> wrote:
>>>
>>>> Further data on this.
>>>> I’m watching another job right now where there are 16 pyspark.daemon
>>>> threads, all of which are trying to get a full core (remember, this is a 16
>>>> core machine). Unfortunately , the java process actually running the spark
>>>> worker is trying to take several cores of its own, driving the load up. I’m
>>>> hoping someone has seen something like this.
>>>>
>>>> —Ken
>>>>
>>>> On Mar 21, 2016, at 3:07 PM, Carlile, Ken <carli...@janelia.hhmi.org>
>>>> wrote:
>>>>
>>>> No further input on this? I discovered today that the pyspark.daemon
>>>> threadcount was actually 48, which makes a little more sense (at least it’s
>>>> a multiple of 16), and it seems to be happening at reduce and collect
>>>> portions of the code.
>>>>
>>>> —Ken
>>>>
>>>> On Mar 17, 2016, at 10:51 AM, Carlile, Ken <carli...@janelia.hhmi.org>
>>>> wrote:
>>>>
>>>> Thanks! I found that part just after I sent the email… whoops. I’m
>>>> guessing that’s not an issue for my users, since it’s been set that way for
>>>> a couple of years now.
>>>>
>>>> The thread count is definitely an issue, though, since if enough nodes
>>>> go down, they can’t schedule their spark clusters.
>>>>
>>>> —Ken
>>>>
>>>> On Mar 17, 2016, at 10:50 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>> I took a look at docs/configuration.md
>>>> Though I didn't find answer for your first question, I think the
>>>> following pertains to your second question:
>>>>
>>>> <tr>
>>>>   <td><code>spark.python.worker.memory</code></td>
>>>>   <td>512m</td>
>>>>   <td>
>>>>     Amount of memory to use per python worker process during
>>>> aggregation, in the same
>>>>     format as JVM memory strings (e.g. <code>512m</code>,
>>>> <code>2g</code>). If the memory
>>>>     used during aggregation goes above this amount, it will spill the
>>>> data into disks.
>>>>   </td>
>>>> </tr>
>>>>
>>>> On Thu, Mar 17, 2016 at 7:43 AM, Carlile, Ken <
>>>> carli...@janelia.hhmi.org> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> We have an HPC cluster that we run Spark jobs on using standalone mode
>>>>> and a number of scripts I’ve built up to dynamically schedule and start
>>>>> spark clusters within the Grid Engine framework. Nodes in the cluster have
>>>>> 16 cores and 128GB of RAM.
>>>>>
>>>>> My users use pyspark heavily. We’ve been having a number of problems
>>>>> with nodes going offline with extraordinarily high load. I was able to 
>>>>> look
>>>>> at one of those nodes today before it went truly sideways, and I 
>>>>> discovered
>>>>> that the user was running 50 pyspark.daemon threads (remember, this is a 
>>>>> 16
>>>>> core box), and the load was somewhere around 25 or so, with all CPUs maxed
>>>>> out at 100%.
>>>>>
>>>>> So while the spark worker is aware it’s only got 16 cores and behaves
>>>>> accordingly, pyspark seems to be happy to overrun everything like crazy. 
>>>>> Is
>>>>> there a global parameter I can use to limit pyspark threads to a sane
>>>>> number, say 15 or 16? It would also be interesting to set a memory limit,
>>>>> which leads to another question.
>>>>>
>>>>> How is memory managed when pyspark is used? I have the spark worker
>>>>> memory set to 90GB, and there is 8GB of system overhead (GPFS caching), so
>>>>> if pyspark operates outside of the JVM memory pool, that leaves it at most
>>>>> 30GB to play with, assuming there is no overhead outside the JVM’s 90GB
>>>>> heap (ha ha.)
>>>>>
>>>>> Thanks,
>>>>> Ken Carlile
>>>>> Sr. Unix Engineer
>>>>> HHMI/Janelia Research Campus
>>>>> 571-209-4363
>>>>>
>>>>>
>>>>
>>>> Т���������������������������������������������������������������������ХF�
>>>> V�7V'67&�&R� R�� �â W6W"�V�7V'67&�&T 7 &�� 6�R��&pФf�" FF�F��� � 6��� �G2�
>>>> R�� �â W6W"ֆV� 7 &�� 6�R��&pР
>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
>>>> additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> www.skrasser.com <http://www.skrasser.com/?utm_source=sig>
>>>
>>>
>>>
>>
>>
>> --
>> www.skrasser.com <http://www.skrasser.com/?utm_source=sig>
>>
>>
>>
>
>
> --
> www.skrasser.com <http://www.skrasser.com/?utm_source=sig>
>

Reply via email to