[FLINK-6494] [RM][Yarn][Mesos] Migrate ResourceManager/Yarn/Mesos configuration 
options

This closes #4075.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d63d704e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d63d704e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d63d704e

Branch: refs/heads/master
Commit: d63d704efb2bc28dd7a33ee9027f4d447acbd209
Parents: 6539180
Author: zjureel <zjur...@gmail.com>
Authored: Thu Jun 8 11:38:56 2017 +0800
Committer: zentol <ches...@apache.org>
Committed: Thu Aug 10 11:36:30 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  89 ++++++++++++++--
 .../configuration/ResourceManagerOptions.java   |  39 +++++++
 .../flink/mesos/configuration/MesosOptions.java | 106 +++++++++++++++++++
 .../MesosApplicationMasterRunner.java           |  31 +++---
 .../MesosFlinkResourceManager.java              |  10 +-
 .../flink/mesos/util/MesosArtifactServer.java   |   5 +-
 .../MesosFlinkResourceManagerTest.java          |   6 +-
 .../ContaineredTaskManagerParameters.java       |  14 ++-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   9 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |   8 +-
 .../java/org/apache/flink/yarn/UtilsTest.java   |  19 ++--
 .../YARNSessionCapacitySchedulerITCase.java     |   6 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |  10 +-
 .../main/java/org/apache/flink/yarn/Utils.java  |  17 +--
 .../flink/yarn/YarnApplicationMasterRunner.java |  17 +--
 .../flink/yarn/YarnFlinkResourceManager.java    |  10 +-
 .../apache/flink/yarn/YarnResourceManager.java  |   3 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   3 +-
 .../yarn/configuration/YarnConfigOptions.java   |  65 ++++++++++++
 .../yarn/entrypoint/YarnEntrypointUtils.java    |  13 +--
 .../org/apache/flink/yarn/YarnJobManager.scala  |   5 +-
 .../flink/yarn/YarnClusterDescriptorTest.java   |   3 +-
 22 files changed, 375 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f817344..4c6c62a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -135,7 +135,9 @@ public final class ConfigConstants {
        /**
         * The config parameter defining the network port to connect to
         * for communication with the resource manager.
+        * @deprecated Use {@link ResourceManagerOptions#IPC_PORT} instead.
         */
+       @Deprecated
        public static final String RESOURCE_MANAGER_IPC_PORT_KEY = 
"resourcemanager.rpc.port";
 
        /**
@@ -349,12 +351,16 @@ public final class ConfigConstants {
        /**
         * Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
         * for other JVM memory usage.
+        * @deprecated Use {@link 
ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_RATIO} instead.
         */
+       @Deprecated
        public static final String CONTAINERIZED_HEAP_CUTOFF_RATIO = 
"containerized.heap-cutoff-ratio";
 
        /**
         * Minimum amount of heap memory to remove in containers, as a safety 
margin.
+        * @deprecated Use {@link 
ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_MIN} instead.
         */
+       @Deprecated
        public static final String CONTAINERIZED_HEAP_CUTOFF_MIN = 
"containerized.heap-cutoff-min";
 
        /**
@@ -362,13 +368,17 @@ public final class ConfigConstants {
         * For example for passing LD_LIBRARY_PATH as an env variable to the 
AppMaster, set:
         * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
         * in the flink-conf.yaml.
+        * @deprecated Use {@link 
ResourceManagerOptions#CONTAINERIZED_MASTER_ENV_PREFIX} instead.
         */
+       @Deprecated
        public static final String CONTAINERIZED_MASTER_ENV_PREFIX = 
"containerized.master.env.";
 
        /**
         * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this 
configuration prefix allows
         * setting custom environment variables for the workers (TaskManagers)
+        * @deprecated Use {@link 
ResourceManagerOptions#CONTAINERIZED_TASK_MANAGER_ENV_PREFIX} instead.
         */
+       @Deprecated
        public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = 
"containerized.taskmanager.env.";
 
        
@@ -376,7 +386,9 @@ public final class ConfigConstants {
 
        /**
         * The vcores exposed by YARN.
+        * @deprecated in favor of {@code YarnConfigOptions#VCORES}.
         */
+       @Deprecated
        public static final String YARN_VCORES = "yarn.containers.vcores";
 
        /**
@@ -406,7 +418,9 @@ public final class ConfigConstants {
         * the YARN session / job on YARN.
         *
         * By default, we take the number of of initially requested containers.
+        * @deprecated in favor of {@code 
YarnConfigOptions#MAX_FAILED_CONTAINERS}.
         */
+       @Deprecated
        public static final String YARN_MAX_FAILED_CONTAINERS = 
"yarn.maximum-failed-containers";
 
        /**
@@ -414,14 +428,18 @@ public final class ConfigConstants {
         * availability mode. This value is usually limited by YARN.
         *
         * By default, it's 1 in the standalone case and 2 in the high 
availability case.
+        * @deprecated in favor of {@code 
YarnConfigOptions#APPLICATION_ATTEMPTS}.
         */
+       @Deprecated
        public static final String YARN_APPLICATION_ATTEMPTS = 
"yarn.application-attempts";
 
        /**
         * The heartbeat interval between the Application Master and the YARN 
Resource Manager.
         *
         * The default value is 5 (seconds).
+        * @deprecated in favor of {@code 
YarnConfigOptions#HEARTBEAT_DELAY_SECONDS}.
         */
+       @Deprecated
        public static final String YARN_HEARTBEAT_DELAY_SECONDS = 
"yarn.heartbeat-delay";
 
        /**
@@ -429,8 +447,10 @@ public final class ConfigConstants {
         * processing slots is written into a properties file, so that the 
Flink client is able
         * to pick those details up.
         * This configuration parameter allows changing the default location of 
that file (for example
-        * for environments sharing a Flink installation between users)
+        * for environments sharing a Flink installation between users).
+        * @deprecated in favor of {@code 
YarnConfigOptions#PROPERTIES_FILE_LOCATION}.
         */
+       @Deprecated
        public static final String YARN_PROPERTIES_FILE_LOCATION = 
"yarn.properties-file.location";
 
        /**
@@ -474,12 +494,16 @@ public final class ConfigConstants {
         * or a list of ranges and or points: "50100-50200,50300-50400,51234"
         *
         * Setting the port to 0 will let the OS choose an available port.
+        * @deprecated in favor of {@code 
YarnConfigOptions#APPLICATION_MASTER_PORT}.
         */
+       @Deprecated
        public static final String YARN_APPLICATION_MASTER_PORT = 
"yarn.application-master.port";
 
        /**
         * A comma-separated list of strings to use as YARN application tags.
+        * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_TAGS}.
         */
+       @Deprecated
        public static final String YARN_APPLICATION_TAGS = "yarn.tags";
 
 
@@ -487,7 +511,9 @@ public final class ConfigConstants {
 
        /**
         * The initial number of Mesos tasks to allocate.
+        * @deprecated in favor of {@code MesosOptions#INITIAL_TASKS}.
         */
+       @Deprecated
        public static final String MESOS_INITIAL_TASKS = "mesos.initial-tasks";
 
        /**
@@ -495,7 +521,9 @@ public final class ConfigConstants {
         * the Mesos session / job on Mesos.
         *
         * By default, we take the number of of initially requested tasks.
+        * @deprecated in favor of {@code MesosOptions#MAX_FAILED_TASKS}.
         */
+       @Deprecated
        public static final String MESOS_MAX_FAILED_TASKS = 
"mesos.maximum-failed-tasks";
 
        /**
@@ -510,36 +538,53 @@ public final class ConfigConstants {
         *     file:///path/to/file (where file contains one of the above)
         * }
         * </pre>
-        *
+        * @deprecated in favor of {@code MesosOptions#MASTER_URL}.
         */
+       @Deprecated
        public static final String MESOS_MASTER_URL = "mesos.master";
 
        /**
         * The failover timeout for the Mesos scheduler, after which running 
tasks are automatically shut down.
         *
         * The default value is 600 (seconds).
+        * @deprecated in favor of {@code 
MesosOptions#FAILOVER_TIMEOUT_SECONDS}.
         */
+       @Deprecated
        public static final String MESOS_FAILOVER_TIMEOUT_SECONDS = 
"mesos.failover-timeout";
 
        /**
         * The config parameter defining the Mesos artifact server port to use.
         * Setting the port to 0 will let the OS choose an available port.
+        * @deprecated in favor of {@code 
MesosOptions#ARTIFACT_SERVER_PORT_KEY}.
         */
+       @Deprecated
        public static final String MESOS_ARTIFACT_SERVER_PORT_KEY = 
"mesos.resourcemanager.artifactserver.port";
 
+       /** @deprecated in favor of {@code 
MesosOptions#RESOURCEMANAGER_FRAMEWORK_NAME}. */
+       @Deprecated
        public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = 
"mesos.resourcemanager.framework.name";
 
+       /** @deprecated in favor of {@code 
MesosOptions#RESOURCEMANAGER_FRAMEWORK_ROLE}. */
+       @Deprecated
        public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = 
"mesos.resourcemanager.framework.role";
 
+       /** @deprecated in favor of {@code 
MesosOptions#RESOURCEMANAGER_FRAMEWORK_PRINCIPAL}. */
+       @Deprecated
        public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL = 
"mesos.resourcemanager.framework.principal";
 
+       /** @deprecated in favor of {@code 
MesosOptions#RESOURCEMANAGER_FRAMEWORK_SECRET}. */
+       @Deprecated
        public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET = 
"mesos.resourcemanager.framework.secret";
 
+       /** @deprecated in favor of {@code 
MesosOptions#RESOURCEMANAGER_FRAMEWORK_USER}. */
+       @Deprecated
        public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_USER = 
"mesos.resourcemanager.framework.user";
 
        /**
         * Config parameter to override SSL support for the Artifact Server
+        * @deprecated in favor of {@code 
MesosOptions#ARTIFACT_SERVER_SSL_ENABLED}.
         */
+       @Deprecated
        public static final String MESOS_ARTIFACT_SERVER_SSL_ENABLED = 
"mesos.resourcemanager.artifactserver.ssl.enabled";
 
        // ------------------------ Hadoop Configuration 
------------------------
@@ -1218,7 +1263,9 @@ public final class ConfigConstants {
 
        /**
         * The default network port of the resource manager.
+        * @deprecated Use {@link ResourceManagerOptions#IPC_PORT} instead.
         */
+       @Deprecated
        public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0;
 
        /**
@@ -1378,13 +1425,17 @@ public final class ConfigConstants {
        /**
         * Minimum amount of memory to subtract from the process memory to get 
the TaskManager
         * heap size. We came up with these values experimentally.
+        * @deprecated Use {@link 
ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_MIN} instead.
         */
+       @Deprecated
        public static final int DEFAULT_YARN_HEAP_CUTOFF = 600;
 
        /**
         * Relative amount of memory to subtract from Java process memory to 
get the TaskManager
-        * heap size
+        * heap size.
+        * @deprecated Use {@link 
ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_RATIO} instead.
         */
+       @Deprecated
        public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f;
 
        /**
@@ -1395,31 +1446,49 @@ public final class ConfigConstants {
 
        /**
         * Default port for the application master is 0, which means
-        * the operating system assigns an ephemeral port
+        * the operating system assigns an ephemeral port.
+        * @deprecated in favor of {@code 
YarnConfigOptions#APPLICATION_MASTER_PORT}.
         */
+       @Deprecated
        public static final String DEFAULT_YARN_JOB_MANAGER_PORT = "0";
 
        // ------ Mesos-Specific Configuration ------
        // For more configuration entries please see {@code 
MesosTaskManagerParameters}.
 
-       /** The default failover timeout provided to Mesos (10 mins) */
+       /**
+        * The default failover timeout provided to Mesos (10 mins)
+        * @deprecated in favor of {@code 
MesosOptions#FAILOVER_TIMEOUT_SECONDS}.
+        */
+       @Deprecated
        public static final int DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS = 10 * 60;
 
        /**
         * The default network port to listen on for the Mesos artifact server.
+        * @deprecated in favor of {@code 
MesosOptions#ARTIFACT_SERVER_PORT_KEY}.
         */
+       @Deprecated
        public static final int DEFAULT_MESOS_ARTIFACT_SERVER_PORT = 0;
 
        /**
         * The default Mesos framework name for the ResourceManager to use.
+        * @deprecated in favor of {@code 
MesosOptions#RESOURCEMANAGER_FRAMEWORK_NAME}.
         */
+       @Deprecated
        public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME 
= "Flink";
 
+       /** @deprecated in favor of {@code 
MesosOptions#RESOURCEMANAGER_FRAMEWORK_ROLE}. */
+       @Deprecated
        public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE 
= "*";
 
+       /** @deprecated in favor of {@code 
MesosOptions#RESOURCEMANAGER_FRAMEWORK_USER}. */
+       @Deprecated
        public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER 
= "";
 
-       /** Default value to override SSL support for the Artifact Server */
+       /**
+        * Default value to override SSL support for the Artifact Server.
+        * @deprecated in favor of {@code 
MesosOptions#ARTIFACT_SERVER_SSL_ENABLED}.
+        */
+       @Deprecated
        public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = 
true;
 
        // ------------------------ File System Behavior 
------------------------
@@ -1659,8 +1728,16 @@ public final class ConfigConstants {
 
        public static final int DEFAULT_LOCAL_NUMBER_JOB_MANAGER = 1;
 
+       /**
+        * @deprecated Use {@link 
ResourceManagerOptions#LOCAL_NUMBER_RESOURCE_MANAGER} instead.
+        */
+       @Deprecated
        public static final String LOCAL_NUMBER_RESOURCE_MANAGER = 
"local.number-resourcemanager";
 
+       /**
+        * @deprecated Use {@link 
ResourceManagerOptions#LOCAL_NUMBER_RESOURCE_MANAGER} instead.
+        */
+       @Deprecated
        public static final int DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER = 1;
 
        public static final String LOCAL_START_WEBSERVER = 
"local.start-webserver";

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index 6a09f19..e2d96bb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -33,6 +33,45 @@ public class ResourceManagerOptions {
                .key("resourcemanager.job.timeout")
                .defaultValue("5 minutes");
 
+       public static final ConfigOption<Integer> LOCAL_NUMBER_RESOURCE_MANAGER 
= ConfigOptions
+               .key("local.number-resourcemanager")
+               .defaultValue(1);
+
+       public static final ConfigOption<Integer> IPC_PORT = ConfigOptions
+               .key("resourcemanager.rpc.port")
+               .defaultValue(0);
+
+       /**
+        * Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
+        * for other JVM memory usage.
+        */
+       public static final ConfigOption<Float> CONTAINERIZED_HEAP_CUTOFF_RATIO 
= ConfigOptions
+               .key("containerized.heap-cutoff-ratio")
+               .defaultValue(0.25f)
+               .withDeprecatedKeys("yarn.heap-cutoff-ratio");
+
+       /**
+        * Minimum amount of heap memory to remove in containers, as a safety 
margin.
+        */
+       public static final ConfigOption<Integer> CONTAINERIZED_HEAP_CUTOFF_MIN 
= ConfigOptions
+               .key("containerized.heap-cutoff-min")
+               .defaultValue(600)
+               .withDeprecatedKeys("yarn.heap-cutoff-min");
+
+       /**
+        * Prefix for passing custom environment variables to Flink's master 
process.
+        * For example for passing LD_LIBRARY_PATH as an env variable to the 
AppMaster, set:
+        * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
+        * in the flink-conf.yaml.
+        */
+       public static final String CONTAINERIZED_MASTER_ENV_PREFIX = 
"containerized.master.env.";
+
+       /**
+        * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this 
configuration prefix allows
+        * setting custom environment variables for the workers (TaskManagers)
+        */
+       public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = 
"containerized.taskmanager.env.";
+       
        // 
---------------------------------------------------------------------------------------------
 
        /** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
new file mode 100644
index 0000000..8616cad
--- /dev/null
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.configuration;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to mesos settings.
+ */
+public class MesosOptions {
+
+       /**
+        * The initial number of Mesos tasks to allocate.
+        */
+       public static final ConfigOption<Integer> INITIAL_TASKS =
+               key("mesos.initial-tasks")
+                       .defaultValue(0);
+
+       /**
+        * The maximum number of failed Mesos tasks before entirely stopping
+        * the Mesos session / job on Mesos.
+        *
+        * <p>By default, we take the number of initially requested tasks.
+        */
+       public static final ConfigOption<Integer> MAX_FAILED_TASKS =
+               key("mesos.maximum-failed-tasks")
+                       .defaultValue(-1);
+
+       /**
+        * The Mesos master URL.
+        *
+        * <p>The value should be in one of the following forms:
+        * <pre>
+        * {@code
+        *     host:port
+        *     zk://host1:port1,host2:port2,.../path
+        *     zk://username:password@host1:port1,host2:port2,.../path
+        *     file:///path/to/file (where file contains one of the above)
+        * }
+        * </pre>
+        */
+       public static final ConfigOption<String> MASTER_URL =
+               key("mesos.master")
+                       .noDefaultValue();
+
+       /**
+        * The failover timeout for the Mesos scheduler, after which running 
tasks are automatically shut down.
+        */
+       public static final ConfigOption<Integer> FAILOVER_TIMEOUT_SECONDS =
+               key("mesos.failover-timeout")
+                       .defaultValue(600);
+
+       /**
+        * The config parameter defining the Mesos artifact server port to use.
+        * Setting the port to 0 will let the OS choose an available port.
+        */
+       public static final ConfigOption<Integer> ARTIFACT_SERVER_PORT =
+               key("mesos.resourcemanager.artifactserver.port")
+                       .defaultValue(0);
+
+       public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_NAME 
=
+               key("mesos.resourcemanager.framework.name")
+                       .defaultValue("Flink");
+
+       public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_ROLE 
=
+               key("mesos.resourcemanager.framework.role")
+                       .defaultValue("*");
+
+       public static final ConfigOption<String> 
RESOURCEMANAGER_FRAMEWORK_PRINCIPAL =
+               key("mesos.resourcemanager.framework.principal")
+                       .noDefaultValue();
+
+       public static final ConfigOption<String> 
RESOURCEMANAGER_FRAMEWORK_SECRET =
+               key("mesos.resourcemanager.framework.secret")
+                       .noDefaultValue();
+
+       public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_USER 
=
+               key("mesos.resourcemanager.framework.user")
+                       .defaultValue("");
+
+       /**
+        * Config parameter to override SSL support for the Artifact Server.
+        */
+       public static final ConfigOption<Boolean> ARTIFACT_SERVER_SSL_ENABLED =
+               key("mesos.resourcemanager.artifactserver.ssl.enabled")
+                       .defaultValue(true);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index d4e2f0d..260b7f3 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
@@ -264,8 +264,7 @@ public class MesosApplicationMasterRunner {
 
                        // try to start the artifact server
                        LOG.debug("Starting Artifact Server");
-                       final int artifactServerPort = 
config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
-                               
ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
+                       final int artifactServerPort = 
config.getInteger(MesosOptions.ARTIFACT_SERVER_PORT);
                        final String artifactServerPrefix = 
UUID.randomUUID().toString();
                        artifactServer = new 
MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, 
config);
 
@@ -491,42 +490,38 @@ public class MesosApplicationMasterRunner {
                        .setHostname(hostname);
                Protos.Credential.Builder credential = null;
 
-               if (!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) 
{
-                       throw new 
IllegalConfigurationException(ConfigConstants.MESOS_MASTER_URL + " must be 
configured.");
+               if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
+                       throw new 
IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be 
configured.");
                }
-               String masterUrl = 
flinkConfig.getString(ConfigConstants.MESOS_MASTER_URL, null);
+               String masterUrl = 
flinkConfig.getString(MesosOptions.MASTER_URL);
 
                Duration failoverTimeout = FiniteDuration.apply(
                        flinkConfig.getInteger(
-                               ConfigConstants.MESOS_FAILOVER_TIMEOUT_SECONDS,
-                               
ConfigConstants.DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS),
+                               MesosOptions.FAILOVER_TIMEOUT_SECONDS),
                        TimeUnit.SECONDS);
                frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
 
                frameworkInfo.setName(flinkConfig.getString(
-                       ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_NAME,
-                       
ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME));
+                       MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
 
                frameworkInfo.setRole(flinkConfig.getString(
-                       ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE,
-                       
ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE));
+                       MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));
 
                frameworkInfo.setUser(flinkConfig.getString(
-                       ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER,
-                       
ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER));
+                       MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER));
 
-               if 
(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL))
 {
+               if 
(flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
                        frameworkInfo.setPrincipal(flinkConfig.getString(
-                               
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null));
+                               
MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL));
 
                        credential = Protos.Credential.newBuilder();
                        credential.setPrincipal(frameworkInfo.getPrincipal());
 
                        // some environments use a side-channel to communicate 
the secret to Mesos,
                        // and thus don't set the 'secret' configuration setting
-                       if 
(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET))
 {
+                       if 
(flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) {
                                credential.setSecret(flinkConfig.getString(
-                                       
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
+                                       
MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET));
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index d6b5c9d..05d7e1f 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -19,9 +19,9 @@
 package org.apache.flink.mesos.runtime.clusterframework;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
@@ -641,7 +641,7 @@ public class MesosFlinkResourceManager extends 
FlinkResourceManager<RegisteredMe
                                String msg = "Stopping Mesos session because 
the number of failed tasks ("
                                        + failedTasksSoFar + ") exceeded the 
maximum failed tasks ("
                                        + maxFailedTasks + "). This number is 
controlled by the '"
-                                       + 
ConfigConstants.MESOS_MAX_FAILED_TASKS + "' configuration setting. "
+                                       + MesosOptions.MAX_FAILED_TASKS.key() + 
"' configuration setting. "
                                        + "By default its the number of 
requested tasks.";
 
                                LOG.error(msg);
@@ -757,18 +757,18 @@ public class MesosFlinkResourceManager extends 
FlinkResourceManager<RegisteredMe
                        Logger log) {
 
                final int numInitialTaskManagers = flinkConfig.getInteger(
-                       ConfigConstants.MESOS_INITIAL_TASKS, 0);
+                       MesosOptions.INITIAL_TASKS);
                if (numInitialTaskManagers >= 0) {
                        log.info("Mesos framework to allocate {} initial tasks",
                                numInitialTaskManagers);
                }
                else {
                        throw new IllegalConfigurationException("Invalid value 
for " +
-                               ConfigConstants.MESOS_INITIAL_TASKS + ", which 
must be at least zero.");
+                               MesosOptions.INITIAL_TASKS.key() + ", which 
must be at least zero.");
                }
 
                final int maxFailedTasks = flinkConfig.getInteger(
-                       ConfigConstants.MESOS_MAX_FAILED_TASKS, 
numInitialTaskManagers);
+                       MesosOptions.MAX_FAILED_TASKS.key(), 
numInitialTaskManagers);
                if (maxFailedTasks >= 0) {
                        log.info("Mesos framework tolerates {} failed tasks 
before giving up",
                                maxFailedTasks);

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 2627d25..3a6f77a 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.mesos.util;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.runtime.net.SSLUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
@@ -115,8 +115,7 @@ public class MesosArtifactServer implements 
MesosArtifactResolver {
 
                // Config to enable https access to the artifact server
                boolean enableSSL = config.getBoolean(
-                               
ConfigConstants.MESOS_ARTIFACT_SERVER_SSL_ENABLED,
-                               
ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED) &&
+                               MesosOptions.ARTIFACT_SERVER_SSL_ENABLED) &&
                                SSLUtils.getSSLEnabled(config);
 
                if (enableSSL) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index af3f7ef..8bfb4d1 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
@@ -105,8 +105,8 @@ public class MesosFlinkResourceManagerTest extends 
TestLogger {
                private static final long serialVersionUID = 
-952579203067648838L;
 
                {
-                       setInteger(ConfigConstants.MESOS_MAX_FAILED_TASKS, -1);
-                       setInteger(ConfigConstants.MESOS_INITIAL_TASKS, 0);
+                       setInteger(MesosOptions.MAX_FAILED_TASKS, -1);
+                       setInteger(MesosOptions.INITIAL_TASKS, 0);
        }};
 
        @BeforeClass

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index 9d679cf..7e9891f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.clusterframework;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 
 import java.util.HashMap;
@@ -115,22 +115,20 @@ public class ContaineredTaskManagerParameters implements 
java.io.Serializable {
                // (1) compute how much memory we subtract from the total 
memory, to get the Java memory
 
                final float memoryCutoffRatio = config.getFloat(
-                       ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
-                       ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
+                       ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
 
                final int minCutoff = config.getInteger(
-                       ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
-                       ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
+                       ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
 
                if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) {
                        throw new IllegalArgumentException("The configuration 
value '"
-                               + 
ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. 
Value given="
+                               + 
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be 
between 0 and 1. Value given="
                                + memoryCutoffRatio);
                }
 
                if (minCutoff >= containerMemoryMB) {
                        throw new IllegalArgumentException("The configuration 
value '"
-                               + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN 
+ "'='" + minCutoff
+                               + 
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "'='" + minCutoff
                                + "' is larger than the total container memory 
" + containerMemoryMB);
                }
 
@@ -147,7 +145,7 @@ public class ContaineredTaskManagerParameters implements 
java.io.Serializable {
 
                // (3) obtain the additional environment variables from the 
configuration
                final HashMap<String, String> envVars = new HashMap<>();
-               final String prefix = 
ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
+               final String prefix = 
ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
                
                for (String key : config.keySet()) {
                        if (key.startsWith(prefix) && key.length() > 
prefix.length()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index c5c87ac..6f13b9f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -28,7 +28,7 @@ import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.{JobExecutionResult, JobID, 
JobSubmissionResult}
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, 
Configuration, JobManagerOptions, TaskManagerOptions}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, 
Configuration, JobManagerOptions, ResourceManagerOptions, TaskManagerOptions}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
@@ -168,8 +168,7 @@ abstract class FlinkMiniCluster(
 
   def getNumberOfResourceManagers: Int = {
     originalConfiguration.getInteger(
-      ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER,
-      ConfigConstants.DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER
+      ResourceManagerOptions.LOCAL_NUMBER_RESOURCE_MANAGER
     )
   }
 
@@ -226,8 +225,8 @@ abstract class FlinkMiniCluster(
     if (useSingleActorSystem) {
       AkkaUtils.getAkkaConfig(originalConfiguration, None)
     } else {
-      val port = 
originalConfiguration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
-                                                  
ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+      val port = originalConfiguration.getInteger(
+        ResourceManagerOptions.IPC_PORT)
 
       val resolvedPort = if(port != 0) port + index else port
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 27a8ee1..0ae00a9 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{Executor, 
ScheduledExecutorService}
 import akka.actor.{ActorRef, ActorSystem, Props}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
-import org.apache.flink.configuration.{ConfigConstants, Configuration, 
JobManagerOptions, QueryableStateOptions, TaskManagerOptions}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, 
JobManagerOptions, QueryableStateOptions, ResourceManagerOptions, 
TaskManagerOptions}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -183,11 +183,11 @@ class LocalFlinkMiniCluster(
     val resourceManagerName = getResourceManagerName(index)
 
     val resourceManagerPort = config.getInteger(
-      ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+      ResourceManagerOptions.IPC_PORT)
 
     if(resourceManagerPort > 0) {
-      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, 
resourceManagerPort + index)
+      config.setInteger(ResourceManagerOptions.IPC_PORT,
+        resourceManagerPort + index)
     }
 
     val resourceManagerProps = getResourceManagerProps(

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 82a656c..275bcc9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
 
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.Level;
@@ -62,8 +63,8 @@ public class UtilsTest {
        @Test
        public void testHeapCutoff() {
                Configuration conf = new Configuration();
-               conf.setDouble(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 
0.15);
-               conf.setInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, 
384);
+               
conf.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15F);
+               
conf.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 384);
 
                Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf));
                Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf));
@@ -71,14 +72,14 @@ public class UtilsTest {
                // test different configuration
                Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));
 
-               conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, 
"1000");
-               conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 
"0.1");
+               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key(), 
"1000");
+               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"0.1");
                Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
 
-               conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 
"0.5");
+               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"0.5");
                Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
 
