Sorry but I wasn't able to code my stuff using accumulators as you
suggested :(
In my use case I have to to add elements to an array/list and then, every
100 element commit the batch to a solr index and then clear it.
In the cleanup code I have to commit the uncommited (remainder) elements.

In the example you showed me I can't see how to append element to a list
and commit the remainder elements of my RDD.

Following the advice of Sean this is more o less what I have now, but still
I don't know how to return empty iterators (and if I really have to) and if
I still need to use accumulators to add element to the list (in the code
below I haven't specified what to use..):

.mapPartitions { partition =>
       if (!partition.isEmpty) {
          // Some setup code here
          println("Initialize batch");
          partition.map(x => {
            var batch = ???
            batch.add(someClassFactory.fromString(x._2, x._3))
            if (batch.size % 100 == 0) {
               println("Flush legacy entities");
               batch.clear
            }
            if (!partition.hasNext) {
               // Some cleanup code here
               println("Flush legacy entities");
               batch.clear
            }
           Iterator.empty
         })
      } else {
         // return an empty Iterator of your return type
        Iterator.empty
      }

Best,
Flavio

On Tue, Oct 28, 2014 at 1:26 PM, Kamal Banga <ka...@sigmoidanalytics.com>
wrote:

> Hi Flavio,
>
> Doing batch += ... shouldn't work. It will create new batch for each
> element in the myRDD (also val initializes an immutable variable, var is
> for mutable variables). You can use something like accumulators
> <http://spark.apache.org/docs/latest/programming-guide.html#accumulators>
> .
>
> val accum = sc.accumulator(0, "Some accumulator")
> val NUM = 100
> val SIZE = myRDD.count()
> val LAST = SIZE % NUM
> val MULTIPLE = (SIZE / NUM) * NUM // should return 300 when SIZE is 350
> myRDD.map(x => {
> accum++
> if (accum++ < MULTIPLE)
> null
> else x
> }).
>
> So if there are 350 elements, it should return null for first 300 elements
> and actual values for last 50 elements. Then we can apply a filter for null
> and get remainder elements (and then finally flush the new RDD containing
> the remainder elements).
>
> On map vs mapPartitions, mapPartitions is used mainly for efficiency (as
> you can see here
> <http://spark-summit.org/wp-content/uploads/2013/10/Wendell-Spark-Performance.pdf>,
> here
> <http://stackoverflow.com/questions/21185092/apache-spark-map-vs-mappartitions>
> and here
> <http://bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/>).
> So for simpler code, you can go with map, and for efficiency, you can go
> with mapPartitions.
>
> Regards,
> Kamal
>



-- 

Flavio Pompermaier

*Development Department*_______________________________________________
*OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*

*Phone:* +(39) 0461 283 702
*Fax:* + (39) 0461 186 6433
*Email:* pomperma...@okkam.it
*Headquarters:* Trento (Italy), via G.B. Trener 8
*Registered office:* Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally
privileged and/or confidential information. Please do not read it if you
are not the intended recipient(S). Any use, distribution, reproduction or
disclosure by any other person is strictly prohibited. If you have received
this e-mail in error, please notify the sender and destroy the original
transmission and its attachments without reading or saving it in any manner.

Reply via email to