Re: Dependency injection and flink.

2020-11-09 Thread Arvid Heise
I hope you don't mind that I'm just lazily giving you a link to wikipedia
[1]. The first few examples all show manual DI with ctor, setters, etc.

Folks usually only think of assembling (automatic) DI when talking about DI
but you can build everything manually with a bit of inconvenience as well.
What usually happens if that your constructors have a huge parameter list
where all the injected dependencies are passed (potentially further down
the object graph).

That's also what we have in the Flink code base (although it would be
better to replace long parameter lists with parameter objects but that's a
different story). For example, consider DataStream, where we inject the
environment [2].

[1] https://en.wikipedia.org/wiki/Dependency_injection#Examples
[2]
https://github.com/apache/flink/blob/a6c064ea41f86ddfb0ed992287c7c1d0e2407217/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L140

On Mon, Nov 9, 2020 at 9:07 PM santhosh venkat 
wrote:

> Hi Arvid,
>
> Thanks a lot for your response. If I understand correctly, then we do not
> support task level DI in flink. I completely agree with the reasons that
> you'd provided(especially with singleton). Also, the serialization context
> is also something I currently do in my app (other frameworks like samza
> expects something similar from applications too) .
>
> Only thing I don't understand is what you mean by "use automatic DI while
> creating the DataStream application and then switch to manual DI on task
> manager level.". I don't quite follow the distinction between automatic and
> manual parts.  Can you please help me understand what you mean by that.
>
> Thanks.
>
> On Tue, Nov 3, 2020 at 12:31 AM Arvid Heise  wrote:
>
>> Hi Santhosh,
>>
>> Flink does not support automatic DI on task level and there is no
>> immediate plan as of now to support it out-of-the-box. In general, there
>> are quite a few implications of using automatic DI in a distributed
>> setting. For example, how is a singleton supposed to work? Nevertheless,
>> Flink's job startup got overhauled in the last and the upcoming release, so
>> it might be easier to support DI frameworks in the near future.
>>
>> What I usually recommend is to use automatic DI while creating the
>> DataStream application and then switch to manual DI on task manager level
>> (most folks confuse DI with automatic DI, but DI is a general pattern that
>> is independent of any framework).
>>
>> Here is an example. Suppose you want to use ServiceA in some asyncIO call.
>>
>> DataStream inputStream = env.addSource(...);
>> AsyncFunction function = new ExternalLookupFunction();
>> AsyncDataStream.unorderedWait(inputStream, function, 1, 
>> TimeUnit.SECONDS).print();
>>
>> class ExternalLookupFunction extends AsyncFunction {
>>  @Autowired
>>  ServiceA service; // <-- will be injected wherever the DataStream graph 
>> is created
>>
>>  @Override
>>  public void asyncInvoke(Integer input, ResultFuture 
>> resultFuture) throws IOException {
>>  service.call(input, resultFuture::complete); // <-- called only 
>> on task manager
>>  }
>> }
>>
>>
>> Now the question is how ServiceA is transferred from client/job manager
>> to task manager. One solution is to make ServiceA Serializable and just let
>> Java Serialization handle everything automatically. Alternatively, you can
>> only serialize the configuration information and create the service on
>> RichAsyncFunction#open.
>>
>> Let's see if someone else made progress on providing the initialization
>> hooks as described in your linked thread. Note that the community is busy
>> getting Flink 1.12 done, so it might take a while for more answers.
>>
>> Best,
>>
>> Arvid
>>
>> On Tue, Nov 3, 2020 at 12:03 AM santhosh venkat <
>> santhoshvenkat1...@gmail.com> wrote:
>>
>>>
>>> Hi,
>>>
>>> I'm trying to integrate a dependency injection framework with flink
>>> within my company. When I searched the user-mailing list, I found the
>>> following thread in flink which discussed about this in the past:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dependency-Injection-and-Flink-td18880.html
>>>
>>> Since the thread was ~2 yrs old, I'm creating this request.
>>>
>>> 1. How do we expect users to integrate flink with a dependency injection
>>> framework. Are there any hooks/entry-points that we can use to seamlessly
>>> integrate a DI-fwk with flink? How does the community recommend the

