This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 30ce8c0059a03b494ad7cae93487f9b8c8117749
Author: Xintong Song <tonysong...@gmail.com>
AuthorDate: Thu Sep 26 19:36:20 2019 +0800

    [FLINK-13983][runtime] Introduce 'BashJavaUtils' allowing bash scripts to 
call java codes for generating TM resource dynamic configurations and JVM 
parameters.
---
 .../flink/configuration/ConfigurationUtils.java    | 58 ++++++++++++++++
 flink-dist/src/main/flink-bin/bin/config.sh        | 14 ++++
 flink-dist/src/test/bin/runBashJavaUtilsCmd.sh     | 38 +++++++++++
 .../org/apache/flink/dist/BashJavaUtilsTest.java   | 54 +++++++++++++++
 .../runtime/taskexecutor/TaskManagerRunner.java    |  4 +-
 .../apache/flink/runtime/util/BashJavaUtils.java   | 77 ++++++++++++++++++++++
 .../TaskExecutorResourceUtilsTest.java             | 44 +++----------
 7 files changed, 250 insertions(+), 39 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index b94865c..b6d817c 100755
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.configuration;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 
 import javax.annotation.Nonnull;
@@ -31,6 +32,7 @@ import java.util.Set;
 
 import static 
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
 import static 
org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Utility class for {@link Configuration} related helper functions.
@@ -174,6 +176,62 @@ public class ConfigurationUtils {
                return separatedPaths.length() > 0 ? separatedPaths.split(",|" 
+ File.pathSeparator) : EMPTY;
        }
 
+       @VisibleForTesting
+       public static Map<String, String> parseTmResourceDynamicConfigs(String 
dynamicConfigsStr) {
+               Map<String, String> configs = new HashMap<>();
+               String[] configStrs = dynamicConfigsStr.split(" ");
+
+               checkArgument(configStrs.length % 2 == 0);
+               for (int i = 0; i < configStrs.length; ++i) {
+                       String configStr = configStrs[i];
+                       if (i % 2 == 0) {
+                               checkArgument(configStr.equals("-D"));
+                       } else {
+                               String[] configKV = configStr.split("=");
+                               checkArgument(configKV.length == 2);
+                               configs.put(configKV[0], configKV[1]);
+                       }
+               }
+
+               
checkArgument(configs.containsKey(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key()));
+               
checkArgument(configs.containsKey(TaskManagerOptions.TASK_HEAP_MEMORY.key()));
+               
checkArgument(configs.containsKey(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key()));
+               
checkArgument(configs.containsKey(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key()));
+               
checkArgument(configs.containsKey(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key()));
+               
checkArgument(configs.containsKey(TaskManagerOptions.MANAGED_MEMORY_SIZE.key()));
+               
checkArgument(configs.containsKey(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key()));
+
+               return configs;
+       }
+
+       @VisibleForTesting
+       public static Map<String, String> parseTmResourceJvmParams(String 
jvmParamsStr) {
+               final String xmx = "-Xmx";
+               final String xms = "-Xms";
+               final String maxDirect = "-XX:MaxDirectMemorySize=";
+               final String maxMetadata = "-XX:MaxMetaspaceSize=";
+
+               Map<String, String> configs = new HashMap<>();
+               for (String paramStr : jvmParamsStr.split(" ")) {
+                       if (paramStr.startsWith(xmx)) {
+                               configs.put(xmx, 
paramStr.substring(xmx.length()));
+                       } else if (paramStr.startsWith(xms)) {
+                               configs.put(xms, 
paramStr.substring(xms.length()));
+                       } else if (paramStr.startsWith(maxDirect)) {
+                               configs.put(maxDirect, 
paramStr.substring(maxDirect.length()));
+                       } else if (paramStr.startsWith(maxMetadata)) {
+                               configs.put(maxMetadata, 
paramStr.substring(maxMetadata.length()));
+                       }
+               }
+
+               checkArgument(configs.containsKey(xmx));
+               checkArgument(configs.containsKey(xms));
+               checkArgument(configs.containsKey(maxDirect));
+               checkArgument(configs.containsKey(maxMetadata));
+
+               return configs;
+       }
+
        // Make sure that we cannot instantiate this class
        private ConfigurationUtils() {
        }
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index b799214..090c1fb 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -776,3 +776,17 @@ calculateTaskManagerHeapSizeMB() {
 
     echo ${tm_heap_size_mb}
 }
