setup and cleanup function in spark

2017-08-29 Thread Mohammad Kargar
To implement setup/cleanup function in Spark we follow the pattern below as
discussed here

.

rdd.mapPartitions { partition =>
   if (!partition.isEmpty) {
 // Some setup code here
 partition.map(item => {
   val output = yourfunction(item)
   if (!partition.hasNext) {
 // Some cleanup code here
   }
   output
 })
   } else {
 // return an empty Iterator of your return type
   }
}

In my case the rdd is a pair-rdd loaded from Accumulo using InputFormat and
our map function only changes the values (no change to the keys). However,
the returned iterator from the  mapPartitions, somehow, ends up with
incorrect keys. I even tried "preservesPartitioning=true"  but no luck.

Debugging the code shows that the keys get changed after calling
partition.hasNext. If I remove "partition.hasNext" from the code then
everything works fine!

Any ideas?

Thanks,
Mohammad


Re: Is there setup and cleanup function in spark?

2014-11-17 Thread Jianshi Huang
I see. Agree that lazy eval is not suitable for proper setup and teardown.

We also abandoned it due to inherent incompatibility between implicit and
lazy. It was fun to come up this trick though.

Jianshi

On Tue, Nov 18, 2014 at 10:28 AM, Tobias Pfeiffer  wrote:

> Hi,
>
> On Fri, Nov 14, 2014 at 2:49 PM, Jianshi Huang 
> wrote:
>
>> Ok, then we need another trick.
>>
>> let's have an *implicit lazy var connection/context* around our code.
>> And setup() will trigger the eval and initialization.
>>
>
> Due to lazy evaluation, I think having setup/teardown is a bit tricky. In
> particular teardown, because it is not easy to execute code after all
> computation is done. You can check
> http://apache-spark-user-list.1001560.n3.nabble.com/Keep-state-inside-map-function-tp10968p11009.html
> for an example of what worked for me.
>
> Tobias
>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Is there setup and cleanup function in spark?

2014-11-17 Thread Tobias Pfeiffer
Hi,

On Fri, Nov 14, 2014 at 2:49 PM, Jianshi Huang 
wrote:

> Ok, then we need another trick.
>
> let's have an *implicit lazy var connection/context* around our code. And
> setup() will trigger the eval and initialization.
>

Due to lazy evaluation, I think having setup/teardown is a bit tricky. In
particular teardown, because it is not easy to execute code after all
computation is done. You can check
http://apache-spark-user-list.1001560.n3.nabble.com/Keep-state-inside-map-function-tp10968p11009.html
for an example of what worked for me.

Tobias


Re: Is there setup and cleanup function in spark?

2014-11-13 Thread Jianshi Huang
Ok, then we need another trick.

let's have an *implicit lazy var connection/context* around our code. And
setup() will trigger the eval and initialization.

The implicit lazy val/var trick is actually invented by Kevin. :)

Jianshi

On Fri, Nov 14, 2014 at 1:41 PM, Cheng Lian  wrote:

>  If you’re just relying on the side effect of setup() and cleanup() then
> I think this trick is OK and pretty cleaner.
>
> But if setup() returns, say, a DB connection, then the map(...) part and
> cleanup() can’t get the connection object.
>
> On 11/14/14 1:20 PM, Jianshi Huang wrote:
>
>   So can I write it like this?
>
>  rdd.mapPartition(i => setup(); i).map(...).mapPartition(i => cleanup();
> i)
>
>  So I don't need to mess up the logic and still can use map, filter and
> other transformations for RDD.
>
>  Jianshi
>
> On Fri, Nov 14, 2014 at 12:20 PM, Cheng Lian 
> wrote:
>
>>  If you’re looking for executor side setup and cleanup functions, there
>> ain’t any yet, but you can achieve the same semantics via
>> RDD.mapPartitions.
>>
>> Please check the “setup() and cleanup” section of this blog from Cloudera
>> for details:
>> http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/
>>
>> On 11/14/14 10:44 AM, Dai, Kevin wrote:
>>
>>  HI, all
>>
>>
>>
>> Is there setup and cleanup function as in hadoop mapreduce in spark which
>> does some initialization and cleanup work?
>>
>>
>>
>> Best Regards,
>>
>> Kevin.
>>
>>  ​
>>
>
>
>
>  --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>   ​
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Is there setup and cleanup function in spark?

2014-11-13 Thread Cheng Lian
If you’re just relying on the side effect of |setup()| and |cleanup()| 
then I think this trick is OK and pretty cleaner.


But if |setup()| returns, say, a DB connection, then the |map(...)| part 
and |cleanup()| can’t get the connection object.


On 11/14/14 1:20 PM, Jianshi Huang wrote:


So can I write it like this?

rdd.mapPartition(i => setup(); i).map(...).mapPartition(i => cleanup(); i)

So I don't need to mess up the logic and still can use map, filter and 
other transformations for RDD.


Jianshi

On Fri, Nov 14, 2014 at 12:20 PM, Cheng Lian > wrote:


If you’re looking for executor side setup and cleanup functions,
there ain’t any yet, but you can achieve the same semantics via
|RDD.mapPartitions|.

Please check the “setup() and cleanup” section of this blog from
Cloudera for details:

http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

On 11/14/14 10:44 AM, Dai, Kevin wrote:


HI, all

Is there setup and cleanup function as in hadoop mapreduce in
spark which does some initialization and cleanup work?

Best Regards,

Kevin.


​




--
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


​


Re: Is there setup and cleanup function in spark?

2014-11-13 Thread Jianshi Huang
So can I write it like this?

rdd.mapPartition(i => setup(); i).map(...).mapPartition(i => cleanup(); i)

So I don't need to mess up the logic and still can use map, filter and
other transformations for RDD.

Jianshi

On Fri, Nov 14, 2014 at 12:20 PM, Cheng Lian  wrote:

>  If you’re looking for executor side setup and cleanup functions, there
> ain’t any yet, but you can achieve the same semantics via
> RDD.mapPartitions.
>
> Please check the “setup() and cleanup” section of this blog from Cloudera
> for details:
> http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/
>
> On 11/14/14 10:44 AM, Dai, Kevin wrote:
>
>   HI, all
>
>
>
> Is there setup and cleanup function as in hadoop mapreduce in spark which
> does some initialization and cleanup work?
>
>
>
> Best Regards,
>
> Kevin.
>
>   ​
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Is there setup and cleanup function in spark?

2014-11-13 Thread Cheng Lian
If you’re looking for executor side setup and cleanup functions, there 
ain’t any yet, but you can achieve the same semantics via 
|RDD.mapPartitions|.


Please check the “setup() and cleanup” section of this blog from 
Cloudera for details: 
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/


On 11/14/14 10:44 AM, Dai, Kevin wrote:


HI, all

Is there setup and cleanup function as in hadoop mapreduce in spark 
which does some initialization and cleanup work?


Best Regards,

Kevin.


​


Re: Is there setup and cleanup function in spark?

2014-11-13 Thread Jianshi Huang
Where do you want the setup and cleanup functions to run? Driver or the
worker nodes?

Jianshi

On Fri, Nov 14, 2014 at 10:44 AM, Dai, Kevin  wrote:

>  HI, all
>
>
>
> Is there setup and cleanup function as in hadoop mapreduce in spark which
> does some initialization and cleanup work?
>
>
>
> Best Regards,
>
> Kevin.
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Is there setup and cleanup function in spark?

2014-11-13 Thread Dai, Kevin
HI, all

Is there setup and cleanup function as in hadoop mapreduce in spark which does 
some initialization and cleanup work?

Best Regards,
Kevin.