This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ea8cf5  [CARBONDATA-3678] optimize list files in insert stage
3ea8cf5 is described below

commit 3ea8cf524c0e4b2e8a6ce941dc21b02f2d9a1a41
Author: h00424960 <h00424960@SZA150414400A>
AuthorDate: Mon Feb 3 14:49:39 2020 +0800

    [CARBONDATA-3678] optimize list files in insert stage
    
    Why is this PR needed?
    To optimize the insert stage files using iterator which can avoid listing 
all files under dir in case file_count is set in command, especially for s3/obs 
cases and this can reduce the time cost of driver.
    
    What changes were proposed in this PR?
    Use iterator to get limited num of stage files.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3600
---
 .../datastore/filesystem/AbstractDFSCarbonFile.java    | 18 ++++++++++++++++++
 .../core/datastore/filesystem/CarbonFile.java          |  2 ++
 .../core/datastore/filesystem/LocalCarbonFile.java     |  7 +++++++
 .../org/apache/carbon/flink/CarbonLocalWriter.java     |  3 ++-
 .../java/org/apache/carbon/flink/CarbonS3Writer.java   |  3 ++-
 .../management/CarbonInsertFromStageCommand.scala      | 10 ++++++++--
 6 files changed, 39 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index f7bc553..5169cfb 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -514,6 +514,24 @@ public abstract class AbstractDFSCarbonFile implements 
CarbonFile {
   }
 
   @Override
+  public CarbonFile[] listFiles(boolean recursive, int maxCount)
+      throws IOException {
+    List<CarbonFile> carbonFiles = new ArrayList<>();
+    FileStatus fileStatus = fileSystem.getFileStatus(path);
+    if (null != fileStatus && fileStatus.isDirectory()) {
+      RemoteIterator<LocatedFileStatus> listStatus = 
fileSystem.listFiles(path, recursive);
+      int counter = 0;
+      while (counter < maxCount && listStatus.hasNext()) {
+        LocatedFileStatus locatedFileStatus = listStatus.next();
+        CarbonFile carbonFile = 
FileFactory.getCarbonFile(locatedFileStatus.getPath().toString());
+        carbonFiles.add(carbonFile);
+        counter++;
+      }
+    }
+    return carbonFiles.toArray(new CarbonFile[0]);
+  }
+
+  @Override
   public String[] getLocations() throws IOException {
     BlockLocation[] blkLocations;
     FileStatus fileStatus = fileSystem.getFileStatus(path);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index 00d37e7..e8e86f0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -33,6 +33,8 @@ public interface CarbonFile {
 
   CarbonFile[] listFiles();
 
+  CarbonFile[] listFiles(boolean recursive, int maxCount) throws IOException;
+
   List<CarbonFile> listFiles(Boolean recursive) throws IOException;
 
   List<CarbonFile> listFiles(boolean recursive, CarbonFileFilter fileFilter) 
throws IOException;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index ad11d15..b8333f0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -163,6 +163,13 @@ public class LocalCarbonFile implements CarbonFile {
   }
 
   @Override
+  public CarbonFile[] listFiles(boolean recursive, int maxCount)
+      throws IOException {
+    // ignore the maxCount for local filesystem
+    return listFiles();
+  }
+
+  @Override
   public List<CarbonFile> listFiles(Boolean recursive) {
     if (!isDirectory()) {
       return new ArrayList<>();
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
index a8068a3..c80e9b3 100644
--- 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
@@ -166,9 +166,10 @@ final class CarbonLocalWriter extends CarbonWriter {
         return;
       }
       try {
+        // make it ordered by time in case the files ordered by file name.
         String stageInputPath = CarbonTablePath.getStageDir(
             table.getAbsoluteTableIdentifier().getTablePath()) +
-            CarbonCommonConstants.FILE_SEPARATOR + UUID.randomUUID();
+            CarbonCommonConstants.FILE_SEPARATOR + System.currentTimeMillis() 
+ UUID.randomUUID();
         tryCreateLocalDirectory(new File(stageInputPath));
         StageManager.writeStageInput(stageInputPath, stageInput);
       } catch (Throwable exception) {
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
index d23c668..72e4405 100644
--- 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
@@ -180,9 +180,10 @@ final class CarbonS3Writer extends CarbonWriter {
         return;
       }
       try {
+        // make it ordered by time in case the files ordered by file name.
         String stageInputPath = CarbonTablePath.getStageDir(
             table.getAbsoluteTableIdentifier().getTablePath()) +
-            CarbonCommonConstants.FILE_SEPARATOR + UUID.randomUUID();
+            CarbonCommonConstants.FILE_SEPARATOR + System.currentTimeMillis() 
+ UUID.randomUUID();
         StageManager.writeStageInput(stageInputPath, stageInput);
       } catch (Throwable exception) {
         this.deleteSegmentDataFilesQuietly(dataPath);
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index fe3175a..ea70d90 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.util.SparkSQLUtil
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.filesystem.{AbstractDFSCarbonFile, 
CarbonFile}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{ColumnarFormatVersion, 
SegmentFileStore}
@@ -502,7 +502,13 @@ case class CarbonInsertFromStageCommand(
   ): Array[(CarbonFile, CarbonFile)] = {
     val dir = FileFactory.getCarbonFile(loadDetailsDir, hadoopConf)
     if (dir.exists()) {
-      val allFiles = dir.listFiles()
+      // Only HDFS/OBS/S3 server side can guarantee the files got from 
iterator are sorted
+      // based on file name so that we can use iterator to get the A and 
A.success together
+      // without loop all files which can improve performance compared with 
list all files.
+      // One file and another with '.success', so we need *2 as total and this 
value is just
+      // an approximate value. For local files, as can it can we not guarantee 
the order, we
+      // just list all.
+      val allFiles = dir.listFiles(false, batchSize * 2)
       val successFiles = allFiles.filter { file =>
         file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
       }.map { file =>

Reply via email to