>From Ayush Tripathi <[email protected]>:
Ayush Tripathi has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19085 )
Change subject: [ASTERIXDB-3503][EXT] Add column filter for Delta Reader.
......................................................................
[ASTERIXDB-3503][EXT] Add column filter for Delta Reader.
- user model changes: yes
- storage format changes: no
- interface changes: no
Details: Filtering out required columns to read from table.
Change-Id: I809c692777349025d5ce0435c3a6068d432cd282
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
3 files changed, 61 insertions(+), 5 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/85/19085/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index 9909cc3..406471c 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -19,7 +19,10 @@
package org.apache.asterix.external.input.record.reader.aws.delta;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
+import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -35,7 +38,10 @@
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -52,6 +58,8 @@
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
@@ -96,7 +104,18 @@
Engine engine = DefaultEngine.create(conf);
io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine,
tableMetadataPath);
Snapshot snapshot = table.getLatestSnapshot(engine);
- Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine,
snapshot.getSchema(engine)).build();
+ StructType requiredSchema;
+ try {
+ String encoded =
conf.get(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
"");
+ ARecordType expectedType = HDFSUtils.getExpectedType(encoded);
+ Map<String, FunctionCallInformation> functionCallInformationMap =
+ HDFSUtils.getFunctionCallInformationMap(encoded);
+ StructType fileSchema = snapshot.getSchema(engine);
+ requiredSchema = clipType(expectedType, fileSchema,
functionCallInformationMap);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine,
requiredSchema).build();
scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
CloseableIterator<FilteredColumnarBatch> iter =
scan.getScanFiles(engine);
@@ -114,6 +133,19 @@
distributeFiles();
}
+ public StructType clipType(ARecordType rootType, StructType fileSchema,
+ Map<String, FunctionCallInformation> funcInfo) {
+ if (rootType == EMPTY_TYPE || rootType == ALL_FIELDS_TYPE) {
+ return fileSchema;
+ }
+ String[] fieldNames = rootType.getFieldNames();
+ List<StructField> var1 = new ArrayList<>();
+ for (int i = 0; i < fieldNames.length; i++) {
+ var1.add(fileSchema.get(fieldNames[i]));
+ }
+ return new StructType(var1);
+ }
+
private AlgebricksAbsolutePartitionConstraint
configureLocationConstraints(ICcApplicationContext appCtx) {
IClusterStateManager csm = appCtx.getClusterStateManager();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
index c8a09b4..e7fc38e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
@@ -66,9 +66,9 @@
ParquetConverterContext context = new
ParquetConverterContext(configuration, warnings);
AsterixTypeToParquetTypeVisitor visitor = new
AsterixTypeToParquetTypeVisitor(context);
try {
- ARecordType expectedType =
HDFSUtils.getExpectedType(configuration);
+ ARecordType expectedType =
HDFSUtils.getExpectedTypeHelper(configuration);
Map<String, FunctionCallInformation> functionCallInformationMap =
- HDFSUtils.getFunctionCallInformationMap(configuration);
+
HDFSUtils.getFunctionCallInformationMapHelper(configuration);
MessageType requestedType = visitor.clipType(expectedType,
fileSchema, functionCallInformationMap);
if (!warnings.isEmpty()) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index f7638b4..09a0339 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -258,8 +258,12 @@
}
- public static ARecordType getExpectedType(Configuration configuration)
throws IOException {
+ public static ARecordType getExpectedTypeHelper(Configuration
configuration) throws IOException {
String encoded =
configuration.get(ExternalDataConstants.KEY_REQUESTED_FIELDS, "");
+ return getExpectedType(encoded);
+ }
+
+ public static ARecordType getExpectedType(String encoded) throws
IOException {
if (ALL_FIELDS_TYPE.getTypeName().equals(encoded)) {
//By default, return the entire records
return ALL_FIELDS_TYPE;
@@ -280,9 +284,14 @@
conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
stringFunctionCallInfoMap);
}
- public static Map<String, FunctionCallInformation>
getFunctionCallInformationMap(Configuration conf)
+ public static Map<String, FunctionCallInformation>
getFunctionCallInformationMapHelper(Configuration conf)
throws IOException {
String encoded =
conf.get(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
"");
+ return getFunctionCallInformationMap(encoded);
+ }
+
+ public static Map<String, FunctionCallInformation>
getFunctionCallInformationMap(String encoded)
+ throws IOException {
if (!encoded.isEmpty()) {
Base64.Decoder decoder = Base64.getDecoder();
byte[] functionCallInfoMapBytes = decoder.decode(encoded);
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19085
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: goldfish
Gerrit-Change-Id: I809c692777349025d5ce0435c3a6068d432cd282
Gerrit-Change-Number: 19085
Gerrit-PatchSet: 1
Gerrit-Owner: Ayush Tripathi <[email protected]>
Gerrit-MessageType: newchange