I don't think accumulators come into play here. Use foreachPartition,
not mapPartitions.

On Wed, Oct 29, 2014 at 12:43 AM, Flavio Pompermaier
<pomperma...@okkam.it> wrote:
> Sorry but I wasn't able to code my stuff using accumulators as you suggested
> :(
> In my use case I have to to add elements to an array/list and then, every
> 100 element commit the batch to a solr index and then clear it.
> In the cleanup code I have to commit the uncommited (remainder) elements.
>
> In the example you showed me I can't see how to append element to a list and
> commit the remainder elements of my RDD.
>
> Following the advice of Sean this is more o less what I have now, but still
> I don't know how to return empty iterators (and if I really have to) and if
> I still need to use accumulators to add element to the list (in the code
> below I haven't specified what to use..):
>
> .mapPartitions { partition =>
>        if (!partition.isEmpty) {
>           // Some setup code here
>           println("Initialize batch");
>           partition.map(x => {
>             var batch = ???
>             batch.add(someClassFactory.fromString(x._2, x._3))
>             if (batch.size % 100 == 0) {
>                println("Flush legacy entities");
>                batch.clear
>             }
>             if (!partition.hasNext) {
>                // Some cleanup code here
>                println("Flush legacy entities");
>                batch.clear
>             }
>            Iterator.empty
>          })
>       } else {
>          // return an empty Iterator of your return type
>         Iterator.empty
>       }
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to