Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-07 Thread Mich Talebzadeh
Since your Hbase is supported by the external vendor, I would ask them to
justify their choice of storage for Hbase and any suggestion they have
vis-a-vis S3 etc.

Spark has an efficient API to Hbase including remote Hbase. I have used in
the past reading from Hbase.


HTH




   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 7 Apr 2022 at 13:38, Joris Billen 
wrote:

> Thanks for pointing this out.
>
> So currently data is stored in hbase on adls. Question (sorry I might be
> ignorant): is it clear that parquet on s3 would be faster as storage to
> read from than hbase on adls?
> In general, I ve found it hard after my processing is done, if I have an
> application that needs to read all data from hbase (full large tables) to
> get this as fast as possible.
> This read speed is important to me, but it is limited (I think) by the
> time it will take to read the data from the cloud storage (adls).
> You can change some parameters (like regionserver heap , block.cache size,
>  memstore global size) depending on if you are processing/write a lot OR
> reading from hbase. What I would find really useful if one of these
> autoscaling systems could also optimize these parameters depending if youre
> reading or writing.
>
>
>
> Wrt architecture: indeed separate spark from hbase would be best , but I
> never got it to write from an outside spark cluster.  For autoscaling, I
> know there are hbase cloud offerings that have elastic scaling so indeed
> that could be an improvement too.
>
>
>
> ANyhow, fruitful discussion.
>
>
>
>
>
> On 7 Apr 2022, at 13:46, Bjørn Jørgensen  wrote:
>
> "4. S3: I am not using it, but people in the thread started suggesting
> potential solutions involving s3. It is an azure system, so hbase is stored
> on adls. In fact the nature of my application (geospatial stuff) requires
> me to use geomesa libs, which only allows directly writing from spark to
> hbase. So I can not write to some other format (the geomesa API is not
> designed for that-it only writes directly to hbase using the predetermined
> key/values)."
>
> In the docs for geomesa it looks like it can write to files. They say to
> AWS which S3 is a part of and " The quick start comes pre-configured to
> use Apache’s Parquet encoding."
>
> http://www.geomesa.org/documentation/current/tutorials/geomesa-quickstart-fsds.html
> 
>
>
>
> tor. 7. apr. 2022 kl. 13:30 skrev Mich Talebzadeh <
> mich.talebza...@gmail.com>:
>
>> Ok. Your architect has decided to emulate anything on prem to the
>> cloud.You are not really taking any advantages of cloud offerings or
>> scalability. For example, how does your Hadoop clustercater for the
>> increased capacity. Likewise your spark nodes are pigeonholed with your
>> Hadoop nodes.  Old wine in a new bottle :)
>>
>> HTH
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>> 
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 7 Apr 2

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-07 Thread Joris Billen
Thanks for pointing this out.

So currently data is stored in hbase on adls. Question (sorry I might be 
ignorant): is it clear that parquet on s3 would be faster as storage to read 
from than hbase on adls?
In general, I ve found it hard after my processing is done, if I have an 
application that needs to read all data from hbase (full large tables) to get 
this as fast as possible.
This read speed is important to me, but it is limited (I think) by the time it 
will take to read the data from the cloud storage (adls).
You can change some parameters (like regionserver heap , block.cache size,  
memstore global size) depending on if you are processing/write a lot OR reading 
from hbase. What I would find really useful if one of these autoscaling systems 
could also optimize these parameters depending if youre reading or writing.



Wrt architecture: indeed separate spark from hbase would be best , but I never 
got it to write from an outside spark cluster.  For autoscaling, I know there 
are hbase cloud offerings that have elastic scaling so indeed that could be an 
improvement too.



ANyhow, fruitful discussion.





On 7 Apr 2022, at 13:46, Bjørn Jørgensen 
mailto:bjornjorgen...@gmail.com>> wrote:

"4. S3: I am not using it, but people in the thread started suggesting 
potential solutions involving s3. It is an azure system, so hbase is stored on 
adls. In fact the nature of my application (geospatial stuff) requires me to 
use geomesa libs, which only allows directly writing from spark to hbase. So I 
can not write to some other format (the geomesa API is not designed for that-it 
only writes directly to hbase using the predetermined key/values)."

In the docs for geomesa it looks like it can write to files. They say to AWS 
which S3 is a part of and " The quick start comes pre-configured to use 
Apache’s Parquet encoding."
http://www.geomesa.org/documentation/current/tutorials/geomesa-quickstart-fsds.html



tor. 7. apr. 2022 kl. 13:30 skrev Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>:
Ok. Your architect has decided to emulate anything on prem to the cloud.You are 
not really taking any advantages of cloud offerings or scalability. For 
example, how does your Hadoop clustercater for the increased capacity. Likewise 
your spark nodes are pigeonholed with your Hadoop nodes.  Old wine in a new 
bottle :)

HTH

 
[https://docs.google.com/uc?export=download&id=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ&revid=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile

 
https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 7 Apr 2022 at 09:20, Joris Billen 
mailto:joris.bil...@bigindustries.be>> wrote:
Thanks for active discussion and sharing your knowledge :-)


1.Cluster is a managed hadoop cluster on Azure in the cloud. It has hbase, and 
spark, and hdfs shared .
2.Hbase is on the cluster, so not standalone. It comes from an enterprise-level 
template from a commercial vendor, so assuming this is correctly installed.
3.I know that woudl be best to have a spark cluster to do the processing and 
then write to a separate hbase cluster.. but alas :-( somehow we found this to 
be buggy so we have it all on one cluster.
4. S3: I am not using it, but people in the thread started suggesting potential 
solutions involving s3. It is an azure system, so hbase is stored on adl

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-07 Thread Bjørn Jørgensen
"4. S3: I am not using it, but people in the thread started suggesting
potential solutions involving s3. It is an azure system, so hbase is stored
on adls. In fact the nature of my application (geospatial stuff) requires
me to use geomesa libs, which only allows directly writing from spark to
hbase. So I can not write to some other format (the geomesa API is not
designed for that-it only writes directly to hbase using the predetermined
key/values)."

In the docs for geomesa it looks like it can write to files. They say to
AWS which S3 is a part of and " The quick start comes pre-configured to use
Apache’s Parquet encoding."
http://www.geomesa.org/documentation/current/tutorials/geomesa-quickstart-fsds.html



tor. 7. apr. 2022 kl. 13:30 skrev Mich Talebzadeh :

> Ok. Your architect has decided to emulate anything on prem to the
> cloud.You are not really taking any advantages of cloud offerings or
> scalability. For example, how does your Hadoop clustercater for the
> increased capacity. Likewise your spark nodes are pigeonholed with your
> Hadoop nodes.  Old wine in a new bottle :)
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 7 Apr 2022 at 09:20, Joris Billen 
> wrote:
>
>> Thanks for active discussion and sharing your knowledge :-)
>>
>>
>> 1.Cluster is a managed hadoop cluster on Azure in the cloud. It has
>> hbase, and spark, and hdfs shared .
>> 2.Hbase is on the cluster, so not standalone. It comes from an
>> enterprise-level template from a commercial vendor, so assuming this is
>> correctly installed.
>> 3.I know that woudl be best to have a spark cluster to do the processing
>> and then write to a separate hbase cluster.. but alas :-( somehow we found
>> this to be buggy so we have it all on one cluster.
>> 4. S3: I am not using it, but people in the thread started suggesting
>> potential solutions involving s3. It is an azure system, so hbase is stored
>> on adls. In fact the nature of my application (geospatial stuff) requires
>> me to use geomesa libs, which only allows directly writing from spark to
>> hbase. So I can not write to some other format (the geomesa API is not
>> designed for that-it only writes directly to hbase using the predetermined
>> key/values).
>>
>> Forgot to mention: I do unpersist my df that was cached.
>>
>> Nevertheless I think I understand the problem now, this discussion is
>> still interesting!
>> So the root cause is : the hbase region server has memory assigned to it
>> (like 20GB). I see when I start writing from spark to hbase, not much of
>> this is used. I have loops of processing 1 day in spark. For each loop, the
>> regionserver heap is filled a bit more. Since I also overcommitted memory
>> in my cluster (have used in the setup more than really is available), tfter
>> several loops it starts to use more and more of the 20GB and eventually the
>> overall cluster starts to  hit the memory that is available on the workers.
>> The solution is to lower the hbase regionserver heap memory, so Im not
>> overcommitted anymore. In fact, high regionserver memory is more important
>> when I read my data, since then it helps a lot to cache data and to have
>> faster reads. For writing it is not important to have such a high value.
>>
>>
>> Thanks,
>> Joris
>>
>>
>> On 7 Apr 2022, at 09:26, Mich Talebzadeh 
>> wrote:
>>
>> Ok so that is your assumption. The whole thing is based on-premise on
>> JBOD (including hadoop cluster which has Spark binaries on each node as I
>> understand) as I understand. But it will be faster to use S3 (or GCS)
>> through some network and it will be faster than writing to the local SSD. I
>> don't understand the point here.
>>
>> Also it appears the thread owner is talking about having HBase on Hadoop
>> cluster on some node eating memory.  This can be easily sorted by moving
>> HBase to its own cluster, which will ease up Hadoop, Spark and HBase
>> competing for resources. It is possible that the issue is with HBase setup
>> as well.
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>> 

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-07 Thread Mich Talebzadeh
Ok. Your architect has decided to emulate anything on prem to the cloud.You
are not really taking any advantages of cloud offerings or scalability. For
example, how does your Hadoop clustercater for the increased capacity.
Likewise your spark nodes are pigeonholed with your Hadoop nodes.  Old wine
in a new bottle :)


HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 7 Apr 2022 at 09:20, Joris Billen 
wrote:

