>From Peeyush Gupta <[email protected]>:

Peeyush Gupta has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19527 )

Change subject: [ASTERIXDB-3576][EXT] push predicates down to delta tables to 
filter row groups
......................................................................

[ASTERIXDB-3576][EXT] push predicates down to delta tables to filter row groups

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Delta table's data files are essentially Parquet files. Parquet allows
applying a predicate while reading data files to skip row groups.
With this patch we pushdown filters to individual parquet files of the
Delta table to filter row groups. The Predicate class of the Delta Kernel API
is not serializable, so we have added a custom serialization/de-serialization
of Delta kernel APIs Predicates.

Ext-ref: MB-65315
Change-Id: I9fa1a84d7be63ada7b9768a81984b2172e7401b3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19527
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Peeyush Gupta <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
Tested-by: Jenkins <[email protected]>
---
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
3 files changed, 181 insertions(+), 6 deletions(-)

Approvals:
  Ali Alsuliman: Looks good to me, approved
  Peeyush Gupta: Looks good to me, but someone else must approve
  Jenkins: Verified; Verified
  Anon. E. Moose #1000171:




diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index a094c22..121a76b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -19,12 +19,15 @@
 package org.apache.asterix.external.input.record.reader.aws.delta;

 import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;

+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
@@ -41,6 +44,7 @@
 import io.delta.kernel.data.Row;
 import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
+import io.delta.kernel.expressions.Predicate;
 import io.delta.kernel.internal.InternalScanFileUtils;
 import io.delta.kernel.internal.data.ScanStateRow;
 import io.delta.kernel.types.StructType;
@@ -65,9 +69,10 @@
     private int fileIndex;
     private Row scanFile;
     private CloseableIterator<Row> rows;
+    private Optional<Predicate> filterPredicate;

-    public DeltaFileRecordReader(List<String> serScanFiles, String 
serScanState, ConfFactory config)
-            throws HyracksDataException {
+    public DeltaFileRecordReader(List<String> serScanFiles, String 
serScanState, ConfFactory config,
+            String filterExpressionStr) throws HyracksDataException {
         JobConf conf = config.getConf();
         this.engine = DefaultEngine.create(conf);
         this.scanFiles = new ArrayList<>();
@@ -85,15 +90,16 @@
             this.scanFile = scanFiles.get(0);
             this.fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
             this.physicalReadSchema = 
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
+            this.filterPredicate = 
PredicateSerDe.deserializeExpressionFromJson(filterExpressionStr);
             try {
                 this.physicalDataIter = engine.getParquetHandler()
-                        
.readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, 
Optional.empty());
+                        
.readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, 
filterPredicate);
                 this.dataIter = Scan.transformPhysicalData(engine, scanState, 
scanFile, physicalDataIter);
                 if (dataIter.hasNext()) {
                     rows = dataIter.next().getRows();
                 }
             } catch (IOException e) {
-                throw new RuntimeException(e);
+                throw new 
RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, e, 
getMessageOrToString(e));
             }
         }
     }
@@ -122,7 +128,7 @@
             physicalReadSchema = 
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
             try {
                 physicalDataIter = 
engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
-                        physicalReadSchema, Optional.empty());
+                        physicalReadSchema, filterPredicate);
                 dataIter = Scan.transformPhysicalData(engine, scanState, 
scanFile, physicalDataIter);
             } catch (IOException e) {
                 throw HyracksDataException.create(e);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index 4e902b9..b76dd4d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -79,6 +79,7 @@
     private String scanState;
     protected final List<PartitionWorkLoadBasedOnSize> 
partitionWorkLoadsBasedOnSize = new ArrayList<>();
     protected ConfFactory confFactory;
+    private String filterExpressionStr;

     public List<PartitionWorkLoadBasedOnSize> 
getPartitionWorkLoadsBasedOnSize() {
         return partitionWorkLoadsBasedOnSize;
@@ -133,8 +134,14 @@
         if (filterExpression != null) {
             scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema)
                     .withFilter(engine, (Predicate) filterExpression).build();
+            if (scan.getRemainingFilter().isPresent()) {
+                filterExpressionStr = 
PredicateSerDe.serializeExpressionToJson(scan.getRemainingFilter().get());
+            } else {
+                filterExpressionStr = null;
+            }
         } else {
             scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema).build();
+            filterExpressionStr = null;
         }
         scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
         List<Row> scanFiles;
@@ -145,6 +152,7 @@
             // We need to fall back to skip applying the filter and return all 
files.
             LOGGER.info("Exception encountered while getting delta table files 
to scan {}", e.getMessage());
             scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema).build();
+            filterExpressionStr = null;
             scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
             scanFiles = getScanFiles(scan, engine);
         }