-               conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 
"1");
+               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"1");
                Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 
                // test also deprecated keys
@@ -93,21 +94,21 @@ public class UtilsTest {
        @Test(expected = IllegalArgumentException.class)
        public void illegalArgument() {
                Configuration conf = new Configuration();
-               conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 
"1.1");
+               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"1.1");
                Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
        }
 
        @Test(expected = IllegalArgumentException.class)
        public void illegalArgumentNegative() {
                Configuration conf = new Configuration();
-               conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 
"-0.01");
+               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"-0.01");
                Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
        }
 
        @Test(expected = IllegalArgumentException.class)
        public void tooMuchCutoff() {
                Configuration conf = new Configuration();
-               conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 
"6000");
+               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"6000");
                Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 2e88836..d85aa97 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.client.JobClient;
@@ -26,6 +25,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -145,7 +145,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
                                "-nm", "customName",
                                "-Dfancy-configuration-value=veryFancy",
                                "-Dyarn.maximum-failed-containers=3",
-                               "-D" + ConfigConstants.YARN_VCORES + "=2"},
+                               "-D" + YarnConfigOptions.VCORES.key() + "=2"},
                        "Number of connected TaskManagers changed to 1. Slots 
available: 3",
                        RunTypes.YARN_SESSION);
 
@@ -186,7 +186,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
 
                        Assert.assertEquals("veryFancy", 
parsedConfig.get("fancy-configuration-value"));
                        Assert.assertEquals("3", 
parsedConfig.get("yarn.maximum-failed-containers"));
-                       Assert.assertEquals("2", 
parsedConfig.get(ConfigConstants.YARN_VCORES));
+                       Assert.assertEquals("2", 
parsedConfig.get(YarnConfigOptions.VCORES.key()));
 
                        // -------------- FLINK-1902: check if jobmanager 
