[FLINK-6494] [RM][Yarn][Mesos] Migrate ResourceManager/Yarn/Mesos configuration options
This closes #4075. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d63d704e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d63d704e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d63d704e Branch: refs/heads/master Commit: d63d704efb2bc28dd7a33ee9027f4d447acbd209 Parents: 6539180 Author: zjureel <zjur...@gmail.com> Authored: Thu Jun 8 11:38:56 2017 +0800 Committer: zentol <ches...@apache.org> Committed: Thu Aug 10 11:36:30 2017 +0200 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 89 ++++++++++++++-- .../configuration/ResourceManagerOptions.java | 39 +++++++ .../flink/mesos/configuration/MesosOptions.java | 106 +++++++++++++++++++ .../MesosApplicationMasterRunner.java | 31 +++--- .../MesosFlinkResourceManager.java | 10 +- .../flink/mesos/util/MesosArtifactServer.java | 5 +- .../MesosFlinkResourceManagerTest.java | 6 +- .../ContaineredTaskManagerParameters.java | 14 ++- .../runtime/minicluster/FlinkMiniCluster.scala | 9 +- .../minicluster/LocalFlinkMiniCluster.scala | 8 +- .../java/org/apache/flink/yarn/UtilsTest.java | 19 ++-- .../YARNSessionCapacitySchedulerITCase.java | 6 +- .../yarn/AbstractYarnClusterDescriptor.java | 10 +- .../main/java/org/apache/flink/yarn/Utils.java | 17 +-- .../flink/yarn/YarnApplicationMasterRunner.java | 17 +-- .../flink/yarn/YarnFlinkResourceManager.java | 10 +- .../apache/flink/yarn/YarnResourceManager.java | 3 +- .../flink/yarn/cli/FlinkYarnSessionCli.java | 3 +- .../yarn/configuration/YarnConfigOptions.java | 65 ++++++++++++ .../yarn/entrypoint/YarnEntrypointUtils.java | 13 +-- .../org/apache/flink/yarn/YarnJobManager.scala | 5 +- .../flink/yarn/YarnClusterDescriptorTest.java | 3 +- 22 files changed, 375 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index f817344..4c6c62a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -135,7 +135,9 @@ public final class ConfigConstants { /** * The config parameter defining the network port to connect to * for communication with the resource manager. + * @deprecated Use {@link ResourceManagerOptions#IPC_PORT} instead. */ + @Deprecated public static final String RESOURCE_MANAGER_IPC_PORT_KEY = "resourcemanager.rpc.port"; /** @@ -349,12 +351,16 @@ public final class ConfigConstants { /** * Percentage of heap space to remove from containers (YARN / Mesos), to compensate * for other JVM memory usage. + * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_RATIO} instead. */ + @Deprecated public static final String CONTAINERIZED_HEAP_CUTOFF_RATIO = "containerized.heap-cutoff-ratio"; /** * Minimum amount of heap memory to remove in containers, as a safety margin. + * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_MIN} instead. */ + @Deprecated public static final String CONTAINERIZED_HEAP_CUTOFF_MIN = "containerized.heap-cutoff-min"; /** @@ -362,13 +368,17 @@ public final class ConfigConstants { * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" * in the flink-conf.yaml. + * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_MASTER_ENV_PREFIX} instead. */ + @Deprecated public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env."; /** * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows * setting custom environment variables for the workers (TaskManagers) + * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_TASK_MANAGER_ENV_PREFIX} instead. */ + @Deprecated public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env."; @@ -376,7 +386,9 @@ public final class ConfigConstants { /** * The vcores exposed by YARN. + * @deprecated in favor of {@code YarnConfigOptions#VCORES}. */ + @Deprecated public static final String YARN_VCORES = "yarn.containers.vcores"; /** @@ -406,7 +418,9 @@ public final class ConfigConstants { * the YARN session / job on YARN. * * By default, we take the number of of initially requested containers. + * @deprecated in favor of {@code YarnConfigOptions#MAX_FAILED_CONTAINERS}. */ + @Deprecated public static final String YARN_MAX_FAILED_CONTAINERS = "yarn.maximum-failed-containers"; /** @@ -414,14 +428,18 @@ public final class ConfigConstants { * availability mode. This value is usually limited by YARN. * * By default, it's 1 in the standalone case and 2 in the high availability case. + * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_ATTEMPTS}. */ + @Deprecated public static final String YARN_APPLICATION_ATTEMPTS = "yarn.application-attempts"; /** * The heartbeat interval between the Application Master and the YARN Resource Manager. * * The default value is 5 (seconds). + * @deprecated in favor of {@code YarnConfigOptions#HEARTBEAT_DELAY_SECONDS}. */ + @Deprecated public static final String YARN_HEARTBEAT_DELAY_SECONDS = "yarn.heartbeat-delay"; /** @@ -429,8 +447,10 @@ public final class ConfigConstants { * processing slots is written into a properties file, so that the Flink client is able * to pick those details up. * This configuration parameter allows changing the default location of that file (for example - * for environments sharing a Flink installation between users) + * for environments sharing a Flink installation between users). + * @deprecated in favor of {@code YarnConfigOptions#PROPERTIES_FILE_LOCATION}. */ + @Deprecated public static final String YARN_PROPERTIES_FILE_LOCATION = "yarn.properties-file.location"; /** @@ -474,12 +494,16 @@ public final class ConfigConstants { * or a list of ranges and or points: "50100-50200,50300-50400,51234" * * Setting the port to 0 will let the OS choose an available port. + * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_MASTER_PORT}. */ + @Deprecated public static final String YARN_APPLICATION_MASTER_PORT = "yarn.application-master.port"; /** * A comma-separated list of strings to use as YARN application tags. + * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_TAGS}. */ + @Deprecated public static final String YARN_APPLICATION_TAGS = "yarn.tags"; @@ -487,7 +511,9 @@ public final class ConfigConstants { /** * The initial number of Mesos tasks to allocate. + * @deprecated in favor of {@code MesosOptions#INITIAL_TASKS}. */ + @Deprecated public static final String MESOS_INITIAL_TASKS = "mesos.initial-tasks"; /** @@ -495,7 +521,9 @@ public final class ConfigConstants { * the Mesos session / job on Mesos. * * By default, we take the number of of initially requested tasks. + * @deprecated in favor of {@code MesosOptions#MAX_FAILED_TASKS}. */ + @Deprecated public static final String MESOS_MAX_FAILED_TASKS = "mesos.maximum-failed-tasks"; /** @@ -510,36 +538,53 @@ public final class ConfigConstants { * file:///path/to/file (where file contains one of the above) * } * </pre> - * + * @deprecated in favor of {@code MesosOptions#MASTER_URL}. */ + @Deprecated public static final String MESOS_MASTER_URL = "mesos.master"; /** * The failover timeout for the Mesos scheduler, after which running tasks are automatically shut down. * * The default value is 600 (seconds). + * @deprecated in favor of {@code MesosOptions#FAILOVER_TIMEOUT_SECONDS}. */ + @Deprecated public static final String MESOS_FAILOVER_TIMEOUT_SECONDS = "mesos.failover-timeout"; /** * The config parameter defining the Mesos artifact server port to use. * Setting the port to 0 will let the OS choose an available port. + * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_PORT_KEY}. */ + @Deprecated public static final String MESOS_ARTIFACT_SERVER_PORT_KEY = "mesos.resourcemanager.artifactserver.port"; + /** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_NAME}. */ + @Deprecated public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = "mesos.resourcemanager.framework.name"; + /** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_ROLE}. */ + @Deprecated public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "mesos.resourcemanager.framework.role"; + /** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_PRINCIPAL}. */ + @Deprecated public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL = "mesos.resourcemanager.framework.principal"; + /** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_SECRET}. */ + @Deprecated public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET = "mesos.resourcemanager.framework.secret"; + /** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_USER}. */ + @Deprecated public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "mesos.resourcemanager.framework.user"; /** * Config parameter to override SSL support for the Artifact Server + * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_SSL_ENABLED}. */ + @Deprecated public static final String MESOS_ARTIFACT_SERVER_SSL_ENABLED = "mesos.resourcemanager.artifactserver.ssl.enabled"; // ------------------------ Hadoop Configuration ------------------------ @@ -1218,7 +1263,9 @@ public final class ConfigConstants { /** * The default network port of the resource manager. + * @deprecated Use {@link ResourceManagerOptions#IPC_PORT} instead. */ + @Deprecated public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0; /** @@ -1378,13 +1425,17 @@ public final class ConfigConstants { /** * Minimum amount of memory to subtract from the process memory to get the TaskManager * heap size. We came up with these values experimentally. + * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_MIN} instead. */ + @Deprecated public static final int DEFAULT_YARN_HEAP_CUTOFF = 600; /** * Relative amount of memory to subtract from Java process memory to get the TaskManager - * heap size + * heap size. + * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_RATIO} instead. */ + @Deprecated public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f; /** @@ -1395,31 +1446,49 @@ public final class ConfigConstants { /** * Default port for the application master is 0, which means - * the operating system assigns an ephemeral port + * the operating system assigns an ephemeral port. + * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_MASTER_PORT}. */ + @Deprecated public static final String DEFAULT_YARN_JOB_MANAGER_PORT = "0"; // ------ Mesos-Specific Configuration ------ // For more configuration entries please see {@code MesosTaskManagerParameters}. - /** The default failover timeout provided to Mesos (10 mins) */ + /** + * The default failover timeout provided to Mesos (10 mins) + * @deprecated in favor of {@code MesosOptions#FAILOVER_TIMEOUT_SECONDS}. + */ + @Deprecated public static final int DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS = 10 * 60; /** * The default network port to listen on for the Mesos artifact server. + * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_PORT_KEY}. */ + @Deprecated public static final int DEFAULT_MESOS_ARTIFACT_SERVER_PORT = 0; /** * The default Mesos framework name for the ResourceManager to use. + * @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_NAME}. */ + @Deprecated public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = "Flink"; + /** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_ROLE}. */ + @Deprecated public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*"; + /** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_USER}. */ + @Deprecated public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = ""; - /** Default value to override SSL support for the Artifact Server */ + /** + * Default value to override SSL support for the Artifact Server. + * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_SSL_ENABLED}. + */ + @Deprecated public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true; // ------------------------ File System Behavior ------------------------ @@ -1659,8 +1728,16 @@ public final class ConfigConstants { public static final int DEFAULT_LOCAL_NUMBER_JOB_MANAGER = 1; + /** + * @deprecated Use {@link ResourceManagerOptions#LOCAL_NUMBER_RESOURCE_MANAGER} instead. + */ + @Deprecated public static final String LOCAL_NUMBER_RESOURCE_MANAGER = "local.number-resourcemanager"; + /** + * @deprecated Use {@link ResourceManagerOptions#LOCAL_NUMBER_RESOURCE_MANAGER} instead. + */ + @Deprecated public static final int DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER = 1; public static final String LOCAL_START_WEBSERVER = "local.start-webserver"; http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java index 6a09f19..e2d96bb 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java @@ -33,6 +33,45 @@ public class ResourceManagerOptions { .key("resourcemanager.job.timeout") .defaultValue("5 minutes"); + public static final ConfigOption<Integer> LOCAL_NUMBER_RESOURCE_MANAGER = ConfigOptions + .key("local.number-resourcemanager") + .defaultValue(1); + + public static final ConfigOption<Integer> IPC_PORT = ConfigOptions + .key("resourcemanager.rpc.port") + .defaultValue(0); + + /** + * Percentage of heap space to remove from containers (YARN / Mesos), to compensate + * for other JVM memory usage. + */ + public static final ConfigOption<Float> CONTAINERIZED_HEAP_CUTOFF_RATIO = ConfigOptions + .key("containerized.heap-cutoff-ratio") + .defaultValue(0.25f) + .withDeprecatedKeys("yarn.heap-cutoff-ratio"); + + /** + * Minimum amount of heap memory to remove in containers, as a safety margin. + */ + public static final ConfigOption<Integer> CONTAINERIZED_HEAP_CUTOFF_MIN = ConfigOptions + .key("containerized.heap-cutoff-min") + .defaultValue(600) + .withDeprecatedKeys("yarn.heap-cutoff-min"); + + /** + * Prefix for passing custom environment variables to Flink's master process. + * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set: + * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native" + * in the flink-conf.yaml. + */ + public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env."; + + /** + * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows + * setting custom environment variables for the workers (TaskManagers) + */ + public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env."; + // --------------------------------------------------------------------------------------------- /** Not intended to be instantiated */ http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java new file mode 100644 index 0000000..8616cad --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.configuration; + +import org.apache.flink.configuration.ConfigOption; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to mesos settings. + */ +public class MesosOptions { + + /** + * The initial number of Mesos tasks to allocate. + */ + public static final ConfigOption<Integer> INITIAL_TASKS = + key("mesos.initial-tasks") + .defaultValue(0); + + /** + * The maximum number of failed Mesos tasks before entirely stopping + * the Mesos session / job on Mesos. + * + * <p>By default, we take the number of initially requested tasks. + */ + public static final ConfigOption<Integer> MAX_FAILED_TASKS = + key("mesos.maximum-failed-tasks") + .defaultValue(-1); + + /** + * The Mesos master URL. + * + * <p>The value should be in one of the following forms: + * <pre> + * {@code + * host:port + * zk://host1:port1,host2:port2,.../path + * zk://username:password@host1:port1,host2:port2,.../path + * file:///path/to/file (where file contains one of the above) + * } + * </pre> + */ + public static final ConfigOption<String> MASTER_URL = + key("mesos.master") + .noDefaultValue(); + + /** + * The failover timeout for the Mesos scheduler, after which running tasks are automatically shut down. + */ + public static final ConfigOption<Integer> FAILOVER_TIMEOUT_SECONDS = + key("mesos.failover-timeout") + .defaultValue(600); + + /** + * The config parameter defining the Mesos artifact server port to use. + * Setting the port to 0 will let the OS choose an available port. + */ + public static final ConfigOption<Integer> ARTIFACT_SERVER_PORT = + key("mesos.resourcemanager.artifactserver.port") + .defaultValue(0); + + public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_NAME = + key("mesos.resourcemanager.framework.name") + .defaultValue("Flink"); + + public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_ROLE = + key("mesos.resourcemanager.framework.role") + .defaultValue("*"); + + public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_PRINCIPAL = + key("mesos.resourcemanager.framework.principal") + .noDefaultValue(); + + public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_SECRET = + key("mesos.resourcemanager.framework.secret") + .noDefaultValue(); + + public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_USER = + key("mesos.resourcemanager.framework.user") + .defaultValue(""); + + /** + * Config parameter to override SSL support for the Artifact Server. + */ + public static final ConfigOption<Boolean> ARTIFACT_SERVER_SSL_ENABLED = + key("mesos.resourcemanager.artifactserver.ssl.enabled") + .defaultValue(true); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index d4e2f0d..260b7f3 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -18,12 +18,12 @@ package org.apache.flink.mesos.runtime.clusterframework; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.mesos.configuration.MesosOptions; import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices; import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils; import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; @@ -264,8 +264,7 @@ public class MesosApplicationMasterRunner { // try to start the artifact server LOG.debug("Starting Artifact Server"); - final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY, - ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT); + final int artifactServerPort = config.getInteger(MesosOptions.ARTIFACT_SERVER_PORT); final String artifactServerPrefix = UUID.randomUUID().toString(); artifactServer = new MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, config); @@ -491,42 +490,38 @@ public class MesosApplicationMasterRunner { .setHostname(hostname); Protos.Credential.Builder credential = null; - if (!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) { - throw new IllegalConfigurationException(ConfigConstants.MESOS_MASTER_URL + " must be configured."); + if (!flinkConfig.contains(MesosOptions.MASTER_URL)) { + throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured."); } - String masterUrl = flinkConfig.getString(ConfigConstants.MESOS_MASTER_URL, null); + String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL); Duration failoverTimeout = FiniteDuration.apply( flinkConfig.getInteger( - ConfigConstants.MESOS_FAILOVER_TIMEOUT_SECONDS, - ConfigConstants.DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS), + MesosOptions.FAILOVER_TIMEOUT_SECONDS), TimeUnit.SECONDS); frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds()); frameworkInfo.setName(flinkConfig.getString( - ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_NAME, - ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME)); + MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME)); frameworkInfo.setRole(flinkConfig.getString( - ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE, - ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE)); + MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE)); frameworkInfo.setUser(flinkConfig.getString( - ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER, - ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER)); + MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER)); - if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) { + if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) { frameworkInfo.setPrincipal(flinkConfig.getString( - ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null)); + MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)); credential = Protos.Credential.newBuilder(); credential.setPrincipal(frameworkInfo.getPrincipal()); // some environments use a side-channel to communicate the secret to Mesos, // and thus don't set the 'secret' configuration setting - if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) { + if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) { credential.setSecret(flinkConfig.getString( - ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null)); + MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java index d6b5c9d..05d7e1f 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java @@ -19,9 +19,9 @@ package org.apache.flink.mesos.runtime.clusterframework; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.configuration.MesosOptions; import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; import org.apache.flink.mesos.scheduler.ConnectionMonitor; import org.apache.flink.mesos.scheduler.LaunchCoordinator; @@ -641,7 +641,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe String msg = "Stopping Mesos session because the number of failed tasks (" + failedTasksSoFar + ") exceeded the maximum failed tasks (" + maxFailedTasks + "). This number is controlled by the '" - + ConfigConstants.MESOS_MAX_FAILED_TASKS + "' configuration setting. " + + MesosOptions.MAX_FAILED_TASKS.key() + "' configuration setting. " + "By default its the number of requested tasks."; LOG.error(msg); @@ -757,18 +757,18 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe Logger log) { final int numInitialTaskManagers = flinkConfig.getInteger( - ConfigConstants.MESOS_INITIAL_TASKS, 0); + MesosOptions.INITIAL_TASKS); if (numInitialTaskManagers >= 0) { log.info("Mesos framework to allocate {} initial tasks", numInitialTaskManagers); } else { throw new IllegalConfigurationException("Invalid value for " + - ConfigConstants.MESOS_INITIAL_TASKS + ", which must be at least zero."); + MesosOptions.INITIAL_TASKS.key() + ", which must be at least zero."); } final int maxFailedTasks = flinkConfig.getInteger( - ConfigConstants.MESOS_MAX_FAILED_TASKS, numInitialTaskManagers); + MesosOptions.MAX_FAILED_TASKS.key(), numInitialTaskManagers); if (maxFailedTasks >= 0) { log.info("Mesos framework tolerates {} failed tasks before giving up", maxFailedTasks); http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java index 2627d25..3a6f77a 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java @@ -18,12 +18,12 @@ package org.apache.flink.mesos.util; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.mesos.configuration.MesosOptions; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; @@ -115,8 +115,7 @@ public class MesosArtifactServer implements MesosArtifactResolver { // Config to enable https access to the artifact server boolean enableSSL = config.getBoolean( - ConfigConstants.MESOS_ARTIFACT_SERVER_SSL_ENABLED, - ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED) && + MesosOptions.ARTIFACT_SERVER_SSL_ENABLED) && SSLUtils.getSSLEnabled(config); if (enableSSL) { http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java index af3f7ef..8bfb4d1 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java @@ -18,8 +18,8 @@ package org.apache.flink.mesos.runtime.clusterframework; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.configuration.MesosOptions; import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; import org.apache.flink.mesos.scheduler.ConnectionMonitor; import org.apache.flink.mesos.scheduler.LaunchCoordinator; @@ -105,8 +105,8 @@ public class MesosFlinkResourceManagerTest extends TestLogger { private static final long serialVersionUID = -952579203067648838L; { - setInteger(ConfigConstants.MESOS_MAX_FAILED_TASKS, -1); - setInteger(ConfigConstants.MESOS_INITIAL_TASKS, 0); + setInteger(MesosOptions.MAX_FAILED_TASKS, -1); + setInteger(MesosOptions.INITIAL_TASKS, 0); }}; @BeforeClass http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java index 9d679cf..7e9891f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.clusterframework; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.runtime.taskexecutor.TaskManagerServices; import java.util.HashMap; @@ -115,22 +115,20 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable { // (1) compute how much memory we subtract from the total memory, to get the Java memory final float memoryCutoffRatio = config.getFloat( - ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, - ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO); + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO); final int minCutoff = config.getInteger( - ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, - ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF); + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN); if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given=" + + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be between 0 and 1. Value given=" + memoryCutoffRatio); } if (minCutoff >= containerMemoryMB) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN + "'='" + minCutoff + + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "'='" + minCutoff + "' is larger than the total container memory " + containerMemoryMB); } @@ -147,7 +145,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable { // (3) obtain the additional environment variables from the configuration final HashMap<String, String> envVars = new HashMap<>(); - final String prefix = ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX; + final String prefix = ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX; for (String key : config.keySet()) { if (key.startsWith(prefix) && key.length() > prefix.length()) { http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index c5c87ac..6f13b9f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -28,7 +28,7 @@ import akka.actor.{ActorRef, ActorSystem} import com.typesafe.config.Config import org.apache.flink.api.common.time.Time import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult} -import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions, TaskManagerOptions} +import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions, ResourceManagerOptions, TaskManagerOptions} import org.apache.flink.core.fs.Path import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils} import org.apache.flink.runtime.client.{JobClient, JobExecutionException} @@ -168,8 +168,7 @@ abstract class FlinkMiniCluster( def getNumberOfResourceManagers: Int = { originalConfiguration.getInteger( - ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, - ConfigConstants.DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER + ResourceManagerOptions.LOCAL_NUMBER_RESOURCE_MANAGER ) } @@ -226,8 +225,8 @@ abstract class FlinkMiniCluster( if (useSingleActorSystem) { AkkaUtils.getAkkaConfig(originalConfiguration, None) } else { - val port = originalConfiguration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT) + val port = originalConfiguration.getInteger( + ResourceManagerOptions.IPC_PORT) val resolvedPort = if(port != 0) port + index else port http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 27a8ee1..0ae00a9 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -24,7 +24,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService} import akka.actor.{ActorRef, ActorSystem, Props} import org.apache.flink.api.common.JobID import org.apache.flink.api.common.io.FileOutputFormat -import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, TaskManagerOptions} +import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, ResourceManagerOptions, TaskManagerOptions} import org.apache.flink.core.fs.Path import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.clusterframework.FlinkResourceManager @@ -183,11 +183,11 @@ class LocalFlinkMiniCluster( val resourceManagerName = getResourceManagerName(index) val resourceManagerPort = config.getInteger( - ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT) + ResourceManagerOptions.IPC_PORT) if(resourceManagerPort > 0) { - config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index) + config.setInteger(ResourceManagerOptions.IPC_PORT, + resourceManagerPort + index) } val resourceManagerProps = getResourceManagerProps( http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java index 82a656c..275bcc9 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -20,6 +20,7 @@ package org.apache.flink.yarn; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Level; @@ -62,8 +63,8 @@ public class UtilsTest { @Test public void testHeapCutoff() { Configuration conf = new Configuration(); - conf.setDouble(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15); - conf.setInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, 384); + conf.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15F); + conf.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 384); Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf)); Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf)); @@ -71,14 +72,14 @@ public class UtilsTest { // test different configuration Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf)); - conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, "1000"); - conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.1"); + conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key(), "1000"); + conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.1"); Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf)); - conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.5"); + conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.5"); Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf)); - conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1"); + conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); // test also deprecated keys @@ -93,21 +94,21 @@ public class UtilsTest { @Test(expected = IllegalArgumentException.class) public void illegalArgument() { Configuration conf = new Configuration(); - conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1.1"); + conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1.1"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } @Test(expected = IllegalArgumentException.class) public void illegalArgumentNegative() { Configuration conf = new Configuration(); - conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "-0.01"); + conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "-0.01"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } @Test(expected = IllegalArgumentException.class) public void tooMuchCutoff() { Configuration conf = new Configuration(); - conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "6000"); + conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "6000"); Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); } http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index 2e88836..d85aa97 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.yarn; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.client.JobClient; @@ -26,6 +25,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.flink.yarn.configuration.YarnConfigOptions; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -145,7 +145,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { "-nm", "customName", "-Dfancy-configuration-value=veryFancy", "-Dyarn.maximum-failed-containers=3", - "-D" + ConfigConstants.YARN_VCORES + "=2"}, + "-D" + YarnConfigOptions.VCORES.key() + "=2"}, "Number of connected TaskManagers changed to 1. Slots available: 3", RunTypes.YARN_SESSION); @@ -186,7 +186,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value")); Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers")); - Assert.assertEquals("2", parsedConfig.get(ConfigConstants.YARN_VCORES)); + Assert.assertEquals("2", parsedConfig.get(YarnConfigOptions.VCORES.key())); // -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface // first, get the hostname/port http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index dfc8b6a..55dc47f 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -253,7 +253,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor // The number of cores can be configured in the config. // If not configured, it is set to the number of task slots int numYarnVcores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES); - int configuredVcores = flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, clusterSpecification.getSlotsPerTaskManager()); + int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager()); // don't configure more than the maximum configured number of vcores if (configuredVcores > numYarnVcores) { throw new IllegalConfigurationException( @@ -261,7 +261,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor " but Yarn only has %d virtual cores available. Please note that the number" + " of virtual cores is set to the number of task slots by default unless configured" + " in the Flink config with '%s.'", - configuredVcores, numYarnVcores, ConfigConstants.YARN_VCORES)); + configuredVcores, numYarnVcores, YarnConfigOptions.VCORES.key())); } // check if required Hadoop environment variables are set. If not, warn user @@ -677,7 +677,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor // activate re-execution of failed applications appContext.setMaxAppAttempts( flinkConfiguration.getInteger( - ConfigConstants.YARN_APPLICATION_ATTEMPTS, + YarnConfigOptions.APPLICATION_ATTEMPTS.key(), YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); activateHighAvailabilitySupport(appContext); @@ -685,7 +685,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor // set number of application retries to 1 in the default case appContext.setMaxAppAttempts( flinkConfiguration.getInteger( - ConfigConstants.YARN_APPLICATION_ATTEMPTS, + YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1)); } @@ -1135,7 +1135,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor IllegalAccessException { final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance(); - final String tagsString = flinkConfiguration.getString(ConfigConstants.YARN_APPLICATION_TAGS, ""); + final String tagsString = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS); final Set<String> applicationTags = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 662617f..98d27ab 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -18,7 +18,7 @@ package org.apache.flink.yarn; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.security.SecurityUtils; @@ -82,24 +82,17 @@ public final class Utils { */ public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) { - BootstrapTools.substituteDeprecatedConfigKey(conf, - ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO); - BootstrapTools.substituteDeprecatedConfigKey(conf, - ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN); - - float memoryCutoffRatio = conf.getFloat(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, - ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO); - int minCutoff = conf.getInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, - ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF); + float memoryCutoffRatio = conf.getFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO); + int minCutoff = conf.getInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN); if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO + + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be between 0 and 1. Value given=" + memoryCutoffRatio); } if (minCutoff > memory) { throw new IllegalArgumentException("The configuration value '" - + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN + + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "' is higher (" + minCutoff + ") than the requested amount of memory " + memory); } http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index dccbb71..e951df4 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; @@ -43,6 +44,7 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.runtime.webmonitor.WebMonitor; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.flink.yarn.configuration.YarnConfigOptions; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -299,8 +301,7 @@ public class YarnApplicationMasterRunner { // try to start the actor system, JobManager and JobManager actor system // using the port range definition from the config. final String amPortRange = config.getString( - ConfigConstants.YARN_APPLICATION_MASTER_PORT, - ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT); + YarnConfigOptions.APPLICATION_MASTER_PORT); actorSystem = BootstrapTools.startActorSystem(config, appMasterHostname, amPortRange, LOG); @@ -518,21 +519,13 @@ public class YarnApplicationMasterRunner { // corresponding generic config keys instead. that way, later code needs not // deal with deprecated config keys - BootstrapTools.substituteDeprecatedConfigKey(configuration, - ConfigConstants.YARN_HEAP_CUTOFF_RATIO, - ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO); - - BootstrapTools.substituteDeprecatedConfigKey(configuration, - ConfigConstants.YARN_HEAP_CUTOFF_MIN, - ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN); - BootstrapTools.substituteDeprecatedConfigPrefix(configuration, ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX, - ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX); + ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX); BootstrapTools.substituteDeprecatedConfigPrefix(configuration, ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX, - ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX); + ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX); return configuration; } http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java index 66e44a6..4d8142f 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java @@ -18,7 +18,6 @@ package org.apache.flink.yarn; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.ApplicationStatus; @@ -28,6 +27,7 @@ import org.apache.flink.runtime.clusterframework.messages.StopCluster; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.util.Preconditions; +import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.flink.yarn.messages.ContainersAllocated; import org.apache.flink.yarn.messages.ContainersComplete; @@ -337,7 +337,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar // Resource requirements for worker containers int taskManagerSlots = taskManagerParameters.numSlots(); - int vcores = config.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1)); + int vcores = config.getInteger(YarnConfigOptions.VCORES, Math.max(taskManagerSlots, 1)); Resource capability = Resource.newInstance(containerMemorySizeMB, vcores); resourceManagerClient.addContainerRequest( @@ -550,7 +550,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar String msg = "Stopping YARN session because the number of failed containers (" + failedContainersSoFar + ") exceeded the maximum failed containers (" + maxFailedContainers + "). This number is controlled by the '" - + ConfigConstants.YARN_MAX_FAILED_CONTAINERS + "' configuration setting. " + + YarnConfigOptions.MAX_FAILED_CONTAINERS.key() + "' configuration setting. " + "By default its the number of requested containers."; LOG.error(msg); @@ -710,7 +710,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar Logger log) { final int yarnHeartbeatIntervalMS = flinkConfig.getInteger( - ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000; + YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000; final long yarnExpiryIntervalMS = yarnConfig.getLong( YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, @@ -723,7 +723,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar } final int maxFailedContainers = flinkConfig.getInteger( - ConfigConstants.YARN_MAX_FAILED_CONTAINERS, numInitialTaskManagers); + YarnConfigOptions.MAX_FAILED_CONTAINERS.key(), numInitialTaskManagers); if (maxFailedContainers >= 0) { log.info("YARN application tolerates {} failed TaskManager containers before giving up", maxFailedContainers); http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 8327b6a..fb1a1c3 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.Container; @@ -134,7 +135,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements this.yarnConfig = new YarnConfiguration(); this.env = env; final int yarnHeartbeatIntervalMS = flinkConfig.getInteger( - ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000; + YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000; final long yarnExpiryIntervalMS = yarnConfig.getLong( YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 5d8abac..f2968b1 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -37,6 +37,7 @@ import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterClient; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptorV2; +import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -773,7 +774,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); String currentUser = System.getProperty("user.name"); String propertiesFileLocation = - conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); + conf.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser); } http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java index 28ef2ab..3773352 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java @@ -52,6 +52,71 @@ public class YarnConfigOptions { key("yarn.per-job-cluster.include-user-jar") .defaultValue("ORDER"); + /** + * The vcores exposed by YARN. + */ + public static final ConfigOption<Integer> VCORES = + key("yarn.containers.vcores") + .defaultValue(-1); + + /** + * The maximum number of failed YARN containers before entirely stopping + * the YARN session / job on YARN. + * By default, we take the number of of initially requested containers. + * + * <p>Note: This option returns a String since Integer options must have a static default value. + */ + public static final ConfigOption<String> MAX_FAILED_CONTAINERS = + key("yarn.maximum-failed-containers") + .noDefaultValue(); + + /** + * Set the number of retries for failed YARN ApplicationMasters/JobManagers in high + * availability mode. This value is usually limited by YARN. + * By default, it's 1 in the standalone case and 2 in the high availability case. + * + * <p>>Note: This option returns a String since Integer options must have a static default value. + */ + public static final ConfigOption<String> APPLICATION_ATTEMPTS = + key("yarn.application-attempts") + .noDefaultValue(); + + /** + * The heartbeat interval between the Application Master and the YARN Resource Manager. + */ + public static final ConfigOption<Integer> HEARTBEAT_DELAY_SECONDS = + key("yarn.heartbeat-delay") + .defaultValue(5); + + /** + * When a Flink job is submitted to YARN, the JobManager's host and the number of available + * processing slots is written into a properties file, so that the Flink client is able + * to pick those details up. + * This configuration parameter allows changing the default location of that file (for example + * for environments sharing a Flink installation between users) + */ + public static final ConfigOption<String> PROPERTIES_FILE_LOCATION = + key("yarn.properties-file.location") + .noDefaultValue(); + + /** + * The config parameter defining the Akka actor system port for the ApplicationMaster and + * JobManager. + * The port can either be a port, such as "9123", + * a range of ports: "50100-50200" + * or a list of ranges and or points: "50100-50200,50300-50400,51234". + * Setting the port to 0 will let the OS choose an available port. + */ + public static final ConfigOption<String> APPLICATION_MASTER_PORT = + key("yarn.application-master.port") + .defaultValue("0"); + + /** + * A comma-separated list of strings to use as YARN application tags. + */ + public static final ConfigOption<String> APPLICATION_TAGS = + key("yarn.tags") + .defaultValue(""); // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java index 9ead775..e8fccac 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.security.SecurityContext; @@ -112,21 +113,13 @@ public class YarnEntrypointUtils { // corresponding generic config keys instead. that way, later code needs not // deal with deprecated config keys - BootstrapTools.substituteDeprecatedConfigKey(configuration, - ConfigConstants.YARN_HEAP_CUTOFF_RATIO, - ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO); - - BootstrapTools.substituteDeprecatedConfigKey(configuration, - ConfigConstants.YARN_HEAP_CUTOFF_MIN, - ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN); - BootstrapTools.substituteDeprecatedConfigPrefix(configuration, ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX, - ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX); + ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX); BootstrapTools.substituteDeprecatedConfigPrefix(configuration, ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX, - ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX); + ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX); final String keytabPath; http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index d78b390..a2d1668 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -22,7 +22,7 @@ import java.io.IOException import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit} import akka.actor.ActorRef -import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration} +import org.apache.flink.configuration.{Configuration => FlinkConfiguration} import org.apache.flink.core.fs.Path import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.clusterframework.ContaineredJobManager @@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} import org.apache.flink.runtime.leaderelection.LeaderElectionService import org.apache.flink.runtime.metrics.MetricRegistry +import org.apache.flink.yarn.configuration.YarnConfigOptions import scala.concurrent.duration._ import scala.language.postfixOps @@ -88,7 +89,7 @@ class YarnJobManager( val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds val YARN_HEARTBEAT_DELAY: FiniteDuration = FiniteDuration( - flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5), + flinkConfiguration.getInteger(YarnConfigOptions.HEARTBEAT_DELAY_SECONDS), TimeUnit.SECONDS) val yarnFilesPath: Option[String] = Option(System.getenv().get(YarnConfigKeys.FLINK_YARN_FILES)) http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java index bcb8559..19d1af5 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.util.TestLogger; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -87,7 +88,7 @@ public class YarnClusterDescriptorTest extends TestLogger { public void testConfigOverwrite() { Configuration configuration = new Configuration(); // overwrite vcores in config - configuration.setInteger(ConfigConstants.YARN_VCORES, Integer.MAX_VALUE); + configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE); YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor( configuration,