@@ -206,7 +214,7 @@
         try {
             int partition = context.getPartition();
             return new 
DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(),
 scanState,
-                    confFactory);
+                    confFactory, filterExpressionStr);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java
new file mode 100644
index 0000000..efd47f5
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright (2023) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Predicate;
+
+/**
+ * Utility class to serialize and deserialize {@link Predicate} object.
+ */
+public class PredicateSerDe {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private PredicateSerDe() {
+    }
+
+    public static String serializeExpressionToJson(Expression expression) {
+        Map<String, Object> expressionObject = visitExpression(expression);
+        try {
+            return OBJECT_MAPPER.writeValueAsString(expressionObject);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Optional<Predicate> deserializeExpressionFromJson(String 
jsonExpression) {
+        try {
+            if (jsonExpression == null) {
+                return Optional.empty();
+            }
+            JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonExpression);
+            return Optional.of((Predicate) visitExpression((ObjectNode) 
jsonNode));
+        } catch (JsonProcessingException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    public static Map<String, Object> visitPredicate(Predicate predicate) {
+        Map<String, Object> predicateObject = new HashMap<>();
+        predicateObject.put("type", "predicate");
+        predicateObject.put("name", predicate.getName());
+        predicateObject.put("left", 
visitExpression(predicate.getChildren().get(0)));
+        predicateObject.put("right", 
visitExpression(predicate.getChildren().get(1)));
+        return predicateObject;
+    }
+
+    public static Map<String, Object> visitLiteral(Literal literal) {
+        Map<String, Object> literalObject = new HashMap<>();
+        literalObject.put("type", "literal");
+        literalObject.put("dataType", literal.getDataType().toString());
+        literalObject.put("value", literal.getValue());
+        return literalObject;
+    }
+
+    public static Map<String, Object> visitColumn(Column column) {
+        Map<String, Object> columnObject = new HashMap<>();
+        columnObject.put("type", "column");
+        columnObject.put("names", column.getNames());
+        return columnObject;
+    }
+
+    private static Map<String, Object> visitExpression(Expression expression) {
+        return switch (expression) {
+            case Predicate predicate -> visitPredicate(predicate);
+            case Column column -> visitColumn(column);
+            case Literal literal -> visitLiteral(literal);
+            case null, default -> throw new 
UnsupportedOperationException("Unsupported expression type: " + expression);
+        };
+    }
+
+    public static Predicate visitPredicate(ObjectNode node) {
+        return new Predicate(node.get("name").asText(), 
visitExpression((ObjectNode) node.get("left")),
+                visitExpression((ObjectNode) node.get("right")));
+    }
+
+    public static Literal visitLiteral(ObjectNode node) {
+        switch (node.get("dataType").asText()) {
+            case "boolean" : return 
Literal.ofBoolean(node.get("value").asBoolean());
+            case "byte" : return Literal.ofByte((byte) 
node.get("value").asInt());
+            case "short" : return 
Literal.ofShort(node.get("value").shortValue());
+            case "integer" : return Literal.ofInt(node.get("value").asInt());
+            case "long" : return Literal.ofLong(node.get("value").asLong());
+            case "float" : return 
Literal.ofFloat(node.get("value").floatValue());
+            case "double" : return 
Literal.ofDouble(node.get("value").doubleValue());
+            case "date" : return Literal.ofDate(node.get("value").asInt());
+            case "timestamp" : return 
Literal.ofTimestamp(node.get("value").asLong());
+            case "string" : return 
Literal.ofString(node.get("value").asText());
+            case null, default : throw new 
UnsupportedOperationException("Unsupported literal type: " + 
node.get("dataType").asText());
+        }
+    }
+
+    public static Column visitColumn(ObjectNode node) {
+        if (node.get("names").isArray()) {
+            return new 
Column(StreamSupport.stream(node.get("names").spliterator(), 
false).map(JsonNode::asText)
+                    .toArray(String[]::new));
+        } else {
+            return new Column(node.get("names").asText());
+        }
+    }
+
+    private static Expression visitExpression(ObjectNode node) {
+        return switch (node.get("type").asText()) {
+            case "predicate" -> visitPredicate(node);
+            case "column" -> visitColumn(node);
+            case "literal" -> visitLiteral(node);
+            case null, default -> throw new 
UnsupportedOperationException("Unsupported expression type: " + 
node.get("type").asText());
+        };
+    }
+}

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19527
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: ionic
Gerrit-Change-Id: I9fa1a84d7be63ada7b9768a81984b2172e7401b3
Gerrit-Change-Number: 19527
Gerrit-PatchSet: 4
Gerrit-Owner: Peeyush Gupta <[email protected]>
Gerrit-Reviewer: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-Reviewer: Peeyush Gupta <[email protected]>
Gerrit-MessageType: merged

Reply via email to