[GitHub] [flink] wangyang0918 commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread

2019-11-13 Thread GitBox
wangyang0918 commented on a change in pull request #10143: 
[FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main 
thread
URL: https://github.com/apache/flink/pull/10143#discussion_r346167378
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -322,11 +323,7 @@ Resource getContainerResource() {
public boolean stopWorker(final YarnWorkerNode workerNode) {
final Container container = workerNode.getContainer();
log.info("Stopping container {}.", container.getId());
-   try {
-   nodeManagerClient.stopContainer(container.getId(), 
container.getNodeId());
-   } catch (final Exception e) {
-   log.warn("Error while calling YARN Node Manager to stop 
container", e);
-   }
+   nodeManagerClient.stopContainerAsync(container.getId(), 
container.getNodeId());
 
 Review comment:
   The exception is not thrown by `startContainerAsync `, but the 
`createTaskExecutorLaunchContext `.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread

2019-11-11 Thread GitBox
wangyang0918 commented on a change in pull request #10143: 
[FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main 
thread
URL: https://github.com/apache/flink/pull/10143#discussion_r344669915
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
 ##
 @@ -844,14 +844,16 @@ private ApplicationReport startAppMaster(
tmpConfigurationFile.deleteOnExit();
BootstrapTools.writeConfiguration(configuration, 
tmpConfigurationFile);
 
+   String flinkConfigKey = "flink-conf.yaml";
Path remotePathConf = setupSingleLocalResource(
-   "flink-conf.yaml",
+   flinkConfigKey,
fs,
appId,
new 
Path(tmpConfigurationFile.getAbsolutePath()),
localResources,
homeDir,
"");
+   
envShipFileList.append(flinkConfigKey).append("=").append(remotePathConf).append(",");
 
 Review comment:
   Yes, we always ship the `flink-conf.yaml` to all `TaskExecutor`s. The 
`TaskExecutor` will load flink configuration from `flink-conf.yaml` uploaded by 
client and dynamic properties set by `YarnResourceManager`. 
   
   @TisonKun What do you mean "we remove it from code diff above".


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread

2019-11-11 Thread GitBox
wangyang0918 commented on a change in pull request #10143: 
[FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main 
thread
URL: https://github.com/apache/flink/pull/10143#discussion_r344667974
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -322,11 +323,7 @@ Resource getContainerResource() {
public boolean stopWorker(final YarnWorkerNode workerNode) {
final Container container = workerNode.getContainer();
log.info("Stopping container {}.", container.getId());
-   try {
-   nodeManagerClient.stopContainer(container.getId(), 
container.getNodeId());
-   } catch (final Exception e) {
-   log.warn("Error while calling YARN Node Manager to stop 
container", e);
-   }
+   nodeManagerClient.stopContainerAsync(container.getId(), 
container.getNodeId());
 
 Review comment:
   `stopContainerAsync` will not throw any exception. If stop failed, 
`onStartContainerError` will be called.
   We catch the exception when 'startContainerAsync' just because 
`createTaskExecutorLaunchContext` may throw an exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread

2019-11-11 Thread GitBox
wangyang0918 commented on a change in pull request #10143: 
[FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main 
thread
URL: https://github.com/apache/flink/pull/10143#discussion_r344666748
 
 

 ##
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
 ##
 @@ -641,4 +610,32 @@ static void require(boolean condition, String message, 
Object... values) {
}
}
 
+   /**
+* Get dynamic properties based on two Flink configuration. If base 
config does not contain and target config
+* contains the key or the value is different, it should be added to 
results. Otherwise, if the base config contains
+* and target config does not contain the key, it will be ignored.
+* @param baseConfig The base configuration.
+* @param targetConfig The target configuration.
+* @return Dynamic properties as string, separated by space.
+*/
+   static String getDynamicProperties(
+   org.apache.flink.configuration.Configuration baseConfig,
+   org.apache.flink.configuration.Configuration targetConfig) {
+
+   String[] newAddedConfigs = 
targetConfig.keySet().stream().flatMap(
+   (String key) -> {
+   final String baseValue = 
baseConfig.getString(ConfigOptions.key(key).stringType().noDefaultValue());
+   final String targetValue = 
targetConfig.getString(ConfigOptions.key(key).stringType().noDefaultValue());
+
+   if (!baseConfig.keySet().contains(key) || 
!baseValue.equals(targetValue)) {
+   return Stream.of("-" + 
CommandLineOptions.DYNAMIC_PROPERTY_OPTION.getOpt() + key +
+   
CommandLineOptions.DYNAMIC_PROPERTY_OPTION.getValueSeparator() + targetValue);
+   } else {
+   return Stream.empty();
+   }
+   })
+   .toArray(String[]::new);
+   return 
org.apache.commons.lang3.StringUtils.join(newAddedConfigs, " ");
 
 Review comment:
   OK. I will use `java.lang.String.join` instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread

2019-11-11 Thread GitBox
wangyang0918 commented on a change in pull request #10143: 
[FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main 
thread
URL: https://github.com/apache/flink/pull/10143#discussion_r344665359
 
 

 ##
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
 ##
 @@ -641,4 +610,32 @@ static void require(boolean condition, String message, 
Object... values) {
}
}
 
+   /**
+* Get dynamic properties based on two Flink configuration. If base 
config does not contain and target config
+* contains the key or the value is different, it should be added to 
results. Otherwise, if the base config contains
+* and target config does not contain the key, it will be ignored.
+* @param baseConfig The base configuration.
+* @param targetConfig The target configuration.
+* @return Dynamic properties as string, separated by space.
+*/
+   static String getDynamicProperties(
 
 Review comment:
   No, it does not store anywhere. It will be passed to the launch command of 
`TaskExecutor`. We only get the differences with client uploaded 
flink-conf.yaml, and then pass it to `TaskExecutor` by dynamic properties.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread

2019-11-11 Thread GitBox
wangyang0918 commented on a change in pull request #10143: 
[FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main 
thread
URL: https://github.com/apache/flink/pull/10143#discussion_r344664368
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
 ##
 @@ -159,18 +159,29 @@ public void testGetTaskManagerShellCommand() {
"-Dlog4j.configuration=file:./conf/log4j.properties"; 
// if set
final String mainClass =

"org.apache.flink.runtime.clusterframework.BootstrapToolsTest";
-   final String args = "--configDir ./conf";
+   final String basicArgs = "--configDir ./conf";
+   final String mainArgs = "-Djobmanager.rpc.address=host1 
-Dkey.a=v1";
+   final String args =  basicArgs + " " + mainArgs;
final String redirects =
"1> ./logs/taskmanager.out 2> ./logs/taskmanager.err";
 
+   assertEquals(
+   java + " " + jvmmem +
+   " " + // jvmOpts
+   " " + // logging
 
 Review comment:
   This is to test the empty mainArgs. Only the basicArgs is added to the 
command.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread

2019-11-11 Thread GitBox
wangyang0918 commented on a change in pull request #10143: 
[FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main 
thread
URL: https://github.com/apache/flink/pull/10143#discussion_r344663861
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 ##
 @@ -381,7 +381,8 @@ public static String getTaskManagerShellCommand(
boolean hasLogback,
boolean hasLog4j,
boolean hasKrb5,
-   Class mainClass) {
+   Class mainClass,
 
 Review comment:
   @walterddr A agree with tison that it is an internal class.
   @TisonKun All the usage of `getTaskManagerShellCommand` has been updated to 
use the new args. So i think it is also easy to cherry-pick. If you insist, i 
will keep the old function.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #10143: [FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread

2019-11-11 Thread GitBox
wangyang0918 commented on a change in pull request #10143: 
[FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main 
thread
URL: https://github.com/apache/flink/pull/10143#discussion_r344660476
 
 

 ##
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
 ##
 @@ -491,41 +492,9 @@ static ContainerLaunchContext createTaskExecutorContext(
flinkJar = registerLocalResource(fs, remoteJarPath);
}
 
-   // register conf with local fs
-   final LocalResource flinkConf;
 
 Review comment:
   Uploading a taskmanager-conf.yaml to hdfs will take 50ms in many production 
hdfs cluster. If we receive more that 1000 containers, this will take long time 
in `YarnResourceManager` main thread. In addition, all the 
{uuid}-taskmanager-conf.yaml are same and have very small difference with flink 
config uploaded by client.
   
   So i suggest to remove the io operation in start TaskExecutor. About 
`NMClientAsync`, it is another optimazion to eliminate the rpc blocking call in 
`YarnResourceManager` main thread.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services