> Thanks for active discussion and sharing your knowledge :-)
>
>
> 1.Cluster is a managed hadoop cluster on Azure in the cloud. It has hbase,
> and spark, and hdfs shared .
> 2.Hbase is on the cluster, so not standalone. It comes from an
> enterprise-level template from a commercial vendor, so assuming this is
> correctly installed.
> 3.I know that woudl be best to have a spark cluster to do the processing
> and then write to a separate hbase cluster.. but alas :-( somehow we found
> this to be buggy so we have it all on one cluster.
> 4. S3: I am not using it, but people in the thread started suggesting
> potential solutions involving s3. It is an azure system, so hbase is stored
> on adls. In fact the nature of my application (geospatial stuff) requires
> me to use geomesa libs, which only allows directly writing from spark to
> hbase. So I can not write to some other format (the geomesa API is not
> designed for that-it only writes directly to hbase using the predetermined
> key/values).
>
> Forgot to mention: I do unpersist my df that was cached.
>
> Nevertheless I think I understand the problem now, this discussion is
> still interesting!
> So the root cause is : the hbase region server has memory assigned to it
> (like 20GB). I see when I start writing from spark to hbase, not much of
> this is used. I have loops of processing 1 day in spark. For each loop, the
> regionserver heap is filled a bit more. Since I also overcommitted memory
> in my cluster (have used in the setup more than really is available), tfter
> several loops it starts to use more and more of the 20GB and eventually the
> overall cluster starts to  hit the memory that is available on the workers.
> The solution is to lower the hbase regionserver heap memory, so Im not
> overcommitted anymore. In fact, high regionserver memory is more important
> when I read my data, since then it helps a lot to cache data and to have
> faster reads. For writing it is not important to have such a high value.
>
>
> Thanks,
> Joris
>
>
> On 7 Apr 2022, at 09:26, Mich Talebzadeh 
> wrote:
>
> Ok so that is your assumption. The whole thing is based on-premise on JBOD
> (including hadoop cluster which has Spark binaries on each node as I
> understand) as I understand. But it will be faster to use S3 (or GCS)
> through some network and it will be faster than writing to the local SSD. I
> don't understand the point here.
>
> Also it appears the thread owner is talking about having HBase on Hadoop
> cluster on some node eating memory.  This can be easily sorted by moving
> HBase to its own cluster, which will ease up Hadoop, Spark and HBase
> competing for resources. It is possible that the issue is with HBase setup
> as well.
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
> 
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 7 Apr 2022 at 08:11, Bjørn Jørgensen 
> wrote:
>
>>
>>1. Where does S3 come into this
>>
>> He is processing d

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-07 Thread Bjørn Jørgensen
"But it will be faster to use S3 (or GCS) through some network and it will
be faster than writing to the local SSD. I don't understand the point
here."
Minio is a S3 mock, so you run minio local.

tor. 7. apr. 2022 kl. 09:27 skrev Mich Talebzadeh :

> Ok so that is your assumption. The whole thing is based on-premise on JBOD
> (including hadoop cluster which has Spark binaries on each node as I
> understand) as I understand. But it will be faster to use S3 (or GCS)
> through some network and it will be faster than writing to the local SSD. I
> don't understand the point here.
>
> Also it appears the thread owner is talking about having HBase on Hadoop
> cluster on some node eating memory.  This can be easily sorted by moving
> HBase to its own cluster, which will ease up Hadoop, Spark and HBase
> competing for resources. It is possible that the issue is with HBase setup
> as well.
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 7 Apr 2022 at 08:11, Bjørn Jørgensen 
> wrote:
>
>>
>>1. Where does S3 come into this
>>
>> He is processing data for each day at a time. So to dump each day to a
>> fast storage he can use parquet files and write it to S3.
>>
>> ons. 6. apr. 2022 kl. 22:27 skrev Mich Talebzadeh <
>> mich.talebza...@gmail.com>:
>>
>>>
>>> Your statement below:
>>>
>>>
>>> I believe I have found the issue: the job writes data to hbase which is
>>> on the same cluster.
>>> When I keep on processing data and writing with spark to hbase ,
>>> eventually the garbage collection can not keep up anymore for hbase, and
>>> the hbase memory consumption increases. As the clusters hosts both hbase
>>> and spark, this leads to an overall increase and at some point you hit the
>>> limit of the available memory on each worker.
>>> I dont think the spark memory is increasing over time.
>>>
>>>
>>>1. Where is your cluster on Prem? Do you Have a Hadoop cluster
>>>with spark using the same nodes as HDFS?
>>>2. Is your Hbase clustered or standalone and has been created on
>>>HDFS nodes
>>>3. Are you writing to Hbase through phoenix or straight to HBase
>>>4. Where does S3 come into this
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 6 Apr 2022 at 16:41, Joris Billen 
>>> wrote:
>>>
 HI,
 thanks for your reply.


 I believe I have found the issue: the job writes data to hbase which is
 on the same cluster.
 When I keep on processing data and writing with spark to hbase ,
 eventually the garbage collection can not keep up anymore for hbase, and
 the hbase memory consumption increases. As the clusters hosts both hbase
 and spark, this leads to an overall increase and at some point you hit the
 limit of the available memory on each worker.
 I dont think the spark memory is increasing over time.



 Here more details:

 **Spark: 2.4
 **operation: many spark sql statements followed by writing data to a
 nosql db from spark
 like this:
 df=read(fromhdfs)
 df2=spark.sql(using df 1)
 ..df10=spark.sql(using df9)
 spark.sql(CACHE TABLE df10)
 df11 =spark.sql(using df10)
 df11.write
 Df12 =spark.sql(using df10)
 df12.write
 df13 =spark.sql(using df10)
 df13.write
 **caching: yes one df that I will use to eventually write 3 x to a db
 (those 3 are different)
 **Loops: since I need to process several years, and processing 1 day is
 already a complex process (40 minutes on 9 node cluster running quite a bit
 of executors). So in the end it will do all at one go and there is a limit
 of how much data I can process in one go with the available resources.
 Some people here pointed out they believe this looping should not be
 necessary. But what is the alternative?
 —> Maybe I can write to disk somewhere in the middle, and read again
 from there so that in the end not all must happen in one go in memory.







 On 5 Apr 2022, at 14:58, Gourav 

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-07 Thread Joris Billen
Thanks for active discussion and sharing your knowledge :-)


