This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b4b482af4b93ee9fc98de08b7b427514df41d61b
Author: Xintong Song <tonysong...@gmail.com>
AuthorDate: Mon Oct 14 16:09:23 2019 +0800

    [FLINK-13983][runtime] Use flip49 config options to decide memory size of 
MemoryManager.
---
 .../flink/runtime/taskexecutor/TaskManagerServices.java   | 15 +++++++++++++++
 .../taskexecutor/TaskManagerServicesConfiguration.java    | 10 ++++++++++
 2 files changed, 25 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index e570799..91f611a 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -51,7 +51,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
@@ -330,6 +332,19 @@ public class TaskManagerServices {
         */
        private static MemoryManager createMemoryManager(
                        TaskManagerServicesConfiguration 
taskManagerServicesConfiguration) {
+               if 
(taskManagerServicesConfiguration.getOnHeapManagedMemorySize() != null &&
+                       
taskManagerServicesConfiguration.getOffHeapManagedMemorySize() != null) {
+                       // flip49 enabled
+
+                       final Map<MemoryType, Long> memorySizeByType = new 
HashMap<>();
+                       memorySizeByType.put(MemoryType.HEAP, 
taskManagerServicesConfiguration.getOnHeapManagedMemorySize().getBytes());
+                       memorySizeByType.put(MemoryType.OFF_HEAP, 
taskManagerServicesConfiguration.getOffHeapManagedMemorySize().getBytes());
+
+                       return new MemoryManager(memorySizeByType,
+                               
taskManagerServicesConfiguration.getNumberOfSlots(),
+                               taskManagerServicesConfiguration.getPageSize());
+               }
+
                // computing the amount of memory to use depends on how much 
memory is available
                // it strictly needs to happen AFTER the network stack has been 
initialized
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index c2133c9..bd84107 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -222,6 +222,16 @@ public class TaskManagerServicesConfiguration {
                return taskExecutorResourceSpec == null ? null : 
taskExecutorResourceSpec.getShuffleMemSize();
        }
 
+       @Nullable // should only be null when flip49 is disabled
+       public MemorySize getOnHeapManagedMemorySize() {
+               return taskExecutorResourceSpec == null ? null : 
taskExecutorResourceSpec.getOnHeapManagedMemorySize();
+       }
+
+       @Nullable // should only be null when flip49 is disabled
+       public MemorySize getOffHeapManagedMemorySize() {
+               return taskExecutorResourceSpec == null ? null : 
taskExecutorResourceSpec.getOffHeapManagedMemorySize();
+       }
+
        long getTimerServiceShutdownTimeout() {
                return timerServiceShutdownTimeout;
        }

Reply via email to