>From Preetham Poluparthi <[email protected]>:
Preetham Poluparthi has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20885?usp=email )
Change subject: WIP: update parquet reader
......................................................................
WIP: update parquet reader
Change-Id: I212cd38479610afe211f2a37d6f527fb62324d2c
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetRecordReaderWrapper.java
2 files changed, 75 insertions(+), 140 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/85/20885/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
index 7b10e09..dbada39 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
@@ -18,22 +18,12 @@
*/
package org.apache.asterix.external.input.record.reader.hdfs.parquet;
-import static java.util.Arrays.asList;
-
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
-import java.util.List;
import
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.parquet.hadoop.Footer;
-import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.ParquetInputSplit;
/**
@@ -45,49 +35,23 @@
* The newer API (@see org.apache.hadoop.mapreduce) is not yet supported.
* Beware before upgrading Apache Parquet version.
*/
-public class MapredParquetInputFormat extends
org.apache.hadoop.mapred.FileInputFormat<Void, VoidPointable> {
+public class MapredParquetInputFormat
+ extends org.apache.hadoop.mapreduce.lib.input.FileInputFormat<Void,
VoidPointable> {
- private final ParquetInputFormat<ArrayBackedValueStorage> realInputFormat
= new ParquetInputFormat<>();
private IExternalFilterValueEmbedder valueEmbedder;
@Override
- public RecordReader<Void, VoidPointable> getRecordReader(InputSplit split,
JobConf job, Reporter reporter)
- throws IOException {
- return new ParquetRecordReaderWrapper(split, job, reporter,
valueEmbedder);
- }
-
- @Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws
IOException {
- if (isTaskSideMetaData(job)) {
- return super.getSplits(job, numSplits);
- }
-
- List<Footer> footers = getFooters(job);
- List<ParquetInputSplit> splits = realInputFormat.getSplits(job,
footers);
- if (splits == null) {
- return null; //NOSONAR
- }
- InputSplit[] resultSplits = new InputSplit[splits.size()];
- int i = 0;
- for (ParquetInputSplit split : splits) {
- resultSplits[i++] = new ParquetInputSplitWrapper(split);
- }
- return resultSplits;
- }
-
- public List<Footer> getFooters(JobConf job) throws IOException {
- return realInputFormat.getFooters(job, asList(super.listStatus(job)));
+ public org.apache.hadoop.mapreduce.RecordReader<Void, VoidPointable>
createRecordReader(
+ org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext
context)
+ throws IOException, InterruptedException {
+ return new ParquetRecordReaderWrapper(split, context, valueEmbedder);
}
public void setValueEmbedder(IExternalFilterValueEmbedder valueEmbedder) {
this.valueEmbedder = valueEmbedder;
}
- public static boolean isTaskSideMetaData(JobConf job) {
- return job.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true);
- }
-
- static class ParquetInputSplitWrapper implements InputSplit {
+ static class ParquetInputSplitWrapper extends InputSplit {
ParquetInputSplit realSplit;
@@ -95,10 +59,6 @@
public ParquetInputSplitWrapper() {
}
- public ParquetInputSplitWrapper(ParquetInputSplit realSplit) {
- this.realSplit = realSplit;
- }
-
@Override
public long getLength() throws IOException {
return realSplit.getLength();
@@ -110,19 +70,9 @@
}
@Override
- public void readFields(DataInput in) throws IOException {
- realSplit = new ParquetInputSplit();
- realSplit.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- realSplit.write(out);
- }
-
- @Override
public String toString() {
return realSplit.toString();
}
}
+
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetRecordReaderWrapper.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetRecordReaderWrapper.java
index a293ebb..ebfa1d0 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetRecordReaderWrapper.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetRecordReaderWrapper.java
@@ -24,11 +24,9 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.util.LogRedactionUtil;
@@ -36,7 +34,7 @@
import org.apache.parquet.hadoop.ParquetRecordReader;
import org.apache.parquet.hadoop.api.ReadSupport;
-public class ParquetRecordReaderWrapper implements RecordReader<Void,
VoidPointable> {
+public class ParquetRecordReaderWrapper extends RecordReader<Void,
VoidPointable> {
private final ParquetRecordReader<IValueReference> realReader;
private final long splitLen; // for getPos()
@@ -45,51 +43,39 @@
private boolean firstRecord;
private boolean eof;
- public ParquetRecordReaderWrapper(InputSplit oldSplit, JobConf oldJobConf,
Reporter reporter,
- IExternalFilterValueEmbedder valueEmbedder) throws IOException {
+ public ParquetRecordReaderWrapper(InputSplit oldSplit, TaskAttemptContext
context,
+ IExternalFilterValueEmbedder valueEmbedder) throws IOException,
InterruptedException {
splitLen = oldSplit.getLength();
- try {
- ReadSupport<IValueReference> readSupport =
ParquetInputFormat.getReadSupportInstance(oldJobConf);
- ParquetReadSupport parquetReadSupport = (ParquetReadSupport)
readSupport;
- parquetReadSupport.setValueEmbedder(valueEmbedder);
- realReader = new ParquetRecordReader<>(readSupport,
ParquetInputFormat.getFilter(oldJobConf));
+ ReadSupport<IValueReference> readSupport =
+
ParquetInputFormat.getReadSupportInstance(context.getConfiguration());
+ ParquetReadSupport parquetReadSupport = (ParquetReadSupport)
readSupport;
+ parquetReadSupport.setValueEmbedder(valueEmbedder);
+ realReader = new ParquetRecordReader<>(readSupport,
ParquetInputFormat.getFilter(context.getConfiguration()));
- if (oldSplit instanceof
MapredParquetInputFormat.ParquetInputSplitWrapper) {
-
realReader.initialize(((MapredParquetInputFormat.ParquetInputSplitWrapper)
oldSplit).realSplit,
- oldJobConf, reporter);
- } else if (oldSplit instanceof FileSplit) {
- realReader.initialize((FileSplit) oldSplit, oldJobConf,
reporter);
- } else {
- throw
RuntimeDataException.create(ErrorCode.INVALID_PARQUET_FILE,
- LogRedactionUtil.userData(oldSplit.toString()),
"invalid file split");
- }
-
- // Set the path for value embedder
- valueEmbedder.setPath(getPath(oldSplit));
-
- valueContainer = new VoidPointable();
- firstRecord = false;
- eof = false;
- // read once to gain access to key and value objects
- if (realReader.nextKeyValue()) {
- firstRecord = true;
- valueContainer.set(realReader.getCurrentValue());
- } else {
- eof = true;
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- } catch (HyracksDataException | AsterixParquetRuntimeException e) {
- throw e;
- } catch (Exception e) {
- if (e.getMessage() != null && e.getMessage().contains("not a
Parquet file")) {
- throw
RuntimeDataException.create(ErrorCode.INVALID_PARQUET_FILE,
- LogRedactionUtil.userData(getPath(oldSplit)), "not a
Parquet file");
- }
-
- throw RuntimeDataException.create(e);
+ if (oldSplit instanceof
MapredParquetInputFormat.ParquetInputSplitWrapper) {
+
realReader.initialize(((MapredParquetInputFormat.ParquetInputSplitWrapper)
oldSplit).realSplit, context);
+ } else if (oldSplit instanceof FileSplit) {
+ realReader.initialize((FileSplit) oldSplit, context);
+ } else {
+ throw RuntimeDataException.create(ErrorCode.INVALID_PARQUET_FILE,
+ LogRedactionUtil.userData(oldSplit.toString()), "invalid
file split");
}
+
+ // Set the path for value embedder
+ valueEmbedder.setPath(getPath(oldSplit));
+
+ valueContainer = new VoidPointable();
+ firstRecord = false;
+ eof = false;
+ // read once to gain access to key and value objects
+ if (realReader.nextKeyValue()) {
+ firstRecord = true;
+ valueContainer.set(realReader.getCurrentValue());
+ } else {
+ eof = true;
+ }
+
}
private String getPath(InputSplit split) {
@@ -108,21 +94,46 @@
}
@Override
- public Void createKey() {
+ public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
TaskAttemptContext context)
+ throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ if (eof) {
+ return false;
+ }
+
+ if (firstRecord) {
+ firstRecord = false;
+ return true;
+ }
+
+ try {
+ if (realReader.nextKeyValue()) {
+ valueContainer.set(realReader.getCurrentValue());
+ return true;
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ eof = true;
+ return false;
+ }
+
+ @Override
+ public Void getCurrentKey() {
return null;
}
@Override
- public VoidPointable createValue() {
+ public VoidPointable getCurrentValue() {
return valueContainer;
}
@Override
- public long getPos() throws IOException {
- return (long) (splitLen * getProgress());
- }
-
- @Override
public float getProgress() throws IOException {
try {
return realReader.getProgress();
@@ -131,30 +142,4 @@
}
}
- @Override
- public boolean next(Void key, VoidPointable value) throws IOException {
- if (eof) {
- return false;
- }
-
- if (firstRecord) { // key & value are already read.
- firstRecord = false;
- value.set(valueContainer);
- return true;
- }
-
- try {
- if (realReader.nextKeyValue()) {
- if (value != null) {
- value.set(realReader.getCurrentValue());
- }
- return true;
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
-
- eof = true; // strictly not required, just for consistency
- return false;
- }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20885?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I212cd38479610afe211f2a37d6f527fb62324d2c
Gerrit-Change-Number: 20885
Gerrit-PatchSet: 1
Gerrit-Owner: Preetham Poluparthi <[email protected]>