This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f24495c0df56473c850605d9e34baf2f7464be1
Author: Andrey Zagrebin <azagre...@apache.org>
AuthorDate: Wed Nov 6 09:59:32 2019 +0100

    Fix yarn cut off
---
 .../client/deployment/ClusterSpecification.java    |   6 +-
 .../clusterframework/TaskExecutorResourceSpec.java |   4 +
 .../TaskExecutorResourceUtils.java                 |   4 +-
 .../ActiveResourceManagerFactory.java              |  23 +---
 .../ActiveResourceManagerFactoryTest.java          |  97 --------------
 .../flink/yarn/CliFrontendRunWithYarnTest.java     |   3 +-
 .../apache/flink/yarn/YarnConfigurationITCase.java |  28 +---
 .../flink/yarn/YarnClusterClientFactory.java       |   6 +-
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  18 +--
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |   2 +-
 .../apache/flink/yarn/FlinkYarnSessionCliTest.java | 142 +++++++--------------
 11 files changed, 73 insertions(+), 260 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
index 72975d8..0d8d105 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.deployment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 
 /**
  * Description of the cluster to start by the {@link ClusterDescriptor}.
@@ -68,7 +69,10 @@ public final class ClusterSpecification {
                int slots = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 
                int jobManagerMemoryMb = 
ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
-               int taskManagerMemoryMb = 
ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
+               int taskManagerMemoryMb = TaskExecutorResourceUtils
+                       .resourceSpecFromConfig(configuration)
+                       .getTotalProcessMemorySize()
+                       .getMebiBytes();
 
                return new ClusterSpecificationBuilder()
                        .setMasterMemoryMB(jobManagerMemoryMb)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
index d6cbe5b..d73e7b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
@@ -156,6 +156,10 @@ public class TaskExecutorResourceSpec implements 
java.io.Serializable {
                return 
getTotalFlinkMemorySize().add(jvmMetaspaceSize).add(jvmOverheadSize);
        }
 
+       public MemorySize getHeapSize() {
+               return 
frameworkHeapSize.add(taskHeapSize).add(onHeapManagedMemorySize);
+       }
+
        @Override
        public String toString() {
                return "TaskExecutorResourceSpec {"
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index 4b649e4..9c69b62 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -46,9 +46,7 @@ public class TaskExecutorResourceUtils {
        // 
------------------------------------------------------------------------
 
        public static String generateJvmParametersStr(final 
TaskExecutorResourceSpec taskExecutorResourceSpec) {
-               final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getFrameworkHeapSize()
-                       .add(taskExecutorResourceSpec.getTaskHeapSize())
-                       
.add(taskExecutorResourceSpec.getOnHeapManagedMemorySize());
+               final MemorySize jvmHeapSize = 
taskExecutorResourceSpec.getHeapSize();
                final MemorySize jvmDirectSize = 
taskExecutorResourceSpec.getTaskOffHeapSize()
                        .add(taskExecutorResourceSpec.getShuffleMemSize());
                final MemorySize jvmMetaspaceSize = 
taskExecutorResourceSpec.getJvmMetaspaceSize();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
index d292e5a..7444e23 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
@@ -19,9 +19,6 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.configuration.TaskManagerOptions;
-import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -30,17 +27,11 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 
 import javax.annotation.Nullable;
 
 /**
  * Resource manager factory which creates active {@link ResourceManager} 
implementations.
- *
- * <p>The default implementation will call {@link 
#createActiveResourceManagerConfiguration}
- * to create a new configuration which is configured with active resource 
manager relevant
- * configuration options.
- *
  * @param <T> type of the {@link ResourceIDRetrievable}
  */
 public abstract class ActiveResourceManagerFactory<T extends 
ResourceIDRetrievable> implements ResourceManagerFactory<T> {
@@ -57,7 +48,7 @@ public abstract class ActiveResourceManagerFactory<T extends 
ResourceIDRetrievab
                        @Nullable String webInterfaceUrl,
                        ResourceManagerMetricGroup resourceManagerMetricGroup) 
throws Exception {
                return createActiveResourceManager(
-                       createActiveResourceManagerConfiguration(configuration),
+                       configuration,
                        resourceId,
                        rpcService,
                        highAvailabilityServices,
@@ -68,18 +59,6 @@ public abstract class ActiveResourceManagerFactory<T extends 
ResourceIDRetrievab
                        resourceManagerMetricGroup);
        }
 
