>From Preetham Poluparthi <[email protected]>:

Preetham Poluparthi has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20958?usp=email )


Change subject: [ASTERIXDB-3707][EXT] Add file level split while reading 
parquet files
......................................................................

[ASTERIXDB-3707][EXT] Add file level split while reading parquet files

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Introduces support for file-based splits during compilation and adds 
'compiler.hdfs.split.parallelism' to control thread count for file listing to 
improve compilation time.

Change-Id: I288908499c90320f9fc497675ef3d671163c69f0
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
10 files changed, 77 insertions(+), 9 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/58/20958/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
index b6a9295..3835d92 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
@@ -97,7 +97,9 @@
                 CompilerProperties.COMPILER_COLUMN_FILTER_KEY, 
CompilerProperties.COMPILER_BATCH_LOOKUP_KEY,
                 CompilerProperties.COMPILER_FRAMESIZE_KEY, 
FunctionUtil.IMPORT_PRIVATE_FUNCTIONS,
                 
CompilerProperties.COMPILER_MAX_VARIABLE_OCCURRENCES_INLINING_KEY,
-                CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY, 
FuzzyUtils.SIM_FUNCTION_PROP_NAME,
+                CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY,
+                CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY,
+                CompilerProperties.COMPILER_HDFS_SPLIT_PARALLELISM_KEY, 
FuzzyUtils.SIM_FUNCTION_PROP_NAME,
                 FuzzyUtils.SIM_THRESHOLD_PROP_NAME, 
StartFeedStatement.WAIT_FOR_COMPLETION,
                 FeedActivityDetails.FEED_POLICY_NAME, 
FeedActivityDetails.COLLECT_LOCATIONS,
                 SqlppQueryRewriter.INLINE_WITH_OPTION, 
SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index ead02b7..8848a4f 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -49,6 +49,7 @@
     "compiler.forcejoinorder" : false,
     "compiler\.framesize" : 32768,
     "compiler\.groupmemory" : 163840,
+    "compiler\.hdfs\.split\.parallelism" : 4,
     "compiler\.index.covering" : true,
     "compiler\.internal\.sanitycheck" : true,
     "compiler\.joinmemory" : 262144,
@@ -60,6 +61,7 @@
     "compiler.min.windowmemory" : 524288,
     "compiler.ordered.fields" : false,
     "compiler\.parallelism" : 0,
+    "compiler\.parquet\.filesplits" : false,
     "compiler.queryplanshape" : "zigzag",
     "compiler.runtime.memory.overhead" : 5,
     "compiler\.sort\.parallel" : false,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 0e9e7fb..262b20e 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -49,6 +49,7 @@
     "compiler.forcejoinorder" : false,
     "compiler\.framesize" : 32768,
     "compiler\.groupmemory" : 163840,
+    "compiler\.hdfs\.split\.parallelism" : 4,
     "compiler\.index.covering" : true,
     "compiler\.internal\.sanitycheck" : false,
     "compiler\.joinmemory" : 262144,
@@ -60,6 +61,7 @@
     "compiler.min.windowmemory" : 524288,
     "compiler.ordered.fields" : false,
     "compiler\.parallelism" : -1,
+    "compiler\.parquet\.filesplits" : false,
     "compiler.queryplanshape" : "zigzag",
     "compiler.runtime.memory.overhead" : 5,
     "compiler\.sort\.parallel" : true,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 8e3c806..9a1c718 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -49,6 +49,7 @@
     "compiler.forcejoinorder" : false,
     "compiler\.framesize" : 32768,
     "compiler\.groupmemory" : 163840,
+    "compiler\.hdfs\.split\.parallelism" : 4,
     "compiler\.index.covering" : true,
     "compiler\.internal\.sanitycheck" : false,
     "compiler\.joinmemory" : 262144,
@@ -60,6 +61,7 @@
     "compiler.min.windowmemory" : 524288,
     "compiler.ordered.fields" : false,
     "compiler\.parallelism" : 3,
+    "compiler\.parquet\.filesplits" : false,
     "compiler.queryplanshape" : "zigzag",
     "compiler.runtime.memory.overhead" : 5,
     "compiler\.sort\.parallel" : true,
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index 62be4d0..115342a 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -159,7 +159,12 @@
                 128,
                 "Maximum occurrences of a variable allowed in an expression 