hostname/port are shown in web interface
                        // first, get the hostname/port

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index dfc8b6a..55dc47f 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -253,7 +253,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                // The number of cores can be configured in the config.
                // If not configured, it is set to the number of task slots
                int numYarnVcores = conf.getInt(YarnConfiguration.NM_VCORES, 
YarnConfiguration.DEFAULT_NM_VCORES);
-               int configuredVcores = 
flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, 
clusterSpecification.getSlotsPerTaskManager());
+               int configuredVcores = 
flinkConfiguration.getInteger(YarnConfigOptions.VCORES, 
clusterSpecification.getSlotsPerTaskManager());
                // don't configure more than the maximum configured number of 
vcores
                if (configuredVcores > numYarnVcores) {
                        throw new IllegalConfigurationException(
@@ -261,7 +261,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                                                " but Yarn only has %d virtual 
cores available. Please note that the number" +
                                                " of virtual cores is set to 
the number of task slots by default unless configured" +
                                                " in the Flink config with 
'%s.'",
-                                       configuredVcores, numYarnVcores, 
ConfigConstants.YARN_VCORES));
+                                       configuredVcores, numYarnVcores, 
YarnConfigOptions.VCORES.key()));
                }
 
                // check if required Hadoop environment variables are set. If 
not, warn user
@@ -677,7 +677,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        // activate re-execution of failed applications
                        appContext.setMaxAppAttempts(
                                flinkConfiguration.getInteger(
-                                       
ConfigConstants.YARN_APPLICATION_ATTEMPTS,
+                                       
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                                        
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
 
                        activateHighAvailabilitySupport(appContext);
@@ -685,7 +685,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        // set number of application retries to 1 in the 
default case
                        appContext.setMaxAppAttempts(
                                flinkConfiguration.getInteger(
-                                       
ConfigConstants.YARN_APPLICATION_ATTEMPTS,
+                                       
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                                        1));
                }
 
@@ -1135,7 +1135,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                IllegalAccessException {
 
                final ApplicationSubmissionContextReflector reflector = 
ApplicationSubmissionContextReflector.getInstance();
-               final String tagsString = 
flinkConfiguration.getString(ConfigConstants.YARN_APPLICATION_TAGS, "");
+               final String tagsString = 
flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS);
 
                final Set<String> applicationTags = new HashSet<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 662617f..98d27ab 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -82,24 +82,17 @@ public final class Utils {
         */
        public static int calculateHeapSize(int memory, 
org.apache.flink.configuration.Configuration conf) {
 
-               BootstrapTools.substituteDeprecatedConfigKey(conf,
-                       ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 
ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
-               BootstrapTools.substituteDeprecatedConfigKey(conf,
-                       ConfigConstants.YARN_HEAP_CUTOFF_MIN, 
ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
-               float memoryCutoffRatio = 
conf.getFloat(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
-                       ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
-               int minCutoff = 
conf.getInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
-                       ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
+               float memoryCutoffRatio = 
conf.getFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
+               int minCutoff = 
conf.getInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
 
                if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) {
                        throw new IllegalArgumentException("The configuration 
value '"
-                               + 
ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO
+                               + 
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key()
                                + "' must be between 0 and 1. Value given=" + 
memoryCutoffRatio);
                }
                if (minCutoff > memory) {
                        throw new IllegalArgumentException("The configuration 
value '"
-                               + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN
+                               + 
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key()
                                + "' is higher (" + minCutoff + ") than the 
requested amount of memory " + memory);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index dccbb71..e951df4 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -43,6 +44,7 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -299,8 +301,7 @@ public class YarnApplicationMasterRunner {
                        // try to start the actor system, JobManager and 
JobManager actor system
                        // using the port range definition from the config.
                        final String amPortRange = config.getString(
-                                       
ConfigConstants.YARN_APPLICATION_MASTER_PORT,
-                                       
ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
+                                       
YarnConfigOptions.APPLICATION_MASTER_PORT);
 
                        actorSystem = BootstrapTools.startActorSystem(config, 
appMasterHostname, amPortRange, LOG);
 
@@ -518,21 +519,13 @@ public class YarnApplicationMasterRunner {
                // corresponding generic config keys instead. that way, later 
code needs not
                // deal with deprecated config keys
 
-               BootstrapTools.substituteDeprecatedConfigKey(configuration,
-                       ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
-                       ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
-
-               BootstrapTools.substituteDeprecatedConfigKey(configuration,
-                       ConfigConstants.YARN_HEAP_CUTOFF_MIN,
-                       ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
                BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
                        ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
-                       ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
+                       ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX);
 
                BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
                        ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
-                       ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
+                       
ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
 
                return configuration;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 66e44a6..4d8142f 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -28,6 +27,7 @@ import 
org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.messages.ContainersAllocated;
 import org.apache.flink.yarn.messages.ContainersComplete;
 
@@ -337,7 +337,7 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
 
                        // Resource requirements for worker containers
                        int taskManagerSlots = taskManagerParameters.numSlots();
-                       int vcores = 
config.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1));
+                       int vcores = 
config.getInteger(YarnConfigOptions.VCORES, Math.max(taskManagerSlots, 1));
                        Resource capability = 
Resource.newInstance(containerMemorySizeMB, vcores);
 
                        resourceManagerClient.addContainerRequest(
@@ -550,7 +550,7 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
                                        String msg = "Stopping YARN session 
because the number of failed containers ("
                                                + failedContainersSoFar + ") 
exceeded the maximum failed containers ("
                                                + maxFailedContainers + "). 
This number is controlled by the '"
-                                               + 
ConfigConstants.YARN_MAX_FAILED_CONTAINERS + "' configuration setting. "
+                                               + 
YarnConfigOptions.MAX_FAILED_CONTAINERS.key() + "' configuration setting. "
                                                + "By default its the number of 
requested containers.";
 
                                        LOG.error(msg);
@@ -710,7 +710,7 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
                        Logger log) {
 
                final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
-                       ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 
DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;
+                       YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
 
                final long yarnExpiryIntervalMS = yarnConfig.getLong(
                        YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
@@ -723,7 +723,7 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
                }
 
                final int maxFailedContainers = flinkConfig.getInteger(
-                       ConfigConstants.YARN_MAX_FAILED_CONTAINERS, 
numInitialTaskManagers);
+                       YarnConfigOptions.MAX_FAILED_CONTAINERS.key(), 
numInitialTaskManagers);
                if (maxFailedContainers >= 0) {
                        log.info("YARN application tolerates {} failed 
TaskManager containers before giving up",
                                maxFailedContainers);

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 8327b6a..fb1a1c3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -134,7 +135,7 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
                this.yarnConfig = new YarnConfiguration();
                this.env = env;
                final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
-                               ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 
DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;
+                               YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 
1000;
 
                final long yarnExpiryIntervalMS = yarnConfig.getLong(
                                YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 5d8abac..f2968b1 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -37,6 +37,7 @@ import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptorV2;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -773,7 +774,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                String defaultPropertiesFileLocation = 
System.getProperty("java.io.tmpdir");
                String currentUser = System.getProperty("user.name");
                String propertiesFileLocation =
-                       
conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, 
defaultPropertiesFileLocation);
+                       
conf.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, 
defaultPropertiesFileLocation);
 
                return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + 
currentUser);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 28ef2ab..3773352 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -52,6 +52,71 @@ public class YarnConfigOptions {
                key("yarn.per-job-cluster.include-user-jar")
                        .defaultValue("ORDER");
 
+       /**
+        * The vcores exposed by YARN.
+        */
+       public static final ConfigOption<Integer> VCORES =
+               key("yarn.containers.vcores")
+               .defaultValue(-1);
+
+       /**
+        * The maximum number of failed YARN containers before entirely stopping
+        * the YARN session / job on YARN.
+        * By default, we take the number of of initially requested containers.
+        *
+        * <p>Note: This option returns a String since Integer options must 
have a static default value.
+        */
+       public static final ConfigOption<String> MAX_FAILED_CONTAINERS =
+               key("yarn.maximum-failed-containers")
+               .noDefaultValue();
+
+       /**
+        * Set the number of retries for failed YARN 
ApplicationMasters/JobManagers in high
+        * availability mode. This value is usually limited by YARN.
+        * By default, it's 1 in the standalone case and 2 in the high 
availability case.
+        *
+        * <p>>Note: This option returns a String since Integer options must 
have a static default value.
+        */
+       public static final ConfigOption<String> APPLICATION_ATTEMPTS =
+               key("yarn.application-attempts")
+               .noDefaultValue();
+
+       /**
+        * The heartbeat interval between the Application Master and the YARN 
Resource Manager.
+        */
+       public static final ConfigOption<Integer> HEARTBEAT_DELAY_SECONDS =
+               key("yarn.heartbeat-delay")
+               .defaultValue(5);
+
+       /**
+        * When a Flink job is submitted to YARN, the JobManager's host and the 
number of available
+        * processing slots is written into a properties file, so that the 
Flink client is able
+        * to pick those details up.
+        * This configuration parameter allows changing the default location of 
that file (for example
+        * for environments sharing a Flink installation between users)
+        */
+       public static final ConfigOption<String> PROPERTIES_FILE_LOCATION =
+               key("yarn.properties-file.location")
+               .noDefaultValue();
+
+       /**
+        * The config parameter defining the Akka actor system port for the 
ApplicationMaster and
+        * JobManager.
+        * The port can either be a port, such as "9123",
+        * a range of ports: "50100-50200"
+        * or a list of ranges and or points: "50100-50200,50300-50400,51234".
+        * Setting the port to 0 will let the OS choose an available port.
+        */
+       public static final ConfigOption<String> APPLICATION_MASTER_PORT =
+               key("yarn.application-master.port")
+               .defaultValue("0");
+
+       /**
+        * A comma-separated list of strings to use as YARN application tags.
+        */
+       public static final ConfigOption<String> APPLICATION_TAGS =
+               key("yarn.tags")
+               .defaultValue("");
 
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index 9ead775..e8fccac 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.security.SecurityContext;
@@ -112,21 +113,13 @@ public class YarnEntrypointUtils {
                // corresponding generic config keys instead. that way, later 
code needs not
                // deal with deprecated config keys
 
-               BootstrapTools.substituteDeprecatedConfigKey(configuration,
-                       ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
-                       ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
-
-               BootstrapTools.substituteDeprecatedConfigKey(configuration,
-                       ConfigConstants.YARN_HEAP_CUTOFF_MIN,
-                       ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
                BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
                        ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
-                       ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
+                       ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX);
 
                BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
                        ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
-                       ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
+                       
ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
 
                final String keytabPath;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index d78b390..a2d1668 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -22,7 +22,7 @@ import java.io.IOException
 import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit}
 
 import akka.actor.ActorRef
-import org.apache.flink.configuration.{ConfigConstants, Configuration => 
FlinkConfiguration}
+import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.ContaineredJobManager
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
 import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.yarn.configuration.YarnConfigOptions
 
 import scala.concurrent.duration._
 import scala.language.postfixOps
@@ -88,7 +89,7 @@ class YarnJobManager(
   val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds
   val YARN_HEARTBEAT_DELAY: FiniteDuration =
     FiniteDuration(
-      
flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5),
+      flinkConfiguration.getInteger(YarnConfigOptions.HEARTBEAT_DELAY_SECONDS),
       TimeUnit.SECONDS)
 
   val yarnFilesPath: Option[String] = 
Option(System.getenv().get(YarnConfigKeys.FLINK_YARN_FILES))

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index bcb8559..19d1af5 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -87,7 +88,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
        public void testConfigOverwrite() {
                Configuration configuration = new Configuration();
                // overwrite vcores in config
-               configuration.setInteger(ConfigConstants.YARN_VCORES, 
Integer.MAX_VALUE);
+               configuration.setInteger(YarnConfigOptions.VCORES, 
Integer.MAX_VALUE);
 
                YarnClusterDescriptor clusterDescriptor = new 
YarnClusterDescriptor(
                        configuration,

Reply via email to