-       public static Configuration 
createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
-               final int taskManagerMemoryMB = 
ConfigurationUtils.getTaskManagerHeapMemory(originalConfiguration).getMebiBytes();
-               final long cutoffMB = 
ContaineredTaskManagerParameters.calculateCutoffMB(originalConfiguration, 
taskManagerMemoryMB);
-               final long processMemoryBytes = (taskManagerMemoryMB - 
cutoffMB) << 20; // megabytes to bytes
-               final long managedMemoryBytes = 
TaskManagerServices.getManagedMemoryFromProcessMemory(originalConfiguration, 
processMemoryBytes);
-
-               final Configuration resourceManagerConfig = new 
Configuration(originalConfiguration);
-               
resourceManagerConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, 
managedMemoryBytes + "b");
-
-               return resourceManagerConfig;
-       }
-
        protected abstract ResourceManager<T> createActiveResourceManager(
                Configuration configuration,
                ResourceID resourceId,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
deleted file mode 100644
index ff61e65..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-/**
- * Tests for the {@link ActiveResourceManagerFactory}.
- */
-public class ActiveResourceManagerFactoryTest extends TestLogger {
-
-       /**
-        * Test which ensures that the {@link ActiveResourceManagerFactory} 
sets the correct managed
-        * memory when creating a resource manager.
-        */
-       @Test
-       public void 
createResourceManager_WithDefaultConfiguration_ShouldSetManagedMemory() throws 
Exception {
-               final Configuration configuration = new Configuration();
-
-               final TestingActiveResourceManagerFactory 
resourceManagerFactory = new TestingActiveResourceManagerFactory();
-
-               final TestingRpcService rpcService = new TestingRpcService();
-
-               try {
-                       final ResourceManager<ResourceID> ignored = 
resourceManagerFactory.createResourceManager(
-                               configuration,
-                               ResourceID.generate(),
-                               rpcService,
-                               new TestingHighAvailabilityServices(),
-                               new TestingHeartbeatServices(),
-                               new TestingFatalErrorHandler(),
-                               new ClusterInformation("foobar", 1234),
-                               null,
-                               
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());
-               } finally {
-                       RpcUtils.terminateRpcService(rpcService, 
Time.seconds(10L));
-               }
-       }
-
-       private static final class TestingActiveResourceManagerFactory extends 
ActiveResourceManagerFactory<ResourceID> {
-
-               @Override
-               protected ResourceManager<ResourceID> 
createActiveResourceManager(
-                               Configuration configuration,
-                               ResourceID resourceId,
-                               RpcService rpcService,
-                               HighAvailabilityServices 
highAvailabilityServices,
-                               HeartbeatServices heartbeatServices,
-                               FatalErrorHandler fatalErrorHandler,
-                               ClusterInformation clusterInformation,
-                               @Nullable String webInterfaceUrl,
-                               ResourceManagerMetricGroup 
resourceManagerMetricGroup) {
-                       
assertThat(configuration.contains(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE),
 is(true));
-
-                       return null;
-               }
-       }
-}
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
index 4e7ece3..4c83115 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
@@ -38,6 +38,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
+import static 
org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
 
@@ -66,7 +67,7 @@ public class CliFrontendRunWithYarnTest extends 
CliFrontendTestBase {
        public void testRun() throws Exception {
                String testJarPath = 
getTestJarPath("BatchWordCount.jar").getAbsolutePath();
 
-               Configuration configuration = new Configuration();
+               Configuration configuration = 
adjustMemoryConfigurationForLocalExecution(new Configuration());
                configuration.setString(JobManagerOptions.ADDRESS, "localhost");
                configuration.setInteger(JobManagerOptions.PORT, 8081);
 
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index dbad597..36f90e4 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -24,13 +24,11 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.RestClientConfiguration;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -84,8 +82,11 @@ public class YarnConfigurationITCase extends YarnTestBase {
                        final YarnClient yarnClient = getYarnClient();
                        final Configuration configuration = new 
Configuration(flinkConfiguration);
 
+                       final TaskExecutorResourceSpec spec = 
TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
                        final int masterMemory = 64;
-                       final int taskManagerMemory = 512;
+                       final int taskManagerMemory = 
spec.getTotalProcessMemorySize().getMebiBytes();
+                       final long expectedHeapBytes = 
spec.getHeapSize().getBytes();
+                       final int expectedManagedMemoryMB = 
spec.getManagedMemorySize().getMebiBytes();
                        final int slotsPerTaskManager = 3;
 
                        // disable heap cutoff min
@@ -175,23 +176,13 @@ public class YarnConfigurationITCase extends YarnTestBase 
{
 
                                        
assertThat(taskManagerInfo.getNumberSlots(), is(slotsPerTaskManager));
 
-                                       final ContaineredTaskManagerParameters 
containeredTaskManagerParameters = ContaineredTaskManagerParameters.create(
-                                               configuration,
-                                               null,
-                                               taskManagerMemory,
-                                               slotsPerTaskManager);
-
-                                       final long expectedHeadSize = 
containeredTaskManagerParameters.taskManagerHeapSizeMB() << 20L;
-
                                        // We compare here physical memory 
assigned to a container with the heap memory that we should pass to
                                        // jvm as Xmx parameter. Those value 
might differ significantly due to system page size or jvm
                                        // implementation therefore we use 15% 
threshold here.
                                        assertThat(
-                                               (double) 
taskManagerInfo.getHardwareDescription().getSizeOfJvmHeap() / (double) 
expectedHeadSize,
+                                               (double) 
taskManagerInfo.getHardwareDescription().getSizeOfJvmHeap() / expectedHeapBytes,
                                                is(closeTo(1.0, 0.15)));
 
-                                       final int expectedManagedMemoryMB = 
calculateManagedMemorySizeMB(configuration);
-
                                        assertThat((int) 
(taskManagerInfo.getHardwareDescription().getSizeOfManagedMemory() >> 20), 
is(expectedManagedMemoryMB));
                                } finally {
                                        restClient.shutdown(TIMEOUT);
@@ -214,9 +205,4 @@ public class YarnConfigurationITCase extends YarnTestBase {
                        return taskManagerInfo.getNumberSlots() > 0;
                }
        }
-
-       private static int calculateManagedMemorySizeMB(Configuration 
configuration) {
-               Configuration resourceManagerConfig = 
ActiveResourceManagerFactory.createActiveResourceManagerConfiguration(configuration);
-               return 
MemorySize.parse(resourceManagerConfig.getString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE)).getMebiBytes();
-       }
 }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index aa138a7..6ea7d63 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -70,7 +71,10 @@ public class YarnClusterClientFactory implements 
ClusterClientFactory<Applicatio
                final int jobManagerMemoryMB = 
ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
 
                // Task Managers memory
-               final int taskManagerMemoryMB = 
ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
+               final int taskManagerMemoryMB = TaskExecutorResourceUtils
+                       .resourceSpecFromConfig(configuration)
+                       .getTotalProcessMemorySize()
+                       .getMebiBytes();
 
                int slotsPerTaskManager = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 74207ff..14005f4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -40,11 +40,10 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.plugin.PluginConfig;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
@@ -458,16 +457,11 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         * @throws FlinkException if the cluster cannot be started with the 
provided {@link ClusterSpecification}
         */
        private void validateClusterSpecification(ClusterSpecification 
clusterSpecification) throws FlinkException {
+               
flinkConfiguration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
clusterSpecification.getTaskManagerMemoryMB() + "m");
                try {
-                       final long taskManagerMemorySize = 
clusterSpecification.getTaskManagerMemoryMB();
-                       // We do the validation by calling the calculation 
methods here
-                       // Internally these methods will check whether the 
cluster can be started with the provided
-                       // ClusterSpecification and the configured memory 
requirements
-                       final long cutoff = 
ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, 
taskManagerMemorySize);
-                       
TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, 
flinkConfiguration);
+                       
TaskExecutorResourceUtils.resourceSpecFromConfig(flinkConfiguration);
                } catch (IllegalArgumentException iae) {
-                       throw new FlinkException("Cannot fulfill the minimum 
memory requirements with the provided " +
-                                       "cluster specification. Please increase 
the memory of the cluster.", iae);
+                       throw new FlinkException("Inconsistent cluster 
specification.", iae);
                }
        }
 
@@ -834,10 +828,6 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                                TaskManagerOptions.NUM_TASK_SLOTS,
                                clusterSpecification.getSlotsPerTaskManager());
 
