Repository: flink
Updated Branches:
  refs/heads/release-1.3 670b23e97 -> 471263cfe


[FLINK-6461] Deprecate web config defaults in ConfigConstants

This closes #3831.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9708550a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9708550a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9708550a

Branch: refs/heads/release-1.3
Commit: 9708550a40127f833728bcc85847035c3b5fbad8
Parents: 60deaef
Author: zentol <ches...@apache.org>
Authored: Fri May 5 12:39:55 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Sun May 14 14:06:28 2017 +0200

----------------------------------------------------------------------
 .../client/program/StandaloneClusterClient.java |  5 +-
 .../flink/configuration/ConfigConstants.java    | 65 +++++++++++++++++---
 .../flink/configuration/JobManagerOptions.java  |  7 +++
 .../flink/api/java/ExecutionEnvironment.java    |  4 --
 .../runtime/webmonitor/WebMonitorConfig.java    | 36 +----------
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 32 +++-------
 .../webmonitor/WebRuntimeMonitorITCase.java     | 13 ++--
 .../executiongraph/ExecutionGraphBuilder.java   |  5 +-
 .../runtime/webmonitor/WebMonitorUtils.java     |  6 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  3 +-
 .../environment/StreamExecutionEnvironment.java |  4 --
 .../apache/flink/test/util/TestBaseUtils.java   |  5 +-
 .../flink/test/web/WebFrontendITCase.java       |  5 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  3 +-
 14 files changed, 98 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9708550a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index fd179c0..7517504 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -18,8 +18,8 @@
 package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -53,8 +53,7 @@ public class StandaloneClusterClient extends ClusterClient {
        @Override
        public String getWebInterfaceURL() {
                String host = this.getJobManagerAddress().getHostString();
-               int port = 
getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
-                       ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+               int port = 
getFlinkConfiguration().getInteger(JobManagerOptions.WEB_PORT);
                return "http://"; +  host + ":" + port;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9708550a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 92e6b5d..c3704be 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1338,36 +1338,81 @@ public final class ConfigConstants {
                key("jobmanager.web.address")
                        .noDefaultValue();
 
-       /** The config key for the port of the JobManager web frontend.
-        * Setting this value to {@code -1} disables the web frontend. */
+       /**
+        * The config key for the port of the JobManager web frontend.
+        * Setting this value to {@code -1} disables the web frontend.
+        *
+        * @deprecated use {@link JobManagerOptions#WEB_PORT} instead
+        */
+       @Deprecated
        public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
 
-       /** Default value to override SSL support for the JobManager web UI */
+       /**
+        * Default value to override SSL support for the JobManager web UI
+        *
+        * @deprecated use {@link JobManagerOptions#WEB_SSL_ENABLED} instead
+        */
+       @Deprecated
        public static final boolean DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED = true;
 
-       /** The default number of archived jobs for the jobmanager */
+       /**
+        * The default number of archived jobs for the jobmanager
+        *
+        * @deprecated use {@link JobManagerOptions#WEB_ARCHIVE_COUNT} instead
+        */
+       @Deprecated
        public static final int DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT = 5;
 
-       /** By default, submitting jobs from the web-frontend is allowed. */
+       /**
+        * By default, submitting jobs from the web-frontend is allowed.
+        *
+        * @deprecated use {@link JobManagerOptions#WEB_SUBMIT_ENABLE} instead
+        */
+       @Deprecated
        public static final boolean DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED = 
true;
 
        /** @deprecated Config key has been deprecated. Therefore, no default 
value required. */
        @Deprecated
        public static final boolean DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE 
= false;
 
-       /** Default number of checkpoints to remember for recent history. */
+       /**
+        * Default number of checkpoints to remember for recent history.
+        *
+        * @deprecated use {@link 
JobManagerOptions#WEB_CHECKPOINTS_HISTORY_SIZE} instead
+        */
+       @Deprecated
        public static final int 
DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = 10;
 
-       /** Time after which cached stats are cleaned up. */
+       /**
+        * Time after which cached stats are cleaned up.
+        *
+        * @@deprecated use {@link 
JobManagerOptions#WEB_BACKPRESSURE_CLEANUP_INTERVAL} instead
+        */
+       @Deprecated
        public static final int 
DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = 10 * 60 * 1000;
 
-       /** Time after which available stats are deprecated and need to be 
refreshed (by resampling). */
+       /**
+        * Time after which available stats are deprecated and need to be 
refreshed (by resampling).
+        *
+        * @deprecated use {@link 
JobManagerOptions#WEB_BACKPRESSURE_REFRESH_INTERVAL} instead
+        */
+       @Deprecated
        public static final int 
DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = 60 * 1000;
 
-       /** Number of samples to take to determine back pressure. */
+       /**
+        * Number of samples to take to determine back pressure.
+        *
+        * @deprecated use {@link 
JobManagerOptions#WEB_BACKPRESSURE_NUM_SAMPLES} instead
+        */
+       @Deprecated
        public static final int 
DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = 100;
 
-       /** Delay between samples to determine back pressure. */
+       /**
+        * Delay between samples to determine back pressure.
+        *
+        * @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_DELAY} 
instead
+        */
+       @Deprecated
        public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = 
50;
 
        // ------------------------------ Akka Values 
------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9708550a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index b924e8e..76b6bed 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -116,6 +116,13 @@ public class JobManagerOptions {
                        .defaultValue("*");
 
        /**
+        * The config parameter defining the refresh interval for the 
web-frontend.
+        */
+       public static final ConfigOption<Long> WEB_REFRESH_INTERVAL =
+               key("jobmanager.web.refresh-interval")
+                       .defaultValue(3000L);
+       
+       /**
         * Config parameter to override SSL support for the JobManager Web UI
         */
        public static final ConfigOption<Boolean> WEB_SSL_ENABLED =

http://git-wip-us.apache.org/repos/asf/flink/blob/9708550a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 709ef09..3d8a384 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -1235,10 +1235,6 @@ public abstract class ExecutionEnvironment {
        public static ExecutionEnvironment 
createLocalEnvironmentWithWebUI(Configuration conf) {
                checkNotNull(conf, "conf");
 
-               if 
(!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
-                       int port = 
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
-                       
conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
-               }
                conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
                LocalEnvironment localEnv = new LocalEnvironment(conf);

http://git-wip-us.apache.org/repos/asf/flink/blob/9708550a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
index dba2145..77537a2 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -18,39 +18,11 @@
 
 package org.apache.flink.runtime.webmonitor;
 
-
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 
 public class WebMonitorConfig {
 
-       // 
------------------------------------------------------------------------
-       //  Config Keys
-       // 
------------------------------------------------------------------------
-
-       /** The port for the runtime monitor web-frontend server. */
-       public static final String JOB_MANAGER_WEB_PORT_KEY = 
ConfigConstants.JOB_MANAGER_WEB_PORT_KEY;
-
-       /** The initial refresh interval for the web dashboard */
-       public static final String JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY = 
"jobmanager.web.refresh-interval";
-
-
-       // 
------------------------------------------------------------------------
-       //  Default values
-       // 
------------------------------------------------------------------------
-
-       /** Default port for the web dashboard (= 8081) */
-       public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
-
-       /** Default refresh interval for the web dashboard (= 3000 msecs) */
-       public static final long DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL = 
3000;
-
-
-       // 
------------------------------------------------------------------------
-       //  Config
-       // 
------------------------------------------------------------------------
-
        /** The configuration queried by this config object */
        private final Configuration config;
 
@@ -67,17 +39,15 @@ public class WebMonitorConfig {
        }
 
        public int getWebFrontendPort() {
-               return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, 
DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+               return config.getInteger(JobManagerOptions.WEB_PORT);
        }
 
        public long getRefreshInterval() {
-               return config.getLong(JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY, 
DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL);
+               return config.getLong(JobManagerOptions.WEB_REFRESH_INTERVAL);
        }
        
        public boolean isProgramSubmitEnabled() {
-               return config.getBoolean(
-                       ConfigConstants.JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY,
-                       ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED);
+               return config.getBoolean(JobManagerOptions.WEB_SUBMIT_ENABLE);
        }
 
        public String getAllowOrigin() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9708550a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index f83fa27..03b53ad 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -25,6 +25,7 @@ import io.netty.handler.codec.http.router.Router;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -191,21 +192,13 @@ public class WebRuntimeMonitor implements WebMonitor {
                stackTraceSamples = new 
StackTraceSampleCoordinator(actorSystem.dispatcher(), 60000);
 
                // Back pressure stats tracker config
-               int cleanUpInterval = config.getInteger(
-                               
ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL,
-                               
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL);
+               int cleanUpInterval = 
config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_CLEANUP_INTERVAL);
 
-               int refreshInterval = config.getInteger(
-                               
ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL,
-                               
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL);
+               int refreshInterval = 
config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_REFRESH_INTERVAL);
 
-               int numSamples = config.getInteger(
-                               
ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES,
-                               
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES);
+               int numSamples = 
config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_NUM_SAMPLES);
 
-               int delay = config.getInteger(
-                               
ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_DELAY,
-                               
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY);
+               int delay = 
config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_DELAY);
 
                Time delayBetweenSamples = Time.milliseconds(delay);
 
@@ -219,10 +212,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                ExecutionContextExecutor context = 
ExecutionContext$.MODULE$.fromExecutor(executorService);
 
                // Config to enable https access to the web-ui
-               boolean enableSSL = config.getBoolean(
-                               ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED,
-                               
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) &&
-                       SSLUtils.getSSLEnabled(config);
+               boolean enableSSL = 
config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) &&     
SSLUtils.getSSLEnabled(config);
 
                if (enableSSL) {
                        LOG.info("Enabling ssl for the web frontend");
@@ -310,9 +300,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                // DELETE is the preferred way of stopping a job (Rest-conform)
                DELETE(router, new JobStoppingHandler());
 
-               int maxCachedEntries = config.getInteger(
-                               
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
-                       
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
+               int maxCachedEntries = 
config.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE);
                CheckpointStatsCache cache = new 
CheckpointStatsCache(maxCachedEntries);
 
                // Register the checkpoint stats handlers
@@ -525,14 +513,14 @@ public class WebRuntimeMonitor implements WebMonitor {
        }
 
        private String getBaseDirStr(Configuration configuration) {
-               return 
configuration.getString(ConfigConstants.JOB_MANAGER_WEB_TMPDIR_KEY, 
System.getProperty("java.io.tmpdir"));
+               return configuration.getString(JobManagerOptions.WEB_TMP_DIR);
        }
 
        private File getUploadDir(Configuration configuration) {
-               File baseDir = new 
File(configuration.getString(ConfigConstants.JOB_MANAGER_WEB_UPLOAD_DIR_KEY,
+               File baseDir = new 
File(configuration.getString(JobManagerOptions.WEB_UPLOAD_DIR,
                        getBaseDirStr(configuration)));
 
-               boolean uploadDirSpecified = 
configuration.containsKey(ConfigConstants.JOB_MANAGER_WEB_UPLOAD_DIR_KEY);
+               boolean uploadDirSpecified = 
configuration.contains(JobManagerOptions.WEB_UPLOAD_DIR);
                return uploadDirSpecified ? baseDir : new File(baseDir, 
"flink-web-" + UUID.randomUUID());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9708550a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 5ccfe90..a51a234 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -137,8 +138,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                        Path logFile = Files.createFile(new File(logDir, 
"jobmanager.log").toPath());
                        Files.createFile(new File(logDir, 
"jobmanager.out").toPath());
 
-                       
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-                       
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.toString());
+                       config.setInteger(JobManagerOptions.WEB_PORT, 0);
+                       config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.toString());
 
                        highAvailabilityServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
                                config,
@@ -286,8 +287,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                        Files.createFile(new File(logDir, 
"jobmanager.out").toPath());
 
                        final Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-                       
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.toString());
+                       config.setInteger(JobManagerOptions.WEB_PORT, 0);
+                       config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.toString());
                        config.setString(HighAvailabilityOptions.HA_MODE, 
"ZOOKEEPER");
                        
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zooKeeper.getConnectString());
 
@@ -463,8 +464,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 
                // Web frontend on random port
                Configuration config = new Configuration();
-               config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-               config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.toString());
+               config.setInteger(JobManagerOptions.WEB_PORT, 0);
+               config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.toString());
 
                WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
                        config,

