This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b946ecc6683 [FLINK-33084][runtime] Migrate globalJobParameter to 
configuration.
b946ecc6683 is described below

commit b946ecc668342d48c2c0193ad4eff1897c75b68f
Author: JunRuiLee <jrlee....@gmail.com>
AuthorDate: Wed Sep 13 18:51:27 2023 +0800

    [FLINK-33084][runtime] Migrate globalJobParameter to configuration.
    
    This closes #23409.
---
 .../org/apache/flink/api/common/ExecutionConfig.java | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index f3fae490cf7..22aee331513 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -154,8 +154,6 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
 
     // ------------------------------- User code values 
--------------------------------------------
 
-    private GlobalJobParameters globalJobParameters = new 
GlobalJobParameters();
-
     // Serializers and types registered with Kryo and the PojoSerializer
     // we store them in linked maps/sets to ensure they are registered in 
order in all kryo
     // instances.
@@ -762,7 +760,10 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
     }
 
     public GlobalJobParameters getGlobalJobParameters() {
-        return globalJobParameters;
+        return configuration
+                .getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS)
+                .map(MapBasedJobParameters::new)
+                .orElse(new MapBasedJobParameters(Collections.emptyMap()));
     }
 
     /**
@@ -772,7 +773,11 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
      */
     public void setGlobalJobParameters(GlobalJobParameters 
globalJobParameters) {
         Preconditions.checkNotNull(globalJobParameters, "globalJobParameters 
shouldn't be null");
-        this.globalJobParameters = globalJobParameters;
+        setGlobalJobParameters(globalJobParameters.toMap());
+    }
+
+    private void setGlobalJobParameters(Map<String, String> parameters) {
+        configuration.set(PipelineOptions.GLOBAL_JOB_PARAMETERS, parameters);
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -985,7 +990,6 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
                             || (null != restartStrategyConfiguration
                                     && restartStrategyConfiguration.equals(
                                             
other.restartStrategyConfiguration)))
-                    && Objects.equals(globalJobParameters, 
other.globalJobParameters)
                     && registeredTypesWithKryoSerializerClasses.equals(
                             other.registeredTypesWithKryoSerializerClasses)
                     && 
defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses)
@@ -1002,7 +1006,6 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
         return Objects.hash(
                 configuration,
                 restartStrategyConfiguration,
-                globalJobParameters,
                 registeredTypesWithKryoSerializerClasses,
                 defaultKryoSerializerClasses,
                 registeredKryoTypes,
@@ -1018,8 +1021,6 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
                 + executionRetryDelay
                 + ", restartStrategyConfiguration="
                 + restartStrategyConfiguration
-                + ", globalJobParameters="
-                + globalJobParameters
                 + ", registeredTypesWithKryoSerializers="
                 + registeredTypesWithKryoSerializers
                 + ", registeredTypesWithKryoSerializerClasses="
@@ -1150,7 +1151,6 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
         
configuration.getOptional(PipelineOptions.FORCE_KRYO).ifPresent(this::setForceKryo);
         configuration
                 .getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS)
-                .<GlobalJobParameters>map(MapBasedJobParameters::new)
                 .ifPresent(this::setGlobalJobParameters);
 
         configuration
@@ -1202,7 +1202,7 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
     /**
      * @return A copy of internal {@link #configuration}. Note it is missing 
all options that are
      *     stored as plain java fields in {@link ExecutionConfig}, for example 
{@link
-     *     #registeredKryoTypes} or {@link #globalJobParameters}.
+     *     #registeredKryoTypes}.
      */
     @Internal
     public Configuration toConfiguration() {

Reply via email to