That might be a good alternative to what we are looking for. But I wonder if this would be as efficient as we want to. For instance, will RDDs of the same size usually get partitioned to the same machines - thus not triggering any cross machine aligning, etc. We'll explore it, but I would still very much like to see more direct worker memory management besides RDDs.
On Mon, Apr 28, 2014 at 10:26 AM, Tom Vacek <minnesota...@gmail.com> wrote: > Right---They are zipped at each iteration. > > > On Mon, Apr 28, 2014 at 11:56 AM, Chester Chen <chesterxgc...@yahoo.com>wrote: > >> Tom, >> Are you suggesting two RDDs, one with loss and another for the rest >> info, using zip to tie them together, but do update on loss RDD (copy) ? >> >> Chester >> >> Sent from my iPhone >> >> On Apr 28, 2014, at 9:45 AM, Tom Vacek <minnesota...@gmail.com> wrote: >> >> I'm not sure what I said came through. RDD zip is not hacky at all, as >> it only depends on a user not changing the partitioning. Basically, you >> would keep your losses as an RDD[Double] and zip whose with the RDD of >> examples, and update the losses. You're doing a copy (and GC) on the RDD >> of losses each time, but this is negligible. >> >> >> On Mon, Apr 28, 2014 at 11:33 AM, Sung Hwan Chung < >> coded...@cs.stanford.edu> wrote: >> >>> Yes, this is what we've done as of now (if you read earlier threads). >>> And we were saying that we'd prefer if Spark supported persistent worker >>> memory management in a little bit less hacky way ;) >>> >>> >>> On Mon, Apr 28, 2014 at 8:44 AM, Ian O'Connell <i...@ianoconnell.com>wrote: >>> >>>> A mutable map in an object should do what your looking for then I >>>> believe. You just reference the object as an object in your closure so it >>>> won't be swept up when your closure is serialized and you can reference >>>> variables of the object on the remote host then. e.g.: >>>> >>>> object MyObject { >>>> val mmap = scala.collection.mutable.Map[Long, Long]() >>>> } >>>> >>>> rdd.map { ele => >>>> MyObject.mmap.getOrElseUpdate(ele, 1L) >>>> ... >>>> }.map {ele => >>>> require(MyObject.mmap(ele) == 1L) >>>> >>>> }.count >>>> >>>> Along with the data loss just be careful with thread safety and >>>> multiple threads/partitions on one host so the map should be viewed as >>>> shared amongst a larger space. >>>> >>>> >>>> >>>> Also with your exact description it sounds like your data should be >>>> encoded into the RDD if its per-record/per-row: RDD[(MyBaseData, >>>> LastIterationSideValues)] >>>> >>>> >>>> >>>> On Mon, Apr 28, 2014 at 1:51 AM, Sung Hwan Chung < >>>> coded...@cs.stanford.edu> wrote: >>>> >>>>> In our case, we'd like to keep memory content from one iteration to >>>>> the next, and not just during a single mapPartition call because then we >>>>> can do more efficient computations using the values from the previous >>>>> iteration. >>>>> >>>>> So essentially, we need to declare objects outside the scope of the >>>>> map/reduce calls (but residing in individual workers), then those can be >>>>> accessed from the map/reduce calls. >>>>> >>>>> We'd be making some assumptions as you said, such as - RDD partition >>>>> is statically located and can't move from worker to another worker unless >>>>> the worker crashes. >>>>> >>>>> >>>>> >>>>> On Mon, Apr 28, 2014 at 1:35 AM, Sean Owen <so...@cloudera.com> wrote: >>>>> >>>>>> On Mon, Apr 28, 2014 at 9:30 AM, Sung Hwan Chung < >>>>>> coded...@cs.stanford.edu> wrote: >>>>>> >>>>>>> Actually, I do not know how to do something like this or whether >>>>>>> this is possible - thus my suggestive statement. >>>>>>> >>>>>>> Can you already declare persistent memory objects per worker? I >>>>>>> tried something like constructing a singleton object within map >>>>>>> functions, >>>>>>> but that didn't work as it seemed to actually serialize singletons and >>>>>>> pass >>>>>>> it back and forth in a weird manner. >>>>>>> >>>>>>> >>>>>> Does it need to be persistent across operations, or just persist for >>>>>> the lifetime of processing of one partition in one mapPartition? The >>>>>> latter >>>>>> is quite easy and might give most of the speedup. >>>>>> >>>>>> Maybe that's 'enough', even if it means you re-cache values several >>>>>> times in a repeated iterative computation. It would certainly avoid >>>>>> managing a lot of complexity in trying to keep that state alive remotely >>>>>> across operations. I'd also be interested if there is any reliable way to >>>>>> do that, though it seems hard since it means you embed assumptions about >>>>>> where particular data is going to be processed. >>>>>> >>>>>> >>>>> >>>> >>> >> >