-               configuration.setString(
-                               TaskManagerOptions.TOTAL_PROCESS_MEMORY,
-                               clusterSpecification.getTaskManagerMemoryMB() + 
"m");
-
                // Upload the flink configuration
                // write out configuration file
                File tmpConfigurationFile = File.createTempFile(appId + 
"-flink-conf.yaml", null);
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 0efa610..0e7df5b 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -374,7 +374,7 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine {
                        if (!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)) {
                                tmMemoryVal += "m";
                        }
-                       
effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, 
tmMemoryVal);
+                       
effectiveConfiguration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
tmMemoryVal);
                }
 
                if (commandLine.hasOption(slots.getOpt())) {
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index a357126..9376533 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -107,11 +107,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                String[] params =
                        new String[] {"-ys", "3"};
 
-               FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
-                       new Configuration(),
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               FlinkYarnSessionCli yarnCLI = 
createFlinkYarnSessionCliWithTmTotalMemory(1024);
 
                final CommandLine commandLine = 
yarnCLI.parseCommandLineOptions(params, true);
 
@@ -129,11 +125,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                String[] params =
                        new String[] {"-yd"};
 
-               FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
-                       new Configuration(),
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCli();
 
                final CommandLine commandLine = 
yarnCLI.parseCommandLineOptions(params, true);
 
@@ -151,11 +143,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
                String[] params = new String[] {"-yz", zkNamespaceCliInput};
 
-               FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
-                       new Configuration(),
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCli();
 
                CommandLine commandLine = 
yarnCLI.parseCommandLineOptions(params, true);
 
@@ -172,11 +160,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
                String[] params = new String[] {"-ynl", nodeLabelCliInput };
 
-               FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
-                       new Configuration(),
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCli();
 
                CommandLine commandLine = 
yarnCLI.parseCommandLineOptions(params, true);
 
@@ -198,11 +182,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                final Configuration configuration = new Configuration();
                
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, 
directoryPath.getAbsolutePath());
 
