[ 
https://issues.apache.org/jira/browse/MAPREDUCE-6759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282395#comment-17282395
 ] 

Christos Karampeazis-Papadakis edited comment on MAPREDUCE-6759 at 2/10/21, 
11:46 AM:
--------------------------------------------------------------------------------------

I currently have a working implementation in hand, but I have a concern on the 
improvement's current specifications, as well as another small note.
 # Parallelizing the loops as they are would cause parallelism on a file basis. 
 So if one defined the files to be uploaded, namely A, B, C, and D separately, 
using 4 threads, this method would work (let's not take in mind file sizes and 
load distribution for now). But if said files were placed in a single directory 
to be uploaded, no parallelism would be achieved. The parallelism grain must be 
controlled, by some means of recursively retrieving each directory's contained 
files. 
 # I have a pull request pending on YARN-7713, which concerns the 
parallelization of Resource Localization. In the class *FSDownload*, the new 
method *PartitionInputList* could be reused if its signature was to be modified 
to use Collections as its input.


was (Author: chriskarampeazis):
I currently have a working implementation in hand, but I have a concern on the 
improvement's current specifications, as well as another small note.
 # Parallelizing the loops as they are would cause parallelism on a file basis. 
 So if one defined the files to be uploaded, namely A, B, C, and D separately, 
using 4 threads, this method would work (let's not take in mind file sizes and 
load distribution for now). But if said files were placed in a single directory 
to be uploaded, directory, no parallelism would be achieved. The parallelism 
grain must be controlled, by some means of recursively retrieving each 
directory's contained files. 
 # I have a pull request pending on YARN-7713, which concerns the 
parallelization of Resource Localization. In the class *FSDownload*, the new 
method *PartitionInputList* could be reused if its signature was to be modified 
to use Collections as its input.

> JobSubmitter/JobResourceUploader should parallelize upload of -libjars, 
> -files, -archives
> -----------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-6759
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-6759
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: job submission
>            Reporter: Dennis Huo
>            Priority: Major
>
> During job submission, the {{JobResourceUploader}} currently iterates over 
> for-loops of {{-libjars}}, {{-files}}, and {{-archives}} sequentially, which 
> can significantly slow down job startup time when a large number of files 
> need to be uploaded, especially if staging the files to a cloud object-store 
> based FileSystem implementation like S3, GCS, WABS, etc., where round-trip 
> latencies may be higher than HDFS despite having good throughput when 
> parallelized:
> {code:title=JobResourceUploader.java}
>     if (files != null) {
>       FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
>       String[] fileArr = files.split(",");
>       for (String tmpFile : fileArr) {
>         URI tmpURI = null;
>         try {
>           tmpURI = new URI(tmpFile);
>         } catch (URISyntaxException e) {
>           throw new IllegalArgumentException(e);
>         }
>         Path tmp = new Path(tmpURI);
>         Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication);
>         try {
>           URI pathURI = getPathURI(newPath, tmpURI.getFragment());
>           DistributedCache.addCacheFile(pathURI, conf);
>         } catch (URISyntaxException ue) {
>           // should not throw a uri exception
>           throw new IOException("Failed to create uri for " + tmpFile, ue);
>         }
>       }
>     }
>     if (libjars != null) {
>       FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);
>       String[] libjarsArr = libjars.split(",");
>       for (String tmpjars : libjarsArr) {
>         Path tmp = new Path(tmpjars);
>         Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
>         DistributedCache.addFileToClassPath(
>             new Path(newPath.toUri().getPath()), conf, jtFs);
>       }
>     }
>     if (archives != null) {
>       FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms);
>       String[] archivesArr = archives.split(",");
>       for (String tmpArchives : archivesArr) {
>         URI tmpURI;
>         try {
>           tmpURI = new URI(tmpArchives);
>         } catch (URISyntaxException e) {
>           throw new IllegalArgumentException(e);
>         }
>         Path tmp = new Path(tmpURI);
>         Path newPath = copyRemoteFiles(archivesDir, tmp, conf, replication);
>         try {
>           URI pathURI = getPathURI(newPath, tmpURI.getFragment());
>           DistributedCache.addCacheArchive(pathURI, conf);
>         } catch (URISyntaxException ue) {
>           // should not throw an uri excpetion
>           throw new IOException("Failed to create uri for " + tmpArchives, 
> ue);
>         }
>       }
>     }
> {code}
> Parallelizing the upload of these files would improve job submission time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to