BTW try to only load the largest RDD2 version (I guess corresponding to the last iterations) and cache the RDD, do a count on it and have a look into the UI to see what's the size of the RDD. If you have enough ram on each worker + driver then this should fit with the broadcast solution if you do the cleaning of old broadcasted vars.
2014-02-19 12:25 GMT+01:00 Eugen Cepoi <[email protected]>: > Yeah this is due to the fact that the broadcasted variables are kept in > memory and I am guessing that it is referenced in a way that prevents it > from being garbage collected... > A solution could be to enable spark.cleaner.ttl, but I don't like it much > as it sounds more like a hacky solution. > There is also a PR that has been merged few days ago > https://github.com/apache/incubator-spark/pull/543, unfortunately it is > not part of spark 0.9 :( > > However you can have a look at the code source and maybe implement it in > your job so that at the end of each iteration you are removing the > broadcasted vars, it should be possible through SparkEnv.get.blockManager. > > > 2014-02-19 12:09 GMT+01:00 hanbo <[email protected]>: > > we have implement this way, we use pyspark, and standalone mode. We collect >> the new RDD2 in each iteration. The java heap memory costed by the driver >> program increases Gradually. And finally Collapse with OutOfMemory Error. >> >> We have done some tests, in each iteration, we simply collect a vector. >> This >> Little Simple problem also costed more and more java heap memory, and >> finally raised OutOfMemory. >> >> We don't know how the momery increased. Is it costed by the DAG >> information? >> Or by some variable related with the collect function? >> >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-join-this-two-complicated-rdds-tp1665p1749.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> > >
