[GitHub] [flink] xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] Launch task executor with new memory calculation logics

2019-10-15 Thread GitBox
xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] 
Launch task executor with new memory calculation logics
URL: https://github.com/apache/flink/pull/9801#discussion_r335057546
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 ##
 @@ -259,6 +271,30 @@ public static TaskManagerServicesConfiguration 
fromConfiguration(
 
final RetryingRegistrationConfiguration 
retryingRegistrationConfiguration = 
RetryingRegistrationConfiguration.fromConfiguration(configuration);
 
+   if 
(configuration.getBoolean(TaskManagerOptions.ENABLE_FLIP_49_CONFIG)) {
+   final TaskExecutorResourceSpec taskExecutorResourceSpec 
= TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
 
 Review comment:
   I understand the motivation of having as less as possible separated code 
paths, for exposing the new codes for testability early.
   
   My key point is that, this cannot be easily done.
   - The old fields cannot be mapped to the new fields. 
`onHeapManagedMemorySize` does not match `taskManagerHeapSizeMB` neither. The 
legacy filed `taskManagerHeapSizeMB` has a "heap" in its name, but is actually 
not describing java heap memory. I don't think any of the filed in 
`TaskExecutorResourceSpec` matches it.
   - FLIP-49 is designed to change the way of memory configuration. We can 
foresee that not all of the IT/E2E cases can work as it is. It doesn't make 
sense to try to expose the new configuration logics to the ETE tests at this 
moment.
   - My plan was to first get the new feature completed, then switching IT/ETE 
test cases to the new configuration (because we no longer need them to guard 
the old logic). That's why I put the work item "fix / update / remove test 
cases for legacy mode" in the last step "clean-up of legacy mode" of FLIP-49.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] Launch task executor with new memory calculation logics

2019-10-15 Thread GitBox
xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] 
Launch task executor with new memory calculation logics
URL: https://github.com/apache/flink/pull/9801#discussion_r334831988
 
 

 ##
 File path: flink-dist/src/test/java/org/apache/flink/dist/JavaBashTestBase.java
 ##
 @@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assume;
+import org.junit.BeforeClass;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * Test class for executing bash scripts.
+ */
+public class JavaBashTestBase extends TestLogger {
+   @BeforeClass
+   public static void checkOperatingSystem() {
+   Assume.assumeTrue("This test checks shell scripts which are not 
available on Windows.",
+   !OperatingSystem.isWindows());
+   }
+
+   /**
+* Executes the given shell script wrapper and returns its output.
+*
+* @param command  command to run
+*
+* @return raw script output
+*/
+   protected String executeScript(final String[] command) throws 
IOException {
+   ProcessBuilder pb = new ProcessBuilder(command);
+   pb.redirectErrorStream(true);
+   Process process = pb.start();
+   BufferedReader reader = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
+   StringBuilder sb = new StringBuilder();
+   String s;
+   while ((s = reader.readLine()) != null) {
+   sb.append(s).append("\n");
 
 Review comment:
   Since we are separating the two commands (generating dynamic config / jvm 
parameters), we can simply remove this line breaker here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] Launch task executor with new memory calculation logics

2019-10-14 Thread GitBox
xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] 
Launch task executor with new memory calculation logics
URL: https://github.com/apache/flink/pull/9801#discussion_r334736805
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 ##
 @@ -259,6 +271,30 @@ public static TaskManagerServicesConfiguration 
fromConfiguration(
 
final RetryingRegistrationConfiguration 
retryingRegistrationConfiguration = 
RetryingRegistrationConfiguration.fromConfiguration(configuration);
 
+   if 
(configuration.getBoolean(TaskManagerOptions.ENABLE_FLIP_49_CONFIG)) {
+   final TaskExecutorResourceSpec taskExecutorResourceSpec 
= TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
 
 Review comment:
   I'm not sure I understand this comment completely.
   
   First of all, as mentioned above, the old parameters cannot be mapped to 
`TaskExecutorResourceSpec` directly.
   
   I'd like to keep the two configuration logics separately for the following 
reasons:
   - This PR means to introduce no behavior changes to the legacy mode, which 
is hard to guarantee if we try to map the old fields to the new ones. Since the 
legacy code path is quite complex and not friendly to maintainability (exactly 
one of the motivations for FLIP-49), and will be removed soon, I'd rather not 
to spend too much effort on deduplication of the two code paths with the risk 
of breaking the original behavior of the legacy mode. 
   - The code path of the new mode, in my opinion, is clear and self contained. 
I'm in favor of not messing it by inserting the legacy logics in between. That 
would also introduce more workload for later legacy clean-ups. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] Launch task executor with new memory calculation logics

2019-10-14 Thread GitBox
xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] 
Launch task executor with new memory calculation logics
URL: https://github.com/apache/flink/pull/9801#discussion_r334732819
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 ##
 @@ -472,7 +472,9 @@ public static String getStartCommand(String template,
for (Map.Entry variable : startCommandValues
.entrySet()) {
template = template
-   .replace("%" + variable.getKey() + "%", 
variable.getValue());
+   .replace("%" + variable.getKey() + "%", 
variable.getValue())
 
 Review comment:
   I think for the test cases that verifies the generated shell command, the 
main purpose is to verified the command values and their order. How many spaces 
between tokens is rather an unconcerned implementation detail.
   Previously, we need to carefully put one or multiple sequential spaces 
between the command values when writing test cases. Compared to that, I think 
it would be easier to always have one space between each pair of adjacent 
command values.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] Launch task executor with new memory calculation logics

2019-10-14 Thread GitBox
xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] 
Launch task executor with new memory calculation logics
URL: https://github.com/apache/flink/pull/9801#discussion_r334730673
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
 ##
 @@ -48,13 +48,17 @@
/** Environment variables to add to the Java process. */
private final HashMap taskManagerEnv;
 
+   private final TaskExecutorResourceSpec taskExecutorResourceSpec;
 
 Review comment:
   I don't think the legacy fields can be mapped to the new ones.
   `taskManagerHeapSizeMB` does not map to `taskHeapSize` (I know the names 
sounds similar and thus confusing), it's similar but also not exactly the same 
as `totalFlinkMemory`.
   
   Mapping between the legacy and new fields could be complicated and against 
intuition. Therefore I'm in favor of keep the two whole sets of parameters for 
the legacy and new mode, so it would be easier for implementation and later 
removing legacy codes.
   
   I'll add the `@nullable` annotations.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] Launch task executor with new memory calculation logics

2019-10-14 Thread GitBox
xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] 
Launch task executor with new memory calculation logics
URL: https://github.com/apache/flink/pull/9801#discussion_r334728535
 
 

 ##
 File path: flink-dist/src/main/flink-bin/bin/taskmanager.sh
 ##
 @@ -44,32 +44,18 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == 
"start-foreground" ]]; then
 export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
 fi
 
