Re: a noob question for how to implement setup and cleanup in Spark map

2014-08-20 Thread Victor Tso-Guillen
And duh, of course, you can do the setup in that new RDD as well :)


On Wed, Aug 20, 2014 at 1:59 AM, Victor Tso-Guillen  wrote:

> How about this:
>
> val prev: RDD[V] = rdd.mapPartitions(partition => { /*setup()*/; partition
> })
> new RDD[V](prev) {
>   protected def getPartitions = prev.partitions
>
>   def compute(split: Partition, context: TaskContext) = {
> context.addOnCompleteCallback(() => /*cleanup()*/)
> firstParent[V].iterator(split, context)
>   }
> }
>
>
> On Tue, Aug 19, 2014 at 11:56 AM, Sean Owen  wrote:
>
>> I think you're looking for foreachPartition(). You've kinda hacked it
>> out of mapPartitions(). Your case has a simple solution, yes. After
>> saving to the DB, you know you can close the connection, since you
>> know the use of the connection has definitely just finished. But it's
>> not a simpler solution for mapPartitions() since that's not really
>> what you are using :)
>>
>> In general, mapPartitions creates an Iterator from another Iterator.
>> Of course you could consume the input iterator, open the connection,
>> perform operations, close the connection and return an iterator over
>> the result. That works, but requires reading the entire input no
>> matter what, and, reading it into memory. These may not be OK in all
>> cases.
>>
>> Where possible, it's nicest to return an Iterator that accesses the
>> source Iterator only as needed to produce elements. This means
>> returning that Iterator before any work has been done. So you have to
>> close the connection later when the Iterator has been exhausted.
>> Really Tobias's method is trying to shim in a "cleanup()" lifecycle
>> method into the Iterator. I suppose it could be done a little more
>> cleanly using Guava's Iterator library, which would give you a more
>> explicit way to execute something when done.
>>
>>
>> On Tue, Aug 19, 2014 at 7:36 PM, Yana Kadiyska 
>> wrote:
>> > Sean, would this work --
>> >
>> > rdd.mapPartitions { partition => Iterator(partition) }.foreach(
>> >
>> >// Some setup code here
>> >// save partition to DB
>> >// Some cleanup code here
>> > )
>> >
>> >
>> > I tried a pretty simple example ... I can see that the setup and
>> cleanup are
>> > executed on the executor node, once per partition (I used
>> > mapPartitionWithIndex instead of mapPartition to track this a little
>> > better). Seems like an easier solution than Tobias's but I'm wondering
>> if
>> > it's perhaps incorrect
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: a noob question for how to implement setup and cleanup in Spark map

2014-08-20 Thread Victor Tso-Guillen
How about this:

val prev: RDD[V] = rdd.mapPartitions(partition => { /*setup()*/; partition
})
new RDD[V](prev) {
  protected def getPartitions = prev.partitions

  def compute(split: Partition, context: TaskContext) = {
context.addOnCompleteCallback(() => /*cleanup()*/)
firstParent[V].iterator(split, context)
  }
}


On Tue, Aug 19, 2014 at 11:56 AM, Sean Owen  wrote:

> I think you're looking for foreachPartition(). You've kinda hacked it
> out of mapPartitions(). Your case has a simple solution, yes. After
> saving to the DB, you know you can close the connection, since you
> know the use of the connection has definitely just finished. But it's
> not a simpler solution for mapPartitions() since that's not really
> what you are using :)
>
> In general, mapPartitions creates an Iterator from another Iterator.
> Of course you could consume the input iterator, open the connection,
> perform operations, close the connection and return an iterator over
> the result. That works, but requires reading the entire input no
> matter what, and, reading it into memory. These may not be OK in all
> cases.
>
> Where possible, it's nicest to return an Iterator that accesses the
> source Iterator only as needed to produce elements. This means
> returning that Iterator before any work has been done. So you have to
> close the connection later when the Iterator has been exhausted.
> Really Tobias's method is trying to shim in a "cleanup()" lifecycle
> method into the Iterator. I suppose it could be done a little more
> cleanly using Guava's Iterator library, which would give you a more
> explicit way to execute something when done.
>
>
> On Tue, Aug 19, 2014 at 7:36 PM, Yana Kadiyska 
> wrote:
> > Sean, would this work --
> >
> > rdd.mapPartitions { partition => Iterator(partition) }.foreach(
> >
> >// Some setup code here
> >// save partition to DB
> >// Some cleanup code here
> > )
> >
> >
> > I tried a pretty simple example ... I can see that the setup and cleanup
> are
> > executed on the executor node, once per partition (I used
> > mapPartitionWithIndex instead of mapPartition to track this a little
> > better). Seems like an easier solution than Tobias's but I'm wondering if
> > it's perhaps incorrect
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: a noob question for how to implement setup and cleanup in Spark map