Re: Dependency injection and flink.

2020-11-04 Thread Dan Diephouse
Just want to chime in here that it would be fantastic to have a way to DI
in Flink. Ideally the injected services themselves don't get serialized at
all since they're just singletons in our case. E.g. we have an API client
that looks up data from our API and caches it for all the functions that
need it.

On Tue, Nov 3, 2020 at 12:32 AM Arvid Heise  wrote:

> Hi Santhosh,
>
> Flink does not support automatic DI on task level and there is no
> immediate plan as of now to support it out-of-the-box. In general, there
> are quite a few implications of using automatic DI in a distributed
> setting. For example, how is a singleton supposed to work? Nevertheless,
> Flink's job startup got overhauled in the last and the upcoming release, so
> it might be easier to support DI frameworks in the near future.
>
> What I usually recommend is to use automatic DI while creating the
> DataStream application and then switch to manual DI on task manager level
> (most folks confuse DI with automatic DI, but DI is a general pattern that
> is independent of any framework).
>
> Here is an example. Suppose you want to use ServiceA in some asyncIO call.
>
> DataStream inputStream = env.addSource(...);
> AsyncFunction function = new ExternalLookupFunction();
> AsyncDataStream.unorderedWait(inputStream, function, 1, 
> TimeUnit.SECONDS).print();
>
> class ExternalLookupFunction extends AsyncFunction {
>   @Autowired
>   ServiceA service; // <-- will be injected wherever the DataStream graph 
> is created
>
>   @Override
>   public void asyncInvoke(Integer input, ResultFuture 
> resultFuture) throws IOException {
>   service.call(input, resultFuture::complete); // <-- called only 
> on task manager
>   }
> }
>
>
> Now the question is how ServiceA is transferred from client/job manager to
> task manager. One solution is to make ServiceA Serializable and just let
> Java Serialization handle everything automatically. Alternatively, you can
> only serialize the configuration information and create the service on
> RichAsyncFunction#open.
>
> Let's see if someone else made progress on providing the initialization
> hooks as described in your linked thread. Note that the community is busy
> getting Flink 1.12 done, so it might take a while for more answers.
>
> Best,
>
> Arvid
>
> On Tue, Nov 3, 2020 at 12:03 AM santhosh venkat <
> santhoshvenkat1...@gmail.com> wrote:
>
>>
>> Hi,
>>
>> I'm trying to integrate a dependency injection framework with flink
>> within my company. When I searched the user-mailing list, I found the
>> following thread in flink which discussed about this in the past:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dependency-Injection-and-Flink-td18880.html
>>
>> Since the thread was ~2 yrs old, I'm creating this request.
>>
>> 1. How do we expect users to integrate flink with a dependency injection
>> framework. Are there any hooks/entry-points that we can use to seamlessly
>> integrate a DI-fwk with flink? How does the community recommend the
>> dependency injection integration?
>>
>> 2. Would it be possible to create the object(say spring objects) at a
>> flink-task scope ? Or all these objects(say spring) from a dependency
>> injection fwk are expected to be created at an entire process(JM/TM) level?
>>
>> Can someone please help answer the above questions and help me understand
>> the flink-guarantees better. Any help would be greatly appreciated.
>>
>> Thanks.
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
Dan Diephouse
@dandiep


Re: Dependency injection and flink.

2020-11-03 Thread Arvid Heise
Hi Santhosh,

Flink does not support automatic DI on task level and there is no immediate
plan as of now to support it out-of-the-box. In general, there are quite a
few implications of using automatic DI in a distributed setting. For
example, how is a singleton supposed to work? Nevertheless, Flink's job
startup got overhauled in the last and the upcoming release, so it might be
easier to support DI frameworks in the near future.

