This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ac84e0c84dee39da7339722aad5ed2dcf26e2d27 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Wed Jun 5 15:03:01 2019 +0200 [FLINK-12101] Deduplicate code by introducing ExecutionEnvironment#resolveFactory ExecutionEnvironment#resolveFactory selects between the thread local and the global factory. This method is used by the ExecutionEnvironment as well as the StreamExecutionEnvironment. This closes #8543. --- .../apache/flink/api/java/ExecutionEnvironment.java | 18 ++++++++---------- .../main/java/org/apache/flink/api/java/Utils.java | 21 +++++++++++++++++++++ .../api/environment/StreamExecutionEnvironment.java | 20 ++++++++++---------- 3 files changed, 39 insertions(+), 20 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index f9fc1ef..be927c5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -102,10 +102,10 @@ public abstract class ExecutionEnvironment { protected static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class); /** The environment of the context (local by default, cluster if invoked through command line). */ - private static ExecutionEnvironmentFactory contextEnvironmentFactory; + private static ExecutionEnvironmentFactory contextEnvironmentFactory = null; /** The ThreadLocal used to store {@link ExecutionEnvironmentFactory}. */ - private static ThreadLocal<ExecutionEnvironmentFactory> contextEnvironmentFactoryThreadLocal = new ThreadLocal<>(); + private static final ThreadLocal<ExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory = new ThreadLocal<>(); /** The default parallelism used by local environments. */ private static int defaultLocalDop = Runtime.getRuntime().availableProcessors(); @@ -1064,11 +1064,9 @@ public abstract class ExecutionEnvironment { * @return The execution environment of the context in which the program is executed. */ public static ExecutionEnvironment getExecutionEnvironment() { - - return contextEnvironmentFactoryThreadLocal.get() == null ? - (contextEnvironmentFactory == null ? - createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment()) : - contextEnvironmentFactoryThreadLocal.get().createExecutionEnvironment(); + return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory) + .map(ExecutionEnvironmentFactory::createExecutionEnvironment) + .orElseGet(ExecutionEnvironment::createLocalEnvironment); } /** @@ -1259,7 +1257,7 @@ public abstract class ExecutionEnvironment { */ protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx) { contextEnvironmentFactory = Preconditions.checkNotNull(ctx); - contextEnvironmentFactoryThreadLocal.set(contextEnvironmentFactory); + threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory); } /** @@ -1269,7 +1267,7 @@ public abstract class ExecutionEnvironment { */ protected static void resetContextEnvironment() { contextEnvironmentFactory = null; - contextEnvironmentFactoryThreadLocal.remove(); + threadLocalContextEnvironmentFactory.remove(); } /** @@ -1281,6 +1279,6 @@ public abstract class ExecutionEnvironment { */ @Internal public static boolean areExplicitEnvironmentsAllowed() { - return contextEnvironmentFactory == null && contextEnvironmentFactoryThreadLocal.get() == null; + return contextEnvironmentFactory == null && threadLocalContextEnvironmentFactory.get() == null; } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java index ed86f7d..c514b33 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java @@ -31,9 +31,12 @@ import org.apache.flink.configuration.Configuration; import org.apache.commons.lang3.StringUtils; +import javax.annotation.Nullable; + import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.util.Optional; import java.util.Random; /** @@ -296,6 +299,24 @@ public final class Utils { return ret; } + // -------------------------------------------------------------------------------------------- + + /** + * Resolves the given factories. The thread local factory has preference over the static factory. + * If none is set, the method returns {@link Optional#empty()}. + * + * @param threadLocalFactory containing the thread local factory + * @param staticFactory containing the global factory + * @param <T> type of factory + * @return Optional containing the resolved factory if it exists, otherwise it's empty + */ + public static <T> Optional<T> resolveFactory(ThreadLocal<T> threadLocalFactory, @Nullable T staticFactory) { + final T localFactory = threadLocalFactory.get(); + final T factory = localFactory == null ? staticFactory : localFactory; + + return Optional.ofNullable(factory); + } + /** * Private constructor to prevent instantiation. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index f93fd4c..ffa2d47 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -34,6 +34,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -115,10 +116,10 @@ public abstract class StreamExecutionEnvironment { /** * The environment of the context (local by default, cluster if invoked through command line). */ - private static StreamExecutionEnvironmentFactory contextEnvironmentFactory; + private static StreamExecutionEnvironmentFactory contextEnvironmentFactory = null; /** The ThreadLocal used to store {@link StreamExecutionEnvironmentFactory}. */ - private static ThreadLocal<StreamExecutionEnvironmentFactory> contextEnvironmentFactoryThreadLocal = new ThreadLocal<>(); + private static final ThreadLocal<StreamExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory = new ThreadLocal<>(); /** The default parallelism used when creating a local environment. */ private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors(); @@ -1571,13 +1572,12 @@ public abstract class StreamExecutionEnvironment { * executed. */ public static StreamExecutionEnvironment getExecutionEnvironment() { - if (contextEnvironmentFactoryThreadLocal.get() != null) { - return contextEnvironmentFactoryThreadLocal.get().createExecutionEnvironment(); - } - if (contextEnvironmentFactory != null) { - return contextEnvironmentFactory.createExecutionEnvironment(); - } + return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory) + .map(StreamExecutionEnvironmentFactory::createExecutionEnvironment) + .orElseGet(StreamExecutionEnvironment::createStreamExecutionEnvironment); + } + private static StreamExecutionEnvironment createStreamExecutionEnvironment() { // because the streaming project depends on "flink-clients" (and not the other way around) // we currently need to intercept the data set environment and create a dependent stream env. // this should be fixed once we rework the project dependencies @@ -1772,12 +1772,12 @@ public abstract class StreamExecutionEnvironment { protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) { contextEnvironmentFactory = ctx; - contextEnvironmentFactoryThreadLocal.set(contextEnvironmentFactory); + threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory); } protected static void resetContextEnvironment() { contextEnvironmentFactory = null; - contextEnvironmentFactoryThreadLocal.remove(); + threadLocalContextEnvironmentFactory.remove(); } /**