>From Peeyush Gupta <[email protected]>:

Peeyush Gupta has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19527 )


Change subject: [ASTERIXDB-3576][EXT] push predicates down to delta tables to 
filter row groups
......................................................................

[ASTERIXDB-3576][EXT] push predicates down to delta tables to filter row groups

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Delta table's data files are essentially Parquet files. Parquet allows
applying a predicate while reading data files to skip row groups.
With this patch we pushdown filters to individual parquet files of the
Delta table to filter row groups. The Predicate class of the Delta Kernel API
is not serializable, so we have added a custom serialization/de-serialization
of Delta kernel APIs Predicates.

Change-Id: I9fa1a84d7be63ada7b9768a81984b2172e7401b3
---
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
3 files changed, 177 insertions(+), 6 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/27/19527/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index a094c22..121a76b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -19,12 +19,15 @@
 package org.apache.asterix.external.input.record.reader.aws.delta;

 import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;

+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
@@ -41,6 +44,7 @@
 import io.delta.kernel.data.Row;
 import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
+import io.delta.kernel.expressions.Predicate;
 import io.delta.kernel.internal.InternalScanFileUtils;
 import io.delta.kernel.internal.data.ScanStateRow;
 import io.delta.kernel.types.StructType;
@@ -65,9 +69,10 @@
     private int fileIndex;
     private Row scanFile;
     private CloseableIterator<Row> rows;
+    private Optional<Predicate> filterPredicate;
 
-    public DeltaFileRecordReader(List<String> serScanFiles, String 
serScanState, ConfFactory config)
-            throws HyracksDataException {
+    public DeltaFileRecordReader(List<String> serScanFiles, String 
serScanState, ConfFactory config,
+            String filterExpressionStr) throws HyracksDataException {
         JobConf conf = config.getConf();
         this.engine = DefaultEngine.create(conf);
         this.scanFiles = new ArrayList<>();
@@ -85,15 +90,16 @@
             this.scanFile = scanFiles.get(0);
             this.fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
             this.physicalReadSchema = 
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
+            this.filterPredicate = 
PredicateSerDe.deserializeExpressionFromJson(filterExpressionStr);
             try {
                 this.physicalDataIter = engine.getParquetHandler()
-                        
.readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, 
Optional.empty());
+                        
.readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, 
filterPredicate);
                 this.dataIter = Scan.transformPhysicalData(engine, scanState, 
scanFile, physicalDataIter);
                 if (dataIter.hasNext()) {
                     rows = dataIter.next().getRows();
                 }
             } catch (IOException e) {
-                throw new RuntimeException(e);
+                throw new 
RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, e, 
getMessageOrToString(e));
             }
         }
     }
@@ -122,7 +128,7 @@
             physicalReadSchema = 
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
             try {
                 physicalDataIter = 
engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
-                        physicalReadSchema, Optional.empty());
+                        physicalReadSchema, filterPredicate);
                 dataIter = Scan.transformPhysicalData(engine, scanState, 
scanFile, physicalDataIter);
             } catch (IOException e) {
                 throw HyracksDataException.create(e);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index 4e902b9..7f3ba36 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -79,6 +79,7 @@
     private String scanState;
     protected final List<PartitionWorkLoadBasedOnSize> 
partitionWorkLoadsBasedOnSize = new ArrayList<>();
     protected ConfFactory confFactory;
+    private String filterExpressionStr;

     public List<PartitionWorkLoadBasedOnSize> 
getPartitionWorkLoadsBasedOnSize() {
         return partitionWorkLoadsBasedOnSize;
@@ -133,8 +134,14 @@
         if (filterExpression != null) {
             scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema)
                     .withFilter(engine, (Predicate) filterExpression).build();
+            if (scan.getRemainingFilter().isPresent()) {
+                filterExpressionStr = 
PredicateSerDe.serializeExpressionToJson(scan.getRemainingFilter().get());
+            } else {
+                filterExpressionStr = null;
+            }
         } else {
             scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema).build();
+            filterExpressionStr = null;
         }
         scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
         List<Row> scanFiles;
@@ -145,9 +152,11 @@
             // We need to fall back to skip applying the filter and return all 
files.
             LOGGER.info("Exception encountered while getting delta table files 
to scan {}", e.getMessage());
             scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema).build();
+            filterExpressionStr = null;
             scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
             scanFiles = getScanFiles(scan, engine);
         }