What I usually recommend is to use automatic DI while creating the
DataStream application and then switch to manual DI on task manager level
(most folks confuse DI with automatic DI, but DI is a general pattern that
is independent of any framework).

Here is an example. Suppose you want to use ServiceA in some asyncIO call.

DataStream inputStream = env.addSource(...);
AsyncFunction function = new ExternalLookupFunction();
AsyncDataStream.unorderedWait(inputStream, function, 1,
TimeUnit.SECONDS).print();

class ExternalLookupFunction extends AsyncFunction {
@Autowired
ServiceA service; // <-- will be injected wherever the DataStream
graph is created

@Override
public void asyncInvoke(Integer input, ResultFuture
resultFuture) throws IOException {
service.call(input, resultFuture::complete); // <-- called only 
on
task manager
}
}


Now the question is how ServiceA is transferred from client/job manager to
task manager. One solution is to make ServiceA Serializable and just let
Java Serialization handle everything automatically. Alternatively, you can
only serialize the configuration information and create the service on
RichAsyncFunction#open.

Let's see if someone else made progress on providing the initialization
hooks as described in your linked thread. Note that the community is busy
getting Flink 1.12 done, so it might take a while for more answers.

Best,

Arvid

On Tue, Nov 3, 2020 at 12:03 AM santhosh venkat <
santhoshvenkat1...@gmail.com> wrote:

>
> Hi,
>
> I'm trying to integrate a dependency injection framework with flink within
> my company. When I searched the user-mailing list, I found the following
> thread in flink which discussed about this in the past:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dependency-Injection-and-Flink-td18880.html
>
> Since the thread was ~2 yrs old, I'm creating this request.
>
> 1. How do we expect users to integrate flink with a dependency injection
> framework. Are there any hooks/entry-points that we can use to seamlessly
> integrate a DI-fwk with flink? How does the community recommend the
> dependency injection integration?
>
> 2. Would it be possible to create the object(say spring objects) at a
> flink-task scope ? Or all these objects(say spring) from a dependency
> injection fwk are expected to be created at an entire process(JM/TM) level?
>
> Can someone please help answer the above questions and help me understand
> the flink-guarantees better. Any help would be greatly appreciated.
>
> Thanks.
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Dependency injection and flink.

2020-11-02 Thread santhosh venkat
Hi,

I'm trying to integrate a dependency injection framework with flink within
my company. When I searched the user-mailing list, I found the following
thread in flink which discussed about this in the past:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dependency-Injection-and-Flink-td18880.html

Since the thread was ~2 yrs old, I'm creating this request.

1. How do we expect users to integrate flink with a dependency injection
framework. Are there any hooks/entry-points that we can use to seamlessly
integrate a DI-fwk with flink? How does the community recommend the
dependency injection integration?

2. Would it be possible to create the object(say spring objects) at a
flink-task scope ? Or all these objects(say spring) from a dependency
injection fwk are expected to be created at an entire process(JM/TM) level?

Can someone please help answer the above questions and help me understand
the flink-guarantees better. Any help would be greatly appreciated.

Thanks.


Re: Dependency Injection and Flink

2018-03-17 Thread Steven Wu
Stephan,

That would be helpful. On job manager side, entry class provides such an
entry point hook. The problem is on the task manager side, where we don't
have such an initialization/entry point.

I have brought up the same question 3 months ago in this list with subject
"entrypoint for executing job in task manager".

Thanks,
Steven



On Thu, Mar 15, 2018 at 1:49 PM, Stephan Ewen  wrote:

