KYLIN-1839, support kylin lib in HDFS

Signed-off-by: terry <hzfen...@corp.netease.com>
Signed-off-by: Yang Li <liy...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4473d710
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4473d710
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4473d710

Branch: refs/heads/master-cdh5.7
Commit: 4473d71011cc0e652eccf4f80269828caa5d3c73
Parents: d28835f
Author: terry <hzfen...@corp.netease.com>
Authored: Tue Oct 11 17:33:45 2016 +0800
Committer: Yang Li <liy...@apache.org>
Committed: Wed Oct 19 07:30:13 2016 +0800

----------------------------------------------------------------------
 .../engine/mr/common/AbstractHadoopJob.java     | 29 +++++++++++---------
 1 file changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4473d710/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index a138eec..bbb1711 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -252,14 +253,10 @@ public abstract class AbstractHadoopJob extends 
Configured implements Tool {
         // for KylinJobMRLibDir
         String mrLibDir = kylinConf.getKylinJobMRLibDir();
         if (!StringUtils.isBlank(mrLibDir)) {
-            File dirFileMRLIB = new File(mrLibDir);
-            if (dirFileMRLIB.exists()) {
-                if (kylinDependency.length() > 0)
-                    kylinDependency.append(",");
+                if(kylinDependency.length() > 0) {
+                        kylinDependency.append(",");
+                }
                 kylinDependency.append(mrLibDir);
-            } else {
-                logger.info("The directory '" + mrLibDir + "' for 
'kylin.job.mr.lib.dir' does not exist!!!");
-            }
         }
 
         setJobTmpJarsAndFiles(job, kylinDependency.toString());
@@ -300,21 +297,27 @@ public abstract class AbstractHadoopJob extends 
Configured implements Tool {
         try {
             Configuration jobConf = job.getConfiguration();
             FileSystem fs = FileSystem.getLocal(jobConf);
+            FileSystem hdfs = FileSystem.get(jobConf);
 
             StringBuilder jarList = new StringBuilder();
             StringBuilder fileList = new StringBuilder();
 
             for (String fileName : fNameList) {
                 Path p = new Path(fileName);
-                if (fs.getFileStatus(p).isDirectory()) {
-                    appendTmpDir(job, fileName);
+                FileSystem current = 
(fileName.startsWith(HdfsConstants.HDFS_URI_SCHEME) ? hdfs : fs);
+                if(!current.exists(p)) {
+                    logger.warn("The directory '" + fileName + "for kylin 
dependency does not exist!!!");
+                    continue;
+                }
+                if (current.getFileStatus(p).isDirectory()) {
+                    appendTmpDir(job, current, fileName);
                     continue;
                 }
 
                 StringBuilder list = (p.getName().endsWith(".jar")) ? jarList 
: fileList;
                 if (list.length() > 0)
                     list.append(",");
-                list.append(fs.getFileStatus(p).getPath().toString());
+                list.append(current.getFileStatus(p).getPath());
             }
 
             appendTmpFiles(fileList.toString(), jobConf);
@@ -324,13 +327,12 @@ public abstract class AbstractHadoopJob extends 
Configured implements Tool {
         }
     }
 
-    private void appendTmpDir(Job job, String tmpDir) {
+    private void appendTmpDir(Job job, FileSystem fs, String tmpDir) {
         if (StringUtils.isBlank(tmpDir))
             return;
 
         try {
             Configuration jobConf = job.getConfiguration();
-            FileSystem fs = FileSystem.getLocal(jobConf);
             FileStatus[] fList = fs.listStatus(new Path(tmpDir));
 
             StringBuilder jarList = new StringBuilder();
@@ -339,7 +341,7 @@ public abstract class AbstractHadoopJob extends Configured 
implements Tool {
             for (FileStatus file : fList) {
                 Path p = file.getPath();
                 if (fs.getFileStatus(p).isDirectory()) {
-                    appendTmpDir(job, p.toString());
+                    appendTmpDir(job, fs, p.toString());
                     continue;
                 }
 
@@ -622,3 +624,4 @@ public abstract class AbstractHadoopJob extends Configured 
implements Tool {
     }
 
 }
+

Reply via email to