2014-08-19 Thread Sean Owen
I think you're looking for foreachPartition(). You've kinda hacked it
out of mapPartitions(). Your case has a simple solution, yes. After
saving to the DB, you know you can close the connection, since you
know the use of the connection has definitely just finished. But it's
not a simpler solution for mapPartitions() since that's not really
what you are using :)

In general, mapPartitions creates an Iterator from another Iterator.
Of course you could consume the input iterator, open the connection,
perform operations, close the connection and return an iterator over
the result. That works, but requires reading the entire input no
matter what, and, reading it into memory. These may not be OK in all
cases.

Where possible, it's nicest to return an Iterator that accesses the
source Iterator only as needed to produce elements. This means
returning that Iterator before any work has been done. So you have to
close the connection later when the Iterator has been exhausted.
Really Tobias's method is trying to shim in a "cleanup()" lifecycle
method into the Iterator. I suppose it could be done a little more
cleanly using Guava's Iterator library, which would give you a more
explicit way to execute something when done.


On Tue, Aug 19, 2014 at 7:36 PM, Yana Kadiyska  wrote:
> Sean, would this work --
>
> rdd.mapPartitions { partition => Iterator(partition) }.foreach(
>
>// Some setup code here
>// save partition to DB
>// Some cleanup code here
> )
>
>
> I tried a pretty simple example ... I can see that the setup and cleanup are
> executed on the executor node, once per partition (I used
> mapPartitionWithIndex instead of mapPartition to track this a little
> better). Seems like an easier solution than Tobias's but I'm wondering if
> it's perhaps incorrect

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: a noob question for how to implement setup and cleanup in Spark map

2014-08-19 Thread Yana Kadiyska
Sean, would this work --

rdd.mapPartitions { partition => Iterator(partition) }.foreach(

   // Some setup code here
   // save partition to DB
   // Some cleanup code here
)


I tried a pretty simple example ... I can see that the setup and
cleanup are executed on the executor node, once per partition (I used
mapPartitionWithIndex instead of mapPartition to track this a little
better). Seems like an easier solution than Tobias's but I'm wondering
if it's perhaps incorrect




On Mon, Aug 18, 2014 at 3:29 AM, Henry Hung  wrote:

