[GitHub] [flink] 1996fanrui commented on a diff in pull request #23219: [FLINK-20681][yarn] Support specifying the hdfs path when ship archives or files

2023-08-28 Thread via GitHub


1996fanrui commented on code in PR #23219:
URL: https://github.com/apache/flink/pull/23219#discussion_r1308166223


##
flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java:
##
@@ -543,22 +546,62 @@ void testExplicitFileShipping() throws Exception {
 Files.createTempDirectory(temporaryFolder, 
UUID.randomUUID().toString())
 .toFile();
 
-assertThat(descriptor.getShipFiles()).doesNotContain(libFile, 
libFolder);
+assertThat(descriptor.getShipFiles())
+.doesNotContain(getPathFromLocalFile(libFile), 
getPathFromLocalFile(libFolder));
 
-List shipFiles = new ArrayList<>();
-shipFiles.add(libFile);
-shipFiles.add(libFolder);
+List shipFiles = new ArrayList<>();
+shipFiles.add(getPathFromLocalFile(libFile));
+shipFiles.add(getPathFromLocalFile(libFolder));
 
 descriptor.addShipFiles(shipFiles);
 
-assertThat(descriptor.getShipFiles()).contains(libFile, libFolder);
+assertThat(descriptor.getShipFiles())
+.contains(getPathFromLocalFile(libFile), 
getPathFromLocalFile(libFolder));
 
 // only execute part of the deployment to test for shipped files
-Set effectiveShipFiles = new HashSet<>();
+Set effectiveShipFiles = new HashSet<>();
 descriptor.addLibFoldersToShipFiles(effectiveShipFiles);
 
 assertThat(effectiveShipFiles).isEmpty();
-assertThat(descriptor.getShipFiles()).hasSize(2).contains(libFile, 
libFolder);
+assertThat(descriptor.getShipFiles())
+.hasSize(2)
+.contains(getPathFromLocalFile(libFile), 
getPathFromLocalFile(libFolder));
+}
+
+String hdfsDir = "hdfs:///flink/hdfs_dir";
+String hdfsFile = "hdfs:///flink/hdfs_file";
+File libFile = Files.createTempFile(temporaryFolder, "libFile", 
".jar").toFile();
+File libFolder =
+Files.createTempDirectory(temporaryFolder, 
UUID.randomUUID().toString()).toFile();
+final org.apache.hadoop.conf.Configuration hdConf =
+new org.apache.hadoop.conf.Configuration();
+hdConf.set(
+MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
temporaryFolder.toAbsolutePath().toString());
+try (final MiniDFSCluster hdfsCluster = new 
MiniDFSCluster.Builder(hdConf).build()) {
+final org.apache.hadoop.fs.Path hdfsRootPath =
+new org.apache.hadoop.fs.Path(hdfsCluster.getURI());
+hdfsCluster.getFileSystem().mkdirs(new 
org.apache.hadoop.fs.Path(hdfsDir));
+hdfsCluster.getFileSystem().createNewFile(new 
org.apache.hadoop.fs.Path(hdfsFile));
+
+Configuration flinkConfiguration = new Configuration();
+flinkConfiguration.set(
+YarnConfigOptions.SHIP_FILES,
+Arrays.asList(
+libFile.getAbsolutePath(),
+libFolder.getAbsolutePath(),
+hdfsDir,
+hdfsFile));
+final YarnConfiguration yarnConfig = new YarnConfiguration();
+yarnConfig.set(
+CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, 
hdfsRootPath.toString());
+YarnClusterDescriptor descriptor =
+createYarnClusterDescriptor(flinkConfiguration, 
yarnConfig);
+assertThat(descriptor.getShipFiles())
+.containsExactly(
+getPathFromLocalFile(libFile),
+getPathFromLocalFile(libFolder),
+new Path(hdfsDir),
+new Path(hdfsFile));

Review Comment:
   These new tests can be moved to a separate test method.
   
   The old test is testing by `YarnClusterDescriptor.addShipFiles`, the method 
name also mentioned it. And your test is testing by 
`YarnConfigOptions.SHIP_FILES`, so it's not suitable adding your  new test to 
the old test method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #23219: [FLINK-20681][yarn] Support specifying the hdfs path when ship archives or files

2023-08-24 Thread via GitHub


1996fanrui commented on code in PR #23219:
URL: https://github.com/apache/flink/pull/23219#discussion_r1305075128


##
flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java:
##
@@ -599,8 +638,13 @@ private void testEnvironmentDirectoryShipping(String 
environmentVariable, boolea
 }
 
 // only add the ship the folder, not the contents
-
assertThat(effectiveShipFiles).doesNotContain(libFile).contains(libFolder);
-assertThat(descriptor.getShipFiles()).doesNotContain(libFile, 
libFolder);
+assertThat(effectiveShipFiles)
+.doesNotContain(new Path(libFile.getAbsolutePath()))
+.contains(new Path(libFolder.toURI()));
+assertThat(descriptor.getShipFiles())
+.doesNotContain(
+new Path(libFile.getAbsolutePath()),
+new Path(libFolder.getAbsolutePath()));

Review Comment:
   Some of files are converted to `Path` by the `toURI`, and some of them by 
the `.getAbsolutePath()`, is it as expected?
   
   I prefer to introduce a method that similar to 
`getPathFromLocalFilePathStr`, we need a `getPathFromLocalFile`. Otherwise the 
test code will be unclear.
   
   Also, the `YarnClusterDescriptor` also needs the `getPathFromLocalFile`. 
Maybe we should extract a couple of static method at an util class.



##
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##
@@ -155,10 +157,19 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor {
 /** True if the descriptor must not shut down the YarnClient. */
 private final boolean sharedYarnClient;
 
-/** Lazily initialized list of files to ship. */
-private final List shipFiles = new LinkedList<>();
+/**
+ * Lazily initialized list of files to ship. The path string for the files 
which is configured
+ * by {@code YarnConfigOptions#SHIP_FILES} will be converted to {@code 
Path} with schema and

Review Comment:
   ```suggestion
* by {@link YarnConfigOptions#SHIP_FILES} will be converted to {@link 
Path} with schema and
   ```
   
   It should be link instead of code, the link can be clicked in IDEA. 
   
   The `shipArchives` is same.



##
flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java:
##
@@ -707,6 +751,25 @@ void testShipArchives() throws IOException {
 YarnConfigOptions.SHIP_ARCHIVES,
 Arrays.asList(archive1.getAbsolutePath(), 
archive2.getAbsolutePath()));
 createYarnClusterDescriptor(flinkConfiguration);
+
+String archive3 = "hdfs:///flink/archive3.zip";
+final org.apache.hadoop.conf.Configuration hdConf =
+new org.apache.hadoop.conf.Configuration();
+hdConf.set(
+MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
temporaryFolder.toAbsolutePath().toString());
+try (final MiniDFSCluster hdfsCluster = new 
MiniDFSCluster.Builder(hdConf).build()) {
+final org.apache.hadoop.fs.Path hdfsRootPath =
+new org.apache.hadoop.fs.Path(hdfsCluster.getURI());
+hdfsCluster.getFileSystem().createNewFile(new 
org.apache.hadoop.fs.Path(archive3));
+
+flinkConfiguration.set(
+YarnConfigOptions.SHIP_ARCHIVES,
+Arrays.asList(archive1.getAbsolutePath(), archive3));
+final YarnConfiguration yarnConfig = new YarnConfiguration();
+yarnConfig.set(
+CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, 
hdfsRootPath.toString());
+createYarnClusterDescriptor(flinkConfiguration, yarnConfig);

Review Comment:
   Could you add a assert to check whether the descriptor.getShipFiles() is as 
expected?



##
flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java:
##
@@ -543,22 +545,59 @@ void testExplicitFileShipping() throws Exception {
 Files.createTempDirectory(temporaryFolder, 
UUID.randomUUID().toString())
 .toFile();
 
-assertThat(descriptor.getShipFiles()).doesNotContain(libFile, 
libFolder);
+assertThat(descriptor.getShipFiles())
+.doesNotContain(
+new Path(libFile.getAbsolutePath()),
+new Path(libFolder.getAbsolutePath()));
 
-List shipFiles = new ArrayList<>();
-shipFiles.add(libFile);
-shipFiles.add(libFolder);
+List shipFiles = new ArrayList<>();
+shipFiles.add(new Path(libFile.getAbsolutePath()));
+shipFiles.add(new Path(libFolder.getAbsolutePath()));
 
 descriptor.addShipFiles(shipFiles);
 
-assertThat(descriptor.getShipFiles()).contains(libFile, libFolder);
+assertThat(descriptor.getShipFiles())
+

[GitHub] [flink] 1996fanrui commented on a diff in pull request #23219: [FLINK-20681][yarn] Support specifying the hdfs path when ship archives or files

2023-08-22 Thread via GitHub


1996fanrui commented on code in PR #23219:
URL: https://github.com/apache/flink/pull/23219#discussion_r1302400685


##
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##
@@ -156,9 +158,9 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor {
 private final boolean sharedYarnClient;
 
 /** Lazily initialized list of files to ship. */
-private final List shipFiles = new LinkedList<>();
+private final List shipFiles = new LinkedList<>();
 
-private final List shipArchives = new LinkedList<>();
+private final List shipArchives = new LinkedList<>();

Review Comment:
   How about adding a comment to describe we have converted the option path str 
to the Path with schema and absolute path? It's more clear for other 
developers, and it's clear to use them.



##
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java:
##
@@ -272,17 +272,25 @@ public class YarnConfigOptions {
 .noDefaultValue()
 .withDeprecatedKeys("yarn.ship-directories")
 .withDescription(
-"A semicolon-separated list of files and/or 
directories to be shipped to the YARN cluster.");
+"A semicolon-separated list of files and/or 
directories to be shipped to the YARN "
++ "cluster. These files/directories can 
come from the local path of flink client "
++ "or HDFS. For example, "

Review Comment:
   How about updating `from the local client and/or HDFS` to `the local path of 
flink client or HDFS` as well?



##
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##
@@ -202,16 +204,27 @@ public YarnClusterDescriptor(
 this.nodeLabel = 
flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
 }
 
-private Optional> decodeFilesToShipToCluster(
+private Optional> decodeFilesToShipToCluster(
 final Configuration configuration, final 
ConfigOption> configOption) {
 checkNotNull(configuration);
 checkNotNull(configOption);
 
-final List files =
-ConfigUtils.decodeListFromConfig(configuration, configOption, 
File::new);
+List files = ConfigUtils.decodeListFromConfig(configuration, 
configOption, Path::new);
+files = 
files.stream().map(this::enrichPathSchemaIfNeeded).collect(Collectors.toList());
 return files.isEmpty() ? Optional.empty() : Optional.of(files);
 }
 
+private Path enrichPathSchemaIfNeeded(Path path) {
+if (isWithoutSchema(path)) {
+return new Path(new File(path.toString()).toURI());

Review Comment:
   This class has a couple of ` new Path(new File(pathStr).toURI())` to convert 
path from `localPathStr` to the hdfs `Path`, could we extract one method to do 
it?



##
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##
@@ -202,16 +204,27 @@ public YarnClusterDescriptor(
 this.nodeLabel = 
flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
 }
 
-private Optional> decodeFilesToShipToCluster(
+private Optional> decodeFilesToShipToCluster(
 final Configuration configuration, final 
ConfigOption> configOption) {
 checkNotNull(configuration);
 checkNotNull(configOption);
 
-final List files =
-ConfigUtils.decodeListFromConfig(configuration, configOption, 
File::new);
+List files = ConfigUtils.decodeListFromConfig(configuration, 
configOption, Path::new);
+files = 
files.stream().map(this::enrichPathSchemaIfNeeded).collect(Collectors.toList());
 return files.isEmpty() ? Optional.empty() : Optional.of(files);
 }
 
+private Path enrichPathSchemaIfNeeded(Path path) {
+if (isWithoutSchema(path)) {
+return new Path(new File(path.toString()).toURI());
+}
+return path;
+}
+
+private boolean isWithoutSchema(Path path) {
+return StringUtils.isNullOrWhitespaceOnly(path.toUri().getScheme());
+}

Review Comment:
   How about updating the `Path enrichPathSchemaIfNeeded(Path path)` to the 
`Path createPathWithSchema(String path)`?
   
   If so, the `files = 
files.stream().map(this::enrichPathSchemaIfNeeded).collect(Collectors.toList());`
 can be removed. We just call `List files = 
ConfigUtils.decodeListFromConfig(configuration, configOption, 
this::createPathWithSchema);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #23219: [FLINK-20681][yarn] Support specifying the hdfs path when ship archives or files

2023-08-17 Thread via GitHub


1996fanrui commented on code in PR #23219:
URL: https://github.com/apache/flink/pull/23219#discussion_r1296881231


##
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java:
##
@@ -272,17 +272,24 @@ public class YarnConfigOptions {
 .noDefaultValue()
 .withDeprecatedKeys("yarn.ship-directories")
 .withDescription(
-"A semicolon-separated list of files and/or 
directories to be shipped to the YARN cluster.");
+"A semicolon-separated list of files and/or 
directories to be shipped to the YARN "
++ "cluster. These files/directories can 
come from the local client and/or remote "

Review Comment:
   ```suggestion
   + "cluster. These files/directories can 
come from the local path of flink client or remote "
   ```



##
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java:
##
@@ -272,17 +272,24 @@ public class YarnConfigOptions {
 .noDefaultValue()
 .withDeprecatedKeys("yarn.ship-directories")
 .withDescription(
-"A semicolon-separated list of files and/or 
directories to be shipped to the YARN cluster.");
+"A semicolon-separated list of files and/or 
directories to be shipped to the YARN "
++ "cluster. These files/directories can 
come from the local client and/or remote "
++ "file system. For example, 
\"/path/to/local/file;/path/to/local/directory;"
++ "hdfs://$namenode_address/path/of/file;"
++ 
"hdfs://$namenode_address/path/of/directory\"");
 
 public static final ConfigOption> SHIP_ARCHIVES =
 key("yarn.ship-archives")
 .stringType()
 .asList()
 .noDefaultValue()
 .withDescription(
-"A semicolon-separated list of archives to be 
shipped to the YARN cluster."
-+ " These archives will be un-packed when 
localizing and they can be any of the following types: "
-+ "\".tar.gz\", \".tar\", \".tgz\", 
\".dst\", \".jar\", \".zip\".");
+"A semicolon-separated list of archives to be 
shipped to the YARN cluster. "
++ "These archives can come from the local 
client and/or remote file system. "
++ "They will be un-packed when localizing 
and they can be any of the following "
++ "types: \".tar.gz\", \".tar\", \".tgz\", 
\".dst\", \".jar\", \".zip\". "
++ "For example, 
\"/path/to/local/archive.jar;"
++ 
"hdfs://$namenode_address/path/to/archive.jar\"");

Review Comment:
   Same 2 comments as the last option.



##
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##
@@ -911,31 +929,21 @@ private ApplicationReport startAppMaster(
 final List systemClassPaths = 
fileUploader.registerProvidedLocalResources();
 final List uploadedDependencies =
 fileUploader.registerMultipleLocalResources(
-systemShipFiles.stream()
-.map(e -> new Path(e.toURI()))
-.collect(Collectors.toSet()),
-Path.CUR_DIR,
-LocalResourceType.FILE);
+systemShipFiles, Path.CUR_DIR, LocalResourceType.FILE);

Review Comment:
   `systemShipFiles` includes 3 types of files:
   
   - ShipFiles
   - logConfigFilePath
   - LibFolders
   
   After this PR, the latter two types do not add schema, does it work as 
expected?



##
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##
@@ -202,16 +204,27 @@ public YarnClusterDescriptor(
 this.nodeLabel = 
flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
 }
 
-private Optional> decodeFilesToShipToCluster(
+private Optional> decodeFilesToShipToCluster(
 final Configuration configuration, final 
ConfigOption> configOption) {
 checkNotNull(configuration);
 checkNotNull(configOption);
 
-final List files =
-ConfigUtils.decodeListFromConfig(configuration, configOption, 
File::new);
+List files = ConfigUtils.decodeListFromConfig(configuration, 
configOption, Path::new);
+files =
+files.stream()
+.map(
+path ->
+