> Would it help to be able to register "initializers", meaning some
> classes/methods that will be called at every process entry point, to set up
> something like this?
>
>
> On Tue, Mar 13, 2018 at 7:56 PM, Steven Wu  wrote:
>
>> Xiaochuan,
>>
>> We are doing exactly as you described. We keep the injector as a global
>> static var.
>>
>> But we extend from FlinkJobManager and FlinkTaskManager to override main
>> method and initialize the injector (and other things) during JVM startup,
>> which does cause tight code coupling. It is a little painful to upgrade
>> Flink because sometimes internal code structure change of FlinkJobManager
>> and FlinkTaskManager can break our extended class..
>>
>> Thanks,
>> Steven
>>
>>
>> On Tue, Mar 13, 2018 at 11:30 AM, XiaoChuan Yu 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm evaluating Flink with the intent to integrate it into a Java project
>>> that uses a lot of dependency injection via Guice. What would be the best
>>> way to work with DI/Guice given that injected fields aren't Serializable?
>>> I looked at this StackOverflow answer so far. To my understanding the
>>> strategy is as follows but I'm not sure about step 3:
>>>
>>>1. Use a RichFunction any time injection required.
>>>2. Do not use @Inject, instead mark each injected field as transient.
>>>3. Implement open() / close() and manually assign values to injected
>>>fields using Injector.getInstance(SomeClass.class)? But where do I
>>>get the injector? Create one on the spot each time? Keep one as a static
>>>var somewhere and use everywhere?
>>>
>>> Example:
>>>  public class MyFilter extends FilterFunction {
>>>  private transient DbClient dbClient;
>>>  //@Inject DbClient dbClient; //typical Guice field injection
>>>
>>>  public void open(Configuration parameters) {
>>>  // where am I suppose to get the injector?
>>>  // keep it as a static variable somewhere and init it in Main?
>>>  this.dbClient = MyInjectorHolder.injector().ge
>>> tInstance(DbClient.class);
>>>  }
>>>  public boolean filter(String value) {
>>>  return this.dbClient.query(value);
>>>  }
>>>  }
>>> I haven't setup a Flink environment to try the above yet though.
>>> Does anyone know of a less verbose way?
>>> I imagine this could get quite verbose with multiple injected fields.
>>>
>>> Thanks,
>>> Xiaochuan Yu
>>>
>>>
>>
>


Re: Dependency Injection and Flink

2018-03-15 Thread Stephan Ewen
Would it help to be able to register "initializers", meaning some
classes/methods that will be called at every process entry point, to set up
something like this?


On Tue, Mar 13, 2018 at 7:56 PM, Steven Wu  wrote:

> Xiaochuan,
>
> We are doing exactly as you described. We keep the injector as a global
> static var.
>
> But we extend from FlinkJobManager and FlinkTaskManager to override main
> method and initialize the injector (and other things) during JVM startup,
> which does cause tight code coupling. It is a little painful to upgrade
> Flink because sometimes internal code structure change of FlinkJobManager
> and FlinkTaskManager can break our extended class..
>
> Thanks,
> Steven
>
>
> On Tue, Mar 13, 2018 at 11:30 AM, XiaoChuan Yu 
> wrote:
>
>> Hi,
>>
>> I'm evaluating Flink with the intent to integrate it into a Java project
>> that uses a lot of dependency injection via Guice. What would be the best
>> way to work with DI/Guice given that injected fields aren't Serializable?
>> I looked at this StackOverflow answer so far. To my understanding the
>> strategy is as follows but I'm not sure about step 3:
>>
>>1. Use a RichFunction any time injection required.
>>2. Do not use @Inject, instead mark each injected field as transient.
>>3. Implement open() / close() and manually assign values to injected
>>fields using Injector.getInstance(SomeClass.class)? But where do I
>>get the injector? Create one on the spot each time? Keep one as a static
>>var somewhere and use everywhere?
>>
>> Example:
>>  public class MyFilter extends FilterFunction {
>>  private transient DbClient dbClient;
>>  //@Inject DbClient dbClient; //typical Guice field injection
>>
>>  public void open(Configuration parameters) {
>>  // where am I suppose to get the injector?
>>  // keep it as a static variable somewhere and init it in Main?
>>  this.dbClient = MyInjectorHolder.injector().ge
>> tInstance(DbClient.class);
>>  }
>>  public boolean filter(String value) {
>>  return this.dbClient.query(value);
>>  }
>>  }
>> I haven't setup a Flink environment to try the above yet though.
>> Does anyone know of a less verbose way?
>> I imagine this could get quite verbose with multiple injected fields.
>>
>> Thanks,
>> Xiaochuan Yu
>>
>>
>


