As Sven mentioned, you can use Alluxio to store RDDs in off-heap memory,
and you can then share that RDD across different jobs. If you would like to
run Spark on Alluxio, this documentation can help:
http://www.alluxio.org/documentation/master/en/Running-Spark-on-Alluxio.html

Thanks,
Gene

On Tue, Jun 14, 2016 at 12:44 AM, agateaaa <agate...@gmail.com> wrote:

> Hi,
>
> I am seeing this issue too with pyspark (Using Spark 1.6.1).  I have set
> spark.executor.cores to 1, but I see that whenever streaming batch starts
> processing data, see python -m pyspark.daemon processes increase gradually
> to about 5, (increasing CPU% on a box about 4-5 times, each pyspark.daemon
> takes up around 100 % CPU)
>
> After the processing is done 4 pyspark.daemon processes go away and we are
> left with one till the next batch run. Also sometimes the  CPU usage for
> executor process spikes to about 800% even though spark.executor.core is
> set to 1
>
> e.g. top output
> PID USER      PR   NI  VIRT  RES  SHR S       %CPU %MEM    TIME+  COMMAND
> 19634 spark     20   0 8871420 1.790g  32056 S 814.1  2.9   0:39.33
> /usr/lib/j+ <--EXECUTOR
>
> 13897 spark     20   0   46576  17916   6720 S   100.0  0.0   0:00.17
> python -m + <--pyspark.daemon
> 13991 spark     20   0   46524  15572   4124 S   98.0  0.0   0:08.18
> python -m + <--pyspark.daemon
> 14488 spark     20   0   46524  15636   4188 S   98.0  0.0   0:07.25
> python -m + <--pyspark.daemon
> 14514 spark     20   0   46524  15636   4188 S   94.0  0.0   0:06.72
> python -m + <--pyspark.daemon
> 14526 spark     20   0   48200  17172   4092 S   0.0  0.0   0:00.38 python
> -m + <--pyspark.daemon
>
>
>
> Is there any way to control the number of pyspark.daemon processes that
> get spawned ?
>
> Thank you
> Agateaaa
>
> On Sun, Mar 27, 2016 at 1:08 AM, Sven Krasser <kras...@gmail.com> wrote:
>
>> Hey Ken,
>>
>> 1. You're correct, cached RDDs live on the JVM heap. (There's an off-heap
>> storage option using Alluxio, formerly Tachyon, with which I have no
>> experience however.)
>>
>> 2. The worker memory setting is not a hard maximum unfortunately. What
>> happens is that during aggregation the Python daemon will check its process
>> size. If the size is larger than this setting, it will start spilling to
>> disk. I've seen many occasions where my daemons grew larger. Also, you're
>> relying on Python's memory management to free up space again once objects
>> are evicted. In practice, leave this setting reasonably small but make sure
>> there's enough free memory on the machine so you don't run into OOM
>> conditions. If the lower memory setting causes strains for your users, make
>> sure they increase the parallelism of their jobs (smaller partitions
>> meaning less data is processed at a time).
>>
>> 3. I believe that is the behavior you can expect when setting
>> spark.executor.cores. I've not experimented much with it and haven't looked
>> at that part of the code, but what you describe also reflects my
>> understanding. Please share your findings here, I'm sure those will be very
>> helpful to others, too.
>>
>> One more suggestion for your users is to move to the Pyspark DataFrame
>> API. Much of the processing will then happen in the JVM, and you will bump
>> into fewer Python resource contention issues.
>>
>> Best,
>> -Sven
>>
>>
>> On Sat, Mar 26, 2016 at 1:38 PM, Carlile, Ken <carli...@janelia.hhmi.org>
>> wrote:
>>
>>> This is extremely helpful!
>>>
>>> I’ll have to talk to my users about how the python memory limit should
>>> be adjusted and what their expectations are. I’m fairly certain we bumped
>>> it up in the dark past when jobs were failing because of insufficient
>>> memory for the python processes.
>>>
>>> So just to make sure I’m understanding correctly:
>>>
>>>
>>>    - JVM memory (set by SPARK_EXECUTOR_MEMORY and/or
>>>    SPARK_WORKER_MEMORY?) is where the RDDs are stored. Currently both of 
>>> those
>>>    values are set to 90GB
>>>    - spark.python.worker.memory controls how much RAM each python task
>>>    can take maximum (roughly speaking. Currently set to 4GB
>>>    - spark.task.cpus controls how many java worker threads will exist
>>>    and thus indirectly how many pyspark daemon processes will exist
>>>
>>>
>>> I’m also looking into fixing my cron jobs so they don’t stack up by
>>> implementing flock in the jobs and changing how teardowns of the spark
>>> cluster work as far as failed workers.
>>>
>>> Thanks again,
>>> —Ken
>>>
>>> On Mar 26, 2016, at 4:08 PM, Sven Krasser <kras...@gmail.com> wrote:
>>>
>>> My understanding is that the spark.executor.cores setting controls the
>>> number of worker threads in the executor in the JVM. Each worker thread
>>> communicates then with a pyspark daemon process (these are not threads) to
>>> stream data into Python. There should be one daemon process per worker
>>> thread (but as I mentioned I sometimes see a low multiple).
>>>
>>> Your 4GB limit for Python is fairly high, that means even for 12 workers
>>> you're looking at a max of 48GB (and it goes frequently beyond that). You
>>> will be better off using a lower number there and instead increasing the
>>> parallelism of your job (i.e. dividing the job into more and smaller
>>> partitions).
>>>
>>> On Sat, Mar 26, 2016 at 7:10 AM, Carlile, Ken <carli...@janelia.hhmi.org
>>> > wrote:
>>>
>>>> Thanks, Sven!
>>>>
>>>> I know that I’ve messed up the memory allocation, but I’m trying not to
>>>> think too much about that (because I’ve advertised it to my users as “90GB
>>>> for Spark works!” and that’s how it displays in the Spark UI (totally
>>>> ignoring the python processes). So I’ll need to deal with that at some
>>>> point… esp since I’ve set the max python memory usage to 4GB to work around
>>>> other issues!
>>>>
>>>> The load issue comes in because we have a lot of background cron jobs
>>>> (mostly to clean up after spark…), and those will stack up behind the high
>>>> load and keep stacking until the whole thing comes crashing down. I will
>>>> look into how to avoid this stacking, as I think one of my predecessors had
>>>> a way, but that’s why the high load nukes the nodes. I don’t have the
>>>> spark.executor.cores set, but will setting that to say, 12 limit the
>>>> pyspark threads, or will it just limit the jvm threads?
>>>>
>>>> Thanks!
>>>> Ken
>>>>
>>>> On Mar 25, 2016, at 9:10 PM, Sven Krasser <kras...@gmail.com> wrote:
>>>>
>>>> Hey Ken,
>>>>
>>>> I also frequently see more pyspark daemons than configured concurrency,
>>>> often it's a low multiple. (There was an issue pre-1.3.0 that caused this
>>>> to be quite a bit higher, so make sure you at least have a recent version;
>>>> see SPARK-5395.)
>>>>
>>>> Each pyspark daemon tries to stay below the configured memory limit
>>>> during aggregation (which is separate from the JVM heap as you note). Since
>>>> the number of daemons can be high and the memory limit is per daemon (each
>>>> daemon is actually a process and not a thread and therefore has its own
>>>> memory it tracks against the configured per-worker limit), I found memory
>>>> depletion to be the main source of pyspark problems on larger data sets.
>>>> Also, as Sea already noted the memory limit is not firm and individual
>>>> daemons can grow larger.
>>>>
>>>> With that said, a run queue of 25 on a 16 core machine does not sound
>>>> great but also not awful enough to knock it offline. I suspect something
>>>> else may be going on. If you want to limit the amount of work running
>>>> concurrently, try reducing spark.executor.cores (under normal circumstances
>>>> this would leave parts of your resources underutilized).
>>>>
>>>> Hope this helps!
>>>> -Sven
>>>>
>>>>
>>>> On Fri, Mar 25, 2016 at 10:41 AM, Carlile, Ken <
>>>> carli...@janelia.hhmi.org> wrote:
>>>>
>>>>> Further data on this.
>>>>> I’m watching another job right now where there are 16 pyspark.daemon
>>>>> threads, all of which are trying to get a full core (remember, this is a 
>>>>> 16
>>>>> core machine). Unfortunately , the java process actually running the spark
>>>>> worker is trying to take several cores of its own, driving the load up. 
>>>>> I’m
>>>>> hoping someone has seen something like this.
>>>>>
>>>>> —Ken
>>>>>
>>>>> On Mar 21, 2016, at 3:07 PM, Carlile, Ken <carli...@janelia.hhmi.org>
>>>>> wrote:
>>>>>
>>>>> No further input on this? I discovered today that the pyspark.daemon
>>>>> threadcount was actually 48, which makes a little more sense (at least 
>>>>> it’s
>>>>> a multiple of 16), and it seems to be happening at reduce and collect
>>>>> portions of the code.
>>>>>
>>>>> —Ken
>>>>>
>>>>> On Mar 17, 2016, at 10:51 AM, Carlile, Ken <carli...@janelia.hhmi.org>
>>>>> wrote:
>>>>>
>>>>> Thanks! I found that part just after I sent the email… whoops. I’m
>>>>> guessing that’s not an issue for my users, since it’s been set that way 
>>>>> for
>>>>> a couple of years now.
>>>>>
>>>>> The thread count is definitely an issue, though, since if enough nodes
>>>>> go down, they can’t schedule their spark clusters.
>>>>>
>>>>> —Ken
>>>>>
>>>>> On Mar 17, 2016, at 10:50 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>> I took a look at docs/configuration.md
>>>>> Though I didn't find answer for your first question, I think the
>>>>> following pertains to your second question:
>>>>>
>>>>> <tr>
>>>>>   <td><code>spark.python.worker.memory</code></td>
>>>>>   <td>512m</td>
>>>>>   <td>
>>>>>     Amount of memory to use per python worker process during
>>>>> aggregation, in the same
>>>>>     format as JVM memory strings (e.g. <code>512m</code>,
>>>>> <code>2g</code>). If the memory
>>>>>     used during aggregation goes above this amount, it will spill the
>>>>> data into disks.
>>>>>   </td>
>>>>> </tr>
>>>>>
>>>>> On Thu, Mar 17, 2016 at 7:43 AM, Carlile, Ken <
>>>>> carli...@janelia.hhmi.org> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> We have an HPC cluster that we run Spark jobs on using standalone
>>>>>> mode and a number of scripts I’ve built up to dynamically schedule and
>>>>>> start spark clusters within the Grid Engine framework. Nodes in the 
>>>>>> cluster
>>>>>> have 16 cores and 128GB of RAM.
>>>>>>
>>>>>> My users use pyspark heavily. We’ve been having a number of problems
>>>>>> with nodes going offline with extraordinarily high load. I was able to 
>>>>>> look
>>>>>> at one of those nodes today before it went truly sideways, and I 
>>>>>> discovered
>>>>>> that the user was running 50 pyspark.daemon threads (remember, this is a 
>>>>>> 16
>>>>>> core box), and the load was somewhere around 25 or so, with all CPUs 
>>>>>> maxed
>>>>>> out at 100%.
>>>>>>
>>>>>> So while the spark worker is aware it’s only got 16 cores and behaves
>>>>>> accordingly, pyspark seems to be happy to overrun everything like crazy. 
>>>>>> Is
>>>>>> there a global parameter I can use to limit pyspark threads to a sane
>>>>>> number, say 15 or 16? It would also be interesting to set a memory limit,
>>>>>> which leads to another question.
>>>>>>
>>>>>> How is memory managed when pyspark is used? I have the spark worker
>>>>>> memory set to 90GB, and there is 8GB of system overhead (GPFS caching), 
>>>>>> so
>>>>>> if pyspark operates outside of the JVM memory pool, that leaves it at 
>>>>>> most
>>>>>> 30GB to play with, assuming there is no overhead outside the JVM’s 90GB
>>>>>> heap (ha ha.)
>>>>>>
>>>>>> Thanks,
>>>>>> Ken Carlile
>>>>>> Sr. Unix Engineer
>>>>>> HHMI/Janelia Research Campus
>>>>>> 571-209-4363
>>>>>>
>>>>>>
>>>>>
>>>>> Т���������������������������������������������������������������������ХF�
>>>>> V�7V'67&�&R� R�� �â W6W"�V�7V'67&�&T 7 &�� 6�R��&pФf�" FF�F��� � 6��� �G2�
>>>>> R�� �â W6W"ֆV� 7 &�� 6�R��&pР
>>>>>
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
>>>>> additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> www.skrasser.com <http://www.skrasser.com/?utm_source=sig>
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> www.skrasser.com <http://www.skrasser.com/?utm_source=sig>
>>>
>>>
>>>
>>
>>
>> --
>> www.skrasser.com <http://www.skrasser.com/?utm_source=sig>
>>
>
>

Reply via email to