Yes, this is a useful trick we found that made our algorithm implementation noticeably faster (btw, we'll send a pull request for this GLMNET implementation, so interested people could look at it).
It would be nice if Spark supported something akin to this natively, as I believe that many efficient algorithms could take advantage of this. Basically, we don't even really need mutable RDD. Instead what we really need is the ability to store/modify stuffs in workers' memory and access them in subsequent iterations. e.g. something like rdd.mapPartition((rows : Iterator[String]) => { var idx = 0 rows.map((row: String) => { val valueMap = SparkWorker.getMemoryContent("valMap") val prevVal = valueMap(idx) idx += 1 ... }) ... }) The developer can implement their own fault recovery mechanism if the worker has crashed and lost the memory content. On Sun, Apr 27, 2014 at 10:24 PM, DB Tsai <dbt...@stanford.edu> wrote: > Hi Todd, > > As Patrick and you already pointed out, it's really dangerous to mutate > the status of RDD. However, when we implement the glmnet in Spark, if we > can reuse the residuals for each row in RDD computed from the previous > step, it can speed up 4~5x. > > As a result, we add extra column in RDD for book-keeping the residual for > each row, and initialize it as NaN first. When the next iteration step find > that the residual for that row is NaN, it means that either the RDD is > ended up in the disk or the job is failed, so we recompute the residuals > for those rows. It solves the problem of fault tolerance and data splitting > to disk. > > It will be nice to have an API that we can do this type of book-keeping > with native support. > > > Sincerely, > > DB Tsai > ------------------------------------------------------- > My Blog: https://www.dbtsai.com > LinkedIn: https://www.linkedin.com/in/dbtsai > > > On Sat, Apr 26, 2014 at 11:22 PM, Patrick Wendell <pwend...@gmail.com>wrote: > >> Hey Todd, >> >> This approach violates the normal semantics of RDD transformations as you >> point out. I think you pointed out some issues already, and there are >> others. For instance say you cache originalRDD and some of the partitions >> end up in memory and others end up on disk. The ones that end up in memory >> will be mutated in-place when you create trasnformedRDD, the ones that are >> serialized disk won't actually be changed (because there will be a copy >> into memory from the serialized on-disk data). So you could end up where >> originalRDD is partially mutated. >> >> Also, in the case of failures your map might run twice (e.g. run >> partially once, fail, then get re-run and succeed). So if your mutation >> e.g. relied on the current state of the object, it could end up having >> unexpected behavior. >> >> We'll probably never "disallow" this in Spark because we can't really >> control what you do inside of the function. But I'd be careful using this >> approach... >> >> - Patrick >> >> >> On Sat, Apr 26, 2014 at 5:59 AM, Lisonbee, Todd >> <todd.lison...@intel.com>wrote: >> >>> For example, >>> >>> val originalRDD: RDD[SomeCaseClass] = ... >>> >>> // Option 1: objects are copied, setting prop1 in the process >>> val transformedRDD = originalRDD.map( item => item.copy(prop1 = >>> calculation() ) >>> >>> // Option 2: objects are re-used and modified >>> val tranformedRDD = originalRDD.map( item => item.prop1 = calculation() ) >>> >>> I did a couple of small tests with option 2 and noticed less time was >>> spent in garbage collection. It didn't add up to much but with a large >>> enough data set it would make a difference. Also, it seems that less >>> memory would be used. >>> >>> Potential gotchas: >>> >>> - Objects in originalRDD are being modified, so you can't expect them to >>> have not changed >>> - You also can't rely on objects in originalRDD having the new value >>> because originalRDD might be re-caclulated >>> - If originalRDD was a PairRDD, and you modified the keys, it could >>> cause issues >>> - more? >>> >>> Other than the potential gotchas, is there any reason not to reuse >>> objects across RDD's? Is it a recommended practice for reducing memory >>> usage and garbage collection or not? >>> >>> Is it safe to do this in code you expect to work on future versions of >>> Spark? >>> >>> Thanks in advance, >>> >>> Todd >>> >> >> >