This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git
commit fd73130c0bf036bf4d04c9b2102e2b6fd1f908ce Author: Andrey Zagrebin <azagre...@apache.org> AuthorDate: Wed Nov 6 16:24:08 2019 +0100 Treat legacy TM heap size as total process memory, not flink memory --- .../flink/configuration/TaskManagerOptions.java | 2 +- .../TaskExecutorResourceUtils.java | 27 +++++++++++----------- .../TaskExecutorResourceUtilsTest.java | 16 ++++++------- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index 7d1492c..d717ad5 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -253,6 +253,7 @@ public class TaskManagerOptions { public static final ConfigOption<String> TOTAL_PROCESS_MEMORY = key("taskmanager.memory.total-process.size") .noDefaultValue() + .withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key()) .withDescription("Total Process Memory size for the TaskExecutors. This includes all the memory that a" + " TaskExecutor consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. On" + " containerized setups, this should be set to the container memory."); @@ -264,7 +265,6 @@ public class TaskManagerOptions { public static final ConfigOption<String> TOTAL_FLINK_MEMORY = key("taskmanager.memory.total-flink.size") .noDefaultValue() - .withDeprecatedKeys(TASK_MANAGER_HEAP_MEMORY.key()) .withDescription("Total Flink Memory size for the TaskExecutors. This includes all the memory that a" + " TaskExecutor consumes, except for JVM Metaspace and JVM Overhead. It consists of Framework Heap Memory," + " Task Heap Memory, Task Off-Heap Memory, Managed Memory, and Shuffle Memory."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java index 83d2d7d..f2a275f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java @@ -408,8 +408,13 @@ public class TaskExecutorResourceUtils { private static MemorySize getTotalFlinkMemorySize(final Configuration config) { checkArgument(isTotalFlinkMemorySizeExplicitlyConfigured(config)); - if (config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY)) { - return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_FLINK_MEMORY)); + return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_FLINK_MEMORY)); + } + + private static MemorySize getTotalProcessMemorySize(final Configuration config) { + checkArgument(isTotalProcessMemorySizeExplicitlyConfigured(config)); + if (config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)) { + return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY)); } else { @SuppressWarnings("deprecation") final long legacyHeapMemoryMB = config.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB); @@ -417,11 +422,6 @@ public class TaskExecutorResourceUtils { } } - private static MemorySize getTotalProcessMemorySize(final Configuration config) { - checkArgument(isTotalProcessMemorySizeExplicitlyConfigured(config)); - return MemorySize.parse(config.getString(TaskManagerOptions.TOTAL_PROCESS_MEMORY)); - } - private static boolean isTaskHeapMemorySizeExplicitlyConfigured(final Configuration config) { return config.contains(TaskManagerOptions.TASK_HEAP_MEMORY); } @@ -454,15 +454,16 @@ public class TaskExecutorResourceUtils { } private static boolean isTotalFlinkMemorySizeExplicitlyConfigured(final Configuration config) { - // backward compatible with the deprecated config option TASK_MANAGER_HEAP_MEMORY_MB only when it's explicitly - // configured by the user - @SuppressWarnings("deprecation") - final boolean legacyConfigured = config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB); - return config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY) || legacyConfigured; + return config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY); } private static boolean isTotalProcessMemorySizeExplicitlyConfigured(final Configuration config) { - return config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY); + // backward compatible with the deprecated config options TASK_MANAGER_HEAP_MEMORY and TASK_MANAGER_HEAP_MEMORY_MB + // only when they are explicitly configured by the user + @SuppressWarnings("deprecation") + final boolean legacyConfigured = + config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) || config.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY); + return config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY) || legacyConfigured; } private static void sanityCheckTotalFlinkMemory(final Configuration config, final FlinkInternalMemory flinkInternalMemory) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java index 432c1af..79ea059 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java @@ -457,17 +457,17 @@ public class TaskExecutorResourceUtilsTest extends TestLogger { } @Test - public void testConfigTotalFlinkMemoryLegacyMB() { - final MemorySize totalFlinkMemorySize = MemorySize.parse("1g"); + public void testConfigTotalProcessMemoryLegacyMB() { + final MemorySize totalProcessMemorySize = MemorySize.parse("1g"); @SuppressWarnings("deprecation") final ConfigOption<Integer> legacyOption = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB; Configuration conf = new Configuration(); - conf.setInteger(legacyOption, totalFlinkMemorySize.getMebiBytes()); + conf.setInteger(legacyOption, totalProcessMemorySize.getMebiBytes()); TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(conf); - assertThat(taskExecutorResourceSpec.getTotalFlinkMemorySize(), is(totalFlinkMemorySize)); + assertThat(taskExecutorResourceSpec.getTotalProcessMemorySize(), is(totalProcessMemorySize)); } @Test @@ -495,17 +495,17 @@ public class TaskExecutorResourceUtilsTest extends TestLogger { } @Test - public void testConfigTotalFlinkMemoryLegacySize() { - final MemorySize totalFlinkMemorySize = MemorySize.parse("1g"); + public void testConfigTotalProcessMemoryLegacySize() { + final MemorySize totalProcessMemorySize = MemorySize.parse("1g"); @SuppressWarnings("deprecation") final ConfigOption<String> legacyOption = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY; Configuration conf = new Configuration(); - conf.setString(legacyOption, totalFlinkMemorySize.getMebiBytes() + "m"); + conf.setString(legacyOption, totalProcessMemorySize.getMebiBytes() + "m"); TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(conf); - assertThat(taskExecutorResourceSpec.getTotalFlinkMemorySize(), is(totalFlinkMemorySize)); + assertThat(taskExecutorResourceSpec.getTotalProcessMemorySize(), is(totalProcessMemorySize)); } @Test