在spark中有一个spark.yarn.jars 参数,作业依赖jar 直接放在hdfs上,避免从本地上传jar,在分发,加快启动速度。
YarnClusterDescriptor.java
// upload and register ship files
String systemJarHdfsDir =
configuration.getString("stream.flink.system.jars.dir", "");
List<String> systemClassPaths = findHdfsJars(fs, systemJarHdfsDir, paths,
localResources, envShipFileList);
String userJars = configuration.getString("stream.flink.use.jars", "");
List<String> userClassPaths;
if (userJars != null && !"".equals(userJars)) {
userClassPaths = registerUserJars(fs, userJars.split(","), paths,
localResources, envShipFileList);
} else {
userClassPaths = Collections.emptyList();
}
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
}
// normalize classpath by sorting
Collections.sort(systemClassPaths);
Collections.sort(userClassPaths);
// classpath assembler
StringBuilder classPathBuilder = new StringBuilder();
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
for (String classPath : systemClassPaths) {
classPathBuilder.append(classPath).append(File.pathSeparator);
}
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
// Setup jar for ApplicationMaster
Path remotePathJar = setupFlinkJar("flink.jar", fs, flinkJarPath,
localResources);