Re: a noob question for how to implement setup and cleanup in Spark map
And duh, of course, you can do the setup in that new RDD as well :) On Wed, Aug 20, 2014 at 1:59 AM, Victor Tso-Guillen wrote: > How about this: > > val prev: RDD[V] = rdd.mapPartitions(partition => { /*setup()*/; partition > }) > new RDD[V](prev) { > protected def getPartitions = prev.partitions > > def compute(split: Partition, context: TaskContext) = { > context.addOnCompleteCallback(() => /*cleanup()*/) > firstParent[V].iterator(split, context) > } > } > > > On Tue, Aug 19, 2014 at 11:56 AM, Sean Owen wrote: > >> I think you're looking for foreachPartition(). You've kinda hacked it >> out of mapPartitions(). Your case has a simple solution, yes. After >> saving to the DB, you know you can close the connection, since you >> know the use of the connection has definitely just finished. But it's >> not a simpler solution for mapPartitions() since that's not really >> what you are using :) >> >> In general, mapPartitions creates an Iterator from another Iterator. >> Of course you could consume the input iterator, open the connection, >> perform operations, close the connection and return an iterator over >> the result. That works, but requires reading the entire input no >> matter what, and, reading it into memory. These may not be OK in all >> cases. >> >> Where possible, it's nicest to return an Iterator that accesses the >> source Iterator only as needed to produce elements. This means >> returning that Iterator before any work has been done. So you have to >> close the connection later when the Iterator has been exhausted. >> Really Tobias's method is trying to shim in a "cleanup()" lifecycle >> method into the Iterator. I suppose it could be done a little more >> cleanly using Guava's Iterator library, which would give you a more >> explicit way to execute something when done. >> >> >> On Tue, Aug 19, 2014 at 7:36 PM, Yana Kadiyska >> wrote: >> > Sean, would this work -- >> > >> > rdd.mapPartitions { partition => Iterator(partition) }.foreach( >> > >> >// Some setup code here >> >// save partition to DB >> >// Some cleanup code here >> > ) >> > >> > >> > I tried a pretty simple example ... I can see that the setup and >> cleanup are >> > executed on the executor node, once per partition (I used >> > mapPartitionWithIndex instead of mapPartition to track this a little >> > better). Seems like an easier solution than Tobias's but I'm wondering >> if >> > it's perhaps incorrect >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: a noob question for how to implement setup and cleanup in Spark map
How about this: val prev: RDD[V] = rdd.mapPartitions(partition => { /*setup()*/; partition }) new RDD[V](prev) { protected def getPartitions = prev.partitions def compute(split: Partition, context: TaskContext) = { context.addOnCompleteCallback(() => /*cleanup()*/) firstParent[V].iterator(split, context) } } On Tue, Aug 19, 2014 at 11:56 AM, Sean Owen wrote: > I think you're looking for foreachPartition(). You've kinda hacked it > out of mapPartitions(). Your case has a simple solution, yes. After > saving to the DB, you know you can close the connection, since you > know the use of the connection has definitely just finished. But it's > not a simpler solution for mapPartitions() since that's not really > what you are using :) > > In general, mapPartitions creates an Iterator from another Iterator. > Of course you could consume the input iterator, open the connection, > perform operations, close the connection and return an iterator over > the result. That works, but requires reading the entire input no > matter what, and, reading it into memory. These may not be OK in all > cases. > > Where possible, it's nicest to return an Iterator that accesses the > source Iterator only as needed to produce elements. This means > returning that Iterator before any work has been done. So you have to > close the connection later when the Iterator has been exhausted. > Really Tobias's method is trying to shim in a "cleanup()" lifecycle > method into the Iterator. I suppose it could be done a little more > cleanly using Guava's Iterator library, which would give you a more > explicit way to execute something when done. > > > On Tue, Aug 19, 2014 at 7:36 PM, Yana Kadiyska > wrote: > > Sean, would this work -- > > > > rdd.mapPartitions { partition => Iterator(partition) }.foreach( > > > >// Some setup code here > >// save partition to DB > >// Some cleanup code here > > ) > > > > > > I tried a pretty simple example ... I can see that the setup and cleanup > are > > executed on the executor node, once per partition (I used > > mapPartitionWithIndex instead of mapPartition to track this a little > > better). Seems like an easier solution than Tobias's but I'm wondering if > > it's perhaps incorrect > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: a noob question for how to implement setup and cleanup in Spark map
I think you're looking for foreachPartition(). You've kinda hacked it out of mapPartitions(). Your case has a simple solution, yes. After saving to the DB, you know you can close the connection, since you know the use of the connection has definitely just finished. But it's not a simpler solution for mapPartitions() since that's not really what you are using :) In general, mapPartitions creates an Iterator from another Iterator. Of course you could consume the input iterator, open the connection, perform operations, close the connection and return an iterator over the result. That works, but requires reading the entire input no matter what, and, reading it into memory. These may not be OK in all cases. Where possible, it's nicest to return an Iterator that accesses the source Iterator only as needed to produce elements. This means returning that Iterator before any work has been done. So you have to close the connection later when the Iterator has been exhausted. Really Tobias's method is trying to shim in a "cleanup()" lifecycle method into the Iterator. I suppose it could be done a little more cleanly using Guava's Iterator library, which would give you a more explicit way to execute something when done. On Tue, Aug 19, 2014 at 7:36 PM, Yana Kadiyska wrote: > Sean, would this work -- > > rdd.mapPartitions { partition => Iterator(partition) }.foreach( > >// Some setup code here >// save partition to DB >// Some cleanup code here > ) > > > I tried a pretty simple example ... I can see that the setup and cleanup are > executed on the executor node, once per partition (I used > mapPartitionWithIndex instead of mapPartition to track this a little > better). Seems like an easier solution than Tobias's but I'm wondering if > it's perhaps incorrect - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: a noob question for how to implement setup and cleanup in Spark map
Sean, would this work -- rdd.mapPartitions { partition => Iterator(partition) }.foreach( // Some setup code here // save partition to DB // Some cleanup code here ) I tried a pretty simple example ... I can see that the setup and cleanup are executed on the executor node, once per partition (I used mapPartitionWithIndex instead of mapPartition to track this a little better). Seems like an easier solution than Tobias's but I'm wondering if it's perhaps incorrect On Mon, Aug 18, 2014 at 3:29 AM, Henry Hung wrote: > I slightly modify the code to use while(partitions.hasNext) { } instead of > partitions.map(func) > I suppose this can eliminate the uncertainty from lazy execution. > > -Original Message- > From: Sean Owen [mailto:so...@cloudera.com] > Sent: Monday, August 18, 2014 3:10 PM > To: MA33 YTHung1 > Cc: user@spark.apache.org > Subject: Re: a noob question for how to implement setup and cleanup in > Spark map > > I think this was a more comprehensive answer recently. Tobias is right > that it is not quite that simple: > > http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E > > On Mon, Aug 18, 2014 at 8:04 AM, Henry Hung wrote: > > Hi All, > > > > > > > > Please ignore my question, I found a way to implement it via old > > archive > > mails: > > > > > > > > http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF > > _KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E > > > > > > > > Best regards, > > > > Henry > > > > > > > > From: MA33 YTHung1 > > Sent: Monday, August 18, 2014 2:42 PM > > To: user@spark.apache.org > > Subject: a noob question for how to implement setup and cleanup in > > Spark map > > > > > > > > Hi All, > > > > > > > > I’m new to Spark and Scala, just recently using this language and love > > it, but there is a small coding problem when I want to convert my > > existing map reduce code from Java to Spark… > > > > > > > > In Java, I create a class by extending > > org.apache.hadoop.mapreduce.Mapper > > and override the setup(), map() and cleanup() methods. > > > > But in the Spark, there is no a method called setup(), so I write the > > setup() code into map(), but it performs badly. > > > > The reason is I create database connection in the setup() once and > > run() will execute SQL query, then cleanup() will close the connection. > > > > Could someone tell me how to do it in Spark? > > > > > > > > Best regards, > > > > Henry Hung > > > > > > > > > > > > The privileged confidential information contained in this email is > > intended for use only by the addressees as indicated by the original > > sender of this email. If you are not the addressee indicated in this > > email or are not responsible for delivery of the email to such a > > person, please kindly reply to the sender indicating this fact and > > delete all copies of it from your computer and network server > > immediately. Your cooperation is highly appreciated. It is advised > > that any unauthorized use of confidential information of Winbond is > > strictly prohibited; and any information in this email irrelevant to > > the official business of Winbond shall be deemed as neither given nor > endorsed by Winbond. > > > > > > > > The privileged confidential information contained in this email is > > intended for use only by the addressees as indicated by the original > > sender of this email. If you are not the addressee indicated in this > > email or are not responsible for delivery of the email to such a > > person, please kindly reply to the sender indicating this fact and > > delete all copies of it from your computer and network server > > immediately. Your cooperation is highly appreciated. It is advised > > that any unauthorized use of confidential information of Winbond is > > strictly prohibited; and any information in this email irrelevant to > > the official business of Winbond shall be deemed as neither given nor > endorsed by Winbond. > > The privileged confidential information contained in this email is > intended for use only by the addressees as indicated by the original sender > of this email. If you are not the addressee indicated in this email or are > not responsible for delivery of the email to such a person, please kindly > reply to the sender indicating this fact and delete all copies of it from > your computer and network server immediately. Your cooperation is highly > appreciated. It is advised that any unauthorized use of confidential > information of Winbond is strictly prohibited; and any information in this > email irrelevant to the official business of Winbond shall be deemed as > neither given nor endorsed by Winbond. >
RE: a noob question for how to implement setup and cleanup in Spark map
I slightly modify the code to use while(partitions.hasNext) { } instead of partitions.map(func) I suppose this can eliminate the uncertainty from lazy execution. -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Monday, August 18, 2014 3:10 PM To: MA33 YTHung1 Cc: user@spark.apache.org Subject: Re: a noob question for how to implement setup and cleanup in Spark map I think this was a more comprehensive answer recently. Tobias is right that it is not quite that simple: http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E On Mon, Aug 18, 2014 at 8:04 AM, Henry Hung wrote: > Hi All, > > > > Please ignore my question, I found a way to implement it via old > archive > mails: > > > > http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF > _KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E > > > > Best regards, > > Henry > > > > From: MA33 YTHung1 > Sent: Monday, August 18, 2014 2:42 PM > To: user@spark.apache.org > Subject: a noob question for how to implement setup and cleanup in > Spark map > > > > Hi All, > > > > I’m new to Spark and Scala, just recently using this language and love > it, but there is a small coding problem when I want to convert my > existing map reduce code from Java to Spark… > > > > In Java, I create a class by extending > org.apache.hadoop.mapreduce.Mapper > and override the setup(), map() and cleanup() methods. > > But in the Spark, there is no a method called setup(), so I write the > setup() code into map(), but it performs badly. > > The reason is I create database connection in the setup() once and > run() will execute SQL query, then cleanup() will close the connection. > > Could someone tell me how to do it in Spark? > > > > Best regards, > > Henry Hung > > > > > > The privileged confidential information contained in this email is > intended for use only by the addressees as indicated by the original > sender of this email. If you are not the addressee indicated in this > email or are not responsible for delivery of the email to such a > person, please kindly reply to the sender indicating this fact and > delete all copies of it from your computer and network server > immediately. Your cooperation is highly appreciated. It is advised > that any unauthorized use of confidential information of Winbond is > strictly prohibited; and any information in this email irrelevant to > the official business of Winbond shall be deemed as neither given nor > endorsed by Winbond. > > > > The privileged confidential information contained in this email is > intended for use only by the addressees as indicated by the original > sender of this email. If you are not the addressee indicated in this > email or are not responsible for delivery of the email to such a > person, please kindly reply to the sender indicating this fact and > delete all copies of it from your computer and network server > immediately. Your cooperation is highly appreciated. It is advised > that any unauthorized use of confidential information of Winbond is > strictly prohibited; and any information in this email irrelevant to > the official business of Winbond shall be deemed as neither given nor > endorsed by Winbond. The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.
Re: a noob question for how to implement setup and cleanup in Spark map
I think this was a more comprehensive answer recently. Tobias is right that it is not quite that simple: http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E On Mon, Aug 18, 2014 at 8:04 AM, Henry Hung wrote: > Hi All, > > > > Please ignore my question, I found a way to implement it via old archive > mails: > > > > http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF_KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E > > > > Best regards, > > Henry > > > > From: MA33 YTHung1 > Sent: Monday, August 18, 2014 2:42 PM > To: user@spark.apache.org > Subject: a noob question for how to implement setup and cleanup in Spark map > > > > Hi All, > > > > I’m new to Spark and Scala, just recently using this language and love it, > but there is a small coding problem when I want to convert my existing map > reduce code from Java to Spark… > > > > In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper > and override the setup(), map() and cleanup() methods. > > But in the Spark, there is no a method called setup(), so I write the > setup() code into map(), but it performs badly. > > The reason is I create database connection in the setup() once and run() > will execute SQL query, then cleanup() will close the connection. > > Could someone tell me how to do it in Spark? > > > > Best regards, > > Henry Hung > > > > > > The privileged confidential information contained in this email is intended > for use only by the addressees as indicated by the original sender of this > email. If you are not the addressee indicated in this email or are not > responsible for delivery of the email to such a person, please kindly reply > to the sender indicating this fact and delete all copies of it from your > computer and network server immediately. Your cooperation is highly > appreciated. It is advised that any unauthorized use of confidential > information of Winbond is strictly prohibited; and any information in this > email irrelevant to the official business of Winbond shall be deemed as > neither given nor endorsed by Winbond. > > > > The privileged confidential information contained in this email is intended > for use only by the addressees as indicated by the original sender of this > email. If you are not the addressee indicated in this email or are not > responsible for delivery of the email to such a person, please kindly reply > to the sender indicating this fact and delete all copies of it from your > computer and network server immediately. Your cooperation is highly > appreciated. It is advised that any unauthorized use of confidential > information of Winbond is strictly prohibited; and any information in this > email irrelevant to the official business of Winbond shall be deemed as > neither given nor endorsed by Winbond. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: a noob question for how to implement setup and cleanup in Spark map
You can create an RDD and then you can do a map or mapPartitions on that where in the top you will create the database connection and all, then do the operations and at the end close the connections. Thanks Best Regards On Mon, Aug 18, 2014 at 12:34 PM, Henry Hung wrote: > Hi All, > > > > Please ignore my question, I found a way to implement it via old archive > mails: > > > > > http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF_KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E > > > > Best regards, > > Henry > > > > *From:* MA33 YTHung1 > *Sent:* Monday, August 18, 2014 2:42 PM > *To:* user@spark.apache.org > *Subject:* a noob question for how to implement setup and cleanup in > Spark map > > > > Hi All, > > > > I’m new to Spark and Scala, just recently using this language and love it, > but there is a small coding problem when I want to convert my existing map > reduce code from Java to Spark… > > > > In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper > and override the setup(), map() and cleanup() methods. > > But in the Spark, there is no a method called setup(), so I write the > setup() code into map(), but it performs badly. > > The reason is I create database connection in the setup() once and run() > will execute SQL query, then cleanup() will close the connection. > > Could someone tell me how to do it in Spark? > > > > Best regards, > > Henry Hung > > > -- > > The privileged confidential information contained in this email is > intended for use only by the addressees as indicated by the original sender > of this email. If you are not the addressee indicated in this email or are > not responsible for delivery of the email to such a person, please kindly > reply to the sender indicating this fact and delete all copies of it from > your computer and network server immediately. Your cooperation is highly > appreciated. It is advised that any unauthorized use of confidential > information of Winbond is strictly prohibited; and any information in this > email irrelevant to the official business of Winbond shall be deemed as > neither given nor endorsed by Winbond. > > -- > The privileged confidential information contained in this email is > intended for use only by the addressees as indicated by the original sender > of this email. If you are not the addressee indicated in this email or are > not responsible for delivery of the email to such a person, please kindly > reply to the sender indicating this fact and delete all copies of it from > your computer and network server immediately. Your cooperation is highly > appreciated. It is advised that any unauthorized use of confidential > information of Winbond is strictly prohibited; and any information in this > email irrelevant to the official business of Winbond shall be deemed as > neither given nor endorsed by Winbond. >
RE: a noob question for how to implement setup and cleanup in Spark map
Hi All, Please ignore my question, I found a way to implement it via old archive mails: http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF_KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E Best regards, Henry From: MA33 YTHung1 Sent: Monday, August 18, 2014 2:42 PM To: user@spark.apache.org Subject: a noob question for how to implement setup and cleanup in Spark map Hi All, I'm new to Spark and Scala, just recently using this language and love it, but there is a small coding problem when I want to convert my existing map reduce code from Java to Spark... In Java, I create a class by extending org.apache.hadoop.mapreduce.Mapper and override the setup(), map() and cleanup() methods. But in the Spark, there is no a method called setup(), so I write the setup() code into map(), but it performs badly. The reason is I create database connection in the setup() once and run() will execute SQL query, then cleanup() will close the connection. Could someone tell me how to do it in Spark? Best regards, Henry Hung The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond. The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.