-               final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
-                       configuration,
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCli(configuration);
 
                final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true);
 
@@ -225,21 +205,12 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                final Configuration configuration = new Configuration();
                
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, 
directoryPath.getAbsolutePath());
 
-               new FlinkYarnSessionCli(
-                       configuration,
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               createFlinkYarnSessionCli(configuration);
        }
 
        @Test
        public void testResumeFromYarnID() throws Exception {
-               final Configuration configuration = new Configuration();
-               final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
-                       configuration,
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCli();
 
                final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", 
TEST_YARN_APPLICATION_ID.toString()}, true);
 
@@ -252,12 +223,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
        @Test
        public void testResumeFromYarnIDZookeeperNamespace() throws Exception {
-               final Configuration configuration = new Configuration();
-               final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
-                       configuration,
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCli();
 
                final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", 
TEST_YARN_APPLICATION_ID.toString()}, true);
 
@@ -273,12 +239,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
        @Test
        public void testResumeFromYarnIDZookeeperNamespaceOverride() throws 
Exception {
-               final Configuration configuration = new Configuration();
-               final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
-                       configuration,
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCli();
 
                final String overrideZkNamespace = "my_cluster";
 
@@ -300,11 +261,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                final Configuration configuration = new Configuration();
                
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, 
directoryPath.getAbsolutePath());
 
-               final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
-                       configuration,
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCli(configuration);
                final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", 
TEST_YARN_APPLICATION_ID_2.toString() }, true);
 
                final Configuration executorConfig = 
flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
@@ -326,15 +283,11 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                final int slotsPerTaskManager = 30;
 
                
configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
jobManagerMemory + "m");
-               
configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, 
taskManagerMemory + "m");
+               
configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
taskManagerMemory + "m");
                configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
slotsPerTaskManager);
 
                final String[] args = {"-yjm", String.valueOf(jobManagerMemory) 
+ "m", "-ytm", String.valueOf(taskManagerMemory) + "m", "-ys", 
String.valueOf(slotsPerTaskManager)};
-               final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
-                       configuration,
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCli(configuration);
 
                CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