+        // A=1 AND B=2 (table partitioned on A)
         LOGGER.info("Number of delta table parquet data files to scan: {}", 
scanFiles.size());
         locationConstraints = getPartitions(appCtx);
         configuration.put(ExternalDataConstants.KEY_PARSER, 
ExternalDataConstants.FORMAT_DELTA);
@@ -206,7 +215,7 @@
         try {
             int partition = context.getPartition();
             return new 
DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(),
 scanState,
-                    confFactory);
+                    confFactory, filterExpressionStr);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java
new file mode 100644
index 0000000..89c4230
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright (2023) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Predicate;
+
+/**
+ * Utility class to serialize and deserialize {@link Predicate} object.
+ */
+public class PredicateSerDe {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private PredicateSerDe() {
+    }
+
+    public static String serializeExpressionToJson(Expression expression) {
+        Map<String, Object> expressionObject = visitExpression(expression);
+        try {
+            return OBJECT_MAPPER.writeValueAsString(expressionObject);
+        } catch (JsonProcessingException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    public static Optional<Predicate> deserializeExpressionFromJson(String 
jsonExpression) {
+        try {
+            if (jsonExpression == null) {
+                return Optional.empty();
+            }
+            JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonExpression);
+            return Optional.of((Predicate) visitExpression((ObjectNode) 
jsonNode));
+        } catch (JsonProcessingException ex) {
+            throw new UncheckedIOException(ex);
+        }
+    }
+
+    public static Map<String, Object> visitPredicate(Predicate predicate) {
+        Map<String, Object> predicateObject = new HashMap<>();
+        predicateObject.put("type", "predicate");
+        predicateObject.put("name", predicate.getName());
+        predicateObject.put("left", 
visitExpression(predicate.getChildren().get(0)));
+        predicateObject.put("right", 
visitExpression(predicate.getChildren().get(1)));
+        return predicateObject;
+    }
+
+    public static Map<String, Object> visitLiteral(Literal literal) {
+        Map<String, Object> literalObject = new HashMap<>();
+        literalObject.put("type", "literal");
+        literalObject.put("dataType", literal.getDataType());
+        literalObject.put("value", literal.getValue());
+        return literalObject;
+    }
+
+    public static Map<String, Object> visitColumn(Column column) {
+        Map<String, Object> columnObject = new HashMap<>();
+        columnObject.put("type", "column");
+        columnObject.put("names", column.getNames());
+        return columnObject;
+    }
+
+    private static Map<String, Object> visitExpression(Expression expression) {
+        return switch (expression) {
+            case Predicate predicate -> visitPredicate(predicate);
+            case Column column -> visitColumn(column);
+            case Literal literal -> visitLiteral(literal);
+            case null, default -> throw new 
UnsupportedOperationException("NYI");
+        };
+    }
+
+    public static Predicate visitPredicate(ObjectNode node) {
+        return new Predicate(node.get("name").asText(), 
visitExpression((ObjectNode) node.get("left")),
+                visitExpression((ObjectNode) node.get("right")));
+    }
+
+    public static Literal visitLiteral(ObjectNode node) {
+        switch (node.get("dataType").asText()) {
+            case "boolean" : return 
Literal.ofBoolean(node.get("value").asBoolean());
+            case "byte" : return Literal.ofByte((byte) 
node.get("value").asInt());
+            case "short" : return 
Literal.ofShort(node.get("value").shortValue());
+            case "integer" : return Literal.ofInt(node.get("value").asInt());
+            case "long" : return Literal.ofLong(node.get("value").asLong());
+            case "float" : return 
Literal.ofFloat(node.get("value").floatValue());
+            case "double" : return 
Literal.ofDouble(node.get("value").doubleValue());
+            case "date" : return Literal.ofDate(node.get("value").asInt());
+            case "timestamp" : return 
Literal.ofTimestamp(node.get("value").asLong());
+            case "string" : return 
Literal.ofString(node.get("value").asText());
+            case null, default : throw new 
UnsupportedOperationException("NYI");
+        }
+    }
+
+    public static Column visitColumn(ObjectNode node) {
+        if (node.get("names").isArray()) {
+            return new 
Column(StreamSupport.stream(node.get("names").spliterator(), 
false).map(JsonNode::asText)
+                    .toArray(String[]::new));
+        } else {
+            return new Column(node.get("names").asText());
+        }
+    }
+
+    private static Expression visitExpression(ObjectNode node) {
+        return switch (node.get("type").asText()) {
+            case "predicate" -> visitPredicate(node);
+            case "column" -> visitColumn(node);
+            case "literal" -> visitLiteral(node);
+            case null, default -> throw new 
UnsupportedOperationException("NYI");
+        };
+    }
+}

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19527
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: ionic
Gerrit-Change-Id: I9fa1a84d7be63ada7b9768a81984b2172e7401b3
Gerrit-Change-Number: 19527
Gerrit-PatchSet: 1
Gerrit-Owner: Peeyush Gupta <[email protected]>
Gerrit-MessageType: newchange

Reply via email to