> I slightly modify the code to use while(partitions.hasNext) { } instead of
> partitions.map(func)
> I suppose this can eliminate the uncertainty from lazy execution.
>
> -Original Message-
> From: Sean Owen [mailto:so...@cloudera.com]
> Sent: Monday, August 18, 2014 3:10 PM
> To: MA33 YTHung1
> Cc: user@spark.apache.org
> Subject: Re: a noob question for how to implement setup and cleanup in
> Spark map
>
> I think this was a more comprehensive answer recently. Tobias is right
> that it is not quite that simple:
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E
>
> On Mon, Aug 18, 2014 at 8:04 AM, Henry Hung  wrote:
> > Hi All,
> >
> >
> >
> > Please ignore my question, I found a way to implement it via old
> > archive
> > mails:
> >
> >
> >
> > http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF
> > _KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E
> >
> >
> >
> > Best regards,
> >
> > Henry
> >
> >
> >
> > From: MA33 YTHung1
> > Sent: Monday, August 18, 2014 2:42 PM
> > To: user@spark.apache.org
> > Subject: a noob question for how to implement setup and cleanup in
> > Spark map
> >
> >
> >
> > Hi All,
> >
> >
> >
> > I’m new to Spark and Scala, just recently using this language and love
> > it, but there is a small coding problem when I want to convert my
> > existing map reduce code from Java to Spark…
> >
> >
> >
> > In Java, I create a class by extending
> > org.apache.hadoop.mapreduce.Mapper
> > and override the setup(), map() and cleanup() methods.
> >
> > But in the Spark, there is no a method called setup(), so I write the
> > setup() code into map(), but it performs badly.
> >
> > The reason is I create database connection in the setup() once and
> > run() will execute SQL query, then cleanup() will close the connection.
> >
> > Could someone tell me how to do it in Spark?
> >
> >
> >
> > Best regards,
> >
> > Henry Hung
> >
> >
> >
> > 
> >
> > The privileged confidential information contained in this email is
> > intended for use only by the addressees as indicated by the original
> > sender of this email. If you are not the addressee indicated in this
> > email or are not responsible for delivery of the email to such a
> > person, please kindly reply to the sender indicating this fact and
> > delete all copies of it from your computer and network server
> > immediately. Your cooperation is highly appreciated. It is advised
> > that any unauthorized use of confidential information of Winbond is
> > strictly prohibited; and any information in this email irrelevant to
> > the official business of Winbond shall be deemed as neither given nor
> endorsed by Winbond.
> >
> >
> > 
> > The privileged confidential information contained in this email is
> > intended for use only by the addressees as indicated by the original
> > sender of this email. If you are not the addressee indicated in this
> > email or are not responsible for delivery of the email to such a
> > person, please kindly reply to the sender indicating this fact and
> > delete all copies of it from your computer and network server
> > immediately. Your cooperation is highly appreciated. It is advised
> > that any unauthorized use of confidential information of Winbond is
> > strictly prohibited; and any information in this email irrelevant to
> > the official business of Winbond shall be deemed as neither given nor
> endorsed by Winbond.
>
> The privileged confidential information contained in this email is
> intended for use only by the addressees as indicated by the original sender
> of this email. If you are not the addressee indicated in this email or are
> not responsible for delivery of the email to such a person, please kindly
> reply to the sender indicating this fact and delete all copies of it from
> your computer and network server immediately. Your cooperation is highly
> appreciated. It is advised that any unauthorized use of confidential
> information of Winbond is strictly prohibited; and any information in this
> email irrelevant to the official business of Winbond shall be deemed as
> neither given nor endorsed by Winbond.
>


RE: a noob question for how to implement setup and cleanup in Spark map

2014-08-18 Thread Henry Hung
I slightly modify the code to use while(partitions.hasNext) { } instead of 
partitions.map(func)
I suppose this can eliminate the uncertainty from lazy execution.

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com]
Sent: Monday, August 18, 2014 3:10 PM
To: MA33 YTHung1
Cc: user@spark.apache.org
Subject: Re: a noob question for how to implement setup and cleanup in Spark map

I think this was a more comprehensive answer recently. Tobias is right that it 
is not quite that simple:
http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E

On Mon, Aug 18, 2014 at 8:04 AM, Henry Hung  wrote:
> Hi All,
>
>
>
> Please ignore my question, I found a way to implement it via old
> archive
> mails:
>
>
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF
> _KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E
>
>
>
> Best regards,
>
> Henry
>
>
>
> From: MA33 YTHung1
> Sent: Monday, August 18, 2014 2:42 PM
> To: user@spark.apache.org
> Subject: a noob question for how to implement setup and cleanup in
> Spark map
>
>
>
> Hi All,
>
>
>
> I’m new to Spark and Scala, just recently using this language and love
> it, but there is a small coding problem when I want to convert my
> existing map reduce code from Java to Spark…
>
>
>
> In Java, I create a class by extending
> org.apache.hadoop.mapreduce.Mapper
> and override the setup(), map() and cleanup() methods.
>
> But in the Spark, there is no a method called setup(), so I write the
> setup() code into map(), but it performs badly.
>
> The reason is I create database connection in the setup() once and
> run() will execute SQL query, then cleanup() will close the connection.
>
> Could someone tell me how to do it in Spark?
>
>
>
> Best regards,
>
> Henry Hung
>
>
>
> 
>
> The privileged confidential information contained in this email is
> intended for use only by the addressees as indicated by the original
> sender of this email. If you are not the addressee indicated in this
> email or are not responsible for delivery of the email to such a
> person, please kindly reply to the sender indicating this fact and
> delete all copies of it from your computer and network server
> immediately. Your cooperation is highly appreciated. It is advised
> that any unauthorized use of confidential information of Winbond is
> strictly prohibited; and any information in this email irrelevant to
> the official business of Winbond shall be deemed as neither given nor 
> endorsed by Winbond.
>
>
> 
> The privileged confidential information contained in this email is
> intended for use only by the addressees as indicated by the original
> sender of this email. If you are not the addressee indicated in this
> email or are not responsible for delivery of the email to such a
> person, please kindly reply to the sender indicating this fact and
> delete all copies of it from your computer and network server
> immediately. Your cooperation is highly appreciated. It is advised
> that any unauthorized use of confidential information of Winbond is
> strictly prohibited; and any information in this email irrelevant to
> the official business of Winbond shall be deemed as neither given nor 
> endorsed by Winbond.

