A. No, it's called once per partition.  Usually you have more partitions
than executors, so it will end up getting called multiple times per
executor.  But you can use a lazy val, singleton, etc to make sure the
setup only takes place once per JVM.

B.  I cant speak to the specifics there ... but as long as you're making
sure the setup gets called at most once per executor, before the work that
needs it ... should be ok.

On Fri, Jun 12, 2015 at 4:11 PM, algermissen1971 <algermissen1...@icloud.com
> wrote:

>
> On 12 Jun 2015, at 22:59, Cody Koeninger <c...@koeninger.org> wrote:
>
> > Close.  the mapPartitions call doesn't need to do anything at all to the
> iter.
> >
> > mapPartitions { iter =>
> >   SomeDb.conn.init
> >   iter
> > }
>
> Yes, thanks!
>
> Maybe you can confirm two more things and then you helped me make a giant
> leap today:
>
> a) When using spark streaming, will this happen exactly once per executor?
> I mean: is mapPartitions called once per executor for the lifetime of the
> stream?
>
> Or should I rather think once per stage?
>
>
> b) I actually need an ActorSystem and FlowMaterializer (for making an
> Akka-HTTP request to store the data), not a DB connection - I presume this
> does not changethe concept?
>
>
> Jan
>
>
>
> >
> > On Fri, Jun 12, 2015 at 3:55 PM, algermissen1971 <
> algermissen1...@icloud.com> wrote:
> > Cody,
> >
> > On 12 Jun 2015, at 17:26, Cody Koeninger <c...@koeninger.org> wrote:
> >
> > > There are several database apis that use a thread local or singleton
> reference to a connection pool (we use ScalikeJDBC currently, but there are
> others).
> > >
> > > You can use mapPartitions earlier in the chain to make sure the
> connection pool is set up on that executor, then use it inside
> updateStateByKey
> > >
> >
> > Thanks. You are saying I should just make an arbitrary use of the
> ‘connection’ to invoke the ‘lazy’. E.g. like this:
> >
> > object SomeDB {
> >
> >   lazy val conn = new SomeDB( “some serializable config")
> >
> > }
> >
> >
> > Then somewhere else:
> >
> > theTrackingEvents.map(toPairs).mapPartitions(iter => iter.map( pair => {
> >   SomeDb.conn.init
> >   pair
> >    }
> > )).updateStateByKey[Session](myUpdateFunction _)
> >
> >
> > An in myUpdateFunction
> >
> > def myUpdateFunction( …) {
> >
> >     SomeDb.conn.store(  … )
> >
> > }
> >
> >
> > Correct?
> >
> > Jan
> >
> >
> >
> >
> > > On Fri, Jun 12, 2015 at 10:07 AM, algermissen1971 <
> algermissen1...@icloud.com> wrote:
> > > Hi,
> > >
> > > I have a scenario with spark streaming, where I need to write to a
> database from within updateStateByKey[1].
> > >
> > > That means that inside my update function I need a connection.
> > >
> > > I have so far understood that I should create a new (lazy) connection
> for every partition. But since I am not working in foreachRDD I wonder
> where I can iterate over the partitions.
> > >
> > > Should I use mapPartitions() somewhere up the chain?
> > >
> > > Jan
> > >
> > >
> > >
> > > [1] The use case being saving ‘done' sessions during web tracking.
> > >
> > >
> > > ---------------------------------------------------------------------
> > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > > For additional commands, e-mail: user-h...@spark.apache.org
> > >
> > >
> >
> >
>
>

Reply via email to