Hi,

The passing in of a Configuration instance in the open method is actually a 
leftover artifact of the DataStream API that remains only due to API backwards 
compatibility reasons.
There’s actually no way to modify what configuration is retrieved there (and it 
is actually always a new empty Configuration).

Normally, to inject dependencies into your operators, you would simply do that 
be supplying it through the constructor of the operator, and store it as class 
fields for future use in the operator work methods.
Make sure that they are serializable, as the operator will need to be 
serialized when deploying the job. I’m assuming that this should be possible 
for you anyway, since you were trying to write that information into the 
Configuration.

Hope this helps!

Cheers,
Gordon

On 20 September 2017 at 11:25:41 PM, Michael Kobit (mko...@gmail.com) wrote:

I'm new to Flink and in the process of trying to write a few operators and 
tests for them. One of the issues I've ran into is "how do I properly set up 
the dependencies for an operator". I've discovered the serialization 
constraints and learned about the execution some model as I've started to 
progress through it, but I'm still struggling to find an analog for dependency 
injection in Flink.

I was experimenting with different ways to supply configuration for the *Rich* 
functions to basically set themselves up and tear themselves down with their 
dependencies on open/close. I wanted to basically "inject" a dependency say 
like an HTTP client that caches, and then mock that dependency for a local test 
instead of actually making HTTP calls. It seemed like it could be done by 
getting and getting the correct implementation types from the config using some 
custom injector type (analogous to Spring or Guice dependency injection). I 
know I have to deal serialization of the operators, which is why I was thinking 
I could do this in open/close and have the magical injector be serializable 
(and possibly be part of the config). This may or may not be a bad idea 
already, but bear with me (and any feedback is very appreciated).

I was doing some local testing using StreamExecutionEnvironment, but wasn't 
able to actually pass in configuration options to the local stream execution.

I tried it these ways:
Create with a config - StreamExecutionEnvironment.createLocalEnvironment(1, 
configuration);
Configure the created LocalStreamEnvironment by 
env.getConfig().setGlobalJobParameters(configuration)
Configure thte DataStremSource<Integer> by 
source.getExecutionConfig().setGlobalJobParameters(configuration)
Configure the SingleOutputStreamOperator by 
mapped.getExecutionConfig().setGlobalJobParameters(configuration)
All 4 of those failed, so I felt like I am doing something wrong here, and 
wanted to reach out.

Here is the example code where all of those tests failing:

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Test;

import java.util.Iterator;

public class FlinkInspection {

    @Test
    public void issueWithLocalStreamEnvironmentCreateWithConfiguration() throws 
Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);
        LocalStreamEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
        DataStreamSource<Integer> source = env.fromElements(1, 2);

        SingleOutputStreamOperator<Integer> mapped = source.map(new 
ConfigurationRetrievingOperator());

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    @Test
    public void 
issueWithLocalStreamEnvironmentConfiguredWithWithConfiguration() throws 
Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);
        LocalStreamEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
        env.getConfig().setGlobalJobParameters(configuration);
        DataStreamSource<Integer> source = env.fromElements(1, 2);

        SingleOutputStreamOperator<Integer> mapped = source.map(new 
ConfigurationRetrievingOperator());

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    @Test
    public void issueWithLocalStreamEnvironmentConfiguringDataStreamSource() 
throws Exception {
        LocalStreamEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
        DataStreamSource<Integer> source = env.fromElements(1, 2);
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);
        source.getExecutionConfig().setGlobalJobParameters(configuration);

        SingleOutputStreamOperator<Integer> mapped = source.map(new 
ConfigurationRetrievingOperator());

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    @Test
    public void 
issueWithLocalStreamEnvironmentConfiguringDataStreamWithOperator() throws 
Exception {
        LocalStreamEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
        DataStreamSource<Integer> source = env.fromElements(1, 2);
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);

        SingleOutputStreamOperator<Integer> mapped = source.map(new 
ConfigurationRetrievingOperator());
        mapped.getExecutionConfig().setGlobalJobParameters(configuration);

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    static class ConfigurationRetrievingOperator extends 
RichMapFunction<Integer, Integer> {

        private int factor = -1;

        @Override
        public Integer map(final Integer value) throws Exception {
            return value * factor;
        }

        @Override
        public void open(final Configuration parameters) throws Exception {
            factor = parameters.getInteger("key", 0);
        }
    }
}
Any suggestions on how I should think about the dependency injection?
Are there ways to customize the Configuration that is passed into Rich 
functions?
Is this an issue with LocalStreamEnvironment, or am I doing something 
completely wrong?

Reply via email to