for inlining"),
         COMPILER_ORDERED_FIELDS(BOOLEAN, AlgebricksConfig.ORDERED_FIELDS, 
"Enable/disable select order list"),
-        COMPILER_DELTALAKE_FILESPLITS(BOOLEAN, false, "Enable/disable delta 
lake file splits");
+        COMPILER_DELTALAKE_FILESPLITS(BOOLEAN, false, "Enable/disable delta 
lake file splits"),
+        COMPILER_PARQUET_FILESPLITS(BOOLEAN, false, "Enable/disable parquet 
file splits"),
+        COMPILER_HDFS_SPLIT_PARALLELISM(
+                INTEGER,
+                Runtime.getRuntime().availableProcessors(),
+                "Number of threads to use for generating file splits for HDFS 
files");

         private final IOptionType type;
         private final Object defaultValue;
@@ -250,6 +255,8 @@

     public static final int COMPILER_PARALLELISM_AS_STORAGE = 0;
     public static final String COMPILER_DELTALAKE_FILESPLITS_KEY = 
Option.COMPILER_DELTALAKE_FILESPLITS.ini();
+    public static final String COMPILER_PARQUET_FILESPLITS_KEY = 
Option.COMPILER_PARQUET_FILESPLITS.ini();
+    public static final String COMPILER_HDFS_SPLIT_PARALLELISM_KEY = 
Option.COMPILER_HDFS_SPLIT_PARALLELISM.ini();

     public CompilerProperties(PropertiesAccessor accessor) {
         super(accessor);
@@ -396,4 +403,13 @@
     public boolean isDeltaLakeFileSplitsEnabled() {
         return accessor.getBoolean(Option.COMPILER_DELTALAKE_FILESPLITS);
     }
+
+    public boolean isParquetFileSplitsEnabled() {
+        return accessor.getBoolean(Option.COMPILER_PARQUET_FILESPLITS);
+    }
+
+    public int getHdfsSplitParallelism() {
+        return accessor.getInt(Option.COMPILER_HDFS_SPLIT_PARALLELISM);
+    }
+
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 82653c2..8a894fb 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -33,6 +33,8 @@
 import java.util.UUID;

 import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.config.CompilerProperties;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
@@ -46,6 +48,7 @@
 import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
 import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
 import 
org.apache.asterix.external.input.record.reader.hdfs.avro.AvroFileRecordReader;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.MapredParquetInputFormat;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.ParquetFileRecordReader;
 import 
org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
 import org.apache.asterix.external.input.stream.HDFSInputStream;
@@ -56,11 +59,13 @@
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -110,6 +115,7 @@
     private transient Credentials credentials;
     private byte[] serializedCredentials;
     private transient UserGroupInformation ugi;
+    private boolean useFileSplits = false;

     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration,
@@ -130,6 +136,14 @@
             extractRequiredFiles(serviceCtx, configuration, warningCollector, 
filterEvaluatorFactory, hdfsConf);
         }
         configureHdfsConf(hdfsConf, configuration);
+        useFileSplits =
+                getParquetFileSplitsConfig(configuration, 
(ICcApplicationContext) serviceCtx.getApplicationContext());
+    }
+
+    private boolean getParquetFileSplitsConfig(Map<String, String> 
configuration, ICcApplicationContext appCtx) {
+        String fileSplits = 
configuration.get(CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY);
+        return fileSplits != null ? Boolean.parseBoolean(fileSplits)
+                : appCtx.getCompilerProperties().isParquetFileSplitsEnabled();
     }

     private void extractRequiredFiles(IServiceContext serviceCtx, Map<String, 
String> configuration,
@@ -177,7 +191,8 @@
         this.configuration = configuration;
         this.filterEvaluatorFactory = filterEvaluatorFactory;
         init((ICCServiceContext) serviceCtx);
-        return HDFSUtils.configureHDFSJobConf(configuration);
+        return HDFSUtils.configureHDFSJobConf(configuration,
+                (ICcApplicationContext) serviceCtx.getApplicationContext());
     }

     protected void configureHdfsConf(JobConf conf, Map<String, String> 
configuration)
@@ -229,6 +244,14 @@
         if (HDFSUtils.isEmpty(conf)) {
             return Scheduler.EMPTY_INPUT_SPLITS;
         }
+        if (!useFileSplits && conf.getInputFormat() instanceof 
MapredParquetInputFormat) {
+            FileStatus[] fs = ((MapredParquetInputFormat) 
conf.getInputFormat()).listStatus(conf);
+            List<InputSplit> splits = new ArrayList<>();
+            for (FileStatus file : fs) {
+                splits.add(new FileSplit(file.getPath(), 0, file.getLen(), 
conf));
+            }
+            return splits.toArray(new InputSplit[0]);
+        }
         return conf.getInputFormat().getSplits(conf, numPartitions);
     }

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index 9ac0043..e85fa75 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -167,7 +167,7 @@
         LOGGER.info("Number of delta table parquet data files to scan: {}", 
scanFiles.size());
         configuration.put(ExternalDataConstants.KEY_PARSER, 
ExternalDataConstants.FORMAT_DELTA);
         try {
-            usingSplits = getFileSplitsConfig(configuration, appCtx);
+            usingSplits = getDeltaFileSplitsConfig(configuration, appCtx);
             if (usingSplits) {
                 distributeSplits(scanFiles, conf, numPartitions);
             } else {
@@ -179,7 +179,7 @@
         issueWarnings(warnings, warningCollector);
     }

-    private boolean getFileSplitsConfig(Map<String, String> configuration, 
ICcApplicationContext appCtx) {
+    private boolean getDeltaFileSplitsConfig(Map<String, String> 
configuration, ICcApplicationContext appCtx) {
         String fileSplits = 
configuration.get(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY);
         return fileSplits != null ? Boolean.parseBoolean(fileSplits)
                 : 
appCtx.getCompilerProperties().isDeltaLakeFileSplitsEnabled();
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
index 7b10e09..0321d20 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
@@ -26,6 +26,7 @@
 import java.util.List;

 import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -75,6 +76,11 @@
         return resultSplits;
     }

+    @Override
+    public FileStatus[] listStatus(JobConf job) throws IOException {
+        return super.listStatus(job);
+    }
+
     public List<Footer> getFooters(JobConf job) throws IOException {
         return realInputFormat.getFooters(job, asList(super.listStatus(job)));
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index e57b4d3..61ce50e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -21,6 +21,7 @@
 import static 
org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
+import static 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

 import java.io.ByteArrayInputStream;
@@ -48,6 +49,7 @@
 import javax.security.auth.login.LoginException;

 import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -222,7 +224,7 @@
         }
     }

-    public static JobConf configureHDFSJobConf(Map<String, String> 
configuration) {
+    public static JobConf configureHDFSJobConf(Map<String, String> 
configuration, ICcApplicationContext serviceCtx) {
         JobConf conf = new JobConf();
         String localShortCircuitSocketPath = 
configuration.get(ExternalDataConstants.KEY_LOCAL_SOCKET_PATH);
         String formatClassName = 
HDFSUtils.getInputFormatClassName(configuration);
@@ -243,6 +245,7 @@
         }
         conf.setClassLoader(HDFSInputStream.class.getClassLoader());
         conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_FORMAT, 
formatClassName);
+        conf.set(LIST_STATUS_NUM_THREADS, 
getHdfsFileSplitParallelism(configuration, serviceCtx));

         // Enable local short circuit reads if user supplied the parameters
         if (localShortCircuitSocketPath != null) {
@@ -270,6 +273,12 @@
         return conf;
     }

+    private static String getHdfsFileSplitParallelism(Map<String, String> 
configuration, ICcApplicationContext appCtx) {
+        String numThreads = 
configuration.get(CompilerProperties.COMPILER_HDFS_SPLIT_PARALLELISM_KEY);
+        return numThreads != null ? numThreads
+                : 
String.valueOf(appCtx.getCompilerProperties().getHdfsSplitParallelism());
+    }
+
     public static Configuration configureHDFSwrite(Map<String, String> 
configuration) {
         Configuration conf = new Configuration();
         String url = configuration.get(ExternalDataConstants.KEY_HDFS_URL);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index f7bf0f6..cf5c004 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -228,10 +228,16 @@

     private void setExternalCollectionCompilerProperties(MetadataProvider 
metadataProvider,
             Map<String, String> configuration) {
-        String fileSplits =
+        String deltaFileSplits =
                 (String) 
metadataProvider.getConfig().get(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY);
-        if (fileSplits != null) {
-            
configuration.put(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY, 
fileSplits);
+        if (deltaFileSplits != null) {
+            
configuration.put(CompilerProperties.COMPILER_DELTALAKE_FILESPLITS_KEY, 
deltaFileSplits);
         }
+        String parquetFileSplits =
+                (String) 
metadataProvider.getConfig().get(CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY);
+        if (parquetFileSplits != null) {
+            
configuration.put(CompilerProperties.COMPILER_PARQUET_FILESPLITS_KEY, 
parquetFileSplits);
+        }
+
     }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20958?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I288908499c90320f9fc497675ef3d671163c69f0
Gerrit-Change-Number: 20958
Gerrit-PatchSet: 1
Gerrit-Owner: Preetham Poluparthi <[email protected]>

Reply via email to