Re: Spark on Mesos / Executor Memory
To be precise, the MesosExecutorBackend's Xms & Xmx equal spark.executor.memory. So there's no question of expanding or contracting the memory held by the executor. On Sat, Oct 17, 2015 at 5:38 PM, Bharath Ravi Kumar wrote: > David, Tom, > > Thanks for the explanation. This confirms my suspicion that the executor > was holding on to memory regardless of tasks in execution once it expands > to occupy memory in keeping with spark.executor.memory. There certainly is > scope for improvement here, though I realize there will substantial > overheads in implementing memory release without compromising RDD caching > and similar aspects. I'll explore alternatives / workarounds meanwhile. > > Thanks, > Bharath > > > > On Sat, Oct 17, 2015 at 3:33 PM, Tom Arnfeld wrote: > >> Hi Bharath, >> >> When running jobs in fine grained mode, each Spark task is sent to mesos >> as a task which allows the offers system to maintain fairness between >> different spark application (as you've described). Having said that, unless >> your memory per-node is hugely undersubscribed when running these jobs in >> parallel. This behaviour matches exactly what you've described. >> >> What you're seeing happens because even though there's a new mesos task >> for each Spark task (allowing CPU to be shared) the Spark executors don't >> get killed even when they aren't doing any work, which means the memory >> isn't released. The JVM doesn't allow for flexible memory re-allocation (as >> far as i'm aware) which make it impossible for spark to dynamically scale >> up the memory of the executor over time as tasks start and finish. >> >> As Dave pointed out, the simplest way to solve this is to use a higher >> level tool that can run your spark jobs through one mesos framework and >> then you can let spark distribute the resources more effectively. >> >> I hope that helps! >> >> Tom. >> >> On 17 Oct 2015, at 06:47, Bharath Ravi Kumar wrote: >> >> Can someone respond if you're aware of the reason for such a memory >> footprint? It seems unintuitive and hard to reason about. >> >> Thanks, >> Bharath >> >> On Thu, Oct 15, 2015 at 12:29 PM, Bharath Ravi Kumar > > wrote: >> >>> Resending since user@mesos bounced earlier. My apologies. >>> >>> On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar < >>> reachb...@gmail.com> wrote: >>> (Reviving this thread since I ran into similar issues...) I'm running two spark jobs (in mesos fine grained mode), each belonging to a different mesos role, say low and high. The low:high mesos weights are 1:10. On expected lines, I see that the low priority job occupies cluster resources to the maximum extent when running alone. However, when the high priority job is submitted, it does not start and continues to await cluster resources (as seen in the logs). Since the jobs run in fine grained mode and the low priority tasks begin to finish, the high priority job should ideally be able to start and gradually take over cluster resources as per the weights. However, I noticed that while the "low" job gives up CPU cores with each completing task (e.g. reduction from 72 -> 12 with default parallelism set to 72), the memory resources are held on (~500G out of 768G). The spark.executor.memory setting appears to directly impact the amount of memory that the job holds on to. In this case, it was set to 200G in the low priority task and 100G in the high priority task. The nature of these jobs is such that setting the numbers to smaller values (say 32g) resulted in job failures with outofmemoryerror. It appears that the spark framework is retaining memory (across tasks) proportional to spark.executor.memory for the duration of the job and not releasing memory as tasks complete. This defeats the purpose of fine grained mode execution as the memory occupancy is preventing the high priority job from accepting the prioritized cpu offers and beginning execution. Can this be explained / documented better please? Thanks, Bharath On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen wrote: > (Adding spark user list) > > Hi Tom, > > If I understand correctly you're saying that you're running into > memory problems because the scheduler is allocating too much CPUs and not > enough memory to acoomodate them right? > > In the case of fine grain mode I don't think that's a problem since we > have a fixed amount of CPU and memory per task. > However, in coarse grain you can run into that problem if you're with > in the spark.cores.max limit, and memory is a fixed number. > > I have a patch out to configure how much max cpus should coarse grain > executor use, and it also allows multiple executors in coarse grain mode. > So you could say try to launch multiples of max 4 cores with > spark.executor.memory (+ overhead and etc) in a slave.
Re: Spark on Mesos / Executor Memory
David, Tom, Thanks for the explanation. This confirms my suspicion that the executor was holding on to memory regardless of tasks in execution once it expands to occupy memory in keeping with spark.executor.memory. There certainly is scope for improvement here, though I realize there will substantial overheads in implementing memory release without compromising RDD caching and similar aspects. I'll explore alternatives / workarounds meanwhile. Thanks, Bharath On Sat, Oct 17, 2015 at 3:33 PM, Tom Arnfeld wrote: > Hi Bharath, > > When running jobs in fine grained mode, each Spark task is sent to mesos > as a task which allows the offers system to maintain fairness between > different spark application (as you've described). Having said that, unless > your memory per-node is hugely undersubscribed when running these jobs in > parallel. This behaviour matches exactly what you've described. > > What you're seeing happens because even though there's a new mesos task > for each Spark task (allowing CPU to be shared) the Spark executors don't > get killed even when they aren't doing any work, which means the memory > isn't released. The JVM doesn't allow for flexible memory re-allocation (as > far as i'm aware) which make it impossible for spark to dynamically scale > up the memory of the executor over time as tasks start and finish. > > As Dave pointed out, the simplest way to solve this is to use a higher > level tool that can run your spark jobs through one mesos framework and > then you can let spark distribute the resources more effectively. > > I hope that helps! > > Tom. > > On 17 Oct 2015, at 06:47, Bharath Ravi Kumar wrote: > > Can someone respond if you're aware of the reason for such a memory > footprint? It seems unintuitive and hard to reason about. > > Thanks, > Bharath > > On Thu, Oct 15, 2015 at 12:29 PM, Bharath Ravi Kumar > wrote: > >> Resending since user@mesos bounced earlier. My apologies. >> >> On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar > > wrote: >> >>> (Reviving this thread since I ran into similar issues...) >>> >>> I'm running two spark jobs (in mesos fine grained mode), each belonging >>> to a different mesos role, say low and high. The low:high mesos weights are >>> 1:10. On expected lines, I see that the low priority job occupies cluster >>> resources to the maximum extent when running alone. However, when the high >>> priority job is submitted, it does not start and continues to await cluster >>> resources (as seen in the logs). Since the jobs run in fine grained mode >>> and the low priority tasks begin to finish, the high priority job should >>> ideally be able to start and gradually take over cluster resources as per >>> the weights. However, I noticed that while the "low" job gives up CPU cores >>> with each completing task (e.g. reduction from 72 -> 12 with default >>> parallelism set to 72), the memory resources are held on (~500G out of >>> 768G). The spark.executor.memory setting appears to directly impact the >>> amount of memory that the job holds on to. In this case, it was set to 200G >>> in the low priority task and 100G in the high priority task. The nature of >>> these jobs is such that setting the numbers to smaller values (say 32g) >>> resulted in job failures with outofmemoryerror. It appears that the spark >>> framework is retaining memory (across tasks) proportional to >>> spark.executor.memory for the duration of the job and not releasing memory >>> as tasks complete. This defeats the purpose of fine grained mode execution >>> as the memory occupancy is preventing the high priority job from accepting >>> the prioritized cpu offers and beginning execution. Can this be explained / >>> documented better please? >>> >>> Thanks, >>> Bharath >>> >>> On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen wrote: >>> (Adding spark user list) Hi Tom, If I understand correctly you're saying that you're running into memory problems because the scheduler is allocating too much CPUs and not enough memory to acoomodate them right? In the case of fine grain mode I don't think that's a problem since we have a fixed amount of CPU and memory per task. However, in coarse grain you can run into that problem if you're with in the spark.cores.max limit, and memory is a fixed number. I have a patch out to configure how much max cpus should coarse grain executor use, and it also allows multiple executors in coarse grain mode. So you could say try to launch multiples of max 4 cores with spark.executor.memory (+ overhead and etc) in a slave. ( https://github.com/apache/spark/pull/4027) It also might be interesting to include a cores to memory multiplier so that with a larger amount of cores we try to scale the memory with some factor, but I'm not entirely sure that's intuitive to use and what people know what to set it to, as that can likely change with different work
Re: Spark on Mesos / Executor Memory
Can someone respond if you're aware of the reason for such a memory footprint? It seems unintuitive and hard to reason about. Thanks, Bharath On Thu, Oct 15, 2015 at 12:29 PM, Bharath Ravi Kumar wrote: > Resending since user@mesos bounced earlier. My apologies. > > On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar > wrote: > >> (Reviving this thread since I ran into similar issues...) >> >> I'm running two spark jobs (in mesos fine grained mode), each belonging >> to a different mesos role, say low and high. The low:high mesos weights are >> 1:10. On expected lines, I see that the low priority job occupies cluster >> resources to the maximum extent when running alone. However, when the high >> priority job is submitted, it does not start and continues to await cluster >> resources (as seen in the logs). Since the jobs run in fine grained mode >> and the low priority tasks begin to finish, the high priority job should >> ideally be able to start and gradually take over cluster resources as per >> the weights. However, I noticed that while the "low" job gives up CPU cores >> with each completing task (e.g. reduction from 72 -> 12 with default >> parallelism set to 72), the memory resources are held on (~500G out of >> 768G). The spark.executor.memory setting appears to directly impact the >> amount of memory that the job holds on to. In this case, it was set to 200G >> in the low priority task and 100G in the high priority task. The nature of >> these jobs is such that setting the numbers to smaller values (say 32g) >> resulted in job failures with outofmemoryerror. It appears that the spark >> framework is retaining memory (across tasks) proportional to >> spark.executor.memory for the duration of the job and not releasing memory >> as tasks complete. This defeats the purpose of fine grained mode execution >> as the memory occupancy is preventing the high priority job from accepting >> the prioritized cpu offers and beginning execution. Can this be explained / >> documented better please? >> >> Thanks, >> Bharath >> >> On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen wrote: >> >>> (Adding spark user list) >>> >>> Hi Tom, >>> >>> If I understand correctly you're saying that you're running into memory >>> problems because the scheduler is allocating too much CPUs and not enough >>> memory to acoomodate them right? >>> >>> In the case of fine grain mode I don't think that's a problem since we >>> have a fixed amount of CPU and memory per task. >>> However, in coarse grain you can run into that problem if you're with in >>> the spark.cores.max limit, and memory is a fixed number. >>> >>> I have a patch out to configure how much max cpus should coarse grain >>> executor use, and it also allows multiple executors in coarse grain mode. >>> So you could say try to launch multiples of max 4 cores with >>> spark.executor.memory (+ overhead and etc) in a slave. ( >>> https://github.com/apache/spark/pull/4027) >>> >>> It also might be interesting to include a cores to memory multiplier so >>> that with a larger amount of cores we try to scale the memory with some >>> factor, but I'm not entirely sure that's intuitive to use and what people >>> know what to set it to, as that can likely change with different workload. >>> >>> Tim >>> >>> >>> >>> >>> >>> >>> >>> On Sat, Apr 11, 2015 at 9:51 AM, Tom Arnfeld wrote: >>> We're running Spark 1.3.0 (with a couple of patches over the top for docker related bits). I don't think SPARK-4158 is related to what we're seeing, things do run fine on the cluster, given a ridiculously large executor memory configuration. As for SPARK-3535 although that looks useful I think we'e seeing something else. Put a different way, the amount of memory required at any given time by the spark JVM process is directly proportional to the amount of CPU it has, because more CPU means more tasks and more tasks means more memory. Even if we're using coarse mode, the amount of executor memory should be proportionate to the amount of CPUs in the offer. On 11 April 2015 at 17:39, Brenden Matthews wrote: > I ran into some issues with it a while ago, and submitted a couple PRs > to fix it: > > https://github.com/apache/spark/pull/2401 > https://github.com/apache/spark/pull/3024 > > Do these look relevant? What version of Spark are you running? > > On Sat, Apr 11, 2015 at 9:33 AM, Tom Arnfeld wrote: > >> Hey, >> >> Not sure whether it's best to ask this on the spark mailing list or >> the mesos one, so I'll try here first :-) >> >> I'm having a bit of trouble with out of memory errors in my spark >> jobs... it seems fairly odd to me that memory resources can only be set >> at >> the executor level, and not also at the task level. For example, as far >> as >> I can tell there's only a *spark.executor.memory* config option. >> >>
Re: Spark on Mesos / Executor Memory
Resending since user@mesos bounced earlier. My apologies. On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar wrote: > (Reviving this thread since I ran into similar issues...) > > I'm running two spark jobs (in mesos fine grained mode), each belonging to > a different mesos role, say low and high. The low:high mesos weights are > 1:10. On expected lines, I see that the low priority job occupies cluster > resources to the maximum extent when running alone. However, when the high > priority job is submitted, it does not start and continues to await cluster > resources (as seen in the logs). Since the jobs run in fine grained mode > and the low priority tasks begin to finish, the high priority job should > ideally be able to start and gradually take over cluster resources as per > the weights. However, I noticed that while the "low" job gives up CPU cores > with each completing task (e.g. reduction from 72 -> 12 with default > parallelism set to 72), the memory resources are held on (~500G out of > 768G). The spark.executor.memory setting appears to directly impact the > amount of memory that the job holds on to. In this case, it was set to 200G > in the low priority task and 100G in the high priority task. The nature of > these jobs is such that setting the numbers to smaller values (say 32g) > resulted in job failures with outofmemoryerror. It appears that the spark > framework is retaining memory (across tasks) proportional to > spark.executor.memory for the duration of the job and not releasing memory > as tasks complete. This defeats the purpose of fine grained mode execution > as the memory occupancy is preventing the high priority job from accepting > the prioritized cpu offers and beginning execution. Can this be explained / > documented better please? > > Thanks, > Bharath > > On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen wrote: > >> (Adding spark user list) >> >> Hi Tom, >> >> If I understand correctly you're saying that you're running into memory >> problems because the scheduler is allocating too much CPUs and not enough >> memory to acoomodate them right? >> >> In the case of fine grain mode I don't think that's a problem since we >> have a fixed amount of CPU and memory per task. >> However, in coarse grain you can run into that problem if you're with in >> the spark.cores.max limit, and memory is a fixed number. >> >> I have a patch out to configure how much max cpus should coarse grain >> executor use, and it also allows multiple executors in coarse grain mode. >> So you could say try to launch multiples of max 4 cores with >> spark.executor.memory (+ overhead and etc) in a slave. ( >> https://github.com/apache/spark/pull/4027) >> >> It also might be interesting to include a cores to memory multiplier so >> that with a larger amount of cores we try to scale the memory with some >> factor, but I'm not entirely sure that's intuitive to use and what people >> know what to set it to, as that can likely change with different workload. >> >> Tim >> >> >> >> >> >> >> >> On Sat, Apr 11, 2015 at 9:51 AM, Tom Arnfeld wrote: >> >>> We're running Spark 1.3.0 (with a couple of patches over the top for >>> docker related bits). >>> >>> I don't think SPARK-4158 is related to what we're seeing, things do run >>> fine on the cluster, given a ridiculously large executor memory >>> configuration. As for SPARK-3535 although that looks useful I think we'e >>> seeing something else. >>> >>> Put a different way, the amount of memory required at any given time by >>> the spark JVM process is directly proportional to the amount of CPU it has, >>> because more CPU means more tasks and more tasks means more memory. Even if >>> we're using coarse mode, the amount of executor memory should be >>> proportionate to the amount of CPUs in the offer. >>> >>> On 11 April 2015 at 17:39, Brenden Matthews >>> wrote: >>> I ran into some issues with it a while ago, and submitted a couple PRs to fix it: https://github.com/apache/spark/pull/2401 https://github.com/apache/spark/pull/3024 Do these look relevant? What version of Spark are you running? On Sat, Apr 11, 2015 at 9:33 AM, Tom Arnfeld wrote: > Hey, > > Not sure whether it's best to ask this on the spark mailing list or > the mesos one, so I'll try here first :-) > > I'm having a bit of trouble with out of memory errors in my spark > jobs... it seems fairly odd to me that memory resources can only be set at > the executor level, and not also at the task level. For example, as far as > I can tell there's only a *spark.executor.memory* config option. > > Surely the memory requirements of a single executor are quite > dramatically influenced by the number of concurrent tasks running? Given a > shared cluster, I have no idea what % of an individual slave my executor > is > going to get, so I basically have to set the executor memory to a value > that's correct
Re: Spark on Mesos / Executor Memory
(Reviving this thread since I ran into similar issues...) I'm running two spark jobs (in mesos fine grained mode), each belonging to a different mesos role, say low and high. The low:high mesos weights are 1:10. On expected lines, I see that the low priority job occupies cluster resources to the maximum extent when running alone. However, when the high priority job is submitted, it does not start and continues to await cluster resources (as seen in the logs). Since the jobs run in fine grained mode and the low priority tasks begin to finish, the high priority job should ideally be able to start and gradually take over cluster resources as per the weights. However, I noticed that while the "low" job gives up CPU cores with each completing task (e.g. reduction from 72 -> 12 with default parallelism set to 72), the memory resources are held on (~500G out of 768G). The spark.executor.memory setting appears to directly impact the amount of memory that the job holds on to. In this case, it was set to 200G in the low priority task and 100G in the high priority task. The nature of these jobs is such that setting the numbers to smaller values (say 32g) resulted in job failures with outofmemoryerror. It appears that the spark framework is retaining memory (across tasks) proportional to spark.executor.memory for the duration of the job and not releasing memory as tasks complete. This defeats the purpose of fine grained mode execution as the memory occupancy is preventing the high priority job from accepting the prioritized cpu offers and beginning execution. Can this be explained / documented better please? Thanks, Bharath On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen wrote: > (Adding spark user list) > > Hi Tom, > > If I understand correctly you're saying that you're running into memory > problems because the scheduler is allocating too much CPUs and not enough > memory to acoomodate them right? > > In the case of fine grain mode I don't think that's a problem since we > have a fixed amount of CPU and memory per task. > However, in coarse grain you can run into that problem if you're with in > the spark.cores.max limit, and memory is a fixed number. > > I have a patch out to configure how much max cpus should coarse grain > executor use, and it also allows multiple executors in coarse grain mode. > So you could say try to launch multiples of max 4 cores with > spark.executor.memory (+ overhead and etc) in a slave. ( > https://github.com/apache/spark/pull/4027) > > It also might be interesting to include a cores to memory multiplier so > that with a larger amount of cores we try to scale the memory with some > factor, but I'm not entirely sure that's intuitive to use and what people > know what to set it to, as that can likely change with different workload. > > Tim > > > > > > > > On Sat, Apr 11, 2015 at 9:51 AM, Tom Arnfeld wrote: > >> We're running Spark 1.3.0 (with a couple of patches over the top for >> docker related bits). >> >> I don't think SPARK-4158 is related to what we're seeing, things do run >> fine on the cluster, given a ridiculously large executor memory >> configuration. As for SPARK-3535 although that looks useful I think we'e >> seeing something else. >> >> Put a different way, the amount of memory required at any given time by >> the spark JVM process is directly proportional to the amount of CPU it has, >> because more CPU means more tasks and more tasks means more memory. Even if >> we're using coarse mode, the amount of executor memory should be >> proportionate to the amount of CPUs in the offer. >> >> On 11 April 2015 at 17:39, Brenden Matthews wrote: >> >>> I ran into some issues with it a while ago, and submitted a couple PRs >>> to fix it: >>> >>> https://github.com/apache/spark/pull/2401 >>> https://github.com/apache/spark/pull/3024 >>> >>> Do these look relevant? What version of Spark are you running? >>> >>> On Sat, Apr 11, 2015 at 9:33 AM, Tom Arnfeld wrote: >>> Hey, Not sure whether it's best to ask this on the spark mailing list or the mesos one, so I'll try here first :-) I'm having a bit of trouble with out of memory errors in my spark jobs... it seems fairly odd to me that memory resources can only be set at the executor level, and not also at the task level. For example, as far as I can tell there's only a *spark.executor.memory* config option. Surely the memory requirements of a single executor are quite dramatically influenced by the number of concurrent tasks running? Given a shared cluster, I have no idea what % of an individual slave my executor is going to get, so I basically have to set the executor memory to a value that's correct when the whole machine is in use... Has anyone else running Spark on Mesos come across this, or maybe someone could correct my understanding of the config options? Thanks! Tom. >>> >>> >> >
Re: Spark on Mesos / Executor Memory
(Adding spark user list) Hi Tom, If I understand correctly you're saying that you're running into memory problems because the scheduler is allocating too much CPUs and not enough memory to acoomodate them right? In the case of fine grain mode I don't think that's a problem since we have a fixed amount of CPU and memory per task. However, in coarse grain you can run into that problem if you're with in the spark.cores.max limit, and memory is a fixed number. I have a patch out to configure how much max cpus should coarse grain executor use, and it also allows multiple executors in coarse grain mode. So you could say try to launch multiples of max 4 cores with spark.executor.memory (+ overhead and etc) in a slave. ( https://github.com/apache/spark/pull/4027) It also might be interesting to include a cores to memory multiplier so that with a larger amount of cores we try to scale the memory with some factor, but I'm not entirely sure that's intuitive to use and what people know what to set it to, as that can likely change with different workload. Tim On Sat, Apr 11, 2015 at 9:51 AM, Tom Arnfeld wrote: > We're running Spark 1.3.0 (with a couple of patches over the top for > docker related bits). > > I don't think SPARK-4158 is related to what we're seeing, things do run > fine on the cluster, given a ridiculously large executor memory > configuration. As for SPARK-3535 although that looks useful I think we'e > seeing something else. > > Put a different way, the amount of memory required at any given time by > the spark JVM process is directly proportional to the amount of CPU it has, > because more CPU means more tasks and more tasks means more memory. Even if > we're using coarse mode, the amount of executor memory should be > proportionate to the amount of CPUs in the offer. > > On 11 April 2015 at 17:39, Brenden Matthews wrote: > >> I ran into some issues with it a while ago, and submitted a couple PRs to >> fix it: >> >> https://github.com/apache/spark/pull/2401 >> https://github.com/apache/spark/pull/3024 >> >> Do these look relevant? What version of Spark are you running? >> >> On Sat, Apr 11, 2015 at 9:33 AM, Tom Arnfeld wrote: >> >>> Hey, >>> >>> Not sure whether it's best to ask this on the spark mailing list or the >>> mesos one, so I'll try here first :-) >>> >>> I'm having a bit of trouble with out of memory errors in my spark >>> jobs... it seems fairly odd to me that memory resources can only be set at >>> the executor level, and not also at the task level. For example, as far as >>> I can tell there's only a *spark.executor.memory* config option. >>> >>> Surely the memory requirements of a single executor are quite >>> dramatically influenced by the number of concurrent tasks running? Given a >>> shared cluster, I have no idea what % of an individual slave my executor is >>> going to get, so I basically have to set the executor memory to a value >>> that's correct when the whole machine is in use... >>> >>> Has anyone else running Spark on Mesos come across this, or maybe >>> someone could correct my understanding of the config options? >>> >>> Thanks! >>> >>> Tom. >>> >> >> >