Hi Fabian, I didn't realize you meant that lazy val should be inside RichMapFunction implementation, it makes sense. That's what I ended up doing already.
Thanks! Timur On Mon, Apr 25, 2016 at 3:34 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Timur, > > a TaskManager may run as many subtasks of a Map operator as it has slots. > Each subtask of an operator runs in a different thread. Each parallel > subtask of a Map operator has its own MapFunction object, so it should be > possible to use a lazy val. > > However, you should not use static variables to hold state, because these > are shared between all MapFunction in a TaskManager (JVM). > > 2016-04-22 21:21 GMT+02:00 Timur Fayruzov <timur.fairu...@gmail.com>: > >> Actually, a follow-up question: is map function single-threaded (within >> one task manager, that is). If it's not then lazy initialization wont' >> work, is it right? >> >> On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <se...@apache.org> wrote: >> >>> You may also be able to initialize the client only in the parallel >>> execution by making it a "lazy" variable in Scala. >>> >>> On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov < >>> timur.fairu...@gmail.com> wrote: >>> >>>> Outstanding! Thanks, Aljoscha. >>>> >>>> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> Hi, >>>>> you could use a RichMapFunction that has an open method: >>>>> >>>>> data.map(new RichMapFunction[...]() { >>>>> def open(): () = { >>>>> // initialize client >>>>> } >>>>> >>>>> def map(input: INT): OUT = { >>>>> // use client >>>>> } >>>>> } >>>>> >>>>> the open() method is called before any elements are passed to the >>>>> function. The counterpart of open() is close(), which is called after all >>>>> elements are through or if the job cancels. >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> >>>>> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairu...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I'm writing a Scala Flink application. I have a standalone process >>>>>> that exists on every Flink node that I need to call to transform my data. >>>>>> To access this process I need to initialize non thread-safe client >>>>>> first. I >>>>>> would like to avoid initializing a client for each element being >>>>>> transformed. A straightforward implementation would be something like >>>>>> this: >>>>>> ``` >>>>>> >>>>>> val env = ExecutionEnvironment.getExecutionEnvironment >>>>>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c")))) >>>>>> val pool = new ArrayBlockingQueue[Client](5) >>>>>> // pool is filled here >>>>>> data.map(e => { >>>>>> val client = pool.take() >>>>>> val res = client.transform(e) >>>>>> pool.put(client) >>>>>> res >>>>>> }) >>>>>> >>>>>> ``` >>>>>> However, this causes a runtime exception with message "Task not >>>>>> serializable", which makes sense. >>>>>> >>>>>> Function parameters and broadcast variables won't work either as far >>>>>> as I understand. Is there a way to make this happen? >>>>>> >>>>>> Thanks, >>>>>> Timur >>>>>> >>>>> >>>> >>> >> >