在spark中有一个spark.yarn.jars 参数,作业依赖jar 直接放在hdfs上,避免从本地上传jar,在分发,加快启动速度。

YarnClusterDescriptor.java

// upload and register ship files
String systemJarHdfsDir =
configuration.getString("stream.flink.system.jars.dir", "");
List<String> systemClassPaths = findHdfsJars(fs, systemJarHdfsDir, paths,
  localResources, envShipFileList);

String userJars = configuration.getString("stream.flink.use.jars", "");
List<String> userClassPaths;
if (userJars != null && !"".equals(userJars)) {
  userClassPaths = registerUserJars(fs, userJars.split(","), paths,
    localResources, envShipFileList);
} else {
  userClassPaths = Collections.emptyList();
}

if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
  systemClassPaths.addAll(userClassPaths);
}

// normalize classpath by sorting
Collections.sort(systemClassPaths);
Collections.sort(userClassPaths);

// classpath assembler
StringBuilder classPathBuilder = new StringBuilder();
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
  for (String userClassPath : userClassPaths) {
    classPathBuilder.append(userClassPath).append(File.pathSeparator);
  }
}
for (String classPath : systemClassPaths) {
  classPathBuilder.append(classPath).append(File.pathSeparator);
}
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
  for (String userClassPath : userClassPaths) {
    classPathBuilder.append(userClassPath).append(File.pathSeparator);
  }
}

// Setup jar for ApplicationMaster
Path remotePathJar = setupFlinkJar("flink.jar", fs, flinkJarPath,
localResources);

回复