Yes, this is a useful trick we found that made our algorithm implementation
noticeably faster (btw, we'll send a pull request for this GLMNET
implementation, so interested people could look at it).

It would be nice if Spark supported something akin to this natively, as I
believe that many efficient algorithms could take advantage of this.

Basically, we don't even really need mutable RDD. Instead what we really
need is the ability to store/modify stuffs in workers' memory and access
them in subsequent iterations.

e.g. something like

rdd.mapPartition((rows : Iterator[String]) => {
  var idx = 0
  rows.map((row: String) => {
    val valueMap = SparkWorker.getMemoryContent("valMap")
    val prevVal = valueMap(idx)
    idx += 1
    ...
  })
  ...
})

The developer can implement their own fault recovery mechanism if the
worker has crashed and lost the memory content.


On Sun, Apr 27, 2014 at 10:24 PM, DB Tsai <dbt...@stanford.edu> wrote:

> Hi Todd,
>
> As Patrick and you already pointed out, it's really dangerous to mutate
> the status of RDD. However, when we implement the glmnet in Spark, if we
> can reuse the residuals for each row in RDD computed from the previous
> step, it can speed up 4~5x.
>
> As a result, we add extra column in RDD for book-keeping the residual for
> each row, and initialize it as NaN first. When the next iteration step find
> that the residual for that row is NaN, it means that either the RDD is
> ended up in the disk or the job is failed, so we recompute the residuals
> for those rows. It solves the problem of fault tolerance and data splitting
> to disk.
>
> It will be nice to have an API that we can do this type of book-keeping
> with native support.
>
>
> Sincerely,
>
> DB Tsai
> -------------------------------------------------------
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Sat, Apr 26, 2014 at 11:22 PM, Patrick Wendell <pwend...@gmail.com>wrote:
>
>> Hey Todd,
>>
>> This approach violates the normal semantics of RDD transformations as you
>> point out. I think you pointed out some issues already, and there are
>> others. For instance say you cache originalRDD and some of the partitions
>> end up in memory and others end up on disk. The ones that end up in memory
>> will be mutated in-place when you create trasnformedRDD, the ones that are
>> serialized disk won't actually be changed (because there will be a copy
>> into memory from the serialized on-disk data). So you could end up where
>> originalRDD is partially mutated.
>>
>> Also, in the case of failures your map might run twice (e.g. run
>> partially once, fail, then get re-run and succeed). So if your mutation
>> e.g. relied on the current state of the object, it could end up having
>> unexpected behavior.
>>
>> We'll probably never "disallow" this in Spark because we can't really
>> control what you do inside of the function. But I'd be careful using this
>> approach...
>>
>> - Patrick
>>
>>
>> On Sat, Apr 26, 2014 at 5:59 AM, Lisonbee, Todd 
>> <todd.lison...@intel.com>wrote:
>>
>>> For example,
>>>
>>> val originalRDD: RDD[SomeCaseClass] = ...
>>>
>>> // Option 1: objects are copied, setting prop1 in the process
>>> val transformedRDD = originalRDD.map( item => item.copy(prop1 =
>>> calculation() )
>>>
>>> // Option 2: objects are re-used and modified
>>> val tranformedRDD = originalRDD.map( item => item.prop1 = calculation() )
>>>
>>> I did a couple of small tests with option 2 and noticed less time was
>>> spent in garbage collection.  It didn't add up to much but with a large
>>> enough data set it would make a difference.  Also, it seems that less
>>> memory would be used.
>>>
>>> Potential gotchas:
>>>
>>> - Objects in originalRDD are being modified, so you can't expect them to
>>> have not changed
>>> - You also can't rely on objects in originalRDD having the new value
>>> because originalRDD might be re-caclulated
>>> - If originalRDD was a PairRDD, and you modified the keys, it could
>>> cause issues
>>> - more?
>>>
>>> Other than the potential gotchas, is there any reason not to reuse
>>> objects across RDD's?  Is it a recommended practice for reducing memory
>>> usage and garbage collection or not?
>>>
>>> Is it safe to do this in code you expect to work on future versions of
>>> Spark?
>>>
>>> Thanks in advance,
>>>
>>> Todd
>>>
>>
>>
>

Reply via email to