I'm new to Flink and in the process of trying to write a few operators and tests for them. One of the issues I've ran into is "how do I properly set up the dependencies for an operator". I've discovered the serialization constraints and learned about the execution some model as I've started to progress through it, but I'm still struggling to find an analog for dependency injection in Flink.
I was experimenting with different ways to supply configuration for the *Rich* functions to basically set themselves up and tear themselves down with their dependencies on open/close. I wanted to basically "inject" a dependency say like an HTTP client that caches, and then mock that dependency for a local test instead of actually making HTTP calls. It seemed like it could be done by getting and getting the correct implementation types from the config using some custom injector type (analogous to Spring or Guice dependency injection). I know I have to deal serialization of the operators, which is why I was thinking I could do this in open/close and have the magical injector be serializable (and possibly be part of the config). This may or may not be a bad idea already, but bear with me (and any feedback is very appreciated). I was doing some local testing using StreamExecutionEnvironment, but wasn't able to actually pass in configuration options to the local stream execution. I tried it these ways: 1. Create with a config - StreamExecutionEnvironment.createLocalEnvironment(1, configuration); 2. Configure the created LocalStreamEnvironment by env.getConfig().setGlobalJobParameters(configuration) 3. Configure thte DataStremSource<Integer> by source.getExecutionConfig().setGlobalJobParameters(configuration) 4. Configure the SingleOutputStreamOperator by mapped.getExecutionConfig().setGlobalJobParameters(configuration) All 4 of those failed, so I felt like I am doing something wrong here, and wanted to reach out. Here is the example code where all of those tests failing: import static org.assertj.core.api.Assertions.assertThat; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.DataStreamUtils; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.Test; import java.util.Iterator; public class FlinkInspection { @Test public void issueWithLocalStreamEnvironmentCreateWithConfiguration() throws Exception { Configuration configuration = new Configuration(); configuration.setInteger("key", 10); LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, configuration); DataStreamSource<Integer> source = env.fromElements(1, 2); SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator()); Iterator<Integer> collection = DataStreamUtils.collect(mapped); env.execute(); assertThat(collection).containsExactlyInAnyOrder(10, 20); } @Test public void issueWithLocalStreamEnvironmentConfiguredWithWithConfiguration() throws Exception { Configuration configuration = new Configuration(); configuration.setInteger("key", 10); LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); env.getConfig().setGlobalJobParameters(configuration); DataStreamSource<Integer> source = env.fromElements(1, 2); SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator()); Iterator<Integer> collection = DataStreamUtils.collect(mapped); env.execute(); assertThat(collection).containsExactlyInAnyOrder(10, 20); } @Test public void issueWithLocalStreamEnvironmentConfiguringDataStreamSource() throws Exception { LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); DataStreamSource<Integer> source = env.fromElements(1, 2); Configuration configuration = new Configuration(); configuration.setInteger("key", 10); source.getExecutionConfig().setGlobalJobParameters(configuration); SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator()); Iterator<Integer> collection = DataStreamUtils.collect(mapped); env.execute(); assertThat(collection).containsExactlyInAnyOrder(10, 20); } @Test public void issueWithLocalStreamEnvironmentConfiguringDataStreamWithOperator() throws Exception { LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); DataStreamSource<Integer> source = env.fromElements(1, 2); Configuration configuration = new Configuration(); configuration.setInteger("key", 10); SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator()); mapped.getExecutionConfig().setGlobalJobParameters(configuration); Iterator<Integer> collection = DataStreamUtils.collect(mapped); env.execute(); assertThat(collection).containsExactlyInAnyOrder(10, 20); } static class ConfigurationRetrievingOperator extends RichMapFunction<Integer, Integer> { private int factor = -1; @Override public Integer map(final Integer value) throws Exception { return value * factor; } @Override public void open(final Configuration parameters) throws Exception { factor = parameters.getInteger("key", 0); } } } 1. Any suggestions on how I should think about the dependency injection? 2. Are there ways to customize the Configuration that is passed into Rich functions? 3. Is this an issue with LocalStreamEnvironment, or am I doing something completely wrong?