Thanks for the response.

That is a bit surprising that it is always a new instance given the various
API signatures that take in a Configuration instance. The best practices
docs (
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-the-parameters-in-your-flink-program
)
also sort of mention it, but I just noticed most of those seem like they
are for the DataSet API rather than the DataStream API (I don't know if
there is a big difference between the programming APIs there). I'm still
new to these things, so I could be making invalid assumptions, too.

I think I have a simple idea for how to get dependency style injection
working anyways by.

   - Pass in a Serializable "injector"/proxy object in the constructor
   - In the "open" (or body of the function) get the things/initialize
   stuff I want that may or may not be Serializable, e.g. an HTTP client or
   database connection from that object
   - Don't use the Configuration instance since it doesn't do anything
   anyways

I haven't thought through any possible security holes or considerations
with this approach yet.

Thanks for the response, that clears up my confusion - now just to explore
and find some better ways to test this stuff!

On Fri, Sep 22, 2017 at 11:51 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi,
>
> The passing in of a Configuration instance in the open method is actually
> a leftover artifact of the DataStream API that remains only due to API
> backwards compatibility reasons.
> There’s actually no way to modify what configuration is retrieved there
> (and it is actually always a new empty Configuration).
>
> Normally, to inject dependencies into your operators, you would simply do
> that be supplying it through the constructor of the operator, and store it
> as class fields for future use in the operator work methods.
> Make sure that they are serializable, as the operator will need to be
> serialized when deploying the job. I’m assuming that this should be
> possible for you anyway, since you were trying to write that information
> into the Configuration.
>
> Hope this helps!
>
> Cheers,
> Gordon
>
>
> On 20 September 2017 at 11:25:41 PM, Michael Kobit (mko...@gmail.com)
> wrote:
>
> I'm new to Flink and in the process of trying to write a few operators and
> tests for them. One of the issues I've ran into is "how do I properly set
> up the dependencies for an operator". I've discovered the serialization
> constraints and learned about the execution some model as I've started to
> progress through it, but I'm still struggling to find an analog for
> dependency injection in Flink.
>
> I was experimenting with different ways to supply configuration for the
> *Rich* functions to basically set themselves up and tear themselves down
> with their dependencies on open/close. I wanted to basically "inject" a
> dependency say like an HTTP client that caches, and then mock that
> dependency for a local test instead of actually making HTTP calls. It
> seemed like it could be done by getting and getting the correct
> implementation types from the config using some custom injector type
> (analogous to Spring or Guice dependency injection). I know I have to deal
> serialization of the operators, which is why I was thinking I could do this
> in open/close and have the magical injector be serializable (and possibly
> be part of the config). This may or may not be a bad idea already, but bear
> with me (and any feedback is very appreciated).
>
> I was doing some local testing using StreamExecutionEnvironment, but
> wasn't able to actually pass in configuration options to the local stream
> execution.
>
> I tried it these ways:
>
>    1. Create with a config
>    - StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
>    2. Configure the created LocalStreamEnvironment
>    by env.getConfig().setGlobalJobParameters(configuration)
>    3. Configure thte DataStremSource<Integer>
>    by source.getExecutionConfig().setGlobalJobParameters(configuration)
>    4. Configure the SingleOutputStreamOperator
>    by mapped.getExecutionConfig().setGlobalJobParameters(configuration)
>
> All 4 of those failed, so I felt like I am doing something wrong here, and
> wanted to reach out.
>
> Here is the example code where all of those tests failing:
>
> import static org.assertj.core.api.Assertions.assertThat;
>
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.contrib.streaming.DataStreamUtils;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.junit.Test;
>
> import java.util.Iterator;
>
> public class FlinkInspection {
>
>     @Test
>     public void issueWithLocalStreamEnvironmentCreateWithConfiguration() 
> throws Exception {
>         Configuration configuration = new Configuration();
>         configuration.setInteger("key", 10);
>         LocalStreamEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
>         DataStreamSource<Integer> source = env.fromElements(1, 2);
>
>         SingleOutputStreamOperator<Integer> mapped = source.map(new 
> ConfigurationRetrievingOperator());
>
>         Iterator<Integer> collection = DataStreamUtils.collect(mapped);
>         env.execute();
>
>         assertThat(collection).containsExactlyInAnyOrder(10, 20);
>     }
>
>     @Test
>     public void 
> issueWithLocalStreamEnvironmentConfiguredWithWithConfiguration() throws 
> Exception {
>         Configuration configuration = new Configuration();
>         configuration.setInteger("key", 10);
>         LocalStreamEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1);
>         env.getConfig().setGlobalJobParameters(configuration);
>         DataStreamSource<Integer> source = env.fromElements(1, 2);
>
>         SingleOutputStreamOperator<Integer> mapped = source.map(new 
> ConfigurationRetrievingOperator());
>
>         Iterator<Integer> collection = DataStreamUtils.collect(mapped);
>         env.execute();
>
>         assertThat(collection).containsExactlyInAnyOrder(10, 20);
>     }
>
>     @Test
>     public void issueWithLocalStreamEnvironmentConfiguringDataStreamSource() 
> throws Exception {
>         LocalStreamEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1);
>         DataStreamSource<Integer> source = env.fromElements(1, 2);
>         Configuration configuration = new Configuration();
>         configuration.setInteger("key", 10);
>         source.getExecutionConfig().setGlobalJobParameters(configuration);
>
>         SingleOutputStreamOperator<Integer> mapped = source.map(new 
> ConfigurationRetrievingOperator());
>
>         Iterator<Integer> collection = DataStreamUtils.collect(mapped);
>         env.execute();
>
>         assertThat(collection).containsExactlyInAnyOrder(10, 20);
>     }
>
>     @Test
>     public void 
> issueWithLocalStreamEnvironmentConfiguringDataStreamWithOperator() throws 
> Exception {
>         LocalStreamEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(1);
>         DataStreamSource<Integer> source = env.fromElements(1, 2);
>         Configuration configuration = new Configuration();
>         configuration.setInteger("key", 10);
>
>         SingleOutputStreamOperator<Integer> mapped = source.map(new 
> ConfigurationRetrievingOperator());
>         mapped.getExecutionConfig().setGlobalJobParameters(configuration);
>
>         Iterator<Integer> collection = DataStreamUtils.collect(mapped);
>         env.execute();
>
>         assertThat(collection).containsExactlyInAnyOrder(10, 20);
>     }
>
>     static class ConfigurationRetrievingOperator extends 
> RichMapFunction<Integer, Integer> {
>
>         private int factor = -1;
>
>         @Override
>         public Integer map(final Integer value) throws Exception {
>             return value * factor;
>         }
>
>         @Override
>         public void open(final Configuration parameters) throws Exception {
>             factor = parameters.getInteger("key", 0);
>         }
>     }
> }
>
>
>    1. Any suggestions on how I should think about the dependency
>    injection?
>    2. Are there ways to customize the Configuration that is passed into
>    Rich functions?
>    3. Is this an issue with LocalStreamEnvironment, or am I doing
>    something completely wrong?
>
>

Reply via email to