[FLINK-2641] integrate off-heap memory configuration
- add offheap configuration parameter taskmanager.memory.off-heap
- remove offheap ratio parameter and reuse memory fraction parameter
- set JVM -XX:MaxDirectMemorySize parameter correctly

This closes #1129


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76a40d59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76a40d59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76a40d59

Branch: refs/heads/master
Commit: 76a40d59e6623cfbc6e265d26da4e739e5e7ed18
Parents: a3150a3
Author: Maximilian Michels <m...@apache.org>
Authored: Mon Sep 14 15:07:23 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed Sep 16 16:16:03 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 10 ---
 flink-dist/src/main/flink-bin/bin/config.sh     | 51 +++++++++++++--
 flink-dist/src/main/flink-bin/bin/jobmanager.sh | 10 +--
 .../src/main/flink-bin/bin/taskmanager.sh       | 38 ++++++++++--
 .../io/network/buffer/NetworkBufferPool.java    |  9 +--
 .../flink/runtime/taskmanager/TaskManager.scala | 65 +++++++++++++-------
 6 files changed, 132 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index bbaf71a..cd7fd76 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -125,11 +125,6 @@ public final class ConfigConstants {
        public static final String TASK_MANAGER_MEMORY_FRACTION_KEY = 
"taskmanager.memory.fraction";
 
        /**
-        * The fraction of off-heap memory relative to the heap size.
-        */
-       public static final String TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY = 
"taskmanager.memory.off-heap-ratio";
-       
-       /**
         * The config parameter defining the memory allocation method (JVM heap 
or off-heap).
        */
        public static final String TASK_MANAGER_MEMORY_OFF_HEAP_KEY = 
"taskmanager.memory.off-heap";
@@ -546,11 +541,6 @@ public final class ConfigConstants {
         * The default fraction of the free memory allocated by the task 
manager's memory manager.
         */
        public static final float DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION = 0.7f;
-
-       /**
-        * The default ratio of heap to off-heap memory, when the TaskManager 
is started with off-heap memory.
-        */
-       public static final float DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO 
= 3.0f;
        
        /**
         * Default number of buffers used in the network stack.

http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index 2aa9c78..f4f58f2 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -87,8 +87,17 @@ DEFAULT_ENV_SSH_OPTS=""                             # 
Optional SSH parameters ru
 # CONFIG KEYS: The default values can be overwritten by the following keys in 
conf/flink-conf.yaml
 
########################################################################################################################
 
-KEY_JOBM_HEAP_MB="jobmanager.heap.mb"
-KEY_TASKM_HEAP_MB="taskmanager.heap.mb"
+KEY_JOBM_MEM_SIZE="jobmanager.heap.mb"
+KEY_TASKM_MEM_SIZE="taskmanager.heap.mb"
+KEY_TASKM_MEM_MANAGED_SIZE="taskmanager.memory.size"
+KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
+KEY_TASKM_MEM_NETWORK_BUFFERS="taskmanager.network.numberOfBuffers"
+# BEGIN:deprecated
+KEY_TASKM_MEM_NETWORK_BUFFER_SIZE="taskmanager.network.bufferSizeInBytes"
+# END:deprecated
+KEY_TASKM_MEM_SEGMENT_SIZE="taskmanager.memory.segment-size"
+KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"
+
 KEY_ENV_PID_DIR="env.pid.dir"
 KEY_ENV_LOG_MAX="env.log.max"
 KEY_ENV_JAVA_HOME="env.java.home"
@@ -132,7 +141,8 @@ FLINK_ROOT_DIR_MANGLED=`manglePath "$FLINK_ROOT_DIR"`
 if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf; 
fi
 FLINK_BIN_DIR=$FLINK_ROOT_DIR_MANGLED/bin
 FLINK_LOG_DIR=$FLINK_ROOT_DIR_MANGLED/log
-YAML_CONF=${FLINK_CONF_DIR}/flink-conf.yaml
+FLINK_CONF_FILE="flink-conf.yaml"
+YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE}
 
 
########################################################################################################################
 # ENVIRONMENT VARIABLES
@@ -173,12 +183,37 @@ IS_NUMBER="^[0-9]+$"
 
 # Define FLINK_JM_HEAP if it is not already set
 if [ -z "${FLINK_JM_HEAP}" ]; then
-    FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_HEAP_MB} 0 "${YAML_CONF}")
+    FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
 fi
 
 # Define FLINK_TM_HEAP if it is not already set
 if [ -z "${FLINK_TM_HEAP}" ]; then
-    FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_HEAP_MB} 0 "${YAML_CONF}")
+    FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
+    FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 
"${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
+    FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then
+    BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" 
"${YAML_CONF}")
+    if [ "${BUFFER_SIZE}" -eq "0" ]; then
+        BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_NETWORK_BUFFER_SIZE} 
"$((32 * 1024))" "${YAML_CONF}")
+    fi
+    NUM_BUFFERS=$(readFromConfig ${KEY_TASKM_MEM_NETWORK_BUFFERS} "2048" 
"${YAML_CONF}")
+    FLINK_TM_MEM_NETWORK_SIZE=$((((NUM_BUFFERS * BUFFER_SIZE) >> 20) + 1))
+fi
+
+# Define FLINK_TM_OFFHEAP if it is not already set
+if [ -z "${FLINK_TM_OFFHEAP}" ]; then
+    FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} 0 "${YAML_CONF}")
 fi
 
 if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then
@@ -211,7 +246,7 @@ fi
 
 # Arguments for the JVM. Used for job and task manager JVMs.
 # DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
-# KEY_JOBM_HEAP_MB and KEY_TASKM_HEAP_MB for that!
+# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
 if [ -z "${JVM_ARGS}" ]; then
     JVM_ARGS=""
 fi
@@ -308,3 +343,7 @@ readSlaves() {
         fi
     done < "$SLAVES_FILE"
 }
+
+useOffHeapMemory() {
+    [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-dist/src/main/flink-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh 
b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index c18a909..45b8e79 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -43,21 +43,21 @@ if [[ $STARTSTOP == "start" ]]; then
         STREAMINGMODE="batch"
     fi
 
-    if [[ ! ${FLINK_JM_HEAP} =~ $IS_NUMBER ]]; then
-        echo "[ERROR] Configured JobManager JVM heap size is not a number. 
Please set '$KEY_JOBM_HEAP_MB' in $FLINK_CONF_FILE."
+    if [[ ! ${FLINK_JM_HEAP} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP}" -lt "0" 
]]; then
+        echo "[ERROR] Configured JobManager memory size is not a valid value. 
Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
         exit 1
     fi
 
     if [ "$EXECUTIONMODE" = "local" ]; then
-        if [[ ! ${FLINK_TM_HEAP} =~ $IS_NUMBER ]]; then
-            echo "[ERROR] Configured JobManager JVM heap size is not a number. 
Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+        if [[ ! ${FLINK_TM_HEAP} =~ $IS_NUMBER ]] || [[ "${FLINK_TM_HEAP}" -lt 
"0" ]]; then
+            echo "[ERROR] Configured TaskManager memory size is not a valid 
value. Please set ${KEY_TASKM_MEM_SIZE} in ${FLINK_CONF_FILE}."
             exit 1
         fi
 
         FLINK_JM_HEAP=`expr $FLINK_JM_HEAP + $FLINK_TM_HEAP`
     fi
 
-    if [ "$FLINK_JM_HEAP" -gt 0 ]; then
+    if [ "${FLINK_JM_HEAP}" -gt "0" ]; then
         export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP"m -Xmx"$FLINK_JM_HEAP"m"
     fi
 

http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh 
b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index c41270d..f5aecc6 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -51,13 +51,43 @@ if [[ $STARTSTOP == "start" ]]; then
         fi
     fi
 
-    if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-        echo "[ERROR] Configured TaskManager JVM heap size is not a number. 
Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+    if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt 
"0" ]]; then
+        echo "[ERROR] Configured TaskManager JVM heap size is not a number. 
Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
         exit 1
     fi
 
-    if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-        export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m"
+    if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+        TM_HEAP_SIZE=${FLINK_TM_HEAP}
+        TM_OFFHEAP_SIZE=0
+        # some space for Netty initialization
+        NETTY_BUFFERS=1
+
+        if [[ "${STREAMINGMODE}" == "batch" ]] && useOffHeapMemory; then
+            if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+                # We split up the total memory in heap and off-heap memory
+                if [[ "${FLINK_TM_HEAP}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" 
]]; then
+                    echo "[ERROR] Configured TaskManager memory size 
('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size 
('${KEY_TASKM_MEM_MANAGED_SIZE}')."
+                    exit 1
+                fi
+                TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
+                TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
+            else
+                # We calculate the memory using a fraction of the total memory
+                if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 1.0"` != 
"0" ]] || [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} <= 0.0"` != "0" ]]; 
then
+                    echo "[ERROR] Configured TaskManager managed memory 
fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' 
in ${FLINK_CONF_FILE}"
+                    exit 1
+                fi
+                # recalculate the JVM heap memory by taking the off-heap ratio 
into account
+                TM_OFFHEAP_SIZE=`printf '%.0f\n' $(bc -l <<< "${FLINK_TM_HEAP} 
* ${FLINK_TM_MEM_MANAGED_FRACTION}")`
+                TM_HEAP_SIZE=$((FLINK_TM_HEAP - TM_OFFHEAP_SIZE))
+            fi
+        fi
+
+        TM_HEAP_SIZE=$((TM_HEAP_SIZE - FLINK_TM_MEM_NETWORK_SIZE - 
NETTY_BUFFERS))
+        echo export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M 
-Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=$((TM_OFFHEAP_SIZE + 
FLINK_TM_MEM_NETWORK_SIZE + NETTY_BUFFERS))M"
+        export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M 
-XX:MaxDirectMemorySize=$((TM_OFFHEAP_SIZE + FLINK_TM_MEM_NETWORK_SIZE + 
NETTY_BUFFERS))M"
+
     fi
 
     # Startup parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 209d925..641d13e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -196,10 +196,11 @@ public class NetworkBufferPool implements 
BufferPoolFactory {
                                throw new 
IOException(String.format("Insufficient number of network buffers: " +
                                                                "required %d, 
but only %d available. The total number of network " +
                                                                "buffers is 
currently set to %d. You can increase this " +
-                                                               "number by 
setting the configuration key '" +
-                                                               
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY +  "'.",
-                                               numRequiredBuffers, 
totalNumberOfMemorySegments - numTotalRequiredBuffers,
-                                               totalNumberOfMemorySegments));
+                                                               "number by 
setting the configuration key '%s'.",
+                                               numRequiredBuffers,
+                                               totalNumberOfMemorySegments - 
numTotalRequiredBuffers,
+                                               totalNumberOfMemorySegments,
+                                               
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY));
                        }
 
                        this.numTotalRequiredBuffers += numRequiredBuffers;

http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index f11b933..1563a7a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1578,7 +1578,7 @@ object TaskManager {
       LOG.info(s"Using $configuredMemory MB for Flink managed memory.")
       configuredMemory << 20 // megabytes to bytes
     }
-    else if (memType == MemoryType.HEAP) {
+    else {
       val fraction = configuration.getFloat(
         ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
         ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
@@ -1586,32 +1586,53 @@ object TaskManager {
                            ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
                            "MemoryManager fraction of the free memory must be 
between 0.0 and 1.0")
 
-      val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-        fraction).toLong
+      if (memType == MemoryType.HEAP) {
+        val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+          fraction).toLong
 
-      LOG.info(s"Using $fraction of the currently free heap space for Flink 
managed " +
-        s" heap memory (${relativeMemSize >> 20} MB).")
+        LOG.info(s"Using $fraction of the currently free heap space for Flink 
managed " +
+          s" heap memory (${relativeMemSize >> 20} MB).")
 
-      relativeMemSize
-    }
-    else {
-      val ratio = configuration.getFloat(
-        ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-        ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-      
-      checkConfigParameter(ratio > 0.0f,
-        ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-        "MemoryManager ratio (off-heap memory / heap size) must be larger than 
zero")
-      
-      val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-      val relativeMemSize = (maxHeapSize * ratio).toLong
+        relativeMemSize
+      }
+      else if (memType == MemoryType.OFF_HEAP) {
+
+        val networkBufferSizeNew = configuration.getLong(
+          ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+          ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
+
+        val networkBufferSizeOld = configuration.getLong(
+          ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
+        val networkBufferSize =
+          if (networkBufferSizeNew != -1) {
+            networkBufferSizeNew
+          } else if (networkBufferSizeOld == -1) {
+            ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE
+          } else {
+            networkBufferSizeOld
+          }
+
+        val numNetworkBuffers = configuration.getLong(
+          ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+          ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
+
+        // direct memory for Netty's off-heap buffers
+        val networkMemory = (numNetworkBuffers * networkBufferSize) + (1 << 20)
 
-      LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) for 
Flink " +
-        s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+        // The maximum heap memory has been adjusted according to the fraction
+        val maxMemory = EnvironmentInformation.getMaxJvmHeapMemory() + 
networkMemory
+        val directMemorySize = (maxMemory / (1.0 - fraction) * fraction).toLong
 
-      relativeMemSize
+        LOG.info(s"Using $fraction of the maximum memory size for " +
+          s"Flink managed off-heap memory (${directMemorySize >> 20} MB).")
+
+        directMemorySize
+      }
+      else {
+        throw new RuntimeException("No supported memory type detected.")
+      }
     }
-    
+
     val preAllocateMemory: Boolean = streamingMode == StreamingMode.BATCH_ONLY
 
     // now start the memory manager

Reply via email to