This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd73130c0bf036bf4d04c9b2102e2b6fd1f908ce
Author: Andrey Zagrebin <azagre...@apache.org>
AuthorDate: Wed Nov 6 16:24:08 2019 +0100

    Treat legacy TM heap size as total process memory, not flink memory
---
 .../flink/configuration/TaskManagerOptions.java    |  2 +-
 .../TaskExecutorResourceUtils.java                 | 27 +++++++++++-----------
 .../TaskExecutorResourceUtilsTest.java             | 16 ++++++-------
 3 files changed, 23 insertions(+), 22 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 7d1492c..d717ad5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -253,6 +253,7 @@ public class TaskManagerOptions {
        public static final ConfigOption<String> TOTAL_PROCESS_MEMORY =
                key("taskmanager.memory.total-process.size")
                        .noDefaultValue()
+                       .withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key())
                        .withDescription("Total Process Memory size for the 
TaskExecutors. This includes all the memory that a"
                                + " TaskExecutor consumes, consisting of Total 
Flink Memory, JVM Metaspace, and JVM Overhead. On"
                                + " containerized setups, this should be set to 
the container memory.");
@@ -264,7 +265,6 @@ public class TaskManagerOptions {
        public static final ConfigOption<String> TOTAL_FLINK_MEMORY =
                key("taskmanager.memory.total-flink.size")
                .noDefaultValue()
-               .withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key())
                .withDescription("Total Flink Memory size for the 
TaskExecutors. This includes all the memory that a"
                        + " TaskExecutor consumes, except for JVM Metaspace and 
JVM Overhead. It consists of Framework Heap Memory,"
                        + " Task Heap Memory, Task Off-Heap Memory, Managed 
Memory, and Shuffle Memory.");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index 83d2d7d..f2a275f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -408,8 +408,13 @@ public class TaskExecutorResourceUtils {
 
        private static MemorySize getTotalFlinkMemorySize(final Configuration 
config) {
                
checkArgument(isTotalFlinkMemorySizeExplicitlyConfigured(config));
-               if (config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY)) {
-                       return 
MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_FLINK_MEMORY));
+               return 
MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_FLINK_MEMORY));
+       }
+
+       private static MemorySize getTotalProcessMemorySize(final Configuration 
config) {
+               
checkArgument(isTotalProcessMemorySizeExplicitlyConfigured(config));
+               if (config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)) {
+                       return 
MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
                } else {
                        @SuppressWarnings("deprecation")
                        final long legacyHeapMemoryMB = 
config.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB);
@@ -417,11 +422,6 @@ public class TaskExecutorResourceUtils {
                }
        }
 
-       private static MemorySize getTotalProcessMemorySize(final Configuration 
config) {
-               
checkArgument(isTotalProcessMemorySizeExplicitlyConfigured(config));
-               return 
MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
-       }
-
        private static boolean isTaskHeapMemorySizeExplicitlyConfigured(final 
Configuration config) {
                return config.contains(TaskManagerOptions.TASK_HEAP_MEMORY);
        }
@@ -454,15 +454,16 @@ public class TaskExecutorResourceUtils {
        }
 
        private static boolean isTotalFlinkMemorySizeExplicitlyConfigured(final 
Configuration config) {
-               // backward compatible with the deprecated config option 
TASK_MANAGER_HEAP_MEMORY_MB only when it's explicitly
-               // configured by the user
-               @SuppressWarnings("deprecation")
-               final boolean legacyConfigured = 
config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB);
-               return config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY) 
|| legacyConfigured;
+               return config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY);
        }
 
        private static boolean 
isTotalProcessMemorySizeExplicitlyConfigured(final Configuration config) {
-               return config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY);
+               // backward compatible with the deprecated config options 
TASK_MANAGER_HEAP_MEMORY and TASK_MANAGER_HEAP_MEMORY_MB
+               // only when they are explicitly configured by the user
+               @SuppressWarnings("deprecation")
+               final boolean legacyConfigured =
+                       
config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) || 
config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+               return config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY) 
|| legacyConfigured;
        }
 
        private static void sanityCheckTotalFlinkMemory(final Configuration 
config, final FlinkInternalMemory flinkInternalMemory) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
index 432c1af..79ea059 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
@@ -457,17 +457,17 @@ public class TaskExecutorResourceUtilsTest extends 
TestLogger {
        }
 
        @Test
-       public void testConfigTotalFlinkMemoryLegacyMB() {
-               final MemorySize totalFlinkMemorySize = MemorySize.parse("1g");
+       public void testConfigTotalProcessMemoryLegacyMB() {
+               final MemorySize totalProcessMemorySize = 
MemorySize.parse("1g");
 
                @SuppressWarnings("deprecation")
                final ConfigOption<Integer> legacyOption = 
TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB;
 
                Configuration conf = new Configuration();
-               conf.setInteger(legacyOption, 
totalFlinkMemorySize.getMebiBytes());
+               conf.setInteger(legacyOption, 
totalProcessMemorySize.getMebiBytes());
 
                TaskExecutorResourceSpec taskExecutorResourceSpec = 
TaskExecutorResourceUtils.resourceSpecFromConfig(conf);
-               assertThat(taskExecutorResourceSpec.getTotalFlinkMemorySize(), 
is(totalFlinkMemorySize));
+               
assertThat(taskExecutorResourceSpec.getTotalProcessMemorySize(), 
is(totalProcessMemorySize));
        }
 
        @Test
@@ -495,17 +495,17 @@ public class TaskExecutorResourceUtilsTest extends 
TestLogger {
        }
 
        @Test
-       public void testConfigTotalFlinkMemoryLegacySize() {
-               final MemorySize totalFlinkMemorySize = MemorySize.parse("1g");
+       public void testConfigTotalProcessMemoryLegacySize() {
+               final MemorySize totalProcessMemorySize = 
MemorySize.parse("1g");
 
                @SuppressWarnings("deprecation")
                final ConfigOption<String> legacyOption = 
TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY;
 
                Configuration conf = new Configuration();
-               conf.setString(legacyOption, 
totalFlinkMemorySize.getMebiBytes() + "m");
+               conf.setString(legacyOption, 
totalProcessMemorySize.getMebiBytes() + "m");
 
                TaskExecutorResourceSpec taskExecutorResourceSpec = 
TaskExecutorResourceUtils.resourceSpecFromConfig(conf);
-               assertThat(taskExecutorResourceSpec.getTotalFlinkMemorySize(), 
is(totalFlinkMemorySize));
+               
assertThat(taskExecutorResourceSpec.getTotalProcessMemorySize(), 
is(totalProcessMemorySize));
        }
 
        @Test

Reply via email to