I don't think accumulators come into play here. Use foreachPartition, not mapPartitions.
On Wed, Oct 29, 2014 at 12:43 AM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Sorry but I wasn't able to code my stuff using accumulators as you suggested > :( > In my use case I have to to add elements to an array/list and then, every > 100 element commit the batch to a solr index and then clear it. > In the cleanup code I have to commit the uncommited (remainder) elements. > > In the example you showed me I can't see how to append element to a list and > commit the remainder elements of my RDD. > > Following the advice of Sean this is more o less what I have now, but still > I don't know how to return empty iterators (and if I really have to) and if > I still need to use accumulators to add element to the list (in the code > below I haven't specified what to use..): > > .mapPartitions { partition => > if (!partition.isEmpty) { > // Some setup code here > println("Initialize batch"); > partition.map(x => { > var batch = ??? > batch.add(someClassFactory.fromString(x._2, x._3)) > if (batch.size % 100 == 0) { > println("Flush legacy entities"); > batch.clear > } > if (!partition.hasNext) { > // Some cleanup code here > println("Flush legacy entities"); > batch.clear > } > Iterator.empty > }) > } else { > // return an empty Iterator of your return type > Iterator.empty > } > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org