1.Cluster is a managed hadoop cluster on Azure in the cloud. It has hbase, and 
spark, and hdfs shared .
2.Hbase is on the cluster, so not standalone. It comes from an enterprise-level 
template from a commercial vendor, so assuming this is correctly installed.
3.I know that woudl be best to have a spark cluster to do the processing and 
then write to a separate hbase cluster.. but alas :-( somehow we found this to 
be buggy so we have it all on one cluster.
4. S3: I am not using it, but people in the thread started suggesting potential 
solutions involving s3. It is an azure system, so hbase is stored on adls. In 
fact the nature of my application (geospatial stuff) requires me to use geomesa 
libs, which only allows directly writing from spark to hbase. So I can not 
write to some other format (the geomesa API is not designed for that-it only 
writes directly to hbase using the predetermined key/values).

Forgot to mention: I do unpersist my df that was cached.

Nevertheless I think I understand the problem now, this discussion is still 
interesting!
So the root cause is : the hbase region server has memory assigned to it (like 
20GB). I see when I start writing from spark to hbase, not much of this is 
used. I have loops of processing 1 day in spark. For each loop, the 
regionserver heap is filled a bit more. Since I also overcommitted memory in my 
cluster (have used in the setup more than really is available), tfter several 
loops it starts to use more and more of the 20GB and eventually the overall 
cluster starts to  hit the memory that is available on the workers. The 
solution is to lower the hbase regionserver heap memory, so Im not 
overcommitted anymore. In fact, high regionserver memory is more important when 
I read my data, since then it helps a lot to cache data and to have faster 
reads. For writing it is not important to have such a high value.


Thanks,
Joris


On 7 Apr 2022, at 09:26, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:

Ok so that is your assumption. The whole thing is based on-premise on JBOD 
(including hadoop cluster which has Spark binaries on each node as I 
understand) as I understand. But it will be faster to use S3 (or GCS) through 
some network and it will be faster than writing to the local SSD. I don't 
understand the point here.

Also it appears the thread owner is talking about having HBase on Hadoop 
cluster on some node eating memory.  This can be easily sorted by moving HBase 
to its own cluster, which will ease up Hadoop, Spark and HBase competing for 
resources. It is possible that the issue is with HBase setup as well.

HTH


 
[https://docs.google.com/uc?export=download&id=1-q7RFGRfLMObPuQPWSd9sl_H1UPNFaIZ&revid=0B1BiUVX33unjMWtVUWpINWFCd0ZQTlhTRHpGckh4Wlg4RG80PQ]
   view my Linkedin 
profile

 
https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 7 Apr 2022 at 08:11, Bjørn Jørgensen 
mailto:bjornjorgen...@gmail.com>> wrote:

  1.  Where does S3 come into this

He is processing data for each day at a time. So to dump each day to a fast 
storage he can use parquet files and write it to S3.

ons. 6. apr. 2022 kl. 22:27 skrev Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>:

Your statement below:

I believe I have found the issue: the job writes data to hbase which is on the 
same cluster.
When I keep on processing data and writing with spark to hbase , eventually the 
garbage collection can not keep up anymore for hbase, and the hbase memory 
consumption increases. As the clusters hosts both hbase and spark, this leads 
to an overall increase and at some point you hit the limit of the available 
memory on each worker.
I dont think the spark memory is increasing over time.


  1.  Where is your cluster on Prem? Do you

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-07 Thread Mich Talebzadeh
Ok so that is your assumption. The whole thing is based on-premise on JBOD
(including hadoop cluster which has Spark binaries on each node as I
understand) as I understand. But it will be faster to use S3 (or GCS)
through some network and it will be faster than writing to the local SSD. I
don't understand the point here.

Also it appears the thread owner is talking about having HBase on Hadoop
cluster on some node eating memory.  This can be easily sorted by moving
HBase to its own cluster, which will ease up Hadoop, Spark and HBase
competing for resources. It is possible that the issue is with HBase setup
as well.

HTH



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 7 Apr 2022 at 08:11, Bjørn Jørgensen 
wrote:

>
>1. Where does S3 come into this
>
> He is processing data for each day at a time. So to dump each day to a
> fast storage he can use parquet files and write it to S3.
>
> ons. 6. apr. 2022 kl. 22:27 skrev Mich Talebzadeh <
> mich.talebza...@gmail.com>:
>
>>
>> Your statement below:
>>
>>
>> I believe I have found the issue: the job writes data to hbase which is
>> on the same cluster.
>> When I keep on processing data and writing with spark to hbase ,
>> eventually the garbage collection can not keep up anymore for hbase, and
>> the hbase memory consumption increases. As the clusters hosts both hbase
>> and spark, this leads to an overall increase and at some point you hit the
>> limit of the available memory on each worker.
>> I dont think the spark memory is increasing over time.
>>
>>
>>1. Where is your cluster on Prem? Do you Have a Hadoop cluster
>>with spark using the same nodes as HDFS?
>>2. Is your Hbase clustered or standalone and has been created on HDFS
>>nodes
>>3. Are you writing to Hbase through phoenix or straight to HBase
>>4. Where does S3 come into this
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 6 Apr 2022 at 16:41, Joris Billen 
>> wrote:
>>
>>> HI,
>>> thanks for your reply.
>>>
>>>
>>> I believe I have found the issue: the job writes data to hbase which is
>>> on the same cluster.
>>> When I keep on processing data and writing with spark to hbase ,
>>> eventually the garbage collection can not keep up anymore for hbase, and
>>> the hbase memory consumption increases. As the clusters hosts both hbase
>>> and spark, this leads to an overall increase and at some point you hit the
>>> limit of the available memory on each worker.
>>> I dont think the spark memory is increasing over time.
>>>
>>>
>>>
>>> Here more details:
>>>
>>> **Spark: 2.4
>>> **operation: many spark sql statements followed by writing data to a
>>> nosql db from spark
>>> like this:
>>> df=read(fromhdfs)
>>> df2=spark.sql(using df 1)
>>> ..df10=spark.sql(using df9)
>>> spark.sql(CACHE TABLE df10)
>>> df11 =spark.sql(using df10)
>>> df11.write
>>> Df12 =spark.sql(using df10)
>>> df12.write
>>> df13 =spark.sql(using df10)
>>> df13.write
>>> **caching: yes one df that I will use to eventually write 3 x to a db
>>> (those 3 are different)
>>> **Loops: since I need to process several years, and processing 1 day is
>>> already a complex process (40 minutes on 9 node cluster running quite a bit
>>> of executors). So in the end it will do all at one go and there is a limit
>>> of how much data I can process in one go with the available resources.
>>> Some people here pointed out they believe this looping should not be
>>> necessary. But what is the alternative?
>>> —> Maybe I can write to disk somewhere in the middle, and read again
>>> from there so that in the end not all must happen in one go in memory.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On 5 Apr 2022, at 14:58, Gourav Sengupta 
>>> wrote:
>>>
>>> Hi,
>>>
>>> can you please give details around:
>>> spark version, what is the operation that you are running, why in loops,
>>> and whether you are caching in any data or not, and whether you are
>>> referencing the variables to create them like in the following expression
>>> we are referencing x to create x, x = x + 1
>>>
>>> Thanks and Regards,
>>> Gourav Sengupta
>>>
>>> On

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-07 Thread Bjørn Jørgensen
   1. Where does S3 come into this

He is processing data for each day at a time. So to dump each day to a fast
storage he can use parquet files and write it to S3.

ons. 6. apr. 2022 kl. 22:27 skrev Mich Talebzadeh :

>
> Your statement below:
>
>
> I believe I have found the issue: the job writes data to hbase which is on
> the same cluster.
> When I keep on processing data and writing with spark to hbase ,
> eventually the garbage collection can not keep up anymore for hbase, and
> the hbase memory consumption increases. As the clusters hosts both hbase
> and spark, this leads to an overall increase and at some point you hit the
> limit of the available memory on each worker.
> I dont think the spark memory is increasing over time.
>
>
>1. Where is your cluster on Prem? Do you Have a Hadoop cluster
>with spark using the same nodes as HDFS?
>2. Is your Hbase clustered or standalone and has been created on HDFS
>nodes
>3. Are you writing to Hbase through phoenix or straight to HBase
>4. Where does S3 come into this
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 6 Apr 2022 at 16:41, Joris Billen 
> wrote:
>
>> HI,
>> thanks for your reply.
>>
>>
>> I believe I have found the issue: the job writes data to hbase which is
>> on the same cluster.
>> When I keep on processing data and writing with spark to hbase ,
>> eventually the garbage collection can not keep up anymore for hbase, and
>> the hbase memory consumption increases. As the clusters hosts both hbase
>> and spark, this leads to an overall increase and at some point you hit the
>> limit of the available memory on each worker.
>> I dont think the spark memory is increasing over time.
>>
>>
>>
>> Here more details:
>>
>> **Spark: 2.4
>> **operation: many spark sql statements followed by writing data to a
>> nosql db from spark
>> like this:
>> df=read(fromhdfs)
>> df2=spark.sql(using df 1)
>> ..df10=spark.sql(using df9)
>> spark.sql(CACHE TABLE df10)
>> df11 =spark.sql(using df10)
>> df11.write
>> Df12 =spark.sql(using df10)
>> df12.write
>> df13 =spark.sql(using df10)
>> df13.write
>> **caching: yes one df that I will use to eventually write 3 x to a db
>> (those 3 are different)
>> **Loops: since I need to process several years, and processing 1 day is
>> already a complex process (40 minutes on 9 node cluster running quite a bit
>> of executors). So in the end it will do all at one go and there is a limit
>> of how much data I can process in one go with the available resources.
>> Some people here pointed out they believe this looping should not be
>> necessary. But what is the alternative?
>> —> Maybe I can write to disk somewhere in the middle, and read again from
>> there so that in the end not all must happen in one go in memory.
>>
>>
>>
>>
>>
>>
>>
>> On 5 Apr 2022, at 14:58, Gourav Sengupta 
>> wrote:
>>
>> Hi,
>>
>> can you please give details around:
>> spark version, what is the operation that you are running, why in loops,
>> and whether you are caching in any data or not, and whether you are
>> referencing the variables to create them like in the following expression
>> we are referencing x to create x, x = x + 1
>>
>> Thanks and Regards,
>> Gourav Sengupta
>>
>> On Mon, Apr 4, 2022 at 10:51 AM Joris Billen <
>> joris.bil...@bigindustries.be> wrote:
>>
>>> Clear-probably not a good idea.
>>>
>>> But a previous comment said “you are doing everything in the end in one
>>> go”.
>>> So this made me wonder: in case your only action is a write in the end
>>> after lots of complex transformations, then what is the alternative for
>>> writing in the end which means doing everything all at once in the end? My
>>> understanding is that if there is no need for an action earlier, you will
>>> do all at the end, which means there is a limitation to how many days you
>>> can process at once. And hence the solution is to loop over a couple days,
>>> and submit always the same spark job just for other input.
>>>
>>>
>>> Thanks!
>>>
>>> On 1 Apr 2022, at 15:26, Sean Owen  wrote:
>>>
>>> This feels like premature optimization, and not clear it's optimizing,
>>> but maybe.
>>> Caching things that are used once is worse than not caching. It looks
>>> like a straight-line through to the write, so I doubt caching helps
>>> anything here.
>>>
>>> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen <
>>> joris.bil...@bigindustries.be> wrote:
>>>
 Hi,
 as said thanks for little discussion over mail.
 I understand that the action is triggered in the end at the write and

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-06 Thread Mich Talebzadeh
Your statement below:


I believe I have found the issue: the job writes data to hbase which is on
the same cluster.
When I keep on processing data and writing with spark to hbase , eventually
the garbage collection can not keep up anymore for hbase, and the hbase
memory consumption increases. As the clusters hosts both hbase and spark,
this leads to an overall increase and at some point you hit the limit of
the available memory on each worker.
I dont think the spark memory is increasing over time.


   1. Where is your cluster on Prem? Do you Have a Hadoop cluster
   with spark using the same nodes as HDFS?
   2. Is your Hbase clustered or standalone and has been created on HDFS
   nodes
   3. Are you writing to Hbase through phoenix or straight to HBase
   4. Where does S3 come into this


HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 6 Apr 2022 at 16:41, Joris Billen 
wrote:

> HI,
> thanks for your reply.
>
>
> I believe I have found the issue: the job writes data to hbase which is on
> the same cluster.
> When I keep on processing data and writing with spark to hbase ,
> eventually the garbage collection can not keep up anymore for hbase, and
> the hbase memory consumption increases. As the clusters hosts both hbase
> and spark, this leads to an overall increase and at some point you hit the
> limit of the available memory on each worker.
> I dont think the spark memory is increasing over time.
>
>
>
> Here more details:
>
> **Spark: 2.4
> **operation: many spark sql statements followed by writing data to a nosql
> db from spark
> like this:
> df=read(fromhdfs)
> df2=spark.sql(using df 1)
> ..df10=spark.sql(using df9)
> spark.sql(CACHE TABLE df10)
> df11 =spark.sql(using df10)
> df11.write
> Df12 =spark.sql(using df10)
> df12.write
> df13 =spark.sql(using df10)
> df13.write
> **caching: yes one df that I will use to eventually write 3 x to a db
> (those 3 are different)
> **Loops: since I need to process several years, and processing 1 day is
> already a complex process (40 minutes on 9 node cluster running quite a bit
> of executors). So in the end it will do all at one go and there is a limit
> of how much data I can process in one go with the available resources.
> Some people here pointed out they believe this looping should not be
> necessary. But what is the alternative?
> —> Maybe I can write to disk somewhere in the middle, and read again from
> there so that in the end not all must happen in one go in memory.
>
>
>
>
>
>
>
> On 5 Apr 2022, at 14:58, Gourav Sengupta 
> wrote:
>
> Hi,
>
> can you please give details around:
> spark version, what is the operation that you are running, why in loops,
> and whether you are caching in any data or not, and whether you are
> referencing the variables to create them like in the following expression
> we are referencing x to create x, x = x + 1
>
> Thanks and Regards,
> Gourav Sengupta
>
> On Mon, Apr 4, 2022 at 10:51 AM Joris Billen <
> joris.bil...@bigindustries.be> wrote:
>
>> Clear-probably not a good idea.
>>
>> But a previous comment said “you are doing everything in the end in one
>> go”.
>> So this made me wonder: in case your only action is a write in the end
>> after lots of complex transformations, then what is the alternative for
>> writing in the end which means doing everything all at once in the end? My
>> understanding is that if there is no need for an action earlier, you will
>> do all at the end, which means there is a limitation to how many days you
>> can process at once. And hence the solution is to loop over a couple days,
>> and submit always the same spark job just for other input.
>>
>>
>> Thanks!
>>
>> On 1 Apr 2022, at 15:26, Sean Owen  wrote:
>>
>> This feels like premature optimization, and not clear it's optimizing,
>> but maybe.
>> Caching things that are used once is worse than not caching. It looks
>> like a straight-line through to the write, so I doubt caching helps
>> anything here.
>>
>> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen <
>> joris.bil...@bigindustries.be> wrote:
>>
>>> Hi,
>>> as said thanks for little discussion over mail.
>>> I understand that the action is triggered in the end at the write and
>>> then all of a sudden everything is executed at once. But I dont really need
>>> to trigger an action before. I am caching somewherew a df that will be
>>> reused several times (slightly updated pseudocode below).
>>>
>>> Question: is it then better practice to already trigger some actions on
>>>  intermediate data frame (like df4 and df8), and cache them? So that these
>>> acti

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-06 Thread Bjørn Jørgensen
Great, upgrade from 2.4 to 3.X.X
It seams like you can use unpersist

 after
df=read(fromhdfs)
df2=spark.sql(using df 1)
..df10=spark.sql(using df9)
?

I did use kubernetes and spark with S3 API min.io
 and it works :)
With kubernetes you deploy everything with k8s resource limits

If
you have done it this way, then you won't have this problem.


ons. 6. apr. 2022 kl. 19:21 skrev Gourav Sengupta :

> Hi,
> super duper.
>
> Please try to see if you can write out the data to S3, and then write a
> load script to load that data from S3 to HBase.
>
>
> Regards,
> Gourav Sengupta
>
>
> On Wed, Apr 6, 2022 at 4:39 PM Joris Billen 
> wrote:
>
>> HI,
>> thanks for your reply.
>>
>>
>> I believe I have found the issue: the job writes data to hbase which is
>> on the same cluster.
>> When I keep on processing data and writing with spark to hbase ,
>> eventually the garbage collection can not keep up anymore for hbase, and
>> the hbase memory consumption increases. As the clusters hosts both hbase
>> and spark, this leads to an overall increase and at some point you hit the
>> limit of the available memory on each worker.
>> I dont think the spark memory is increasing over time.
>>
>>
>>
>> Here more details:
>>
>> **Spark: 2.4
>> **operation: many spark sql statements followed by writing data to a
>> nosql db from spark
>> like this:
>> df=read(fromhdfs)
>> df2=spark.sql(using df 1)
>> ..df10=spark.sql(using df9)
>> spark.sql(CACHE TABLE df10)
>> df11 =spark.sql(using df10)
>> df11.write
>> Df12 =spark.sql(using df10)
>> df12.write
>> df13 =spark.sql(using df10)
>> df13.write
>> **caching: yes one df that I will use to eventually write 3 x to a db
>> (those 3 are different)
>> **Loops: since I need to process several years, and processing 1 day is
>> already a complex process (40 minutes on 9 node cluster running quite a bit
>> of executors). So in the end it will do all at one go and there is a limit
>> of how much data I can process in one go with the available resources.
>> Some people here pointed out they believe this looping should not be
>> necessary. But what is the alternative?
>> —> Maybe I can write to disk somewhere in the middle, and read again from
>> there so that in the end not all must happen in one go in memory.
>>
>>
>>
>>
>>
>>
>>
>> On 5 Apr 2022, at 14:58, Gourav Sengupta 
>> wrote:
>>
>> Hi,
>>
>> can you please give details around:
>> spark version, what is the operation that you are running, why in loops,
>> and whether you are caching in any data or not, and whether you are
>> referencing the variables to create them like in the following expression
>> we are referencing x to create x, x = x + 1
>>
>> Thanks and Regards,
>> Gourav Sengupta
>>
>> On Mon, Apr 4, 2022 at 10:51 AM Joris Billen <
>> joris.bil...@bigindustries.be> wrote:
>>
>>> Clear-probably not a good idea.
>>>
>>> But a previous comment said “you are doing everything in the end in one
>>> go”.
>>> So this made me wonder: in case your only action is a write in the end
>>> after lots of complex transformations, then what is the alternative for
>>> writing in the end which means doing everything all at once in the end? My
>>> understanding is that if there is no need for an action earlier, you will
>>> do all at the end, which means there is a limitation to how many days you
>>> can process at once. And hence the solution is to loop over a couple days,
>>> and submit always the same spark job just for other input.
>>>
>>>
>>> Thanks!
>>>
>>> On 1 Apr 2022, at 15:26, Sean Owen  wrote:
>>>
>>> This feels like premature optimization, and not clear it's optimizing,
>>> but maybe.
>>> Caching things that are used once is worse than not caching. It looks
>>> like a straight-line through to the write, so I doubt caching helps
>>> anything here.
>>>
>>> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen <
>>> joris.bil...@bigindustries.be> wrote:
>>>
 Hi,
 as said thanks for little discussion over mail.
 I understand that the action is triggered in the end at the write and
 then all of a sudden everything is executed at once. But I dont really need
 to trigger an action before. I am caching somewherew a df that will be
 reused several times (slightly updated pseudocode below).

 Question: is it then better practice to already trigger some actions on
  intermediate data frame (like df4 and df8), and cache them? So that these
 actions will not be that expensive yet, and the actions to write at the end
 will require less resources, which would allow to process more days in one
 go? LIke what is added in red in improvement section in the pseudo
 code below?



 *pseudocode:*


 *loop over all days:*
 *spark submit 1 day*



 with

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-06 Thread Gourav Sengupta
Hi,
super duper.

Please try to see if you can write out the data to S3, and then write a
load script to load that data from S3 to HBase.


Regards,
Gourav Sengupta


On Wed, Apr 6, 2022 at 4:39 PM Joris Billen 
wrote:

> HI,
> thanks for your reply.
>
>
> I believe I have found the issue: the job writes data to hbase which is on
> the same cluster.
> When I keep on processing data and writing with spark to hbase ,
> eventually the garbage collection can not keep up anymore for hbase, and
> the hbase memory consumption increases. As the clusters hosts both hbase
> and spark, this leads to an overall increase and at some point you hit the
> limit of the available memory on each worker.
> I dont think the spark memory is increasing over time.
>
>
>
> Here more details:
>
> **Spark: 2.4
> **operation: many spark sql statements followed by writing data to a nosql
> db from spark
> like this:
> df=read(fromhdfs)
> df2=spark.sql(using df 1)
> ..df10=spark.sql(using df9)
> spark.sql(CACHE TABLE df10)
> df11 =spark.sql(using df10)
> df11.write
> Df12 =spark.sql(using df10)
> df12.write
> df13 =spark.sql(using df10)
> df13.write
> **caching: yes one df that I will use to eventually write 3 x to a db
> (those 3 are different)
> **Loops: since I need to process several years, and processing 1 day is
> already a complex process (40 minutes on 9 node cluster running quite a bit
> of executors). So in the end it will do all at one go and there is a limit
> of how much data I can process in one go with the available resources.
> Some people here pointed out they believe this looping should not be
> necessary. But what is the alternative?
> —> Maybe I can write to disk somewhere in the middle, and read again from
> there so that in the end not all must happen in one go in memory.
>
>
>
>
>
>
>
> On 5 Apr 2022, at 14:58, Gourav Sengupta 
> wrote:
>
> Hi,
>
> can you please give details around:
> spark version, what is the operation that you are running, why in loops,
> and whether you are caching in any data or not, and whether you are
> referencing the variables to create them like in the following expression
> we are referencing x to create x, x = x + 1
>
> Thanks and Regards,
> Gourav Sengupta
>
> On Mon, Apr 4, 2022 at 10:51 AM Joris Billen <
> joris.bil...@bigindustries.be> wrote:
>
>> Clear-probably not a good idea.
>>
>> But a previous comment said “you are doing everything in the end in one
>> go”.
>> So this made me wonder: in case your only action is a write in the end
>> after lots of complex transformations, then what is the alternative for
>> writing in the end which means doing everything all at once in the end? My
>> understanding is that if there is no need for an action earlier, you will
>> do all at the end, which means there is a limitation to how many days you
>> can process at once. And hence the solution is to loop over a couple days,
>> and submit always the same spark job just for other input.
>>
>>
>> Thanks!
>>
>> On 1 Apr 2022, at 15:26, Sean Owen  wrote:
>>
>> This feels like premature optimization, and not clear it's optimizing,
>> but maybe.
>> Caching things that are used once is worse than not caching. It looks
>> like a straight-line through to the write, so I doubt caching helps
>> anything here.
>>
>> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen <
>> joris.bil...@bigindustries.be> wrote:
>>
>>> Hi,
>>> as said thanks for little discussion over mail.
>>> I understand that the action is triggered in the end at the write and
>>> then all of a sudden everything is executed at once. But I dont really need
>>> to trigger an action before. I am caching somewherew a df that will be
>>> reused several times (slightly updated pseudocode below).
>>>
>>> Question: is it then better practice to already trigger some actions on
>>>  intermediate data frame (like df4 and df8), and cache them? So that these
>>> actions will not be that expensive yet, and the actions to write at the end
>>> will require less resources, which would allow to process more days in one
>>> go? LIke what is added in red in improvement section in the pseudo code
>>> below?
>>>
>>>
>>>
>>> *pseudocode:*
>>>
>>>
>>> *loop over all days:*
>>> *spark submit 1 day*
>>>
>>>
>>>
>>> with spark submit (overly simplified)=
>>>
>>>
>>> *  df=spark.read(hfs://somepath)*
>>> *  …*
>>> *   ##IMPROVEMENT START*
>>> *   df4=spark.sql(some stuff with df3)*
>>> *   spark.sql(CACHE TABLE df4)*
>>> *   …*
>>> *   df8=spark.sql(some stuff with df7)*
>>> *   spark.sql(CACHE TABLE df8)*
>>> *  ##IMPROVEMENT END*
>>> *   ...*
>>> *   df12=df11.spark.sql(complex stufff)*
>>> *  spark.sql(CACHE TABLE df10)*
>>> *   ...*
>>> *  df13=spark.sql( complex stuff with df12)*
>>> *  df13.write *
>>> *  df14=spark.sql( some other complex stuff with df12)*
>>> *  df14.write *
>>> *  df15=spark.sql( some completely other complex stuff with df12)*
>>> *  df15.write *
>>>
>>>
>>>
>>>
>>>
>>>
>>> THanks!
>>>
>>>
>>>
>>> On 31 Mar 2022, at 14:37, Sean Owen  wrote:
>

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-06 Thread Joris Billen
HI,
thanks for your reply.


I believe I have found the issue: the job writes data to hbase which is on the 
same cluster.
When I keep on processing data and writing with spark to hbase , eventually the 
garbage collection can not keep up anymore for hbase, and the hbase memory 
consumption increases. As the clusters hosts both hbase and spark, this leads 
to an overall increase and at some point you hit the limit of the available 
memory on each worker.
I dont think the spark memory is increasing over time.



Here more details:

**Spark: 2.4
**operation: many spark sql statements followed by writing data to a nosql db 
from spark
like this:
df=read(fromhdfs)
df2=spark.sql(using df 1)
..df10=spark.sql(using df9)
spark.sql(CACHE TABLE df10)
df11 =spark.sql(using df10)
df11.write
Df12 =spark.sql(using df10)
df12.write
df13 =spark.sql(using df10)
df13.write
**caching: yes one df that I will use to eventually write 3 x to a db (those 3 
are different)
**Loops: since I need to process several years, and processing 1 day is already 
a complex process (40 minutes on 9 node cluster running quite a bit of 
executors). So in the end it will do all at one go and there is a limit of how 
much data I can process in one go with the available resources.
Some people here pointed out they believe this looping should not be necessary. 
But what is the alternative?
—> Maybe I can write to disk somewhere in the middle, and read again from there 
so that in the end not all must happen in one go in memory.







On 5 Apr 2022, at 14:58, Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>> wrote:

Hi,

can you please give details around:
spark version, what is the operation that you are running, why in loops, and 
whether you are caching in any data or not, and whether you are referencing the 
variables to create them like in the following expression we are referencing x 
to create x, x = x + 1

Thanks and Regards,
Gourav Sengupta

On Mon, Apr 4, 2022 at 10:51 AM Joris Billen 
mailto:joris.bil...@bigindustries.be>> wrote:
Clear-probably not a good idea.

But a previous comment said “you are doing everything in the end in one go”.
So this made me wonder: in case your only action is a write in the end after 
lots of complex transformations, then what is the alternative for writing in 
the end which means doing everything all at once in the end? My understanding 
is that if there is no need for an action earlier, you will do all at the end, 
which means there is a limitation to how many days you can process at once. And 
hence the solution is to loop over a couple days, and submit always the same 
spark job just for other input.


Thanks!

On 1 Apr 2022, at 15:26, Sean Owen mailto:sro...@gmail.com>> 
wrote:

This feels like premature optimization, and not clear it's optimizing, but 
maybe.
Caching things that are used once is worse than not caching. It looks like a 
straight-line through to the write, so I doubt caching helps anything here.

On Fri, Apr 1, 2022 at 2:49 AM Joris Billen 
mailto:joris.bil...@bigindustries.be>> wrote:
Hi,
as said thanks for little discussion over mail.
I understand that the action is triggered in the end at the write and then all 
of a sudden everything is executed at once. But I dont really need to trigger 
an action before. I am caching somewherew a df that will be reused several 
times (slightly updated pseudocode below).

Question: is it then better practice to already trigger some actions on  
intermediate data frame (like df4 and df8), and cache them? So that these 
actions will not be that expensive yet, and the actions to write at the end 
will require less resources, which would allow to process more days in one go? 
LIke what is added in red in improvement section in the pseudo code below?



pseudocode:


loop over all days:
spark submit 1 day



with spark submit (overly simplified)=


  df=spark.read(hfs://somepath)
  …
   ##IMPROVEMENT START
   df4=spark.sql(some stuff with df3)
   spark.sql(CACHE TABLE df4)
   …
   df8=spark.sql(some stuff with df7)
   spark.sql(CACHE TABLE df8)
  ##IMPROVEMENT END
   ...
   df12=df11.spark.sql(complex stufff)
  spark.sql(CACHE TABLE df10)
   ...
  df13=spark.sql( complex stuff with df12)
  df13.write
  df14=spark.sql( some other complex stuff with df12)
  df14.write
  df15=spark.sql( some completely other complex stuff with df12)
  df15.write






THanks!



On 31 Mar 2022, at 14:37, Sean Owen mailto:sro...@gmail.com>> 
wrote:

If that is your loop unrolled, then you are not doing parts of work at a time. 
That will execute all operations in one go when the write finally happens. 
That's OK, but may be part of the problem. For example if you are filtering for 
a subset, processing, and unioning, then that is just a harder and slower way 
of applying the transformation to all data at once.

On Thu, Mar 31, 2022 at 3:30 AM Joris Billen 
mailto:joris.bil...@bigindustries.be>> wrote:
Thanks for reply :-)

I am using pyspark. Basicially my code (simpli

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-05 Thread Gourav Sengupta
Hi,

can you please give details around:
spark version, what is the operation that you are running, why in loops,
and whether you are caching in any data or not, and whether you are
referencing the variables to create them like in the following expression
we are referencing x to create x, x = x + 1

Thanks and Regards,
Gourav Sengupta

On Mon, Apr 4, 2022 at 10:51 AM Joris Billen 
wrote:

> Clear-probably not a good idea.
>
> But a previous comment said “you are doing everything in the end in one
> go”.
> So this made me wonder: in case your only action is a write in the end
> after lots of complex transformations, then what is the alternative for
> writing in the end which means doing everything all at once in the end? My
> understanding is that if there is no need for an action earlier, you will
> do all at the end, which means there is a limitation to how many days you
> can process at once. And hence the solution is to loop over a couple days,
> and submit always the same spark job just for other input.
>
>
> Thanks!
>
> On 1 Apr 2022, at 15:26, Sean Owen  wrote:
>
> This feels like premature optimization, and not clear it's optimizing, but
> maybe.
> Caching things that are used once is worse than not caching. It looks like
> a straight-line through to the write, so I doubt caching helps anything
> here.
>
> On Fri, Apr 1, 2022 at 2:49 AM Joris Billen 
> wrote:
>
>> Hi,
>> as said thanks for little discussion over mail.
>> I understand that the action is triggered in the end at the write and
>> then all of a sudden everything is executed at once. But I dont really need
>> to trigger an action before. I am caching somewherew a df that will be
>> reused several times (slightly updated pseudocode below).
>>
>> Question: is it then better practice to already trigger some actions on
>>  intermediate data frame (like df4 and df8), and cache them? So that these
>> actions will not be that expensive yet, and the actions to write at the end
>> will require less resources, which would allow to process more days in one
>> go? LIke what is added in red in improvement section in the pseudo code
>> below?
>>
>>
>>
>> *pseudocode:*
>>
>>
>> *loop over all days:*
>> *spark submit 1 day*
>>
>>
>>
>> with spark submit (overly simplified)=
>>
>>
>> *  df=spark.read(hfs://somepath)*
>> *  …*
>> *   ##IMPROVEMENT START*
>> *   df4=spark.sql(some stuff with df3)*
>> *   spark.sql(CACHE TABLE df4)*
>> *   …*
>> *   df8=spark.sql(some stuff with df7)*
>> *   spark.sql(CACHE TABLE df8)*
>> *  ##IMPROVEMENT END*
>> *   ...*
>> *   df12=df11.spark.sql(complex stufff)*
>> *  spark.sql(CACHE TABLE df10)*
>> *   ...*
>> *  df13=spark.sql( complex stuff with df12)*
>> *  df13.write *
>> *  df14=spark.sql( some other complex stuff with df12)*
>> *  df14.write *
>> *  df15=spark.sql( some completely other complex stuff with df12)*
>> *  df15.write *
>>
>>
>>
>>
>>
>>
>> THanks!
>>
>>
>>
>> On 31 Mar 2022, at 14:37, Sean Owen  wrote:
>>
>> If that is your loop unrolled, then you are not doing parts of work at a
>> time. That will execute all operations in one go when the write finally
>> happens. That's OK, but may be part of the problem. For example if you are
>> filtering for a subset, processing, and unioning, then that is just a
>> harder and slower way of applying the transformation to all data at once.
>>
>> On Thu, Mar 31, 2022 at 3:30 AM Joris Billen <
>> joris.bil...@bigindustries.be> wrote:
>>
>>> Thanks for reply :-)
>>>
>>> I am using pyspark. Basicially my code (simplified is):
>>>
>>> df=spark.read.csv(hdfs://somehdfslocation)
>>> df1=spark.sql (complex statement using df)
>>> ...
>>> dfx=spark.sql(complex statement using df x-1)
>>> ...
>>> dfx15.write()
>>>
>>>
>>> What exactly is meant by "closing resources"? Is it just unpersisting
>>> cached dataframes at the end and stopping the spark context explicitly:
>>> sc.stop()?
>>>
>>>
>>> FOr processing many years at once versus a chunk in a loop: I see that
>>> if I go up to certain number of days, one iteration will start to have
>>> tasks that fail. So I only take a limited number of days, and do this
>>> process several times. Isnt this normal as you are always somehow limited
>>> in terms of resources (I have 9 nodes wiht 32GB). Or is it like this that
>>> in theory you could process any volume, in case you wait long enough? I
>>> guess spark can only break down the tasks up to a certain level (based on
>>> the datasets' and the intermediate results’ partitions) and at some moment
>>> you hit the limit where your resources are not sufficient anymore to
>>> process such one task? Maybe you can tweak it a bit, but in the end you’ll
>>> hit a limit?
>>>
>>>
>>>
>>> Concretely  following topics would be interesting to find out more about
>>> (links):
>>> -where to see what you are still consuming after spark job ended if you
>>> didnt close resources
>>> -memory leaks for pyspark
>>> -good article about closing resources (you find tons of snippets on how
>>> to start 

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-04 Thread Joris Billen
Clear-probably not a good idea.

But a previous comment said “you are doing everything in the end in one go”.
So this made me wonder: in case your only action is a write in the end after 
lots of complex transformations, then what is the alternative for writing in 
the end which means doing everything all at once in the end? My understanding 
is that if there is no need for an action earlier, you will do all at the end, 
which means there is a limitation to how many days you can process at once. And 
hence the solution is to loop over a couple days, and submit always the same 
spark job just for other input.


Thanks!

On 1 Apr 2022, at 15:26, Sean Owen mailto:sro...@gmail.com>> 
wrote:

This feels like premature optimization, and not clear it's optimizing, but 
maybe.
Caching things that are used once is worse than not caching. It looks like a 
straight-line through to the write, so I doubt caching helps anything here.

On Fri, Apr 1, 2022 at 2:49 AM Joris Billen 
mailto:joris.bil...@bigindustries.be>> wrote:
Hi,
as said thanks for little discussion over mail.
I understand that the action is triggered in the end at the write and then all 
of a sudden everything is executed at once. But I dont really need to trigger 
an action before. I am caching somewherew a df that will be reused several 
times (slightly updated pseudocode below).

Question: is it then better practice to already trigger some actions on  
intermediate data frame (like df4 and df8), and cache them? So that these 
actions will not be that expensive yet, and the actions to write at the end 
will require less resources, which would allow to process more days in one go? 
LIke what is added in red in improvement section in the pseudo code below?



pseudocode:


loop over all days:
spark submit 1 day



with spark submit (overly simplified)=


  df=spark.read(hfs://somepath)
  …
   ##IMPROVEMENT START
   df4=spark.sql(some stuff with df3)
   spark.sql(CACHE TABLE df4)
   …
   df8=spark.sql(some stuff with df7)
   spark.sql(CACHE TABLE df8)
  ##IMPROVEMENT END
   ...
   df12=df11.spark.sql(complex stufff)
  spark.sql(CACHE TABLE df10)
   ...
  df13=spark.sql( complex stuff with df12)
  df13.write
  df14=spark.sql( some other complex stuff with df12)
  df14.write
  df15=spark.sql( some completely other complex stuff with df12)
  df15.write






THanks!



On 31 Mar 2022, at 14:37, Sean Owen mailto:sro...@gmail.com>> 
wrote:

If that is your loop unrolled, then you are not doing parts of work at a time. 
That will execute all operations in one go when the write finally happens. 
That's OK, but may be part of the problem. For example if you are filtering for 
a subset, processing, and unioning, then that is just a harder and slower way 
of applying the transformation to all data at once.

On Thu, Mar 31, 2022 at 3:30 AM Joris Billen 
mailto:joris.bil...@bigindustries.be>> wrote:
Thanks for reply :-)

I am using pyspark. Basicially my code (simplified is):

df=spark.read.csv(hdfs://somehdfslocation)
df1=spark.sql (complex statement using df)
...
dfx=spark.sql(complex statement using df x-1)
...
dfx15.write()


What exactly is meant by "closing resources"? Is it just unpersisting cached 
dataframes at the end and stopping the spark context explicitly: sc.stop()?


FOr processing many years at once versus a chunk in a loop: I see that if I go 
up to certain number of days, one iteration will start to have tasks that fail. 
So I only take a limited number of days, and do this process several times. 
Isnt this normal as you are always somehow limited in terms of resources (I 
have 9 nodes wiht 32GB). Or is it like this that in theory you could process 
any volume, in case you wait long enough? I guess spark can only break down the 
tasks up to a certain level (based on the datasets' and the intermediate 
results’ partitions) and at some moment you hit the limit where your resources 
are not sufficient anymore to process such one task? Maybe you can tweak it a 
bit, but in the end you’ll hit a limit?



Concretely  following topics would be interesting to find out more about 
(links):
-where to see what you are still consuming after spark job ended if you didnt 
close resources
-memory leaks for pyspark
-good article about closing resources (you find tons of snippets on how to 
start spark context+ config for number/cores/memory of worker/executors etc, 
but never saw a focus on making sure you clean up —> or is it just stopping the 
spark context)




On 30 Mar 2022, at 21:24, Bjørn Jørgensen 
mailto:bjornjorgen...@gmail.com>> wrote:

It`s quite impossible for anyone to answer your question about what is eating 
your memory, without even knowing what language you are using.

If you are using C then it`s always pointers, that's the mem issue.
If you are using python, there can be some like not using context manager like 
With Context Managers and Python's with 
Statement

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-01 Thread Sean Owen
This feels like premature optimization, and not clear it's optimizing, but
maybe.
Caching things that are used once is worse than not caching. It looks like
a straight-line through to the write, so I doubt caching helps anything
here.

On Fri, Apr 1, 2022 at 2:49 AM Joris Billen 
wrote:

> Hi,
> as said thanks for little discussion over mail.
> I understand that the action is triggered in the end at the write and then
> all of a sudden everything is executed at once. But I dont really need to
> trigger an action before. I am caching somewherew a df that will be reused
> several times (slightly updated pseudocode below).
>
> Question: is it then better practice to already trigger some actions on
>  intermediate data frame (like df4 and df8), and cache them? So that these
> actions will not be that expensive yet, and the actions to write at the end
> will require less resources, which would allow to process more days in one
> go? LIke what is added in red in improvement section in the pseudo code
> below?
>
>
>
> *pseudocode:*
>
>
> *loop over all days:*
> *spark submit 1 day*
>
>
>
> with spark submit (overly simplified)=
>
>
> *  df=spark.read(hfs://somepath)*
> *  …*
> *   ##IMPROVEMENT START*
> *   df4=spark.sql(some stuff with df3)*
> *   spark.sql(CACHE TABLE df4)*
> *   …*
> *   df8=spark.sql(some stuff with df7)*
> *   spark.sql(CACHE TABLE df8)*
> *  ##IMPROVEMENT END*
> *   ...*
> *   df12=df11.spark.sql(complex stufff)*
> *  spark.sql(CACHE TABLE df10)*
> *   ...*
> *  df13=spark.sql( complex stuff with df12)*
> *  df13.write *
> *  df14=spark.sql( some other complex stuff with df12)*
> *  df14.write *
> *  df15=spark.sql( some completely other complex stuff with df12)*
> *  df15.write *
>
>
>
>
>
>
> THanks!
>
>
>
> On 31 Mar 2022, at 14:37, Sean Owen  wrote:
>
> If that is your loop unrolled, then you are not doing parts of work at a
> time. That will execute all operations in one go when the write finally
> happens. That's OK, but may be part of the problem. For example if you are
> filtering for a subset, processing, and unioning, then that is just a
> harder and slower way of applying the transformation to all data at once.
>
> On Thu, Mar 31, 2022 at 3:30 AM Joris Billen <
> joris.bil...@bigindustries.be> wrote:
>
>> Thanks for reply :-)
>>
>> I am using pyspark. Basicially my code (simplified is):
>>
>> df=spark.read.csv(hdfs://somehdfslocation)
>> df1=spark.sql (complex statement using df)
>> ...
>> dfx=spark.sql(complex statement using df x-1)
>> ...
>> dfx15.write()
>>
>>
>> What exactly is meant by "closing resources"? Is it just unpersisting
>> cached dataframes at the end and stopping the spark context explicitly:
>> sc.stop()?
>>
>>
>> FOr processing many years at once versus a chunk in a loop: I see that if
>> I go up to certain number of days, one iteration will start to have tasks
>> that fail. So I only take a limited number of days, and do this process
>> several times. Isnt this normal as you are always somehow limited in terms
>> of resources (I have 9 nodes wiht 32GB). Or is it like this that in theory
>> you could process any volume, in case you wait long enough? I guess spark
>> can only break down the tasks up to a certain level (based on the datasets'
>> and the intermediate results’ partitions) and at some moment you hit the
>> limit where your resources are not sufficient anymore to process such one
>> task? Maybe you can tweak it a bit, but in the end you’ll hit a limit?
>>
>>
>>
>> Concretely  following topics would be interesting to find out more about
>> (links):
>> -where to see what you are still consuming after spark job ended if you
>> didnt close resources
>> -memory leaks for pyspark
>> -good article about closing resources (you find tons of snippets on how
>> to start spark context+ config for number/cores/memory of worker/executors
>> etc, but never saw a focus on making sure you clean up —> or is it just
>> stopping the spark context)
>>
>>
>>
>>
>> On 30 Mar 2022, at 21:24, Bjørn Jørgensen 
>> wrote:
>>
>> It`s quite impossible for anyone to answer your question about what is
>> eating your memory, without even knowing what language you are using.
>>
>> If you are using C then it`s always pointers, that's the mem issue.
>> If you are using python, there can be some like not using context manager
>> like With Context Managers and Python's with Statement
>> 
>>
>> And another can be not to close resources after use.
>>
>> In my experience you can process 3 years or more of data, IF you are
>> closing opened resources.
>> I use the web GUI ht

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-01 Thread Joris Billen
Hi,
as said thanks for little discussion over mail.
I understand that the action is triggered in the end at the write and then all 
of a sudden everything is executed at once. But I dont really need to trigger 
an action before. I am caching somewherew a df that will be reused several 
times (slightly updated pseudocode below).

Question: is it then better practice to already trigger some actions on  
intermediate data frame (like df4 and df8), and cache them? So that these 
actions will not be that expensive yet, and the actions to write at the end 
will require less resources, which would allow to process more days in one go? 
LIke what is added in red in improvement section in the pseudo code below?



pseudocode:


loop over all days:
spark submit 1 day



with spark submit (overly simplified)=


  df=spark.read(hfs://somepath)
  …
   ##IMPROVEMENT START
   df4=spark.sql(some stuff with df3)
   spark.sql(CACHE TABLE df4)
   …
   df8=spark.sql(some stuff with df7)
   spark.sql(CACHE TABLE df8)
  ##IMPROVEMENT END
   ...
   df12=df11.spark.sql(complex stufff)
  spark.sql(CACHE TABLE df10)
   ...
  df13=spark.sql( complex stuff with df12)
  df13.write
  df14=spark.sql( some other complex stuff with df12)
  df14.write
  df15=spark.sql( some completely other complex stuff with df12)
  df15.write






THanks!



On 31 Mar 2022, at 14:37, Sean Owen mailto:sro...@gmail.com>> 
wrote:

If that is your loop unrolled, then you are not doing parts of work at a time. 
That will execute all operations in one go when the write finally happens. 
That's OK, but may be part of the problem. For example if you are filtering for 
a subset, processing, and unioning, then that is just a harder and slower way 
of applying the transformation to all data at once.

On Thu, Mar 31, 2022 at 3:30 AM Joris Billen 
mailto:joris.bil...@bigindustries.be>> wrote:
Thanks for reply :-)

I am using pyspark. Basicially my code (simplified is):

df=spark.read.csv(hdfs://somehdfslocation)
df1=spark.sql (complex statement using df)
...
dfx=spark.sql(complex statement using df x-1)
...
dfx15.write()


What exactly is meant by "closing resources"? Is it just unpersisting cached 
dataframes at the end and stopping the spark context explicitly: sc.stop()?


FOr processing many years at once versus a chunk in a loop: I see that if I go 
up to certain number of days, one iteration will start to have tasks that fail. 
So I only take a limited number of days, and do this process several times. 
Isnt this normal as you are always somehow limited in terms of resources (I 
have 9 nodes wiht 32GB). Or is it like this that in theory you could process 
any volume, in case you wait long enough? I guess spark can only break down the 
tasks up to a certain level (based on the datasets' and the intermediate 
results’ partitions) and at some moment you hit the limit where your resources 
are not sufficient anymore to process such one task? Maybe you can tweak it a 
bit, but in the end you’ll hit a limit?



Concretely  following topics would be interesting to find out more about 
(links):
-where to see what you are still consuming after spark job ended if you didnt 
close resources
-memory leaks for pyspark
-good article about closing resources (you find tons of snippets on how to 
start spark context+ config for number/cores/memory of worker/executors etc, 
but never saw a focus on making sure you clean up —> or is it just stopping the 
spark context)




On 30 Mar 2022, at 21:24, Bjørn Jørgensen 
mailto:bjornjorgen...@gmail.com>> wrote:

It`s quite impossible for anyone to answer your question about what is eating 
your memory, without even knowing what language you are using.

If you are using C then it`s always pointers, that's the mem issue.
If you are using python, there can be some like not using context manager like 
With Context Managers and Python's with 
Statement
And another can be not to close resources after use.

In my experience you can process 3 years or more of data, IF you are closing 
opened resources.
I use the web GUI http://spark:4040 to follow what spark is 
doing.




ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen 
mailto:joris.bil...@bigindustries.be>>:
Thanks for answer-much appreciated! This forum is very useful :-)

I didnt know the sparkcontext stays alive. I guess this is eating up memory.  
The eviction means that he knows that he should clear some of the old cached 
memory to be able to store new one. In case anyone has good articles about 
memory leaks I would be interested to read.
I will t

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-31 Thread Enrico Minack
How well Spark can scale up with your data (in terms of years of data) 
depends on two things: the operations performed on the data, and 
characteristics of the data, like value distributions.


Failing tasks smell like you are using operations that do not scale 
(e.g. Cartesian product of your data, join on low-cardinality row). But 
that could be anything.


Again, the reasons for these failing tasks can be manifold, and without 
the actual transformations (i.e. your "complex statements"), and some 
characteristics of your data, no specific help is possible.


Enrico


Am 31.03.22 um 10:30 schrieb Joris Billen:

Thanks for reply :-)

I am using pyspark. Basicially my code (simplified is):

df=spark.read.csv(hdfs://somehdfslocation)
df1=spark.sql (complex statement using df)
...
dfx=spark.sql(complex statement using df x-1)
...
dfx15.write()


What exactly is meant by "closing resources"? Is it just unpersisting 
cached dataframes at the end and stopping the spark context 
explicitly: sc.stop()?



FOr processing many years at once versus a chunk in a loop: I see that 
if I go up to certain number of days, one iteration will start to have 
tasks that fail. So I only take a limited number of days, and do this 
process several times. Isnt this normal as you are always somehow 
limited in terms of resources (I have 9 nodes wiht 32GB). Or is it 
like this that in theory you could process any volume, in case you 
wait long enough? I guess spark can only break down the tasks up to a 
certain level (based on the datasets' and the intermediate results’ 
partitions) and at some moment you hit the limit where your resources 
are not sufficient anymore to process such one task? Maybe you can 
tweak it a bit, but in the end you’ll hit a limit?




Concretely  following topics would be interesting to find out more 
about (links):
-where to see what you are still consuming after spark job ended if 
you didnt close resources

-memory leaks for pyspark
-good article about closing resources (you find tons of snippets on 
how to start spark context+ config for number/cores/memory of 
worker/executors etc, but never saw a focus on making sure you clean 
up —> or is it just stopping the spark context)





On 30 Mar 2022, at 21:24, Bjørn Jørgensen  
wrote:


It`s quite impossible for anyone to answer your question about what 
is eating your memory, without even knowing what language you are using.


If you are using C then it`s always pointers, that's the mem issue.
If you are using python, there can be some like not using context 
manager like With Context Managers and Python's with Statement 
 


And another can be not to close resources after use.

In my experience you can process 3 years or more of data, IF you are 
closing opened resources.

I use the web GUI http://spark:4040 to follow what spark is doing.



ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen 
:


Thanks for answer-much appreciated! This forum is very useful :-)

I didnt know the sparkcontext stays alive. I guess this is eating
up memory.  The eviction means that he knows that he should clear
some of the old cached memory to be able to store new one. In
case anyone has good articles about memory leaks I would be
interested to read.
I will try to add following lines at the end of my job (as I
cached the table in spark sql):


/sqlContext.sql("UNCACHE TABLE mytableofinterest ")/
/spark.stop()/


Wrt looping: if I want to process 3 years of data, my modest
cluster will never do it one go , I would expect? I have to break
it down in smaller pieces and run that in a loop (1 day is
already lots of data).



Thanks!





On 30 Mar 2022, at 17:25, Sean Owen  wrote:

The Spark context does not stop when a job does. It stops when
you stop it. There could be many ways mem can leak. Caching
maybe - but it will evict. You should be clearing caches when no
longer needed.

I would guess it is something else your program holds on to in
its logic.

Also consider not looping; there is probably a faster way to do
it in one go.

On Wed, Mar 30, 2022, 10:16 AM Joris Billen
 wrote:

Hi,
I have a pyspark job submitted through spark-submit that
does some heavy processing for 1 day of data. It runs with
no errors. I have to loop over many days, so I run this
spark job in a loop. I notice after couple executions the
memory is increasing on all worker nodes and eventually this
leads to faillures. My job does some ca

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-31 Thread Sean Owen
If that is your loop unrolled, then you are not doing parts of work at a
time. That will execute all operations in one go when the write finally
happens. That's OK, but may be part of the problem. For example if you are
filtering for a subset, processing, and unioning, then that is just a
harder and slower way of applying the transformation to all data at once.

On Thu, Mar 31, 2022 at 3:30 AM Joris Billen 
wrote:

> Thanks for reply :-)
>
> I am using pyspark. Basicially my code (simplified is):
>
> df=spark.read.csv(hdfs://somehdfslocation)
> df1=spark.sql (complex statement using df)
> ...
> dfx=spark.sql(complex statement using df x-1)
> ...
> dfx15.write()
>
>
> What exactly is meant by "closing resources"? Is it just unpersisting
> cached dataframes at the end and stopping the spark context explicitly:
> sc.stop()?
>
>
> FOr processing many years at once versus a chunk in a loop: I see that if
> I go up to certain number of days, one iteration will start to have tasks
> that fail. So I only take a limited number of days, and do this process
> several times. Isnt this normal as you are always somehow limited in terms
> of resources (I have 9 nodes wiht 32GB). Or is it like this that in theory
> you could process any volume, in case you wait long enough? I guess spark
> can only break down the tasks up to a certain level (based on the datasets'
> and the intermediate results’ partitions) and at some moment you hit the
> limit where your resources are not sufficient anymore to process such one
> task? Maybe you can tweak it a bit, but in the end you’ll hit a limit?
>
>
>
> Concretely  following topics would be interesting to find out more about
> (links):
> -where to see what you are still consuming after spark job ended if you
> didnt close resources
> -memory leaks for pyspark
> -good article about closing resources (you find tons of snippets on how to
> start spark context+ config for number/cores/memory of worker/executors
> etc, but never saw a focus on making sure you clean up —> or is it just
> stopping the spark context)
>
>
>
>
> On 30 Mar 2022, at 21:24, Bjørn Jørgensen 
> wrote:
>
> It`s quite impossible for anyone to answer your question about what is
> eating your memory, without even knowing what language you are using.
>
> If you are using C then it`s always pointers, that's the mem issue.
> If you are using python, there can be some like not using context manager
> like With Context Managers and Python's with Statement
> 
>
> And another can be not to close resources after use.
>
> In my experience you can process 3 years or more of data, IF you are
> closing opened resources.
> I use the web GUI http://spark:4040 to follow what spark is doing.
>
>
>
>
> ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen <
> joris.bil...@bigindustries.be>:
>
>> Thanks for answer-much appreciated! This forum is very useful :-)
>>
>> I didnt know the sparkcontext stays alive. I guess this is eating up
>> memory.  The eviction means that he knows that he should clear some of the
>> old cached memory to be able to store new one. In case anyone has good
>> articles about memory leaks I would be interested to read.
>> I will try to add following lines at the end of my job (as I cached the
>> table in spark sql):
>>
>>
>> *sqlContext.sql("UNCACHE TABLE mytableofinterest ")*
>> *spark.stop()*
>>
>>
>> Wrt looping: if I want to process 3 years of data, my modest cluster will
>> never do it one go , I would expect? I have to break it down in smaller
>> pieces and run that in a loop (1 day is already lots of data).
>>
>>
>>
>> Thanks!
>>
>>
>>
>>
>> On 30 Mar 2022, at 17:25, Sean Owen  wrote:
>>
>> The Spark context does not stop when a job does. It stops when you stop
>> it. There could be many ways mem can leak. Caching maybe - but it will
>> evict. You should be clearing caches when no longer needed.
>>
>> I would guess it is something else your program holds on to in its logic.
>>
>> Also consider not looping; there is probably a faster way to do it in one
>> go.
>>
>> On Wed, Mar 30, 2022, 10:16 AM Joris Billen <
>> joris.bil...@bigindustries.be> wrote:
>>
>>> Hi,
>>> I have a pyspark job submitted through spark-submit that does some heavy
>>> processing for 1 day of data. It runs with no errors. I have to loop over
>>> many days, so I run this spark job in a loop. I notice after couple
>>> executions the memory is increasing on all worker nodes and eventually this
>>> leads to faillures. My job does some caching, but I understand that when
>>> the job ends successfully, then the sparkcontext is destroye

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-31 Thread Joris Billen
Thanks for reply :-)

I am using pyspark. Basicially my code (simplified is):

df=spark.read.csv(hdfs://somehdfslocation)
df1=spark.sql (complex statement using df)
...
dfx=spark.sql(complex statement using df x-1)
...
dfx15.write()


What exactly is meant by "closing resources"? Is it just unpersisting cached 
dataframes at the end and stopping the spark context explicitly: sc.stop()?


FOr processing many years at once versus a chunk in a loop: I see that if I go 
up to certain number of days, one iteration will start to have tasks that fail. 
So I only take a limited number of days, and do this process several times. 
Isnt this normal as you are always somehow limited in terms of resources (I 
have 9 nodes wiht 32GB). Or is it like this that in theory you could process 
any volume, in case you wait long enough? I guess spark can only break down the 
tasks up to a certain level (based on the datasets' and the intermediate 
results’ partitions) and at some moment you hit the limit where your resources 
are not sufficient anymore to process such one task? Maybe you can tweak it a 
bit, but in the end you’ll hit a limit?



Concretely  following topics would be interesting to find out more about 
(links):
-where to see what you are still consuming after spark job ended if you didnt 
close resources
-memory leaks for pyspark
-good article about closing resources (you find tons of snippets on how to 
start spark context+ config for number/cores/memory of worker/executors etc, 
but never saw a focus on making sure you clean up —> or is it just stopping the 
spark context)




On 30 Mar 2022, at 21:24, Bjørn Jørgensen 
mailto:bjornjorgen...@gmail.com>> wrote:

It`s quite impossible for anyone to answer your question about what is eating 
your memory, without even knowing what language you are using.

If you are using C then it`s always pointers, that's the mem issue.
If you are using python, there can be some like not using context manager like 
With Context Managers and Python's with 
Statement
And another can be not to close resources after use.

In my experience you can process 3 years or more of data, IF you are closing 
opened resources.
I use the web GUI http://spark:4040 to follow what spark is 
doing.




ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen 
mailto:joris.bil...@bigindustries.be>>:
Thanks for answer-much appreciated! This forum is very useful :-)

I didnt know the sparkcontext stays alive. I guess this is eating up memory.  
The eviction means that he knows that he should clear some of the old cached 
memory to be able to store new one. In case anyone has good articles about 
memory leaks I would be interested to read.
I will try to add following lines at the end of my job (as I cached the table 
in spark sql):


sqlContext.sql("UNCACHE TABLE mytableofinterest ")
spark.stop()


Wrt looping: if I want to process 3 years of data, my modest cluster will never 
do it one go , I would expect? I have to break it down in smaller pieces and 
run that in a loop (1 day is already lots of data).



Thanks!




On 30 Mar 2022, at 17:25, Sean Owen mailto:sro...@gmail.com>> 
wrote:

The Spark context does not stop when a job does. It stops when you stop it. 
There could be many ways mem can leak. Caching maybe - but it will evict. You 
should be clearing caches when no longer needed.

I would guess it is something else your program holds on to in its logic.

Also consider not looping; there is probably a faster way to do it in one go.

On Wed, Mar 30, 2022, 10:16 AM Joris Billen 
mailto:joris.bil...@bigindustries.be>> wrote:
Hi,
I have a pyspark job submitted through spark-submit that does some heavy 
processing for 1 day of data. It runs with no errors. I have to loop over many 
days, so I run this spark job in a loop. I notice after couple executions the 
memory is increasing on all worker nodes and eventually this leads to 
faillures. My job does some caching, but I understand that when the job ends 
successfully, then the sparkcontext is destroyed and the cache should be 
cleared. However it seems that something keeps on filling the memory a bit more 
and more after each run. THis is the memory behaviour over time, which in the 
end will start leading to failures :
[X]

(what we see is: green=physical memory used, green-blue=physical memory cached, 
grey=memory capacity =straight line around 31GB )
This runs on a healthy spark 2.4 and was optimized already to come to a stable 
job in terms of spark-submit resources parameters like 
driver-memory/num-executors/executor-memor

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-30 Thread Enrico Minack
> Wrt looping: if I want to process 3 years of data, my modest cluster 
will never do it one go , I would expect?
> I have to break it down in smaller pieces and run that in a loop (1 
day is already lots of data).


Well, that is exactly what Spark is made for. It splits the work up and 
processes it in small pieces, called partitions. No matter how much data 
you have, it probably works with your laptop (as long as it fits on 
disk), though it will take some time. But it will succeed. A large 
cluster is doing nothing else, except for having more partitions being 
processed in parallel.


You should expect it to work, no matter how many years of data. 
Otherwise, you have to rethink your Spark code, not your cluster size.


Share some code that does not work with 3 years and people might help. 
Without that, speculations is all you will get.


Enrico



Am 30.03.22 um 17:40 schrieb Joris Billen:

Thanks for answer-much appreciated! This forum is very useful :-)

I didnt know the sparkcontext stays alive. I guess this is eating up 
memory.  The eviction means that he knows that he should clear some of 
the old cached memory to be able to store new one. In case anyone has 
good articles about memory leaks I would be interested to read.
I will try to add following lines at the end of my job (as I cached 
the table in spark sql):



/sqlContext.sql("UNCACHE TABLE mytableofinterest ")/
/spark.stop()/


Wrt looping: if I want to process 3 years of data, my modest cluster 
will never do it one go , I would expect? I have to break it down in 
smaller pieces and run that in a loop (1 day is already lots of data).




Thanks!





On 30 Mar 2022, at 17:25, Sean Owen  wrote:

The Spark context does not stop when a job does. It stops when you 
stop it. There could be many ways mem can leak. Caching maybe - but 
it will evict. You should be clearing caches when no longer needed.


I would guess it is something else your program holds on to in its 
logic.


Also consider not looping; there is probably a faster way to do it in 
one go.


On Wed, Mar 30, 2022, 10:16 AM Joris Billen 
 wrote:


Hi,
I have a pyspark job submitted through spark-submit that does
some heavy processing for 1 day of data. It runs with no errors.
I have to loop over many days, so I run this spark job in a loop.
I notice after couple executions the memory is increasing on all
worker nodes and eventually this leads to faillures. My job does
some caching, but I understand that when the job ends
successfully, then the sparkcontext is destroyed and the cache
should be cleared. However it seems that something keeps on
filling the memory a bit more and more after each run. THis is
the memory behaviour over time, which in the end will start
leading to failures :

(what we see is: green=physical memory used, green-blue=physical
memory cached, grey=memory capacity =straight line around 31GB )
This runs on a healthy spark 2.4 and was optimized already to
come to a stable job in terms of spark-submit resources
parameters like

driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait).
Any clue how to “really” clear the memory in between jobs? So
basically currently I can loop 10x and then need to restart my
cluster so all memory is cleared completely.


Thanks for any info!






Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-30 Thread Bjørn Jørgensen
It`s quite impossible for anyone to answer your question about what is
eating your memory, without even knowing what language you are using.

If you are using C then it`s always pointers, that's the mem issue.
If you are using python, there can be some like not using context manager
like With Context Managers and Python's with Statement

And another can be not to close resources after use.

In my experience you can process 3 years or more of data, IF you are
closing opened resources.
I use the web GUI http://spark:4040 to follow what spark is doing.




ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen <
joris.bil...@bigindustries.be>:

> Thanks for answer-much appreciated! This forum is very useful :-)
>
> I didnt know the sparkcontext stays alive. I guess this is eating up
> memory.  The eviction means that he knows that he should clear some of the
> old cached memory to be able to store new one. In case anyone has good
> articles about memory leaks I would be interested to read.
> I will try to add following lines at the end of my job (as I cached the
> table in spark sql):
>
>
> *sqlContext.sql("UNCACHE TABLE mytableofinterest ")*
> *spark.stop()*
>
>
> Wrt looping: if I want to process 3 years of data, my modest cluster will
> never do it one go , I would expect? I have to break it down in smaller
> pieces and run that in a loop (1 day is already lots of data).
>
>
>
> Thanks!
>
>
>
>
> On 30 Mar 2022, at 17:25, Sean Owen  wrote:
>
> The Spark context does not stop when a job does. It stops when you stop
> it. There could be many ways mem can leak. Caching maybe - but it will
> evict. You should be clearing caches when no longer needed.
>
> I would guess it is something else your program holds on to in its logic.
>
> Also consider not looping; there is probably a faster way to do it in one
> go.
>
> On Wed, Mar 30, 2022, 10:16 AM Joris Billen 
> wrote:
>
>> Hi,
>> I have a pyspark job submitted through spark-submit that does some heavy
>> processing for 1 day of data. It runs with no errors. I have to loop over
>> many days, so I run this spark job in a loop. I notice after couple
>> executions the memory is increasing on all worker nodes and eventually this
>> leads to faillures. My job does some caching, but I understand that when
>> the job ends successfully, then the sparkcontext is destroyed and the cache
>> should be cleared. However it seems that something keeps on filling the
>> memory a bit more and more after each run. THis is the memory behaviour
>> over time, which in the end will start leading to failures :
>>
>> (what we see is: green=physical memory used, green-blue=physical memory
>> cached, grey=memory capacity =straight line around 31GB )
>> This runs on a healthy spark 2.4 and was optimized already to come to a
>> stable job in terms of spark-submit resources parameters like
>> driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait).
>> Any clue how to “really” clear the memory in between jobs? So basically
>> currently I can loop 10x and then need to restart my cluster so all memory
>> is cleared completely.
>>
>>
>> Thanks for any info!
>>
>> 
>
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297


Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-30 Thread Joris Billen
Thanks for answer-much appreciated! This forum is very useful :-)

