>From Ayush Tripathi <[email protected]>:

Ayush Tripathi has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19085 )


Change subject: [ASTERIXDB-3503][EXT] Add column filter for Delta Reader.
......................................................................

[ASTERIXDB-3503][EXT] Add column filter for Delta Reader.

- user model changes: yes
- storage format changes: no
- interface changes: no

Details: Filtering out required columns to read from table.

Change-Id: I809c692777349025d5ce0435c3a6068d432cd282
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
3 files changed, 61 insertions(+), 5 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/85/19085/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index 9909cc3..406471c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -19,7 +19,10 @@
 package org.apache.asterix.external.input.record.reader.aws.delta;

 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
+import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;

+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -35,7 +38,10 @@
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -52,6 +58,8 @@
 import io.delta.kernel.data.Row;
 import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
 import io.delta.kernel.utils.CloseableIterator;

 public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
@@ -96,7 +104,18 @@
         Engine engine = DefaultEngine.create(conf);
         io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, 
tableMetadataPath);
         Snapshot snapshot = table.getLatestSnapshot(engine);
-        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
snapshot.getSchema(engine)).build();
+        StructType requiredSchema;
+        try {
+            String encoded = 
conf.get(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, 
"");
+            ARecordType expectedType = HDFSUtils.getExpectedType(encoded);
+            Map<String, FunctionCallInformation> functionCallInformationMap =
+                    HDFSUtils.getFunctionCallInformationMap(encoded);
+            StructType fileSchema = snapshot.getSchema(engine);
+            requiredSchema = clipType(expectedType, fileSchema, 
functionCallInformationMap);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema).build();
         scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
         CloseableIterator<FilteredColumnarBatch> iter = 
scan.getScanFiles(engine);

@@ -114,6 +133,19 @@
         distributeFiles();
     }

+    public StructType clipType(ARecordType rootType, StructType fileSchema,
+            Map<String, FunctionCallInformation> funcInfo) {
+        if (rootType == EMPTY_TYPE || rootType == ALL_FIELDS_TYPE) {
+            return fileSchema;
+        }
+        String[] fieldNames = rootType.getFieldNames();
+        List<StructField> var1 = new ArrayList<>();
+        for (int i = 0; i < fieldNames.length; i++) {
+            var1.add(fileSchema.get(fieldNames[i]));
+        }
+        return new StructType(var1);
+    }
+
     private AlgebricksAbsolutePartitionConstraint 
configureLocationConstraints(ICcApplicationContext appCtx) {
         IClusterStateManager csm = appCtx.getClusterStateManager();

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
index c8a09b4..e7fc38e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
@@ -66,9 +66,9 @@
         ParquetConverterContext context = new 
ParquetConverterContext(configuration, warnings);
         AsterixTypeToParquetTypeVisitor visitor = new 
AsterixTypeToParquetTypeVisitor(context);
         try {
-            ARecordType expectedType = 
HDFSUtils.getExpectedType(configuration);
+            ARecordType expectedType = 
HDFSUtils.getExpectedTypeHelper(configuration);
             Map<String, FunctionCallInformation> functionCallInformationMap =
-                    HDFSUtils.getFunctionCallInformationMap(configuration);
+                    
HDFSUtils.getFunctionCallInformationMapHelper(configuration);
             MessageType requestedType = visitor.clipType(expectedType, 
fileSchema, functionCallInformationMap);

             if (!warnings.isEmpty()) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index f7638b4..09a0339 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -258,8 +258,12 @@

     }

-    public static ARecordType getExpectedType(Configuration configuration) 
throws IOException {
+    public static ARecordType getExpectedTypeHelper(Configuration 
configuration) throws IOException {
         String encoded = 
configuration.get(ExternalDataConstants.KEY_REQUESTED_FIELDS, "");
+        return getExpectedType(encoded);
+    }
+
+    public static ARecordType getExpectedType(String encoded) throws 
IOException {
         if (ALL_FIELDS_TYPE.getTypeName().equals(encoded)) {
             //By default, return the entire records
             return ALL_FIELDS_TYPE;
@@ -280,9 +284,14 @@
         
conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, 
stringFunctionCallInfoMap);
     }

-    public static Map<String, FunctionCallInformation> 
getFunctionCallInformationMap(Configuration conf)
+    public static Map<String, FunctionCallInformation> 
getFunctionCallInformationMapHelper(Configuration conf)
             throws IOException {
         String encoded = 
conf.get(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, 
"");
+        return getFunctionCallInformationMap(encoded);
+    }
+
+    public static Map<String, FunctionCallInformation> 
getFunctionCallInformationMap(String encoded)
+            throws IOException {
         if (!encoded.isEmpty()) {
             Base64.Decoder decoder = Base64.getDecoder();
             byte[] functionCallInfoMapBytes = decoder.decode(encoded);

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19085
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: goldfish
Gerrit-Change-Id: I809c692777349025d5ce0435c3a6068d432cd282
Gerrit-Change-Number: 19085
Gerrit-PatchSet: 1
Gerrit-Owner: Ayush Tripathi <[email protected]>
Gerrit-MessageType: newchange

Reply via email to