KYLIN-2135 minor format update

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0a20a9b0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0a20a9b0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0a20a9b0

Branch: refs/heads/KYLIN-2135
Commit: 0a20a9b093807dac60f758de9a3f6fc0a6d72a62
Parents: fbd2132
Author: shaofengshi <shaofeng...@apache.org>
Authored: Thu Nov 3 18:49:50 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Fri Nov 4 13:40:48 2016 +0800

----------------------------------------------------------------------
 .../kylin/engine/mr/DFSFileTableReader.java     | 92 ++++++++++----------
 .../engine/mr/steps/FactDistinctColumnsJob.java | 34 ++++----
 2 files changed, 61 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0a20a9b0/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
index dda1d6f..173c908 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
@@ -23,14 +23,15 @@ import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
@@ -57,7 +58,7 @@ public class DFSFileTableReader implements TableReader {
 
     private String filePath;
     private String delim;
-    private List<RowReader> readerList;
+    private List<RowReader> readerList;
 
     private String curLine;
     private String[] curColumns;
@@ -72,33 +73,33 @@ public class DFSFileTableReader implements TableReader {
         this.filePath = filePath;
         this.delim = delim;
         this.expectedColumnNumber = expectedColumnNumber;
-        this.readerList = new ArrayList<RowReader>();
+        this.readerList = new ArrayList<RowReader>();
 
         FileSystem fs = HadoopUtil.getFileSystem(filePath);
 
-        ArrayList<FileStatus> allFiles = new ArrayList<>();
-        FileStatus status = fs.getFileStatus(new Path(filePath));
-        if (status.isFile()) {
-            allFiles.add(status);
-        } else {
-            FileStatus[] listStatus = fs.listStatus(new Path(filePath));
-            allFiles.addAll(Arrays.asList(listStatus));
-        }
-
-        try {
-            for (FileStatus f : allFiles) {
-                RowReader rowReader = new 
SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString());
-                this.readerList.add(rowReader);
-            }
+        ArrayList<FileStatus> allFiles = new ArrayList<>();
+        FileStatus status = fs.getFileStatus(new Path(filePath));
+        if (status.isFile()) {
+            allFiles.add(status);
+        } else {
+            FileStatus[] listStatus = fs.listStatus(new Path(filePath));
+            allFiles.addAll(Arrays.asList(listStatus));
+        }
+
+        try {
+            for (FileStatus f : allFiles) {
+                RowReader rowReader = new 
SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString());
+                this.readerList.add(rowReader);
+            }
         } catch (IOException e) {
             if (isExceptionSayingNotSeqFile(e) == false)
                 throw e;
 
-            this.readerList = new ArrayList<RowReader>();
-            for (FileStatus f : allFiles) {
-                RowReader rowReader = new CsvRowReader(fs, 
f.getPath().toString());
-                this.readerList.add(rowReader);
-            }
+            this.readerList = new ArrayList<RowReader>();
+            for (FileStatus f : allFiles) {
+                RowReader rowReader = new CsvRowReader(fs, 
f.getPath().toString());
+                this.readerList.add(rowReader);
+            }
         }
     }
 
@@ -114,20 +115,20 @@ public class DFSFileTableReader implements TableReader {
 
     @Override
     public boolean next() throws IOException {
-        int curReaderIndex = -1;
-        RowReader curReader;
-
-        while (++curReaderIndex < readerList.size()) {
-            curReader = readerList.get(curReaderIndex);
-            curLine = curReader.nextLine();
-            curColumns = null;
-
-            if (curLine != null) {
-                return true;
-            }
-        }
-
-        return false;
+        int curReaderIndex = -1;
+        RowReader curReader;
+
+        while (++curReaderIndex < readerList.size()) {
+            curReader = readerList.get(curReaderIndex);
+            curLine = curReader.nextLine();
+            curColumns = null;
+
+            if (curLine != null) {
+                return true;
+            }
+        }
+
+        return false;
     }
 
     public String getLine() {
@@ -176,15 +177,10 @@ public class DFSFileTableReader implements TableReader {
     }
 
     @Override
-    public void close() {
-        for (RowReader reader : readerList) {
-            try {
-                if (reader != null)
-                    reader.close();
-            } catch (IOException e) {
-                logger.warn("close file failed:", e);
-            }
-        }
+    public void close() {
+        for (RowReader reader : readerList) {
+            IOUtils.closeQuietly(reader);
+        }
     }
 
     private String autoDetectDelim(String line) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0a20a9b0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 27f8c19..c34ff94 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -78,27 +78,27 @@ public class FactDistinctColumnsJob extends 
AbstractHadoopJob {
             CubeInstance cube = cubeMgr.getCube(cubeName);
             List<TblColRef> columnsNeedDict = 
cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor());
 
-            int reducerCount = columnsNeedDict.size();
-            int uhcReducerCount = cube.getConfig().getUHCReducerCount();
-
-            int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
-            for(int index : uhcIndex) {
-                if(index == 1) {
-                    reducerCount += uhcReducerCount - 1;
-                }
-            }
-
-            if (reducerCount > 255) {
-                throw new IOException("The max reducer number for 
FactDistinctColumnsJob is 255, please decrease the 
'kylin.job.global.dictionary.column.reducer.count' ");
-            }
-
-
+            int reducerCount = columnsNeedDict.size();
+            int uhcReducerCount = cube.getConfig().getUHCReducerCount();
+
+            int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
+            for(int index : uhcIndex) {
+                if(index == 1) {
+                    reducerCount += uhcReducerCount - 1;
+                }
+            }
+
+            if (reducerCount > 255) {
+                throw new IllegalArgumentException("The max reducer number for 
FactDistinctColumnsJob is 255, but now it is " + reducerCount + ", decrease 
'kylin.job.uhc.reducer.count'");
+            }
+
+
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
segmentID);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, 
statistics_enabled);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, 
statistics_output);
             
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, 
statistics_sampling_percent);
-
+
             logger.info("Starting: " + job.getJobName());
 
             setJobClasspath(job, cube.getConfig());
@@ -117,7 +117,7 @@ public class FactDistinctColumnsJob extends 
AbstractHadoopJob {
                 System.out.println("Found segment " + segment);
             }
             setupMapper(cube.getSegmentById(segmentID));
-            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? 
reducerCount + 2 : reducerCount);
+            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? 
reducerCount + 2 : reducerCount);
 
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 

Reply via email to