The privileged confidential information contained in this email is intended for 
use only by the addressees as indicated by the original sender of this email. 
If you are not the addressee indicated in this email or are not responsible for 
delivery of the email to such a person, please kindly reply to the sender 
indicating this fact and delete all copies of it from your computer and network 
server immediately. Your cooperation is highly appreciated. It is advised that 
any unauthorized use of confidential information of Winbond is strictly 
prohibited; and any information in this email irrelevant to the official 
business of Winbond shall be deemed as neither given nor endorsed by Winbond.


Re: a noob question for how to implement setup and cleanup in Spark map

2014-08-18 Thread Sean Owen
I think this was a more comprehensive answer recently. Tobias is right
that it is not quite that simple:
http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E

On Mon, Aug 18, 2014 at 8:04 AM, Henry Hung  wrote:
> Hi All,
>
>
>
> Please ignore my question, I found a way to implement it via old archive
> mails:
>
>
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF_KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E
>
>
>
> Best regards,
>
> Henry
>
>
>
> From: MA33 YTHung1
> Sent: Monday, August 18, 2014 2:42 PM
> To: user@spark.apache.org
> Subject: a noob question for how to implement setup and cleanup in Spark map
>
>
>
> Hi All,
>
>
>
> I’m new to Spark and Scala, just recently using this language and love it,
> but there is a small coding problem when I want to convert my existing map
> reduce code from Java to Spark…
>
>
>
> In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper
> and override the setup(), map() and cleanup() methods.
>
> But in the Spark, there is no a method called setup(), so I write the
> setup() code into map(), but it performs badly.
>
> The reason is I create database connection in the setup() once and run()
> will execute SQL query, then cleanup() will close the connection.
>
> Could someone tell me how to do it in Spark?
>
>
>
> Best regards,
>
> Henry Hung
>
>
>
> 
>
> The privileged confidential information contained in this email is intended
> for use only by the addressees as indicated by the original sender of this
> email. If you are not the addressee indicated in this email or are not
> responsible for delivery of the email to such a person, please kindly reply
> to the sender indicating this fact and delete all copies of it from your
> computer and network server immediately. Your cooperation is highly
> appreciated. It is advised that any unauthorized use of confidential
> information of Winbond is strictly prohibited; and any information in this
> email irrelevant to the official business of Winbond shall be deemed as
> neither given nor endorsed by Winbond.
>
>
> 
> The privileged confidential information contained in this email is intended
> for use only by the addressees as indicated by the original sender of this
> email. If you are not the addressee indicated in this email or are not
> responsible for delivery of the email to such a person, please kindly reply
> to the sender indicating this fact and delete all copies of it from your
> computer and network server immediately. Your cooperation is highly
> appreciated. It is advised that any unauthorized use of confidential
> information of Winbond is strictly prohibited; and any information in this
> email irrelevant to the official business of Winbond shall be deemed as
> neither given nor endorsed by Winbond.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: a noob question for how to implement setup and cleanup in Spark map

2014-08-18 Thread Akhil Das
You can create an RDD and then you can do a map or mapPartitions on that
where in the top you will create the database connection and all, then do
the operations and at the end close the connections.

Thanks
Best Regards


