Hi Fabian,

I didn't realize you meant that lazy val should be inside RichMapFunction
implementation, it makes sense. That's what I ended up doing already.

Thanks!
Timur

On Mon, Apr 25, 2016 at 3:34 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Timur,
>
> a TaskManager may run as many subtasks of a Map operator as it has slots.
> Each subtask of an operator runs in a different thread. Each parallel
> subtask of a Map operator has its own MapFunction object, so it should be
> possible to use a lazy val.
>
> However, you should not use static variables to hold state, because these
> are shared between all MapFunction in a TaskManager (JVM).
>
> 2016-04-22 21:21 GMT+02:00 Timur Fayruzov <timur.fairu...@gmail.com>:
>
>> Actually, a follow-up question: is map function single-threaded (within
>> one task manager, that is). If it's not then lazy initialization wont'
>> work, is it right?
>>
>> On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> You may also be able to initialize the client only in the parallel
>>> execution by making it a "lazy" variable in Scala.
>>>
>>> On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <
>>> timur.fairu...@gmail.com> wrote:
>>>
>>>> Outstanding! Thanks, Aljoscha.
>>>>
>>>> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> you could use a RichMapFunction that has an open method:
>>>>>
>>>>> data.map(new RichMapFunction[...]() {
>>>>>   def open(): () = {
>>>>>     // initialize client
>>>>>   }
>>>>>
>>>>>   def map(input: INT): OUT = {
>>>>>     // use client
>>>>>   }
>>>>> }
>>>>>
>>>>> the open() method is called before any elements are passed to the
>>>>> function. The counterpart of open() is close(), which is called after all
>>>>> elements are through or if the job cancels.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I'm writing a Scala Flink application. I have a standalone process
>>>>>> that exists on every Flink node that I need to call to transform my data.
>>>>>> To access this process I need to initialize non thread-safe client 
>>>>>> first. I
>>>>>> would like to avoid initializing a client for each element being
>>>>>> transformed. A straightforward implementation would be something like 
>>>>>> this:
>>>>>> ```
>>>>>>
>>>>>> val env = ExecutionEnvironment.getExecutionEnvironment
>>>>>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"))))
>>>>>> val pool  = new ArrayBlockingQueue[Client](5)
>>>>>> // pool is filled here
>>>>>> data.map(e => {
>>>>>>   val client = pool.take()
>>>>>>   val res = client.transform(e)
>>>>>>   pool.put(client)
>>>>>>   res
>>>>>> })
>>>>>>
>>>>>> ```
>>>>>> However, this causes a runtime exception with message "Task not
>>>>>> serializable", which makes sense.
>>>>>>
>>>>>> Function parameters and broadcast variables won't work either as far
>>>>>> as I understand. Is there a way to make this happen?
>>>>>>
>>>>>> Thanks,
>>>>>> Timur
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to