This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit bfef1a4c1649a860acae6463a726caad7aeb4db3 Author: wangyang0918 <danrtsey...@alibaba-inc.com> AuthorDate: Tue Dec 17 11:31:13 2019 +0800 [hotfix][yarn] Move yarn Utils#calculateHeapSize to BootstrapTools The method BootstrapTools#calculateHeapSize will be used by Yarn and Kubernetes to calculate the heapsize of jobmanager. The heapsize will be set to JVM -Xmx and -Xms. --- .../runtime/clusterframework/BootstrapTools.java | 28 +++++++++++ .../clusterframework/BootstrapToolsTest.java | 54 ++++++++++++++++++++ .../test/java/org/apache/flink/yarn/UtilsTest.java | 58 ---------------------- .../src/main/java/org/apache/flink/yarn/Utils.java | 27 ---------- .../apache/flink/yarn/YarnClusterDescriptor.java | 2 +- 5 files changed, 83 insertions(+), 86 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index e7e1fd9..30f70e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions; import org.apache.flink.util.NetUtils; @@ -640,4 +641,31 @@ public class BootstrapTools { .toArray(String[]::new); return String.join(" ", newAddedConfigs); } + + /** + * Calculate heap size after cut-off. The heap size after cut-off will be used to set -Xms and -Xmx for jobmanager + * start command. + */ + public static int calculateHeapSize(int memory, Configuration conf) { + + final float memoryCutoffRatio = conf.getFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO); + final int minCutoff = conf.getInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN); + + if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) { + throw new IllegalArgumentException("The configuration value '" + + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + + "' must be between 0 and 1. Value given=" + memoryCutoffRatio); + } + if (minCutoff > memory) { + throw new IllegalArgumentException("The configuration value '" + + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + + "' is higher (" + minCutoff + ") than the requested amount of memory " + memory); + } + + int heapLimit = (int) ((float) memory * memoryCutoffRatio); + if (heapLimit < minCutoff) { + heapLimit = minCutoff; + } + return memory - heapLimit; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java index 47501e4..02f7c81 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.util.ExceptionUtils; @@ -31,6 +32,7 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.CheckedSupplier; import akka.actor.ActorSystem; +import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -423,4 +425,56 @@ public class BootstrapToolsTest extends TestLogger { String dynamicProperties = BootstrapTools.getDynamicProperties(baseConfig, targetConfig); assertEquals("-Dkey.b=b2 -Dkey.c=c", dynamicProperties); } + + @Test + public void testHeapCutoff() { + Configuration conf = new Configuration(); + conf.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15F); + conf.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 384); + + Assert.assertEquals(616, BootstrapTools.calculateHeapSize(1000, conf)); + Assert.assertEquals(8500, BootstrapTools.calculateHeapSize(10000, conf)); + + // test different configuration + Assert.assertEquals(3400, BootstrapTools.calculateHeapSize(4000, conf)); + + conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key(), "1000"); + conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.1"); + Assert.assertEquals(3000, BootstrapTools.calculateHeapSize(4000, conf)); + + conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.5"); + Assert.assertEquals(2000, BootstrapTools.calculateHeapSize(4000, conf)); + + conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1"); + Assert.assertEquals(0, BootstrapTools.calculateHeapSize(4000, conf)); + + // test also deprecated keys + conf = new Configuration(); + conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15); + conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384); + + Assert.assertEquals(616, BootstrapTools.calculateHeapSize(1000, conf)); + Assert.assertEquals(8500, BootstrapTools.calculateHeapSize(10000, conf)); + } + + @Test(expected = IllegalArgumentException.class) + public void illegalArgument() { + final Configuration conf = new Configuration(); + conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1.1"); + BootstrapTools.calculateHeapSize(4000, conf); + } + + @Test(expected = IllegalArgumentException.class) + public void illegalArgumentNegative() { + final Configuration conf = new Configuration(); + conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "-0.01"); + BootstrapTools.calculateHeapSize(4000, conf); + } + + @Test(expected = IllegalArgumentException.class) + public void tooMuchCutoff() { + final Configuration conf = new Configuration(); + conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "6000"); + BootstrapTools.calculateHeapSize(4000, conf); + } } diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java index 7e45b99..ae16cd6 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -18,10 +18,8 @@ package org.apache.flink.yarn; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; -import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec; @@ -82,62 +80,6 @@ public class UtilsTest extends TestLogger { Assert.assertTrue(files.contains("conf")); } - /** - * Remove 15% of the heap, at least 384MB. - * - */ - @Test - public void testHeapCutoff() { - Configuration conf = new Configuration(); - conf.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15F); - conf.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 384); - - Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf)); - Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf)); - - // test different configuration - Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf)); - - conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key(), "1000"); - conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.1"); - Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf)); - - conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.5"); - Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf)); - - conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1"); - Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); - - // test also deprecated keys - conf = new Configuration(); - conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15); - conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384); - - Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf)); - Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf)); - } - - @Test(expected = IllegalArgumentException.class) - public void illegalArgument() { - Configuration conf = new Configuration(); - conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1.1"); - Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); - } - - @Test(expected = IllegalArgumentException.class) - public void illegalArgumentNegative() { - Configuration conf = new Configuration(); - conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "-0.01"); - Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); - } - - @Test(expected = IllegalArgumentException.class) - public void tooMuchCutoff() { - Configuration conf = new Configuration(); - conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "6000"); - Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf)); - } - @Test public void testGetEnvironmentVariables() { Configuration testConf = new Configuration(); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index e82159b..d2c40e2 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -19,7 +19,6 @@ package org.apache.flink.yarn; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.util.HadoopUtils; @@ -87,32 +86,6 @@ public final class Utils { /** Time to wait in milliseconds between each remote resources fetch in case of FileNotFoundException. */ public static final int REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI = 100; - /** - * See documentation. - */ - public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) { - - float memoryCutoffRatio = conf.getFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO); - int minCutoff = conf.getInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN); - - if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) { - throw new IllegalArgumentException("The configuration value '" - + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() - + "' must be between 0 and 1. Value given=" + memoryCutoffRatio); - } - if (minCutoff > memory) { - throw new IllegalArgumentException("The configuration value '" - + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() - + "' is higher (" + minCutoff + ") than the requested amount of memory " + memory); - } - - int heapLimit = (int) ((float) memory * memoryCutoffRatio); - if (heapLimit < minCutoff) { - heapLimit = minCutoff; - } - return memory - heapLimit; - } - public static void setupYarnClassPath(Configuration conf, Map<String, String> appMasterEnv) { addToEnvironment( appMasterEnv, diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index a65a48f..1a6f31e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -1647,7 +1647,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { final Map<String, String> startCommandValues = new HashMap<>(); startCommandValues.put("java", "$JAVA_HOME/bin/java"); - int heapSize = Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration); + int heapSize = BootstrapTools.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration); String jvmHeapMem = String.format("-Xms%sm -Xmx%sm", heapSize, heapSize); startCommandValues.put("jvmmem", jvmHeapMem);