This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bfef1a4c1649a860acae6463a726caad7aeb4db3
Author: wangyang0918 <danrtsey...@alibaba-inc.com>
AuthorDate: Tue Dec 17 11:31:13 2019 +0800

    [hotfix][yarn] Move yarn Utils#calculateHeapSize to BootstrapTools
    
    The method BootstrapTools#calculateHeapSize will be used by Yarn and 
Kubernetes to calculate the heapsize of jobmanager. The heapsize will be set to 
JVM -Xmx and -Xms.
---
 .../runtime/clusterframework/BootstrapTools.java   | 28 +++++++++++
 .../clusterframework/BootstrapToolsTest.java       | 54 ++++++++++++++++++++
 .../test/java/org/apache/flink/yarn/UtilsTest.java | 58 ----------------------
 .../src/main/java/org/apache/flink/yarn/Utils.java | 27 ----------
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  2 +-
 5 files changed, 83 insertions(+), 86 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index e7e1fd9..30f70e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
 import org.apache.flink.util.NetUtils;
@@ -640,4 +641,31 @@ public class BootstrapTools {
                        .toArray(String[]::new);
                return String.join(" ", newAddedConfigs);
        }
+
+       /**
+        * Calculate heap size after cut-off. The heap size after cut-off will 
be used to set -Xms and -Xmx for jobmanager
+        * start command.
+        */
+       public static int calculateHeapSize(int memory, Configuration conf) {
+
+               final float memoryCutoffRatio = 
conf.getFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
+               final int minCutoff = 
conf.getInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
+
+               if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) {
+                       throw new IllegalArgumentException("The configuration 
value '"
+                               + 
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key()
+                               + "' must be between 0 and 1. Value given=" + 
memoryCutoffRatio);
+               }
+               if (minCutoff > memory) {
+                       throw new IllegalArgumentException("The configuration 
value '"
+                               + 
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key()
+                               + "' is higher (" + minCutoff + ") than the 
requested amount of memory " + memory);
+               }
+
+               int heapLimit = (int) ((float) memory * memoryCutoffRatio);
+               if (heapLimit < minCutoff) {
+                       heapLimit = minCutoff;
+               }
+               return memory - heapLimit;
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index 47501e4..02f7c81 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.util.ExceptionUtils;
@@ -31,6 +32,7 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.CheckedSupplier;
 
 import akka.actor.ActorSystem;
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -423,4 +425,56 @@ public class BootstrapToolsTest extends TestLogger {
                String dynamicProperties = 
BootstrapTools.getDynamicProperties(baseConfig, targetConfig);
                assertEquals("-Dkey.b=b2 -Dkey.c=c", dynamicProperties);
        }
+
+       @Test
+       public void testHeapCutoff() {
+               Configuration conf = new Configuration();
+               
conf.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15F);
+               
conf.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 384);
+
+               Assert.assertEquals(616, BootstrapTools.calculateHeapSize(1000, 
conf));
+               Assert.assertEquals(8500, 
BootstrapTools.calculateHeapSize(10000, conf));
+
+               // test different configuration
+               Assert.assertEquals(3400, 
BootstrapTools.calculateHeapSize(4000, conf));
+
+               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key(), 
"1000");
+               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"0.1");
+               Assert.assertEquals(3000, 
BootstrapTools.calculateHeapSize(4000, conf));
+
+               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"0.5");
+               Assert.assertEquals(2000, 
BootstrapTools.calculateHeapSize(4000, conf));
+
+               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"1");
+               Assert.assertEquals(0, BootstrapTools.calculateHeapSize(4000, 
conf));
+
+               // test also deprecated keys
+               conf = new Configuration();
+               conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15);
+               conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384);
+
+               Assert.assertEquals(616, BootstrapTools.calculateHeapSize(1000, 
conf));
+               Assert.assertEquals(8500, 
BootstrapTools.calculateHeapSize(10000, conf));
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void illegalArgument() {
+               final Configuration conf = new Configuration();
+               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"1.1");
+               BootstrapTools.calculateHeapSize(4000, conf);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void illegalArgumentNegative() {
+               final Configuration conf = new Configuration();
+               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"-0.01");
+               BootstrapTools.calculateHeapSize(4000, conf);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void tooMuchCutoff() {
+               final Configuration conf = new Configuration();
+               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"6000");
+               BootstrapTools.calculateHeapSize(4000, conf);
+       }
 }
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 7e45b99..ae16cd6 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
@@ -82,62 +80,6 @@ public class UtilsTest extends TestLogger {
                Assert.assertTrue(files.contains("conf"));
        }
 
-       /**
-        * Remove 15% of the heap, at least 384MB.
-        *
-        */
-       @Test
-       public void testHeapCutoff() {
-               Configuration conf = new Configuration();
-               
conf.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15F);
-               
conf.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 384);
-
-               Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf));
-               Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf));
-
-               // test different configuration
-               Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));
-
-               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key(), 
"1000");
-               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"0.1");
-               Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
-
-               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"0.5");
-               Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
-
-               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"1");
-               Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-
-               // test also deprecated keys
-               conf = new Configuration();
-               conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15);
-               conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384);
-
-               Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf));
-               Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf));
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void illegalArgument() {
-               Configuration conf = new Configuration();
-               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"1.1");
-               Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void illegalArgumentNegative() {
-               Configuration conf = new Configuration();
-               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"-0.01");
-               Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void tooMuchCutoff() {
-               Configuration conf = new Configuration();
-               
conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), 
"6000");
-               Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-       }
-
        @Test
        public void testGetEnvironmentVariables() {
                Configuration testConf = new Configuration();
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index e82159b..d2c40e2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -19,7 +19,6 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.util.HadoopUtils;
@@ -87,32 +86,6 @@ public final class Utils {
        /** Time to wait in milliseconds between each remote resources fetch in 
case of FileNotFoundException. */
        public static final int REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI = 100;
 
-       /**
-        * See documentation.
-        */
-       public static int calculateHeapSize(int memory, 
org.apache.flink.configuration.Configuration conf) {
-
-               float memoryCutoffRatio = 
conf.getFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
-               int minCutoff = 
conf.getInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
-               if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) {
-                       throw new IllegalArgumentException("The configuration 
value '"
-                               + 
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key()
-                               + "' must be between 0 and 1. Value given=" + 
memoryCutoffRatio);
-               }
-               if (minCutoff > memory) {
-                       throw new IllegalArgumentException("The configuration 
value '"
-                               + 
ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key()
-                               + "' is higher (" + minCutoff + ") than the 
requested amount of memory " + memory);
-               }
-
-               int heapLimit = (int) ((float) memory * memoryCutoffRatio);
-               if (heapLimit < minCutoff) {
-                       heapLimit = minCutoff;
-               }
-               return memory - heapLimit;
-       }
-
        public static void setupYarnClassPath(Configuration conf, Map<String, 
String> appMasterEnv) {
                addToEnvironment(
                        appMasterEnv,
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index a65a48f..1a6f31e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -1647,7 +1647,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                final  Map<String, String> startCommandValues = new HashMap<>();
                startCommandValues.put("java", "$JAVA_HOME/bin/java");
 
-               int heapSize = Utils.calculateHeapSize(jobManagerMemoryMb, 
flinkConfiguration);
+               int heapSize = 
BootstrapTools.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration);
                String jvmHeapMem = String.format("-Xms%sm -Xmx%sm", heapSize, 
heapSize);
                startCommandValues.put("jvmmem", jvmHeapMem);
 

Reply via email to