Executing on worker is fine since state I would like to maintain is specific to a partition. Accumulators, being global counters, wont work.
On Wednesday, January 27, 2016, Jakob Odersky <ja...@odersky.com> wrote: > Be careful with mapPartitions though, since it is executed on worker > nodes, you may not see side-effects locally. > > Is it not possible to represent your state changes as part of your > rdd's transformations? I.e. return a tuple containing the modified > data and some accumulated state. > If that really doesn't work, I would second accumulators. Check out > > http://spark.apache.org/docs/latest/programming-guide.html#accumulators-a-nameaccumlinka > , > it also tells you how to define your own for custom data types. > > On Wed, Jan 27, 2016 at 7:22 PM, Krishna <research...@gmail.com > <javascript:;>> wrote: > > mapPartitions(...) seems like a good candidate, since, it's processing > over > > a partition while maintaining state across map(...) calls. > > > > On Wed, Jan 27, 2016 at 6:58 PM, Ted Yu <yuzhih...@gmail.com > <javascript:;>> wrote: > >> > >> Initially I thought of using accumulators. > >> > >> Since state change can be anything, how about storing state in external > >> NoSQL store such as hbase ? > >> > >> On Wed, Jan 27, 2016 at 6:37 PM, Krishna <research...@gmail.com > <javascript:;>> wrote: > >>> > >>> Thanks; What I'm looking for is a way to see changes to the state of > some > >>> variable during map(..) phase. > >>> I simplified the scenario in my example by making row_index() increment > >>> "incr" by 1 but in reality, the change to "incr" can be anything. > >>> > >>> On Wed, Jan 27, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com > <javascript:;>> wrote: > >>>> > >>>> Have you looked at this method ? > >>>> > >>>> * Zips this RDD with its element indices. The ordering is first > based > >>>> on the partition index > >>>> ... > >>>> def zipWithIndex(): RDD[(T, Long)] = withScope { > >>>> > >>>> On Wed, Jan 27, 2016 at 6:03 PM, Krishna <research...@gmail.com > <javascript:;>> wrote: > >>>>> > >>>>> Hi, > >>>>> > >>>>> I've a scenario where I need to maintain state that is local to a > >>>>> worker that can change during map operation. What's the best way to > handle > >>>>> this? > >>>>> > >>>>> incr = 0 > >>>>> def row_index(): > >>>>> global incr > >>>>> incr += 1 > >>>>> return incr > >>>>> > >>>>> out_rdd = inp_rdd.map(lambda x: row_index()).collect() > >>>>> > >>>>> "out_rdd" in this case only contains 1s but I would like it to have > >>>>> index of each row in "inp_rdd". > >>>> > >>>> > >>> > >> > > >