-if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
-   echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace 
with key \`${KEY_TASKM_MEM_SIZE}\`"
-else
-   flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
-   FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
-fi
-
-if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" 
-lt "0" ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a number. 
Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
-exit 1
-fi
-
-if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then
-
-TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
-# Long.MAX_VALUE in TB: This is an upper bound, much less direct 
memory will be used
-TM_MAX_OFFHEAP_SIZE="8388607T"
-
-export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M 
-XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"
-
-fi
-
 # Add TaskManager-specific JVM options
 export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} 
${FLINK_ENV_JAVA_OPTS_TM}"
 
 # Startup parameters
+
dynamic_configs_and_jvm_params=$(getTmResourceDynamicConfigsAndExportJvmParams)
+IFS=$'\n' lines=(${dynamic_configs_and_jvm_params})
+
+jvm_params=${lines[0]}
+export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
+
+IFS=' ' dynamic_configs=(${lines[1]})
+ARGS=(${ARGS[@]} ${dynamic_configs[@]})
 
 Review comment:
   Thanks, I'll try that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] Launch task executor with new memory calculation logics

2019-10-14 Thread GitBox
xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] 
Launch task executor with new memory calculation logics
URL: https://github.com/apache/flink/pull/9801#discussion_r334728390
 
 

 ##
 File path: flink-dist/src/main/flink-bin/bin/config.sh
 ##
 @@ -783,3 +790,55 @@ calculateTaskManagerHeapSizeMB() {
 
 echo ${tm_heap_size_mb}
 }
+
+getTmResourceDynamicConfigsAndExportJvmParams() {
+if [[ "`echo ${FLINK_TM_ENABLE_FLIP49} | tr '[:upper:]' '[:lower:]'`" == 
"true" ]]; then
+echo "$(getTmResourceDynamicConfigsAndExportJvmParamsFlip49)"
+else
+echo "$(getTmResourceDynamicConfigsAndExportJvmParamsLegacy)"
+fi
+}
+
+getTmResourceDynamicConfigsAndExportJvmParamsFlip49() {
+local class_to_run=org.apache.flink.runtime.util.BashJavaUtils
+local command=GET_TM_RESOURCE_CONFIGS_AND_JVM_PARAMS
+local class_path=`constructFlinkClassPath`
+class_path=`manglePathList ${class_path}`
+
+local output="`${JAVA_RUN} -classpath ${class_path} ${class_to_run} 
${command} --configDir ${FLINK_CONF_DIR} 2> /dev/null`"
+if [[ $? -ne 0 ]]; then
+echo "[ERROR] Cannot get TaskManager resource dynamic configs and JVM 
parameters from BashJavaUtils."
+exit 1
+fi
+
+IFS=$'\n' lines=($output)
+local dynamic_configs=${lines[0]} # dynamic configs
+local jvm_params=${lines[1]} # jvm parameters
+
+echo ${jvm_params} $'\n' ${dynamic_configs}
 
 Review comment:
   Yes, it is.
   Somehow if I don't explicitly put `'\n'` in the `echo` line, the line break 
will be lost. Don't know why, I'm not very familiar with bash scripts.
   I guest it will no longer be a problem if we are separating responsibility 
of generating dynamic configs and JVM parameters.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] Launch task executor with new memory calculation logics

2019-10-14 Thread GitBox
xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] 
Launch task executor with new memory calculation logics
URL: https://github.com/apache/flink/pull/9801#discussion_r334727581
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for using java utilities in bash scripts.
+ */
+public class BashJavaUtils {
+
+   private static final String CMD_GET_TM_RESOURCE_CONFIGS_AND_JVM_PARAMS 
= "GET_TM_RESOURCE_CONFIGS_AND_JVM_PARAMS";
+
+   public static void main(String[] args) throws Exception {
+   checkArgument(args.length > 0, "Command not specified.");
+
+   switch (args[0]) {
+   case CMD_GET_TM_RESOURCE_CONFIGS_AND_JVM_PARAMS: 
getTmResourceConfigsAndJvmParams(args); break;
+   default: throw new 
RuntimeException(String.format("Unknown command: %s", args[0]));
+   }
+   }
+
+   private static void getTmResourceConfigsAndJvmParams(String[] args) 
throws Exception {
+   Configuration configuration = 
TaskManagerRunner.loadConfiguration(Arrays.copyOfRange(args, 1, args.length));
+   TaskExecutorResourceSpec taskExecutorResourceSpec = 
TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
+   
System.out.println(TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec));
+   
System.out.println(TaskExecutorResourceUtils.generateJvmParametersStr(taskExecutorResourceSpec));
 
 Review comment:
   I was also hesitating about this.
   Ok, let's try do it separately first.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] Launch task executor with new memory calculation logics

2019-10-14 Thread GitBox
xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] 
Launch task executor with new memory calculation logics
URL: https://github.com/apache/flink/pull/9801#discussion_r334726479
 
 

 ##
 File path: 
flink-dist/src/test/bin/getTmResourceConfigsAndJvmParamsFromBashJavaUtils.sh
 ##
 @@ -0,0 +1,32 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+FLINK_CONF_DIR=${bin}/../../main/resources
+. ${bin}/../../main/flink-bin/bin/config.sh > /dev/null
+
+FLINK_CLASSPATH=`find . -name 'flink-dist*.jar' | grep lib`
+
+result=`java -classpath ${FLINK_CLASSPATH} 
org.apache.flink.runtime.util.BashJavaUtils 
GET_TM_RESOURCE_CONFIGS_AND_JVM_PARAMS --configDir ${FLINK_CONF_DIR} 2> 
/dev/null`
 
 Review comment:
   Make sense. I'll try to do that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] Launch task executor with new memory calculation logics

2019-09-29 Thread GitBox
xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] 
Launch task executor with new memory calculation logics
URL: https://github.com/apache/flink/pull/9801#discussion_r329350267
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
 ##
 @@ -48,13 +48,17 @@
/** Environment variables to add to the Java process. */
private final HashMap taskManagerEnv;
 
+   private final TaskExecutorResourceSpec taskExecutorResourceSpec;
+
public ContaineredTaskManagerParameters(
+   TaskExecutorResourceSpec taskExecutorResourceSpec,
 
 Review comment:
   I'm not sure about that. Having different constructors means that 
`ContaineredTaskManagerParameters` needs to understand the difference between 
legacy and flip49 modes, and initialize final class members that are not valid 
in the current mode with `null` values. I think we should have as less as 
possible separated code paths, thus I'm in favor of keep it the current way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] Launch task executor with new memory calculation logics

2019-09-29 Thread GitBox
xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] 
Launch task executor with new memory calculation logics
URL: https://github.com/apache/flink/pull/9801#discussion_r329349528
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 ##
 @@ -432,7 +432,12 @@ public static String getTaskManagerShellCommand(
tmParams.taskManagerDirectMemoryLimitMB()));
}
 
-   startCommandValues.put("jvmmem", StringUtils.join(params, ' '));
+   final TaskExecutorResourceSpec taskExecutorResourceSpec = 
tmParams.getTaskExecutorResourceSpec();
+   if (taskExecutorResourceSpec == null) { // FLIP-49 disabled
 
 Review comment:
   `taskExecutorResourceSpec` should never be `null` after enabling FLIP-49. 
The only reason it might be `null` is that we passed in `null` value for the 
legacy code paths. So I think it should not be decoupled. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] Launch task executor with new memory calculation logics

2019-09-29 Thread GitBox
xintongsong commented on a change in pull request #9801: [FLINK-13983][runtime] 
Launch task executor with new memory calculation logics
URL: https://github.com/apache/flink/pull/9801#discussion_r329349401
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
 ##
 @@ -125,7 +125,7 @@ public static String generateDynamicConfigsStr(final 
TaskExecutorResourceSpec ta
private static String assembleDynamicConfigsStr(final Map configs) {
final StringBuffer sb = new StringBuffer();
for (Map.Entry entry : configs.entrySet()) {
-   
sb.append("-D").append(entry.getKey()).append("=").append(entry.getValue()).append("
 ");
+   sb.append("-D 
").append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
 
 Review comment:
   Dynamic configs are exiting mechanism for passing configs into flink 
overwriting configs in flink-conf.yaml. The correct format requires a space 
between '-D' and the following key-value part. This change is just a fix of 
previous wrong format.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services