This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git
commit 30ce8c0059a03b494ad7cae93487f9b8c8117749 Author: Xintong Song <tonysong...@gmail.com> AuthorDate: Thu Sep 26 19:36:20 2019 +0800 [FLINK-13983][runtime] Introduce 'BashJavaUtils' allowing bash scripts to call java codes for generating TM resource dynamic configurations and JVM parameters. --- .../flink/configuration/ConfigurationUtils.java | 58 ++++++++++++++++ flink-dist/src/main/flink-bin/bin/config.sh | 14 ++++ flink-dist/src/test/bin/runBashJavaUtilsCmd.sh | 38 +++++++++++ .../org/apache/flink/dist/BashJavaUtilsTest.java | 54 +++++++++++++++ .../runtime/taskexecutor/TaskManagerRunner.java | 4 +- .../apache/flink/runtime/util/BashJavaUtils.java | 77 ++++++++++++++++++++++ .../TaskExecutorResourceUtilsTest.java | 44 +++---------- 7 files changed, 250 insertions(+), 39 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index b94865c..b6d817c 100755 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.configuration; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import javax.annotation.Nonnull; @@ -31,6 +32,7 @@ import java.util.Set; import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS; import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL; +import static org.apache.flink.util.Preconditions.checkArgument; /** * Utility class for {@link Configuration} related helper functions. @@ -174,6 +176,62 @@ public class ConfigurationUtils { return separatedPaths.length() > 0 ? separatedPaths.split(",|" + File.pathSeparator) : EMPTY; } + @VisibleForTesting + public static Map<String, String> parseTmResourceDynamicConfigs(String dynamicConfigsStr) { + Map<String, String> configs = new HashMap<>(); + String[] configStrs = dynamicConfigsStr.split(" "); + + checkArgument(configStrs.length % 2 == 0); + for (int i = 0; i < configStrs.length; ++i) { + String configStr = configStrs[i]; + if (i % 2 == 0) { + checkArgument(configStr.equals("-D")); + } else { + String[] configKV = configStr.split("="); + checkArgument(configKV.length == 2); + configs.put(configKV[0], configKV[1]); + } + } + + checkArgument(configs.containsKey(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())); + checkArgument(configs.containsKey(TaskManagerOptions.TASK_HEAP_MEMORY.key())); + checkArgument(configs.containsKey(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())); + checkArgument(configs.containsKey(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())); + checkArgument(configs.containsKey(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())); + checkArgument(configs.containsKey(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())); + checkArgument(configs.containsKey(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())); + + return configs; + } + + @VisibleForTesting + public static Map<String, String> parseTmResourceJvmParams(String jvmParamsStr) { + final String xmx = "-Xmx"; + final String xms = "-Xms"; + final String maxDirect = "-XX:MaxDirectMemorySize="; + final String maxMetadata = "-XX:MaxMetaspaceSize="; + + Map<String, String> configs = new HashMap<>(); + for (String paramStr : jvmParamsStr.split(" ")) { + if (paramStr.startsWith(xmx)) { + configs.put(xmx, paramStr.substring(xmx.length())); + } else if (paramStr.startsWith(xms)) { + configs.put(xms, paramStr.substring(xms.length())); + } else if (paramStr.startsWith(maxDirect)) { + configs.put(maxDirect, paramStr.substring(maxDirect.length())); + } else if (paramStr.startsWith(maxMetadata)) { + configs.put(maxMetadata, paramStr.substring(maxMetadata.length())); + } + } + + checkArgument(configs.containsKey(xmx)); + checkArgument(configs.containsKey(xms)); + checkArgument(configs.containsKey(maxDirect)); + checkArgument(configs.containsKey(maxMetadata)); + + return configs; + } + // Make sure that we cannot instantiate this class private ConfigurationUtils() { } diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index b799214..090c1fb 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -776,3 +776,17 @@ calculateTaskManagerHeapSizeMB() { echo ${tm_heap_size_mb} } + +runBashJavaUtilsCmd() { + local cmd=$1 + local class_path=$2 + local conf_dir=$3 + + local output="`${JAVA_RUN} -classpath ${class_path} org.apache.flink.runtime.util.BashJavaUtils ${cmd} --configDir ${conf_dir} 2> /dev/null`" + if [[ $? -ne 0 ]]; then + echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." + exit 1 + fi + + echo ${output} +} diff --git a/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh b/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh new file mode 100755 index 0000000..e623760 --- /dev/null +++ b/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Wrapper script to compare the TM heap size calculation of config.sh with Java +USAGE="Usage: runBashJavaUtilsCmd.sh <command>" + +COMMAND=$1 + +if [[ -z "${COMMAND}" ]]; then + echo "$USAGE" + exit 1 +fi + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +FLINK_CLASSPATH=`find . -name 'flink-dist*.jar' | grep lib` +FLINK_CONF_DIR=${bin}/../../main/resources + +. ${bin}/../../main/flink-bin/bin/config.sh > /dev/null + +runBashJavaUtilsCmd ${COMMAND} ${FLINK_CLASSPATH} ${FLINK_CONF_DIR} diff --git a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsTest.java b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsTest.java new file mode 100644 index 0000000..6b6a216 --- /dev/null +++ b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dist; + +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.runtime.util.BashJavaUtils; + +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; + +/** + * Tests for BashJavaUtils. + */ +public class BashJavaUtilsTest extends JavaBashTestBase { + + private static final String RUN_BASH_JAVA_UTILS_CMD_SCRIPT = "src/test/bin/runBashJavaUtilsCmd.sh"; + + @Test + public void testGetTmResourceDynamicConfigs() throws Exception { + String[] command = {RUN_BASH_JAVA_UTILS_CMD_SCRIPT, + BashJavaUtils.Command.GET_TM_RESOURCE_DYNAMIC_CONFIGS.toString()}; + String result = executeScript(command); + + assertNotNull(result); + ConfigurationUtils.parseTmResourceDynamicConfigs(result); + } + + @Test + public void testGetTmResourceJvmParams() throws Exception { + String[] command = {RUN_BASH_JAVA_UTILS_CMD_SCRIPT, + BashJavaUtils.Command.GET_TM_RESOURCE_JVM_PARAMS.toString()}; + String result = executeScript(command); + + assertNotNull(result); + ConfigurationUtils.parseTmResourceJvmParams(result); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 6493dec..924ed0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.taskexecutor; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; @@ -305,8 +304,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync } } - @VisibleForTesting - static Configuration loadConfiguration(String[] args) throws FlinkParseException { + public static Configuration loadConfiguration(String[] args) throws FlinkParseException { final CommandLineParser<ClusterConfiguration> commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory()); final ClusterConfiguration clusterConfiguration; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java new file mode 100644 index 0000000..70618b1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; +import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; + +import java.util.Arrays; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Utility class for using java utilities in bash scripts. + */ +public class BashJavaUtils { + + public static void main(String[] args) throws Exception { + checkArgument(args.length > 0, "Command not specified."); + + switch (Command.valueOf(args[0])) { + case GET_TM_RESOURCE_DYNAMIC_CONFIGS: + getTmResourceDynamicConfigs(args); + break; + case GET_TM_RESOURCE_JVM_PARAMS: + getTmResourceJvmParams(args); + break; + default: + // unexpected, Command#valueOf should fail if a unknown command is passed in + throw new RuntimeException("Unexpected, something is wrong."); + } + } + + private static void getTmResourceDynamicConfigs(String[] args) throws Exception { + Configuration configuration = TaskManagerRunner.loadConfiguration(Arrays.copyOfRange(args, 1, args.length)); + TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration); + System.out.println(TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec)); + } + + private static void getTmResourceJvmParams(String[] args) throws Exception { + Configuration configuration = TaskManagerRunner.loadConfiguration(Arrays.copyOfRange(args, 1, args.length)); + TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration); + System.out.println(TaskExecutorResourceUtils.generateJvmParametersStr(taskExecutorResourceSpec)); + } + + /** + * Commands that BashJavaUtils supports. + */ + public enum Command { + /** + * Get dynamic configs of task executor resources. + */ + GET_TM_RESOURCE_DYNAMIC_CONFIGS, + + /** + * Get JVM parameters of task executor resources. + */ + GET_TM_RESOURCE_JVM_PARAMS + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java index 8577d8f..432c1af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.TaskManagerOptions; @@ -27,7 +28,6 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; -import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; @@ -60,19 +60,7 @@ public class TaskExecutorResourceUtilsTest extends TestLogger { @Test public void testGenerateDynamicConfigurations() { String dynamicConfigsStr = TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC); - Map<String, String> configs = new HashMap<>(); - String[] configStrs = dynamicConfigsStr.split(" "); - assertThat(configStrs.length % 2, is(0)); - for (int i = 0; i < configStrs.length; ++i) { - String configStr = configStrs[i]; - if (i % 2 == 0) { - assertThat(configStr, is("-D")); - } else { - String[] configKV = configStr.split("="); - assertThat(configKV.length, is(2)); - configs.put(configKV[0], configKV[1]); - } - } + Map<String, String> configs = ConfigurationUtils.parseTmResourceDynamicConfigs(dynamicConfigsStr); assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())), is(TM_RESOURCE_SPEC.getFrameworkHeapSize())); assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())), is(TM_RESOURCE_SPEC.getTaskHeapSize())); @@ -84,30 +72,14 @@ public class TaskExecutorResourceUtilsTest extends TestLogger { } @Test - public void testGenerateJvmParameters() throws Exception { + public void testGenerateJvmParameters() { String jvmParamsStr = TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC); - MemorySize heapSizeMax = null; - MemorySize heapSizeMin = null; - MemorySize directSize = null; - MemorySize metaspaceSize = null; - for (String paramStr : jvmParamsStr.split(" ")) { - if (paramStr.startsWith("-Xmx")) { - heapSizeMax = MemorySize.parse(paramStr.substring("-Xmx".length())); - } else if (paramStr.startsWith("-Xms")) { - heapSizeMin = MemorySize.parse(paramStr.substring("-Xms".length())); - } else if (paramStr.startsWith("-XX:MaxDirectMemorySize=")) { - directSize = MemorySize.parse(paramStr.substring("-XX:MaxDirectMemorySize=".length())); - } else if (paramStr.startsWith("-XX:MaxMetaspaceSize=")) { - metaspaceSize = MemorySize.parse(paramStr.substring("-XX:MaxMetaspaceSize=".length())); - } else { - throw new Exception("Unknown JVM parameter: " + paramStr); - } - } + Map<String, String> configs = ConfigurationUtils.parseTmResourceJvmParams(jvmParamsStr); - assertThat(heapSizeMax, is(TM_RESOURCE_SPEC.getFrameworkHeapSize().add(TM_RESOURCE_SPEC.getTaskHeapSize()).add(TM_RESOURCE_SPEC.getOnHeapManagedMemorySize()))); - assertThat(heapSizeMin, is(heapSizeMax)); - assertThat(directSize, is(TM_RESOURCE_SPEC.getTaskOffHeapSize().add(TM_RESOURCE_SPEC.getShuffleMemSize()))); - assertThat(metaspaceSize, is(TM_RESOURCE_SPEC.getJvmMetaspaceSize())); + assertThat(MemorySize.parse(configs.get("-Xmx")), is(TM_RESOURCE_SPEC.getFrameworkHeapSize().add(TM_RESOURCE_SPEC.getTaskHeapSize()).add(TM_RESOURCE_SPEC.getOnHeapManagedMemorySize()))); + assertThat(MemorySize.parse(configs.get("-Xms")), is(TM_RESOURCE_SPEC.getFrameworkHeapSize().add(TM_RESOURCE_SPEC.getTaskHeapSize()).add(TM_RESOURCE_SPEC.getOnHeapManagedMemorySize()))); + assertThat(MemorySize.parse(configs.get("-XX:MaxDirectMemorySize=")), is(TM_RESOURCE_SPEC.getTaskOffHeapSize().add(TM_RESOURCE_SPEC.getShuffleMemSize()))); + assertThat(MemorySize.parse(configs.get("-XX:MaxMetaspaceSize=")), is(TM_RESOURCE_SPEC.getJvmMetaspaceSize())); } @Test public void testConfigFrameworkHeapMemory() {