I didnt know the sparkcontext stays alive. I guess this is eating up memory.  
The eviction means that he knows that he should clear some of the old cached 
memory to be able to store new one. In case anyone has good articles about 
memory leaks I would be interested to read.
I will try to add following lines at the end of my job (as I cached the table 
in spark sql):


sqlContext.sql("UNCACHE TABLE mytableofinterest ")
spark.stop()


Wrt looping: if I want to process 3 years of data, my modest cluster will never 
do it one go , I would expect? I have to break it down in smaller pieces and 
run that in a loop (1 day is already lots of data).



Thanks!




On 30 Mar 2022, at 17:25, Sean Owen mailto:sro...@gmail.com>> 
wrote:

The Spark context does not stop when a job does. It stops when you stop it. 
There could be many ways mem can leak. Caching maybe - but it will evict. You 
should be clearing caches when no longer needed.

I would guess it is something else your program holds on to in its logic.

Also consider not looping; there is probably a faster way to do it in one go.

On Wed, Mar 30, 2022, 10:16 AM Joris Billen 
mailto:joris.bil...@bigindustries.be>> wrote:
Hi,
I have a pyspark job submitted through spark-submit that does some heavy 
processing for 1 day of data. It runs with no errors. I have to loop over many 
days, so I run this spark job in a loop. I notice after couple executions the 
memory is increasing on all worker nodes and eventually this leads to 
faillures. My job does some caching, but I understand that when the job ends 
successfully, then the sparkcontext is destroyed and the cache should be 
cleared. However it seems that something keeps on filling the memory a bit more 
and more after each run. THis is the memory behaviour over time, which in the 
end will start leading to failures :
[cid:C5C58A91-D7ED-4522-9984-C75192E4A9AA@home]

