Executing on worker is fine since state I would like to maintain is
specific to a partition. Accumulators, being global counters, wont work.

On Wednesday, January 27, 2016, Jakob Odersky <ja...@odersky.com> wrote:

> Be careful with mapPartitions though, since it is executed on worker
> nodes, you may not see side-effects locally.
>
> Is it not possible to represent your state changes as part of your
> rdd's transformations? I.e. return a tuple containing the modified
> data and some accumulated state.
> If that really doesn't work, I would second accumulators. Check out
>
> http://spark.apache.org/docs/latest/programming-guide.html#accumulators-a-nameaccumlinka
> ,
> it also tells you how to define your own for custom data types.
>
> On Wed, Jan 27, 2016 at 7:22 PM, Krishna <research...@gmail.com
> <javascript:;>> wrote:
> > mapPartitions(...) seems like a good candidate, since, it's processing
> over
> > a partition while maintaining state across map(...) calls.
> >
> > On Wed, Jan 27, 2016 at 6:58 PM, Ted Yu <yuzhih...@gmail.com
> <javascript:;>> wrote:
> >>
> >> Initially I thought of using accumulators.
> >>
> >> Since state change can be anything, how about storing state in external
> >> NoSQL store such as hbase ?
> >>
> >> On Wed, Jan 27, 2016 at 6:37 PM, Krishna <research...@gmail.com
> <javascript:;>> wrote:
> >>>
> >>> Thanks; What I'm looking for is a way to see changes to the state of
> some
> >>> variable during map(..) phase.
> >>> I simplified the scenario in my example by making row_index() increment
> >>> "incr" by 1 but in reality, the change to "incr" can be anything.
> >>>
> >>> On Wed, Jan 27, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com
> <javascript:;>> wrote:
> >>>>
> >>>> Have you looked at this method ?
> >>>>
> >>>>    * Zips this RDD with its element indices. The ordering is first
> based
> >>>> on the partition index
> >>>> ...
> >>>>   def zipWithIndex(): RDD[(T, Long)] = withScope {
> >>>>
> >>>> On Wed, Jan 27, 2016 at 6:03 PM, Krishna <research...@gmail.com
> <javascript:;>> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> I've a scenario where I need to maintain state that is local to a
> >>>>> worker that can change during map operation. What's the best way to
> handle
> >>>>> this?
> >>>>>
> >>>>> incr = 0
> >>>>> def row_index():
> >>>>>   global incr
> >>>>>   incr += 1
> >>>>>   return incr
> >>>>>
> >>>>> out_rdd = inp_rdd.map(lambda x: row_index()).collect()
> >>>>>
> >>>>> "out_rdd" in this case only contains 1s but I would like it to have
> >>>>> index of each row in "inp_rdd".
> >>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to