@@ -357,16 +310,12 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                final int jobManagerMemory = 1337;
                
configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
jobManagerMemory + "m");
                final int taskManagerMemory = 7331;
-               
configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, 
taskManagerMemory + "m");
+               
configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
taskManagerMemory + "m");
                final int slotsPerTaskManager = 42;
                configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
slotsPerTaskManager);
 
                final String[] args = {};
-               final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
-                       configuration,
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCli(configuration);
 
                CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
@@ -385,11 +334,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
        @Test
        public void testHeapMemoryPropertyWithoutUnit() throws Exception {
                final String[] args = new String[] { "-yjm", "1024", "-ytm", 
"2048" };
-               final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
-                       new Configuration(),
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCli();
 
                final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
@@ -407,11 +352,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
        @Test
        public void testHeapMemoryPropertyWithUnitMB() throws Exception {
                final String[] args = new String[] { "-yjm", "1024m", "-ytm", 
"2048m" };
-               final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
-                       new Configuration(),
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCli();
                final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
                final Configuration executorConfig = 
flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
@@ -428,11 +369,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
        @Test
        public void testHeapMemoryPropertyWithArbitraryUnit() throws Exception {
                final String[] args = new String[] { "-yjm", "1g", "-ytm", "2g" 
};
-               final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
-                       new Configuration(),
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCli();
                final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
                final Configuration executorConfig = 
flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
@@ -452,11 +389,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                
configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
                
configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
 
-               final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
-                       configuration,
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCli(configuration);
 
                final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
 
@@ -469,15 +402,12 @@ public class FlinkYarnSessionCliTest extends TestLogger {
        }
 
        /**
-        * Tests the specifying heap memory with config default value for job 
manager and task manager.
+        * Tests the specifying job manager heap memory with config default 
value for job manager and task manager.
         */
        @Test
-       public void testHeapMemoryPropertyWithConfigDefaultValue() throws 
Exception {
-               final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
-                       new Configuration(),
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+       public void testJobManagerHeapMemoryPropertyWithConfigDefaultValue() 
throws Exception {
+               int totalMemomory = 1024;
+               final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCliWithTmTotalMemory(totalMemomory);
 
                final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
 
@@ -485,18 +415,14 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                final ClusterClientFactory<ApplicationId> clientFactory = 
getClusterClientFactory(executorConfig);
                final ClusterSpecification clusterSpecification = 
clientFactory.getClusterSpecification(executorConfig);
 
-               assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
-               assertThat(clusterSpecification.getTaskManagerMemoryMB(), 
is(1024));
+               assertThat(clusterSpecification.getMasterMemoryMB(), 
is(totalMemomory));
+               assertThat(clusterSpecification.getTaskManagerMemoryMB(), 
is(totalMemomory));
        }
 
        @Test
        public void testMultipleYarnShipOptions() throws Exception {
                final String[] args = new String[]{"run", "--yarnship", 
tmp.newFolder().getAbsolutePath(), "--yarnship", 
tmp.newFolder().getAbsolutePath()};
-               final FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
-                       new Configuration(),
-                       tmp.getRoot().getAbsolutePath(),
-                       "y",
-                       "yarn");
+               final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCli();
 
                final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(args, false);
 
@@ -527,4 +453,22 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
                return tmpFolder.getAbsoluteFile();
        }
+
+       private FlinkYarnSessionCli createFlinkYarnSessionCli() throws 
FlinkException {
+               return createFlinkYarnSessionCli(new Configuration());
+       }
+
+       private FlinkYarnSessionCli 
createFlinkYarnSessionCliWithTmTotalMemory(int totalMemomory) throws 
FlinkException {
+               Configuration configuration = new Configuration();
+               
configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemomory 
+ "m");
+               return createFlinkYarnSessionCli(configuration);
+       }
+
+       private FlinkYarnSessionCli createFlinkYarnSessionCli(Configuration 
configuration) throws FlinkException {
+               return new FlinkYarnSessionCli(
+                       configuration,
+                       tmp.getRoot().getAbsolutePath(),
+                       "y",
+                       "yarn");
+       }
 }

Reply via email to