This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b946ecc6683 [FLINK-33084][runtime] Migrate globalJobParameter to configuration. b946ecc6683 is described below commit b946ecc668342d48c2c0193ad4eff1897c75b68f Author: JunRuiLee <jrlee....@gmail.com> AuthorDate: Wed Sep 13 18:51:27 2023 +0800 [FLINK-33084][runtime] Migrate globalJobParameter to configuration. This closes #23409. --- .../org/apache/flink/api/common/ExecutionConfig.java | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index f3fae490cf7..22aee331513 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -154,8 +154,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut // ------------------------------- User code values -------------------------------------------- - private GlobalJobParameters globalJobParameters = new GlobalJobParameters(); - // Serializers and types registered with Kryo and the PojoSerializer // we store them in linked maps/sets to ensure they are registered in order in all kryo // instances. @@ -762,7 +760,10 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut } public GlobalJobParameters getGlobalJobParameters() { - return globalJobParameters; + return configuration + .getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS) + .map(MapBasedJobParameters::new) + .orElse(new MapBasedJobParameters(Collections.emptyMap())); } /** @@ -772,7 +773,11 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut */ public void setGlobalJobParameters(GlobalJobParameters globalJobParameters) { Preconditions.checkNotNull(globalJobParameters, "globalJobParameters shouldn't be null"); - this.globalJobParameters = globalJobParameters; + setGlobalJobParameters(globalJobParameters.toMap()); + } + + private void setGlobalJobParameters(Map<String, String> parameters) { + configuration.set(PipelineOptions.GLOBAL_JOB_PARAMETERS, parameters); } // -------------------------------------------------------------------------------------------- @@ -985,7 +990,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut || (null != restartStrategyConfiguration && restartStrategyConfiguration.equals( other.restartStrategyConfiguration))) - && Objects.equals(globalJobParameters, other.globalJobParameters) && registeredTypesWithKryoSerializerClasses.equals( other.registeredTypesWithKryoSerializerClasses) && defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses) @@ -1002,7 +1006,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut return Objects.hash( configuration, restartStrategyConfiguration, - globalJobParameters, registeredTypesWithKryoSerializerClasses, defaultKryoSerializerClasses, registeredKryoTypes, @@ -1018,8 +1021,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut + executionRetryDelay + ", restartStrategyConfiguration=" + restartStrategyConfiguration - + ", globalJobParameters=" - + globalJobParameters + ", registeredTypesWithKryoSerializers=" + registeredTypesWithKryoSerializers + ", registeredTypesWithKryoSerializerClasses=" @@ -1150,7 +1151,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut configuration.getOptional(PipelineOptions.FORCE_KRYO).ifPresent(this::setForceKryo); configuration .getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS) - .<GlobalJobParameters>map(MapBasedJobParameters::new) .ifPresent(this::setGlobalJobParameters); configuration @@ -1202,7 +1202,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut /** * @return A copy of internal {@link #configuration}. Note it is missing all options that are * stored as plain java fields in {@link ExecutionConfig}, for example {@link - * #registeredKryoTypes} or {@link #globalJobParameters}. + * #registeredKryoTypes}. */ @Internal public Configuration toConfiguration() {