Re: Dependency Injection and Flink

2018-03-13 Thread Steven Wu
Xiaochuan,

We are doing exactly as you described. We keep the injector as a global
static var.

But we extend from FlinkJobManager and FlinkTaskManager to override main
method and initialize the injector (and other things) during JVM startup,
which does cause tight code coupling. It is a little painful to upgrade
Flink because sometimes internal code structure change of FlinkJobManager
and FlinkTaskManager can break our extended class..

Thanks,
Steven


On Tue, Mar 13, 2018 at 11:30 AM, XiaoChuan Yu  wrote:

> Hi,
>
> I'm evaluating Flink with the intent to integrate it into a Java project
> that uses a lot of dependency injection via Guice. What would be the best
> way to work with DI/Guice given that injected fields aren't Serializable?
> I looked at this StackOverflow answer so far. To my understanding the
> strategy is as follows but I'm not sure about step 3:
>
>1. Use a RichFunction any time injection required.
>2. Do not use @Inject, instead mark each injected field as transient.
>3. Implement open() / close() and manually assign values to injected
>fields using Injector.getInstance(SomeClass.class)? But where do I get
>the injector? Create one on the spot each time? Keep one as a static var
>somewhere and use everywhere?
>
> Example:
>  public class MyFilter extends FilterFunction {
>  private transient DbClient dbClient;
>  //@Inject DbClient dbClient; //typical Guice field injection
>
>  public void open(Configuration parameters) {
>  // where am I suppose to get the injector?
>  // keep it as a static variable somewhere and init it in Main?
>  this.dbClient = MyInjectorHolder.injector().
> getInstance(DbClient.class);
>  }
>  public boolean filter(String value) {
>  return this.dbClient.query(value);
>  }
>  }
> I haven't setup a Flink environment to try the above yet though.
> Does anyone know of a less verbose way?
> I imagine this could get quite verbose with multiple injected fields.
>
> Thanks,
> Xiaochuan Yu
>
>


Dependency Injection and Flink

2018-03-13 Thread XiaoChuan Yu
Hi,

I'm evaluating Flink with the intent to integrate it into a Java project
that uses a lot of dependency injection via Guice. What would be the best
way to work with DI/Guice given that injected fields aren't Serializable?
I looked at this StackOverflow answer so far. To my understanding the
strategy is as follows but I'm not sure about step 3:

   1. Use a RichFunction any time injection required.
   2. Do not use @Inject, instead mark each injected field as transient.
   3. Implement open() / close() and manually assign values to injected
   fields using Injector.getInstance(SomeClass.class)? But where do I get the
   injector? Create one on the spot each time? Keep one as a static var
   somewhere and use everywhere?

Example:
 public class MyFilter extends FilterFunction {
 private transient DbClient dbClient;
 //@Inject DbClient dbClient; //typical Guice field injection

 public void open(Configuration parameters) {
 // where am I suppose to get the injector?
 // keep it as a static variable somewhere and init it in Main?
 this.dbClient =
MyInjectorHolder.injector().getInstance(DbClient.class);
 }
 public boolean filter(String value) {
 return this.dbClient.query(value);
 }
 }
I haven't setup a Flink environment to try the above yet though.
Does anyone know of a less verbose way?
I imagine this could get quite verbose with multiple injected fields.

Thanks,
Xiaochuan Yu