Re: Memory-efficient successive calls to repartition()
if (i+1) % 10 == 0: data2.checkpoint() data2.first() # materialize rdd data.unpersist() # unpersist previous version data=data2 The data is checkpointed every 10 iterations to a directory that I specified. While this seems to improve things a little bit, there is still a lot of writing on disk (appcache directory, shown as "non HDFS files" in Cloudera Manager) *besides* the checkpoint files (which are regular HDFS files), and the application eventually runs out of disk space. The same is true even if I checkpoint at every iteration. What am I doing wrong? Maybe some garbage collector setting? Thanks a lot for the help, Aurelien Le 24/08/2015 10:39, alexis GILLAIN a écrit : Hi Aurelien, The first code should create a new RDD in memory at each iteration (check the webui). The second code will unpersist the RDD but that's not the main problem. I think you have trouble due to long lineage as .cache() keep track of lineage for recovery. You should have a look at checkpointing : https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala You can also have a look at the code of others iterative algorithms in mlllib for best practices. 2015-08-20 17:26 GMT+08:00 abellet mailto:aurelien.bel...@telecom-paristech.fr> <mailto:aurelien.bel...@telecom-paristech.fr <mailto:aurelien.bel...@telecom-paristech.fr>> <mailto:aurelien.bel...@telecom-paristech.fr <mailto:aurelien.bel...@telecom-paristech.fr> <mailto:aurelien.bel...@telecom-paristech.fr <mailto:aurelien.bel...@telecom-paristech.fr>>>>: Hello, For the need of my application, I need to periodically "shuffle" the data across nodes/partitions of a reasonably-large dataset. This is an expensive operation but I only need to do it every now and then. However it seems that I am doing something wrong because as the iterations go the memory usage increases, causing the job to spill onto HDFS, which eventually gets full. I am also getting some "Lost executor" errors that I don't get if I don't repartition. Here's a basic piece of code which reproduces the problem: data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache() data.count() for i in range(1000): data=data.repartition(50).persist() # below several operations are done on data What am I doing wrong? I tried the following but it doesn't solve the issue: for i in range(1000): data2=data.repartition(50).persist() data2.count() # materialize rdd data.unpersist() # unpersist previous version data=data2 Help and suggestions on this would be greatly appreciated! Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> <mailto:user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org>> <mailto:user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.o
Re: Memory-efficient successive calls to repartition()
n as "non HDFS files" in Cloudera Manager) *besides* the checkpoint files (which are regular HDFS files), and the application eventually runs out of disk space. The same is true even if I checkpoint at every iteration. What am I doing wrong? Maybe some garbage collector setting? Thanks a lot for the help, Aurelien Le 24/08/2015 10:39, alexis GILLAIN a écrit : Hi Aurelien, The first code should create a new RDD in memory at each iteration (check the webui). The second code will unpersist the RDD but that's not the main problem. I think you have trouble due to long lineage as .cache() keep track of lineage for recovery. You should have a look at checkpointing : https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala You can also have a look at the code of others iterative algorithms in mlllib for best practices. 2015-08-20 17:26 GMT+08:00 abellet mailto:aurelien.bel...@telecom-paristech.fr> <mailto:aurelien.bel...@telecom-paristech.fr <mailto:aurelien.bel...@telecom-paristech.fr>> <mailto:aurelien.bel...@telecom-paristech.fr <mailto:aurelien.bel...@telecom-paristech.fr> <mailto:aurelien.bel...@telecom-paristech.fr <mailto:aurelien.bel...@telecom-paristech.fr>>>>: Hello, For the need of my application, I need to periodically "shuffle" the data across nodes/partitions of a reasonably-large dataset. This is an expensive operation but I only need to do it every now and then. However it seems that I am doing something wrong because as the iterations go the memory usage increases, causing the job to spill onto HDFS, which eventually gets full. I am also getting some "Lost executor" errors that I don't get if I don't repartition. Here's a basic piece of code which reproduces the problem: data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache() data.count() for i in range(1000): data=data.repartition(50).persist() # below several operations are done on data What am I doing wrong? I tried the following but it doesn't solve the issue: for i in range(1000): data2=data.repartition(50).persist() data2.count() # materialize rdd data.unpersist() # unpersist previous version data=data2 Help and suggestions on this would be greatly appreciated! Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> <mailto:user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org>> <mailto:user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> <mailto:user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org>>> For additional commands, e-mail: user-h...@spark.apache.org <mailto:user-h...@spark.apache.org> <mailto:user-h...@spark.apache.org <mailto:user-h...
Re: Memory-efficient successive calls to repartition()
> >>> Hi Aurelien, >>> >>> The first code should create a new RDD in memory at each >>> iteration >>> (check the webui). >>> The second code will unpersist the RDD but that's not the main >>> problem. >>> >>> I think you have trouble due to long lineage as .cache() keep >>> track of >>> lineage for recovery. >>> You should have a look at checkpointing : >>> >>> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md >>> >>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala >>> >>> You can also have a look at the code of others iterative >>> algorithms in >>> mlllib for best practices. >>> >>> 2015-08-20 17:26 GMT+08:00 abellet >>> >> <mailto:aurelien.bel...@telecom-paristech.fr> >>> <mailto:aurelien.bel...@telecom-paristech.fr >>> >>> <mailto:aurelien.bel...@telecom-paristech.fr>>>: >>> >>> Hello, >>> >>> For the need of my application, I need to periodically >>> "shuffle" the >>> data >>> across nodes/partitions of a reasonably-large dataset. This >>> is an >>> expensive >>> operation but I only need to do it every now and then. >>> However it >>> seems that >>> I am doing something wrong because as the iterations go the >>> memory usage >>> increases, causing the job to spill onto HDFS, which >>> eventually gets >>> full. I >>> am also getting some "Lost executor" errors that I don't >>> get if I don't >>> repartition. >>> >>> Here's a basic piece of code which reproduces the problem: >>> >>> data = >>> sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache() >>> data.count() >>> for i in range(1000): >>> data=data.repartition(50).persist() >>> # below several operations are done on data >>> >>> >>> What am I doing wrong? I tried the following but it doesn't >>> solve >>> the issue: >>> >>> for i in range(1000): >>> data2=data.repartition(50).persist() >>> data2.count() # materialize rdd >>> data.unpersist() # unpersist previous version >>> data=data2 >>> >>> >>> Help and suggestions on this would be greatly appreciated! >>> Thanks a lot! >>> >>> >>> >>> >>> -- >>> View this message in context: >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html >>> Sent from the Apache Spark User List mailing list archive >>> at Nabble.com. >>> >>> >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> <mailto:user-unsubscr...@spark.apache.org> >>> <mailto:user-unsubscr...@spark.apache.org >>> <mailto:user-unsubscr...@spark.apache.org>> >>> For additional commands, e-mail: user-h...@spark.apache.org >>> <mailto:user-h...@spark.apache.org> >>> <mailto:user-h...@spark.apache.org >>> <mailto:user-h...@spark.apache.org>> >>> >>> >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> <mailto:user-unsubscr...@spark.apache.org> >>> For additional commands, e-mail: user-h...@spark.apache.org >>> <mailto:user-h...@spark.apache.org> >>> >>> >>> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > -- with Regards Shahid Ashraf
Re: Memory-efficient successive calls to repartition()
w RDD in memory at each iteration >> (check the webui). >> The second code will unpersist the RDD but that's not the main >> problem. >> >> I think you have trouble due to long lineage as .cache() keep >> track of >> lineage for recovery. >> You should have a look at checkpointing : >> >> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md >> >> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala >> >> You can also have a look at the code of others iterative >> algorithms in >> mlllib for best practices. >> >> 2015-08-20 17:26 GMT+08:00 abellet >> > <mailto:aurelien.bel...@telecom-paristech.fr> >> <mailto:aurelien.bel...@telecom-paristech.fr >> >> <mailto:aurelien.bel...@telecom-paristech.fr>>>: >> >> Hello, >> >> For the need of my application, I need to periodically >> "shuffle" the >> data >> across nodes/partitions of a reasonably-large dataset. This >> is an >> expensive >> operation but I only need to do it every now and then. >> However it >> seems that >> I am doing something wrong because as the iterations go the >> memory usage >> increases, causing the job to spill onto HDFS, which >> eventually gets >> full. I >> am also getting some "Lost executor" errors that I don't >> get if I don't >> repartition. >> >> Here's a basic piece of code which reproduces the problem: >> >> data = >> sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache() >> data.count() >> for i in range(1000): >> data=data.repartition(50).persist() >> # below several operations are done on data >> >> >> What am I doing wrong? I tried the following but it doesn't >> solve >> the issue: >> >> for i in range(1000): >> data2=data.repartition(50).persist() >> data2.count() # materialize rdd >> data.unpersist() # unpersist previous version >> data=data2 >> >> >> Help and suggestions on this would be greatly appreciated! >> Thanks a lot! >> >> >> >> >> -- >> View this message in context: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html >> Sent from the Apache Spark User List mailing list archive >> at Nabble.com. >> >> >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org> >> <mailto:user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org>> >> For additional commands, e-mail: user-h...@spark.apache.org >> <mailto:user-h...@spark.apache.org> >> <mailto:user-h...@spark.apache.org >> <mailto:user-h...@spark.apache.org>> >> >> >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> <mailto:user-unsubscr...@spark.apache.org> >> For additional commands, e-mail: user-h...@spark.apache.org >> <mailto:user-h...@spark.apache.org> >> >> >> > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Memory-efficient successive calls to repartition()
is an expensive operation but I only need to do it every now and then. However it seems that I am doing something wrong because as the iterations go the memory usage increases, causing the job to spill onto HDFS, which eventually gets full. I am also getting some "Lost executor" errors that I don't get if I don't repartition. Here's a basic piece of code which reproduces the problem: data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache() data.count() for i in range(1000): data=data.repartition(50).persist() # below several operations are done on data What am I doing wrong? I tried the following but it doesn't solve the issue: for i in range(1000): data2=data.repartition(50).persist() data2.count() # materialize rdd data.unpersist() # unpersist previous version data=data2 Help and suggestions on this would be greatly appreciated! Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> <mailto:user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org>> For additional commands, e-mail: user-h...@spark.apache.org <mailto:user-h...@spark.apache.org> <mailto:user-h...@spark.apache.org <mailto:user-h...@spark.apache.org>> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org <mailto:user-h...@spark.apache.org> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Memory-efficient successive calls to repartition()
Aurélien, >From what you're saying, I can think of a couple of things considering I don't know what you are doing in the rest of the code : - There is lot of non hdfs writes, it comes from the rest of your code and/or repartittion(). Repartition involve a shuffling and creation of files on disk. I would have said that the problem come from that but I just checked and checkpoint() is supposed to delete shuffle files : https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html (looks exactly as your problem so you could maybe try the others workarounds) Still, you may do a lot of shuffle in the rest of the code (you should see the amount of shuffle files written in the webui) and consider increasing the disk space available...if you can do that. - On the hdfs side, the class I pointed to has an update function which "automatically handles persisting and (optionally) checkpointing, as well as unpersisting and removing checkpoint files". Not sure your method for checkpointing remove previous checkpoint file. In the end, does the disk space error come from hdfs growing or local disk growing ? You should check the webui to identify which tasks spill data on disk and verify if the shuffle files are properly deleted when you checkpoint your rdd. Regards, 2015-09-01 22:48 GMT+08:00 Aurélien Bellet < aurelien.bel...@telecom-paristech.fr>: > Dear Alexis, > > Thanks again for your reply. After reading about checkpointing I have > modified my sample code as follows: > > for i in range(1000): > print i > data2=data.repartition(50).cache() > if (i+1) % 10 == 0: > data2.checkpoint() > data2.first() # materialize rdd > data.unpersist() # unpersist previous version > data=data2 > > The data is checkpointed every 10 iterations to a directory that I > specified. While this seems to improve things a little bit, there is still > a lot of writing on disk (appcache directory, shown as "non HDFS files" in > Cloudera Manager) *besides* the checkpoint files (which are regular HDFS > files), and the application eventually runs out of disk space. The same is > true even if I checkpoint at every iteration. > > What am I doing wrong? Maybe some garbage collector setting? > > Thanks a lot for the help, > > Aurelien > > Le 24/08/2015 10:39, alexis GILLAIN a écrit : > >> Hi Aurelien, >> >> The first code should create a new RDD in memory at each iteration >> (check the webui). >> The second code will unpersist the RDD but that's not the main problem. >> >> I think you have trouble due to long lineage as .cache() keep track of >> lineage for recovery. >> You should have a look at checkpointing : >> >> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md >> >> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala >> >> You can also have a look at the code of others iterative algorithms in >> mlllib for best practices. >> >> 2015-08-20 17:26 GMT+08:00 abellet > <mailto:aurelien.bel...@telecom-paristech.fr>>: >> >> Hello, >> >> For the need of my application, I need to periodically "shuffle" the >> data >> across nodes/partitions of a reasonably-large dataset. This is an >> expensive >> operation but I only need to do it every now and then. However it >> seems that >> I am doing something wrong because as the iterations go the memory >> usage >> increases, causing the job to spill onto HDFS, which eventually gets >> full. I >> am also getting some "Lost executor" errors that I don't get if I >> don't >> repartition. >> >> Here's a basic piece of code which reproduces the problem: >> >> data = >> sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache() >> data.count() >> for i in range(1000): >> data=data.repartition(50).persist() >> # below several operations are done on data >> >> >> What am I doing wrong? I tried the following but it doesn't solve >> the issue: >> >> for i in range(1000): >> data2=data.repartition(50).persist() >> data2.count() # materialize rdd >> data.unpersist() # unpersist previous version >> data=data2 >> >> >> Help and suggestions on this would be greatly appreciated! Thanks a >> lot! >> >> >> >
Re: Memory-efficient successive calls to repartition()
Dear Alexis, Thanks again for your reply. After reading about checkpointing I have modified my sample code as follows: for i in range(1000): print i data2=data.repartition(50).cache() if (i+1) % 10 == 0: data2.checkpoint() data2.first() # materialize rdd data.unpersist() # unpersist previous version data=data2 The data is checkpointed every 10 iterations to a directory that I specified. While this seems to improve things a little bit, there is still a lot of writing on disk (appcache directory, shown as "non HDFS files" in Cloudera Manager) *besides* the checkpoint files (which are regular HDFS files), and the application eventually runs out of disk space. The same is true even if I checkpoint at every iteration. What am I doing wrong? Maybe some garbage collector setting? Thanks a lot for the help, Aurelien Le 24/08/2015 10:39, alexis GILLAIN a écrit : Hi Aurelien, The first code should create a new RDD in memory at each iteration (check the webui). The second code will unpersist the RDD but that's not the main problem. I think you have trouble due to long lineage as .cache() keep track of lineage for recovery. You should have a look at checkpointing : https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala You can also have a look at the code of others iterative algorithms in mlllib for best practices. 2015-08-20 17:26 GMT+08:00 abellet mailto:aurelien.bel...@telecom-paristech.fr>>: Hello, For the need of my application, I need to periodically "shuffle" the data across nodes/partitions of a reasonably-large dataset. This is an expensive operation but I only need to do it every now and then. However it seems that I am doing something wrong because as the iterations go the memory usage increases, causing the job to spill onto HDFS, which eventually gets full. I am also getting some "Lost executor" errors that I don't get if I don't repartition. Here's a basic piece of code which reproduces the problem: data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache() data.count() for i in range(1000): data=data.repartition(50).persist() # below several operations are done on data What am I doing wrong? I tried the following but it doesn't solve the issue: for i in range(1000): data2=data.repartition(50).persist() data2.count() # materialize rdd data.unpersist() # unpersist previous version data=data2 Help and suggestions on this would be greatly appreciated! Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org <mailto:user-h...@spark.apache.org> - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Memory-efficient successive calls to repartition()
Hi Aurelien, The first code should create a new RDD in memory at each iteration (check the webui). The second code will unpersist the RDD but that's not the main problem. I think you have trouble due to long lineage as .cache() keep track of lineage for recovery. You should have a look at checkpointing : https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala You can also have a look at the code of others iterative algorithms in mlllib for best practices. 2015-08-20 17:26 GMT+08:00 abellet : > Hello, > > For the need of my application, I need to periodically "shuffle" the data > across nodes/partitions of a reasonably-large dataset. This is an expensive > operation but I only need to do it every now and then. However it seems > that > I am doing something wrong because as the iterations go the memory usage > increases, causing the job to spill onto HDFS, which eventually gets full. > I > am also getting some "Lost executor" errors that I don't get if I don't > repartition. > > Here's a basic piece of code which reproduces the problem: > > data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache() > data.count() > for i in range(1000): > data=data.repartition(50).persist() > # below several operations are done on data > > > What am I doing wrong? I tried the following but it doesn't solve the > issue: > > for i in range(1000): > data2=data.repartition(50).persist() > data2.count() # materialize rdd > data.unpersist() # unpersist previous version > data=data2 > > > Help and suggestions on this would be greatly appreciated! Thanks a lot! > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Memory-efficient successive calls to repartition()
Hi Aurelien, The first code should create a new RDD in memory at each iteration (check the webui). The second code will unpersist the RDD but that's not the main problem. I think you have trouble due to long lineage as .cache() keep track of lineage for recovery. You should have a look at checkpointing : https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala You can also have a look at the code of others iterative algorithms in mlllib for best practices. 2015-08-20 17:26 GMT+08:00 abellet : > Hello, > > For the need of my application, I need to periodically "shuffle" the data > across nodes/partitions of a reasonably-large dataset. This is an expensive > operation but I only need to do it every now and then. However it seems > that > I am doing something wrong because as the iterations go the memory usage > increases, causing the job to spill onto HDFS, which eventually gets full. > I > am also getting some "Lost executor" errors that I don't get if I don't > repartition. > > Here's a basic piece of code which reproduces the problem: > > data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache() > data.count() > for i in range(1000): > data=data.repartition(50).persist() > # below several operations are done on data > > > What am I doing wrong? I tried the following but it doesn't solve the > issue: > > for i in range(1000): > data2=data.repartition(50).persist() > data2.count() # materialize rdd > data.unpersist() # unpersist previous version > data=data2 > > > Help and suggestions on this would be greatly appreciated! Thanks a lot! > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Alexis GILLAIN
Memory-efficient successive calls to repartition()
Hello, For the need of my application, I need to periodically "shuffle" the data across nodes/partitions of a reasonably-large dataset. This is an expensive operation but I only need to do it every now and then. However it seems that I am doing something wrong because as the iterations go the memory usage increases, causing the job to spill onto HDFS, which eventually gets full. I am also getting some "Lost executor" errors that I don't get if I don't repartition. Here's a basic piece of code which reproduces the problem: data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache() data.count() for i in range(1000): data=data.repartition(50).persist() # below several operations are done on data What am I doing wrong? I tried the following but it doesn't solve the issue: for i in range(1000): data2=data.repartition(50).persist() data2.count() # materialize rdd data.unpersist() # unpersist previous version data=data2 Help and suggestions on this would be greatly appreciated! Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org