Thanks a lot for the useful link and comments Alexis!

First of all, the problem occurs without doing anything else in the code (except of course loading my data from HDFS at the beginning) - so it definitely comes from the shuffling. You're right, in the current version, checkpoint files are not removed and take up some space in HDFS (this is easy to fix). But this is negligible compared to the non hdfs files which keeps growing as iterations go. So I agree with you that this must come from the shuffling operations: it seems that the shuffle files are not removed along the execution (they are only removed if I stop/kill the application), despite the use of checkpoint.

The class you mentioned is very interesting but I did not find a way to use it from pyspark. I will try to implement my own version, looking at the source code. But besides the queueing and removing of checkpoint files, I do not really see anything special there that could solve my issue.

I will continue to investigate this. Just found out I can use a command line browser to look at the webui (I cannot access the server in graphical display mode), this should help me understand what's going on. I will also try the workarounds mentioned in the link. Keep you posted.

Again, thanks a lot!

Best,

Aurelien

Le 02/09/2015 14:15, alexis GILLAIN a écrit :
Aurélien,

 From what you're saying, I can think of a couple of things considering
I don't know what you are doing in the rest of the code :

- There is lot of non hdfs writes, it comes from the rest of your code
and/or repartittion(). Repartition involve a shuffling and creation of
files on disk. I would have said that the problem come from that but I
just checked and checkpoint() is supposed to delete shuffle files :
https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
(looks exactly as your problem so you could maybe try the others
workarounds)
Still, you may do a lot of shuffle in the rest of the code (you should
see the amount of shuffle files written in the webui) and consider
increasing the disk space available...if you can do that.

- On the hdfs side, the class I pointed to has an update function which
"automatically handles persisting and (optionally) checkpointing, as
well as unpersisting and removing checkpoint files". Not sure your
method for checkpointing remove previous checkpoint file.

In the end, does the disk space error come from hdfs growing or local
disk growing ?

You should check the webui to identify which tasks spill data on disk
and verify if the shuffle files are properly deleted when you checkpoint
your rdd.


Regards,


2015-09-01 22:48 GMT+08:00 Aurélien Bellet
<aurelien.bel...@telecom-paristech.fr
<mailto:aurelien.bel...@telecom-paristech.fr>>:

    Dear Alexis,

    Thanks again for your reply. After reading about checkpointing I
    have modified my sample code as follows:

    for i in range(1000):
         print i
         data2=data.repartition(50).cache()
         if (i+1) % 10 == 0:
             data2.checkpoint()
         data2.first() # materialize rdd
         data.unpersist() # unpersist previous version
         data=data2

    The data is checkpointed every 10 iterations to a directory that I
    specified. While this seems to improve things a little bit, there is
    still a lot of writing on disk (appcache directory, shown as "non
    HDFS files" in Cloudera Manager) *besides* the checkpoint files
    (which are regular HDFS files), and the application eventually runs
    out of disk space. The same is true even if I checkpoint at every
    iteration.

    What am I doing wrong? Maybe some garbage collector setting?

    Thanks a lot for the help,

    Aurelien

    Le 24/08/2015 10:39, alexis GILLAIN a écrit :

        Hi Aurelien,

        The first code should create a new RDD in memory at each iteration
        (check the webui).
        The second code will unpersist the RDD but that's not the main
        problem.

        I think you have trouble due to long lineage as .cache() keep
        track of
        lineage for recovery.
        You should have a look at checkpointing :
        
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
        
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala

        You can also have a look at the code of others iterative
        algorithms in
        mlllib for best practices.

        2015-08-20 17:26 GMT+08:00 abellet
        <aurelien.bel...@telecom-paristech.fr
        <mailto:aurelien.bel...@telecom-paristech.fr>
        <mailto:aurelien.bel...@telecom-paristech.fr
        <mailto:aurelien.bel...@telecom-paristech.fr>>>:

             Hello,

             For the need of my application, I need to periodically
        "shuffle" the
             data
             across nodes/partitions of a reasonably-large dataset. This
        is an
             expensive
             operation but I only need to do it every now and then.
        However it
             seems that
             I am doing something wrong because as the iterations go the
        memory usage
             increases, causing the job to spill onto HDFS, which
        eventually gets
             full. I
             am also getting some "Lost executor" errors that I don't
        get if I don't
             repartition.

             Here's a basic piece of code which reproduces the problem:

             data =
        sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
             data.count()
             for i in range(1000):
                      data=data.repartition(50).persist()
                      # below several operations are done on data


             What am I doing wrong? I tried the following but it doesn't
        solve
             the issue:

             for i in range(1000):
                      data2=data.repartition(50).persist()
                      data2.count() # materialize rdd
                      data.unpersist() # unpersist previous version
                      data=data2


             Help and suggestions on this would be greatly appreciated!
        Thanks a lot!




             --
             View this message in context:
        
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
             Sent from the Apache Spark User List mailing list archive
        at Nabble.com.


        ---------------------------------------------------------------------
             To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
        <mailto:user-unsubscr...@spark.apache.org>
             <mailto:user-unsubscr...@spark.apache.org
        <mailto:user-unsubscr...@spark.apache.org>>
             For additional commands, e-mail: user-h...@spark.apache.org
        <mailto:user-h...@spark.apache.org>
             <mailto:user-h...@spark.apache.org
        <mailto:user-h...@spark.apache.org>>



    ---------------------------------------------------------------------
    To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
    <mailto:user-unsubscr...@spark.apache.org>
    For additional commands, e-mail: user-h...@spark.apache.org
    <mailto:user-h...@spark.apache.org>



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to