+
+runBashJavaUtilsCmd() {
+    local cmd=$1
+    local class_path=$2
+    local conf_dir=$3
+
+    local output="`${JAVA_RUN} -classpath ${class_path} 
org.apache.flink.runtime.util.BashJavaUtils ${cmd} --configDir ${conf_dir} 2> 
/dev/null`"
+    if [[ $? -ne 0 ]]; then
+        echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}."
+        exit 1
+    fi
+
+    echo ${output}
+}
diff --git a/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh 
b/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh
new file mode 100755
index 0000000..e623760
--- /dev/null
+++ b/flink-dist/src/test/bin/runBashJavaUtilsCmd.sh
@@ -0,0 +1,38 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Wrapper script to compare the TM heap size calculation of config.sh with Java
+USAGE="Usage: runBashJavaUtilsCmd.sh <command>"
+
+COMMAND=$1
+
+if [[ -z "${COMMAND}" ]]; then
+  echo "$USAGE"
+  exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+FLINK_CLASSPATH=`find . -name 'flink-dist*.jar' | grep lib`
+FLINK_CONF_DIR=${bin}/../../main/resources
+
+. ${bin}/../../main/flink-bin/bin/config.sh > /dev/null
+
+runBashJavaUtilsCmd ${COMMAND} ${FLINK_CLASSPATH} ${FLINK_CONF_DIR}
diff --git 
a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsTest.java 
b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsTest.java
new file mode 100644
index 0000000..6b6a216
--- /dev/null
+++ b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.runtime.util.BashJavaUtils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for BashJavaUtils.
+ */
+public class BashJavaUtilsTest extends JavaBashTestBase {
+
+       private static final String RUN_BASH_JAVA_UTILS_CMD_SCRIPT = 
"src/test/bin/runBashJavaUtilsCmd.sh";
+
+       @Test
+       public void testGetTmResourceDynamicConfigs() throws Exception {
+               String[] command = {RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
+                       
BashJavaUtils.Command.GET_TM_RESOURCE_DYNAMIC_CONFIGS.toString()};
+               String result = executeScript(command);
+
+               assertNotNull(result);
+               ConfigurationUtils.parseTmResourceDynamicConfigs(result);
+       }
+
+       @Test
+       public void testGetTmResourceJvmParams() throws Exception {
+               String[] command = {RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
+                       
BashJavaUtils.Command.GET_TM_RESOURCE_JVM_PARAMS.toString()};
+               String result = executeScript(command);
+
+               assertNotNull(result);
+               ConfigurationUtils.parseTmResourceJvmParams(result);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 6493dec..924ed0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -305,8 +304,7 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                }
        }
 
-       @VisibleForTesting
-       static Configuration loadConfiguration(String[] args) throws 
FlinkParseException {
+       public static Configuration loadConfiguration(String[] args) throws 
FlinkParseException {
                final CommandLineParser<ClusterConfiguration> commandLineParser 
= new CommandLineParser<>(new ClusterConfigurationParserFactory());
 
                final ClusterConfiguration clusterConfiguration;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java
new file mode 100644
index 0000000..70618b1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for using java utilities in bash scripts.
+ */
+public class BashJavaUtils {
+
+       public static void main(String[] args) throws Exception {
+               checkArgument(args.length > 0, "Command not specified.");
+
+               switch (Command.valueOf(args[0])) {
+                       case GET_TM_RESOURCE_DYNAMIC_CONFIGS:
+                               getTmResourceDynamicConfigs(args);
+                               break;
+                       case GET_TM_RESOURCE_JVM_PARAMS:
+                               getTmResourceJvmParams(args);
+                               break;
+                       default:
+                               // unexpected, Command#valueOf should fail if a 
unknown command is passed in
+                               throw new RuntimeException("Unexpected, 
something is wrong.");
+               }
+       }
+
+       private static void getTmResourceDynamicConfigs(String[] args) throws 
Exception {
+               Configuration configuration = 
TaskManagerRunner.loadConfiguration(Arrays.copyOfRange(args, 1, args.length));
+               TaskExecutorResourceSpec taskExecutorResourceSpec = 
TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
+               
System.out.println(TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec));
+       }
+
+       private static void getTmResourceJvmParams(String[] args) throws 
Exception {
+               Configuration configuration = 
TaskManagerRunner.loadConfiguration(Arrays.copyOfRange(args, 1, args.length));
+               TaskExecutorResourceSpec taskExecutorResourceSpec = 
TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
+               
System.out.println(TaskExecutorResourceUtils.generateJvmParametersStr(taskExecutorResourceSpec));
+       }
+
+       /**
+        * Commands that BashJavaUtils supports.
+        */
+       public enum Command {
+               /**
+                * Get dynamic configs of task executor resources.
+                */
+               GET_TM_RESOURCE_DYNAMIC_CONFIGS,
+
+               /**
+                * Get JVM parameters of task executor resources.
+                */
+               GET_TM_RESOURCE_JVM_PARAMS
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
index 8577d8f..432c1af 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -27,7 +28,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Consumer;
 
@@ -60,19 +60,7 @@ public class TaskExecutorResourceUtilsTest extends 
TestLogger {
        @Test
        public void testGenerateDynamicConfigurations() {
                String dynamicConfigsStr = 
TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
-               Map<String, String> configs = new HashMap<>();
-               String[] configStrs = dynamicConfigsStr.split(" ");
-               assertThat(configStrs.length % 2, is(0));
-               for (int i = 0; i < configStrs.length; ++i) {
-                       String configStr = configStrs[i];
-                       if (i % 2 == 0) {
-                               assertThat(configStr, is("-D"));
-                       } else {
-                               String[] configKV = configStr.split("=");
-                               assertThat(configKV.length, is(2));
-                               configs.put(configKV[0], configKV[1]);
-                       }
-               }
+               Map<String, String> configs = 
ConfigurationUtils.parseTmResourceDynamicConfigs(dynamicConfigsStr);
 
                
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
                
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskHeapSize()));
@@ -84,30 +72,14 @@ public class TaskExecutorResourceUtilsTest extends 
TestLogger {
        }
 
        @Test
-       public void testGenerateJvmParameters() throws Exception {
+       public void testGenerateJvmParameters() {
                String jvmParamsStr = 
TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC);
-               MemorySize heapSizeMax = null;
-               MemorySize heapSizeMin = null;
-               MemorySize directSize = null;
-               MemorySize metaspaceSize = null;
-               for (String paramStr : jvmParamsStr.split(" ")) {
-                       if (paramStr.startsWith("-Xmx")) {
-                               heapSizeMax = 
MemorySize.parse(paramStr.substring("-Xmx".length()));
-                       } else if (paramStr.startsWith("-Xms")) {
-                               heapSizeMin = 
MemorySize.parse(paramStr.substring("-Xms".length()));
-                       } else if 
(paramStr.startsWith("-XX:MaxDirectMemorySize=")) {
-                               directSize = 
MemorySize.parse(paramStr.substring("-XX:MaxDirectMemorySize=".length()));
-                       } else if 
(paramStr.startsWith("-XX:MaxMetaspaceSize=")) {
-                               metaspaceSize = 
MemorySize.parse(paramStr.substring("-XX:MaxMetaspaceSize=".length()));
-                       } else {
-                               throw new Exception("Unknown JVM parameter: " + 
paramStr);
-                       }
-               }
+               Map<String, String> configs = 
ConfigurationUtils.parseTmResourceJvmParams(jvmParamsStr);
 
-               assertThat(heapSizeMax, 
is(TM_RESOURCE_SPEC.getFrameworkHeapSize().add(TM_RESOURCE_SPEC.getTaskHeapSize()).add(TM_RESOURCE_SPEC.getOnHeapManagedMemorySize())));
-               assertThat(heapSizeMin, is(heapSizeMax));
-               assertThat(directSize, 
is(TM_RESOURCE_SPEC.getTaskOffHeapSize().add(TM_RESOURCE_SPEC.getShuffleMemSize())));
-               assertThat(metaspaceSize, 
is(TM_RESOURCE_SPEC.getJvmMetaspaceSize()));
+               assertThat(MemorySize.parse(configs.get("-Xmx")), 
is(TM_RESOURCE_SPEC.getFrameworkHeapSize().add(TM_RESOURCE_SPEC.getTaskHeapSize()).add(TM_RESOURCE_SPEC.getOnHeapManagedMemorySize())));
+               assertThat(MemorySize.parse(configs.get("-Xms")), 
is(TM_RESOURCE_SPEC.getFrameworkHeapSize().add(TM_RESOURCE_SPEC.getTaskHeapSize()).add(TM_RESOURCE_SPEC.getOnHeapManagedMemorySize())));
+               
assertThat(MemorySize.parse(configs.get("-XX:MaxDirectMemorySize=")), 
is(TM_RESOURCE_SPEC.getTaskOffHeapSize().add(TM_RESOURCE_SPEC.getShuffleMemSize())));
+               
assertThat(MemorySize.parse(configs.get("-XX:MaxMetaspaceSize=")), 
is(TM_RESOURCE_SPEC.getJvmMetaspaceSize()));
        }
 
        @Test public void testConfigFrameworkHeapMemory() {

Reply via email to