On Mon, Aug 18, 2014 at 12:34 PM, Henry Hung  wrote:

>  Hi All,
>
>
>
> Please ignore my question, I found a way to implement it via old archive
> mails:
>
>
>
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF_KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E
>
>
>
> Best regards,
>
> Henry
>
>
>
> *From:* MA33 YTHung1
> *Sent:* Monday, August 18, 2014 2:42 PM
> *To:* user@spark.apache.org
> *Subject:* a noob question for how to implement setup and cleanup in
> Spark map
>
>
>
> Hi All,
>
>
>
> I’m new to Spark and Scala, just recently using this language and love it,
> but there is a small coding problem when I want to convert my existing map
> reduce code from Java to Spark…
>
>
>
> In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper
> and override the setup(), map() and cleanup() methods.
>
> But in the Spark, there is no a method called setup(), so I write the
> setup() code into map(), but it performs badly.
>
> The reason is I create database connection in the setup() once and run()
> will execute SQL query, then cleanup() will close the connection.
>
> Could someone tell me how to do it in Spark?
>
>
>
> Best regards,
>
> Henry Hung
>
>
>  --
>
> The privileged confidential information contained in this email is
> intended for use only by the addressees as indicated by the original sender
> of this email. If you are not the addressee indicated in this email or are
> not responsible for delivery of the email to such a person, please kindly
> reply to the sender indicating this fact and delete all copies of it from
> your computer and network server immediately. Your cooperation is highly
> appreciated. It is advised that any unauthorized use of confidential
> information of Winbond is strictly prohibited; and any information in this
> email irrelevant to the official business of Winbond shall be deemed as
> neither given nor endorsed by Winbond.
>
> --
> The privileged confidential information contained in this email is
> intended for use only by the addressees as indicated by the original sender
> of this email. If you are not the addressee indicated in this email or are
> not responsible for delivery of the email to such a person, please kindly
> reply to the sender indicating this fact and delete all copies of it from
> your computer and network server immediately. Your cooperation is highly
> appreciated. It is advised that any unauthorized use of confidential
> information of Winbond is strictly prohibited; and any information in this
> email irrelevant to the official business of Winbond shall be deemed as
> neither given nor endorsed by Winbond.
>


RE: a noob question for how to implement setup and cleanup in Spark map

2014-08-18 Thread Henry Hung
Hi All,

Please ignore my question, I found a way to implement it via old archive mails:

http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF_KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E

Best regards,
Henry

From: MA33 YTHung1
Sent: Monday, August 18, 2014 2:42 PM
To: user@spark.apache.org
Subject: a noob question for how to implement setup and cleanup in Spark map

Hi All,

I'm new to Spark and Scala, just recently using this language and love it, but 
there is a small coding problem when I want to convert my existing map reduce 
code from Java to Spark...

In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper and 
override the setup(), map() and cleanup() methods.
But in the Spark, there is no a method called setup(), so I write the setup() 
code into map(), but it performs badly.
The reason is I create database connection in the setup() once and run() will 
execute SQL query, then cleanup() will close the connection.
Could someone tell me how to do it in Spark?

Best regards,
Henry Hung


The privileged confidential information contained in this email is intended for 
use only by the addressees as indicated by the original sender of this email. 
If you are not the addressee indicated in this email or are not responsible for 
delivery of the email to such a person, please kindly reply to the sender 
indicating this fact and delete all copies of it from your computer and network 
server immediately. Your cooperation is highly appreciated. It is advised that 
any unauthorized use of confidential information of Winbond is strictly 
prohibited; and any information in this email irrelevant to the official 
business of Winbond shall be deemed as neither given nor endorsed by Winbond.


The privileged confidential information contained in this email is intended for 
use only by the addressees as indicated by the original sender of this email. 
If you are not the addressee indicated in this email or are not responsible for 
delivery of the email to such a person, please kindly reply to the sender 
indicating this fact and delete all copies of it from your computer and network 
server immediately. Your cooperation is highly appreciated. It is advised that 
any unauthorized use of confidential information of Winbond is strictly 
prohibited; and any information in this email irrelevant to the official 
business of Winbond shall be deemed as neither given nor endorsed by Winbond.