Re: Batch of updates

2014-10-29 Thread Sean Owen
I don't think accumulators come into play here. Use foreachPartition,
not mapPartitions.

On Wed, Oct 29, 2014 at 12:43 AM, Flavio Pompermaier
pomperma...@okkam.it wrote:
 Sorry but I wasn't able to code my stuff using accumulators as you suggested
 :(
 In my use case I have to to add elements to an array/list and then, every
 100 element commit the batch to a solr index and then clear it.
 In the cleanup code I have to commit the uncommited (remainder) elements.

 In the example you showed me I can't see how to append element to a list and
 commit the remainder elements of my RDD.

 Following the advice of Sean this is more o less what I have now, but still
 I don't know how to return empty iterators (and if I really have to) and if
 I still need to use accumulators to add element to the list (in the code
 below I haven't specified what to use..):

 .mapPartitions { partition =
if (!partition.isEmpty) {
   // Some setup code here
   println(Initialize batch);
   partition.map(x = {
 var batch = ???
 batch.add(someClassFactory.fromString(x._2, x._3))
 if (batch.size % 100 == 0) {
println(Flush legacy entities);
batch.clear
 }
 if (!partition.hasNext) {
// Some cleanup code here
println(Flush legacy entities);
batch.clear
 }
Iterator.empty
  })
   } else {
  // return an empty Iterator of your return type
 Iterator.empty
   }


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Batch of updates

2014-10-28 Thread Kamal Banga
Hi Flavio,

Doing batch += ... shouldn't work. It will create new batch for each
element in the myRDD (also val initializes an immutable variable, var is
for mutable variables). You can use something like accumulators
http://spark.apache.org/docs/latest/programming-guide.html#accumulators.

val accum = sc.accumulator(0, Some accumulator)
val NUM = 100
val SIZE = myRDD.count()
val LAST = SIZE % NUM
val MULTIPLE = (SIZE / NUM) * NUM // should return 300 when SIZE is 350
myRDD.map(x = {
accum++
if (accum++  MULTIPLE)
null
else x
}).

So if there are 350 elements, it should return null for first 300 elements
and actual values for last 50 elements. Then we can apply a filter for null
and get remainder elements (and then finally flush the new RDD containing
the remainder elements).

On map vs mapPartitions, mapPartitions is used mainly for efficiency (as
you can see here
http://spark-summit.org/wp-content/uploads/2013/10/Wendell-Spark-Performance.pdf,
here
http://stackoverflow.com/questions/21185092/apache-spark-map-vs-mappartitions
and here
http://bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/).
So for simpler code, you can go with map, and for efficiency, you can go
with mapPartitions.

Regards,
Kamal


Re: Batch of updates

2014-10-28 Thread Sean Owen
You should use foreachPartition, and take care to open and close your
connection following the pattern described in:

http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E

Within a partition, you iterate over elements and call whatever code you
want to output results whenever you want. You can output every 100 elements
and at the end. It is just your function to do as you like.

On Mon, Oct 27, 2014 at 11:45 PM, Flavio Pompermaier pomperma...@okkam.it
wrote:
 Hi to all,
 I'm trying to convert my old mapreduce job to a spark one but I have some
 doubts..
 My application basically buffers a batch of updates and every 100 elements
 it flushes the batch to a server. This is very easy in mapreduce but I
don't
 know how you can do that in scala..

 For example, if I do:

 myRdd.map(x = {
 val batch = new ArrayBuffer[someClass]()
 batch += someClassFactory.fromString(x._2, x._3)
 if (batch.size % 100 == 0) {
 println(Flush legacy entities);
 batch.clear
 }

 I still have two problems:

 - how can I commit the remainder elements (in mapreduce those elements
 still in the batch array within cleanup() method)?
 - if I have to create a connection to a server for pushing updates, is it
 better to use mapPartitions instead of map?

 Best,
 Flavio




Re: Batch of updates

2014-10-28 Thread Flavio Pompermaier
Sorry but I wasn't able to code my stuff using accumulators as you
suggested :(
In my use case I have to to add elements to an array/list and then, every
100 element commit the batch to a solr index and then clear it.
In the cleanup code I have to commit the uncommited (remainder) elements.

In the example you showed me I can't see how to append element to a list
and commit the remainder elements of my RDD.

Following the advice of Sean this is more o less what I have now, but still
I don't know how to return empty iterators (and if I really have to) and if
I still need to use accumulators to add element to the list (in the code
below I haven't specified what to use..):

.mapPartitions { partition =
   if (!partition.isEmpty) {
  // Some setup code here
  println(Initialize batch);
  partition.map(x = {
var batch = ???
batch.add(someClassFactory.fromString(x._2, x._3))
if (batch.size % 100 == 0) {
   println(Flush legacy entities);
   batch.clear
}
if (!partition.hasNext) {
   // Some cleanup code here
   println(Flush legacy entities);
   batch.clear
}
   Iterator.empty
 })
  } else {
 // return an empty Iterator of your return type
Iterator.empty
  }

Best,
Flavio

On Tue, Oct 28, 2014 at 1:26 PM, Kamal Banga ka...@sigmoidanalytics.com
wrote:

 Hi Flavio,

 Doing batch += ... shouldn't work. It will create new batch for each
 element in the myRDD (also val initializes an immutable variable, var is
 for mutable variables). You can use something like accumulators
 http://spark.apache.org/docs/latest/programming-guide.html#accumulators
 .

 val accum = sc.accumulator(0, Some accumulator)
 val NUM = 100
 val SIZE = myRDD.count()
 val LAST = SIZE % NUM
 val MULTIPLE = (SIZE / NUM) * NUM // should return 300 when SIZE is 350
 myRDD.map(x = {
 accum++
 if (accum++  MULTIPLE)
 null
 else x
 }).

 So if there are 350 elements, it should return null for first 300 elements
 and actual values for last 50 elements. Then we can apply a filter for null
 and get remainder elements (and then finally flush the new RDD containing
 the remainder elements).

 On map vs mapPartitions, mapPartitions is used mainly for efficiency (as
 you can see here
 http://spark-summit.org/wp-content/uploads/2013/10/Wendell-Spark-Performance.pdf,
 here
 http://stackoverflow.com/questions/21185092/apache-spark-map-vs-mappartitions
 and here
 http://bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/).
 So for simpler code, you can go with map, and for efficiency, you can go
 with mapPartitions.

 Regards,
 Kamal




-- 

Flavio Pompermaier

*Development Department*___
*OKKAM**Srl **- www.okkam.it http://www.okkam.it/*

*Phone:* +(39) 0461 283 702
*Fax:* + (39) 0461 186 6433
*Email:* pomperma...@okkam.it
*Headquarters:* Trento (Italy), via G.B. Trener 8
*Registered office:* Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally
privileged and/or confidential information. Please do not read it if you
are not the intended recipient(S). Any use, distribution, reproduction or
disclosure by any other person is strictly prohibited. If you have received
this e-mail in error, please notify the sender and destroy the original
transmission and its attachments without reading or saving it in any manner.