http://git-wip-us.apache.org/repos/asf/flink/blob/9708550a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index de0d9d0..0e76cfb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -205,9 +206,7 @@ public class ExecutionGraphBuilder {
                        }
 
                        // Maximum number of remembered checkpoints
-                       int historySize = jobManagerConfig.getInteger(
-                                       
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
-                                       
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
+                       int historySize = 
jobManagerConfig.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE);
 
                        CheckpointStatsTracker checkpointStatsTracker = new 
CheckpointStatsTracker(
                                        historySize,

http://git-wip-us.apache.org/repos/asf/flink/blob/9708550a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 2baadb5..dd9527e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -25,8 +25,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 
 import java.net.URI;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -80,14 +80,14 @@ public final class WebMonitorUtils {
                        
                        if (logFilePath == null) {
                                LOG.warn("Log file environment variable '{}' is 
not set.", logEnv);
-                               logFilePath = 
config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
+                               logFilePath = 
config.getString(JobManagerOptions.WEB_LOG_PATH);
                        }
                        
                        // not configured, cannot serve log files
                        if (logFilePath == null || logFilePath.length() < 4) {
                                LOG.warn("JobManager log files are unavailable 
in the web dashboard. " +
                                        "Log file location not found in 
environment variable '{}' or configuration key '{}'.",
-                                       logEnv, 
ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY);
+                                       logEnv, 
JobManagerOptions.WEB_LOG_PATH.key());
                                return new LogFileLocation(null, null);
                        }
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/9708550a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 5092643..57a6415 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2532,8 +2532,7 @@ object JobManager {
 
     val restartStrategy = 
RestartStrategyFactory.createRestartStrategyFactory(configuration)
 
-    val archiveCount = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
-      ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
+    val archiveCount = 
configuration.getInteger(JobManagerOptions.WEB_ARCHIVE_COUNT)
 
     val archiveDir = configuration.getString(JobManagerOptions.ARCHIVE_DIR)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9708550a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index aad3a4b..97117d2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1661,10 +1661,6 @@ public abstract class StreamExecutionEnvironment {
        public static StreamExecutionEnvironment 
createLocalEnvironmentWithWebUI(Configuration conf) {
                checkNotNull(conf, "conf");
 
-               if 
(!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
-                       int port = 
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
-                       
conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
-               }
                conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
                LocalStreamEnvironment localEnv = new 
LocalStreamEnvironment(conf);

http://git-wip-us.apache.org/repos/asf/flink/blob/9708550a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index f96ab3d..437dd5f 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
@@ -148,8 +149,8 @@ public class TestBaseUtils extends TestLogger {
                config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
DEFAULT_AKKA_ASK_TIMEOUT + "s");
                config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
DEFAULT_AKKA_STARTUP_TIMEOUT);
 
-               config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
8081);
-               config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.toString());
+               config.setInteger(JobManagerOptions.WEB_PORT, 8081);
+               config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.toString());
 
                config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
logFile.toString());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9708550a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 003eb0c..538ac98 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -24,12 +24,15 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 
 import org.apache.commons.io.FileUtils;
 
+import 
org.apache.commons.math3.optim.nonlinear.vector.JacobianMultivariateVectorOptimizer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.JobManagerCliOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
@@ -84,7 +87,7 @@ public class WebFrontendITCase extends TestLogger {
                Files.createFile(logFile.toPath());
                Files.createFile(outFile.toPath());
                
-               config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.getAbsolutePath());
+               config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.getAbsolutePath());
                config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
logFile.getAbsolutePath());
 
                cluster = new LocalFlinkMiniCluster(config, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/9708550a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index b62f957..64417f6 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -372,8 +372,7 @@ public class YarnApplicationMasterRunner {
                                LOG);
 
                        String protocol = "http://";;
-                       if 
(config.getBoolean(ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED,
-                               
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) && 
SSLUtils.getSSLEnabled(config)) {
+                       if 
(config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) && 
SSLUtils.getSSLEnabled(config)) {
                                protocol = "https://";;
                        }
                        final String webMonitorURL = webMonitor == null ? null :

Reply via email to