>From Preetham Poluparthi <[email protected]>:
Preetham Poluparthi has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20906?usp=email )
Change subject: WIP: Add file level split while reading parquet files
......................................................................
WIP: Add file level split while reading parquet files
Change-Id: I288908499c90320f9fc497675ef3d671163c69f0
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
3 files changed, 19 insertions(+), 0 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/06/20906/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 82653c2..e11eb43 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -229,6 +229,7 @@
if (HDFSUtils.isEmpty(conf)) {
return Scheduler.EMPTY_INPUT_SPLITS;
}
+
return conf.getInputFormat().getSplits(conf, numPartitions);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
index 7b10e09..4ffff6c 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
@@ -23,9 +23,13 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import
org.apache.asterix.external.input.record.reader.aws.delta.SerializableFileSplit;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -49,6 +53,7 @@
private final ParquetInputFormat<ArrayBackedValueStorage> realInputFormat
= new ParquetInputFormat<>();
private IExternalFilterValueEmbedder valueEmbedder;
+ private boolean useFileSplits = false;
@Override
public RecordReader<Void, VoidPointable> getRecordReader(InputSplit split,
JobConf job, Reporter reporter)
@@ -58,6 +63,17 @@
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws
IOException {
+
+ if (!useFileSplits) {
+ Path[] paths = getInputPaths(job);
+ List<InputSplit> splits = new ArrayList<>();
+ for (Path path : paths) {
+ FileSystem fs = path.getFileSystem(job);
+ splits.add(new SerializableFileSplit(path, 0,
fs.getFileStatus(path).getLen(), job));
+ }
+ return splits.toArray(new InputSplit[0]);
+ }
+
if (isTaskSideMetaData(job)) {
return super.getSplits(job, numSplits);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index e57b4d3..51e9085 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -21,6 +21,7 @@
import static
org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
import static
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
+import static
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.io.ByteArrayInputStream;
@@ -243,6 +244,7 @@
}
conf.setClassLoader(HDFSInputStream.class.getClassLoader());
conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_FORMAT,
formatClassName);
+ conf.set(LIST_STATUS_NUM_THREADS,
String.valueOf(Runtime.getRuntime().availableProcessors()));
// Enable local short circuit reads if user supplied the parameters
if (localShortCircuitSocketPath != null) {
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20906?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I288908499c90320f9fc497675ef3d671163c69f0
Gerrit-Change-Number: 20906
Gerrit-PatchSet: 1
Gerrit-Owner: Preetham Poluparthi <[email protected]>