This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git
commit b4b482af4b93ee9fc98de08b7b427514df41d61b Author: Xintong Song <tonysong...@gmail.com> AuthorDate: Mon Oct 14 16:09:23 2019 +0800 [FLINK-13983][runtime] Use flip49 config options to decide memory size of MemoryManager. --- .../flink/runtime/taskexecutor/TaskManagerServices.java | 15 +++++++++++++++ .../taskexecutor/TaskManagerServicesConfiguration.java | 10 ++++++++++ 2 files changed, 25 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index e570799..91f611a 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -51,7 +51,9 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -330,6 +332,19 @@ public class TaskManagerServices { */ private static MemoryManager createMemoryManager( TaskManagerServicesConfiguration taskManagerServicesConfiguration) { + if (taskManagerServicesConfiguration.getOnHeapManagedMemorySize() != null && + taskManagerServicesConfiguration.getOffHeapManagedMemorySize() != null) { + // flip49 enabled + + final Map<MemoryType, Long> memorySizeByType = new HashMap<>(); + memorySizeByType.put(MemoryType.HEAP, taskManagerServicesConfiguration.getOnHeapManagedMemorySize().getBytes()); + memorySizeByType.put(MemoryType.OFF_HEAP, taskManagerServicesConfiguration.getOffHeapManagedMemorySize().getBytes()); + + return new MemoryManager(memorySizeByType, + taskManagerServicesConfiguration.getNumberOfSlots(), + taskManagerServicesConfiguration.getPageSize()); + } + // computing the amount of memory to use depends on how much memory is available // it strictly needs to happen AFTER the network stack has been initialized diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index c2133c9..bd84107 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -222,6 +222,16 @@ public class TaskManagerServicesConfiguration { return taskExecutorResourceSpec == null ? null : taskExecutorResourceSpec.getShuffleMemSize(); } + @Nullable // should only be null when flip49 is disabled + public MemorySize getOnHeapManagedMemorySize() { + return taskExecutorResourceSpec == null ? null : taskExecutorResourceSpec.getOnHeapManagedMemorySize(); + } + + @Nullable // should only be null when flip49 is disabled + public MemorySize getOffHeapManagedMemorySize() { + return taskExecutorResourceSpec == null ? null : taskExecutorResourceSpec.getOffHeapManagedMemorySize(); + } + long getTimerServiceShutdownTimeout() { return timerServiceShutdownTimeout; }