(what we see is: green=physical memory used, green-blue=physical memory cached, 
grey=memory capacity =straight line around 31GB )
This runs on a healthy spark 2.4 and was optimized already to come to a stable 
job in terms of spark-submit resources parameters like 
driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait).
Any clue how to “really” clear the memory in between jobs? So basically 
currently I can loop 10x and then need to restart my cluster so all memory is 
cleared completely.


Thanks for any info!





Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-30 Thread Sean Owen
The Spark context does not stop when a job does. It stops when you stop it.
There could be many ways mem can leak. Caching maybe - but it will evict.
You should be clearing caches when no longer needed.

I would guess it is something else your program holds on to in its logic.

Also consider not looping; there is probably a faster way to do it in one
go.

On Wed, Mar 30, 2022, 10:16 AM Joris Billen 
wrote:

> Hi,
> I have a pyspark job submitted through spark-submit that does some heavy
> processing for 1 day of data. It runs with no errors. I have to loop over
> many days, so I run this spark job in a loop. I notice after couple
> executions the memory is increasing on all worker nodes and eventually this
> leads to faillures. My job does some caching, but I understand that when
> the job ends successfully, then the sparkcontext is destroyed and the cache
> should be cleared. However it seems that something keeps on filling the
> memory a bit more and more after each run. THis is the memory behaviour
> over time, which in the end will start leading to failures :
>
> (what we see is: green=physical memory used, green-blue=physical memory
> cached, grey=memory capacity =straight line around 31GB )
> This runs on a healthy spark 2.4 and was optimized already to come to a
> stable job in terms of spark-submit resources parameters like
> driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait).
> Any clue how to “really” clear the memory in between jobs? So basically
> currently I can loop 10x and then need to restart my cluster so all memory
> is cleared completely.
>
>
> Thanks for any info!
>
>


loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-30 Thread Joris Billen
Hi,
I have a pyspark job submitted through spark-submit that does some heavy 
processing for 1 day of data. It runs with no errors. I have to loop over many 
days, so I run this spark job in a loop. I notice after couple executions the 
memory is increasing on all worker nodes and eventually this leads to 
faillures. My job does some caching, but I understand that when the job ends 
successfully, then the sparkcontext is destroyed and the cache should be 
cleared. However it seems that something keeps on filling the memory a bit more 
and more after each run. THis is the memory behaviour over time, which in the 
end will start leading to failures :
[cid:C5C58A91-D7ED-4522-9984-C75192E4A9AA@home]

(what we see is: green=physical memory used, green-blue=physical memory cached, 
grey=memory capacity =straight line around 31GB )
This runs on a healthy spark 2.4 and was optimized already to come to a stable 
job in terms of spark-submit resources parameters like 
driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait).
Any clue how to “really” clear the memory in between jobs? So basically 
currently I can loop 10x and then need to restart my cluster so all memory is 
cleared completely.


Thanks for any info!