This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executors-clean in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2b1fd1fcc80657215ad325a9191ae5e3e77fffda Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Mon Nov 18 14:20:46 2019 +0100 [hotfix] Make the DefaultExecutorServiceLoader a singleton --- .../flink/core/execution/DefaultExecutorServiceLoader.java | 12 +++++++++--- .../java/org/apache/flink/api/java/ExecutionEnvironment.java | 2 +- .../api/environment/StreamExecutionEnvironment.java | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java index 241feab..8bde967 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java @@ -45,6 +45,8 @@ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader { private static final ServiceLoader<ExecutorFactory> defaultLoader = ServiceLoader.load(ExecutorFactory.class); + public static final DefaultExecutorServiceLoader INSTANCE = new DefaultExecutorServiceLoader(); + @Override public ExecutorFactory getExecutorFactory(final Configuration configuration) { checkNotNull(configuration); @@ -67,14 +69,18 @@ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader { } if (compatibleFactories.size() > 1) { - final List<String> configStr = + final String configStr = configuration.toMap().entrySet().stream() .map(e -> e.getKey() + "=" + e.getValue()) - .collect(Collectors.toList()); + .collect(Collectors.joining("\n")); - throw new IllegalStateException("Multiple compatible client factories found for:\n" + String.join("\n", configStr) + "."); + throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + "."); } return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0); } + + private DefaultExecutorServiceLoader() { + // make sure nobody instantiates us explicitly. + } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 5b07843..26632e6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -140,7 +140,7 @@ public class ExecutionEnvironment { } protected ExecutionEnvironment(final Configuration executorConfiguration) { - this(new DefaultExecutorServiceLoader(), executorConfiguration); + this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration); } protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 3870b52..ba702ea 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -163,7 +163,7 @@ public class StreamExecutionEnvironment { } public StreamExecutionEnvironment(final Configuration executorConfiguration) { - this(new DefaultExecutorServiceLoader(), executorConfiguration); + this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration); } public StreamExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {