>From Preetham Poluparthi <[email protected]>:

Preetham Poluparthi has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20906?usp=email )


Change subject: WIP: Add file level split while reading parquet files
......................................................................

WIP: Add file level split while reading parquet files

Change-Id: I288908499c90320f9fc497675ef3d671163c69f0
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
3 files changed, 19 insertions(+), 0 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/06/20906/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 82653c2..e11eb43 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -229,6 +229,7 @@
         if (HDFSUtils.isEmpty(conf)) {
             return Scheduler.EMPTY_INPUT_SPLITS;
         }
+
         return conf.getInputFormat().getSplits(conf, numPartitions);
     }

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
index 7b10e09..4ffff6c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
@@ -23,9 +23,13 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;

 import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import 
org.apache.asterix.external.input.record.reader.aws.delta.SerializableFileSplit;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -49,6 +53,7 @@

     private final ParquetInputFormat<ArrayBackedValueStorage> realInputFormat 
= new ParquetInputFormat<>();
     private IExternalFilterValueEmbedder valueEmbedder;
+    private boolean useFileSplits = false;

     @Override
     public RecordReader<Void, VoidPointable> getRecordReader(InputSplit split, 
JobConf job, Reporter reporter)
@@ -58,6 +63,17 @@

     @Override
     public InputSplit[] getSplits(JobConf job, int numSplits) throws 
IOException {
+
+        if (!useFileSplits) {
+            Path[] paths = getInputPaths(job);
+            List<InputSplit> splits = new ArrayList<>();
+            for (Path path : paths) {
+                FileSystem fs = path.getFileSystem(job);
+                splits.add(new SerializableFileSplit(path, 0, 
fs.getFileStatus(path).getLen(), job));
+            }
+            return splits.toArray(new InputSplit[0]);
+        }
+
         if (isTaskSideMetaData(job)) {
             return super.getSplits(job, numSplits);
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index e57b4d3..51e9085 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -21,6 +21,7 @@
 import static 
org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
+import static 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

 import java.io.ByteArrayInputStream;
@@ -243,6 +244,7 @@
         }
         conf.setClassLoader(HDFSInputStream.class.getClassLoader());
         conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_FORMAT, 
formatClassName);
+        conf.set(LIST_STATUS_NUM_THREADS, 
String.valueOf(Runtime.getRuntime().availableProcessors()));

         // Enable local short circuit reads if user supplied the parameters
         if (localShortCircuitSocketPath != null) {

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20906?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I288908499c90320f9fc497675ef3d671163c69f0
Gerrit-Change-Number: 20906
Gerrit-PatchSet: 1
Gerrit-Owner: Preetham Poluparthi <[email protected]>

Reply via email to