>From Ayush Tripathi <[email protected]>:

Ayush Tripathi has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18708 )


Change subject: [NO ISSUE]: Deltalake format support
......................................................................

[NO ISSUE]: Deltalake format support

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Creating external dataset from a given Deltalake table path.
Change-Id: I9627faf6b46d6fe85f43d0ed1826eb71cb675cb8

Change-Id: Iff608397aab711f324861fe83eeb428f73682912
---
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetOnePartitionTest.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/deltalake/deltalake-read-from-latest-snapshot.00.ddl.sqlpp
M asterixdb/asterix-external-data/pom.xml
M 
asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake/read-data.2.adm
A 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaLakeTableGenerator.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/deltalake/deltalake-read-from-latest-snapshot.01.query.sqlpp
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
10 files changed, 310 insertions(+), 85 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/08/18708/1

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetOnePartitionTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetOnePartitionTest.java
index 86d03a1..cda7c65 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetOnePartitionTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetOnePartitionTest.java
@@ -50,6 +50,7 @@
         PREPARE_MIXED_DATA_BUCKET = 
AwsS3ExternalDatasetOnePartitionTest::prepareMixedDataBucket;
         PREPARE_BOM_FILE_BUCKET = 
AwsS3ExternalDatasetOnePartitionTest::prepareBomDataBucket;
         PREPARE_ICEBERG_TABLE_BUCKET = 
AwsS3ExternalDatasetOnePartitionTest::prepareIcebergTableBucket;
+        PREPARE_DELTALAKE_TABLE_BUCKET = 
AwsS3ExternalDatasetOnePartitionTest::prepareDeltalakeTableBucket;
         return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
     }

@@ -70,4 +71,8 @@

     private static void prepareIcebergTableBucket() {
     }
+
+    private static void prepareDeltalakeTableBucket() {
+    }
+
 }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
index 9a2bfbc..d36e566 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
@@ -18,38 +18,13 @@
  */
 package org.apache.asterix.test.external_dataset.aws;

-import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createAvroFiles;
-import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createAvroFilesRecursively;
-import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createBinaryFiles;
-import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createBinaryFilesRecursively;
-import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.setDataPaths;
-import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.setUploaders;
-import static 
org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil.DEFAULT_PARQUET_SRC_PATH;
-import static org.apache.hyracks.util.file.FileUtil.joinPath;
-import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
-import static org.apache.iceberg.types.Types.NestedField.required;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.zip.GZIPOutputStream;
-
+import io.findify.s3mock.S3Mock;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
 import org.apache.asterix.test.common.TestConstants;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils;
+import 
org.apache.asterix.test.external_dataset.deltalake.DeltaLakeTableGenerator;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
 import org.apache.asterix.test.runtime.LangExecutionUtil;
 import org.apache.asterix.testframework.context.TestCaseContext;
@@ -88,8 +63,6 @@
 import org.junit.runners.MethodSorters;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
-
-import io.findify.s3mock.S3Mock;
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.core.sync.RequestBody;
 import software.amazon.awssdk.regions.Region;
@@ -100,6 +73,33 @@
 import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;

+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.GZIPOutputStream;
+
+import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createAvroFiles;
+import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createAvroFilesRecursively;
+import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createBinaryFiles;
+import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createBinaryFilesRecursively;
+import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.setDataPaths;
+import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.setUploaders;
+import static 
org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil.DEFAULT_PARQUET_SRC_PATH;
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
 /**
  * Runs an AWS S3 mock server and test it as an external dataset
  */
@@ -124,6 +124,8 @@

     static Runnable PREPARE_ICEBERG_TABLE_BUCKET;

+    static Runnable PREPARE_DELTALAKE_TABLE_BUCKET;
+
     // Base directory paths for data files
     private static final String JSON_DATA_PATH = joinPath("data", "json");
     private static final String CSV_DATA_PATH = joinPath("data", "csv");
@@ -153,6 +155,7 @@
     public static final String INCLUDE_EXCLUDE_CONTAINER = "include-exclude";
     public static final String BOM_FILE_CONTAINER = "bom-file-container";
     public static final String ICEBERG_TABLE_CONTAINER = "iceberg-container";
+    public static final String DELTALAKE_TABLE_CONTAINER = 
"deltalake-container";

     public static final PutObjectRequest.Builder playgroundBuilder =
             PutObjectRequest.builder().bucket(PLAYGROUND_CONTAINER);
@@ -167,6 +170,8 @@

     public static final PutObjectRequest.Builder icebergContainerBuilder =
             PutObjectRequest.builder().bucket(ICEBERG_TABLE_CONTAINER);
+    public static final PutObjectRequest.Builder deltalakeContainerBuilder =
+            PutObjectRequest.builder().bucket(DELTALAKE_TABLE_CONTAINER);

     public AwsS3ExternalDatasetTest(TestCaseContext tcCtx) {
         this.tcCtx = tcCtx;
@@ -176,7 +181,7 @@
     private static final Schema SCHEMA =
             new Schema(required(1, "id", Types.IntegerType.get()), required(2, 
"data", Types.StringType.get()));
     private static final Configuration CONF = new Configuration();
-
+    public static final String DELTALAKE_TABLE_PATH = "s3a://" + 
DELTALAKE_TABLE_CONTAINER + "/my-table/";
     private static final String ICEBERG_TABLE_PATH = "s3a://" + 
ICEBERG_TABLE_CONTAINER + "/my-table/";
     private static final String ICEBERG_TABLE_PATH_FORMAT_VERSION_2 =
             "s3a://" + ICEBERG_TABLE_CONTAINER + "/my-table-format-version-2/";
@@ -245,6 +250,11 @@
         CONF.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, 
TestConstants.S3_SECRET_ACCESS_KEY_DEFAULT);
     }

+    public static void prepareDeltalakeTableContainer() {
+        prepareIcebergConfiguration();
+        DeltaLakeTableGenerator.prepareDeltalakeTableContainer(CONF);
+    }
+
     public static void prepareIcebergTableContainer() {
         prepareIcebergConfiguration();
         Tables tables = new HadoopTables(CONF);
@@ -361,7 +371,7 @@
         PREPARE_MIXED_DATA_BUCKET = 
ExternalDatasetTestUtils::prepareMixedDataContainer;
         PREPARE_BOM_FILE_BUCKET = 
ExternalDatasetTestUtils::prepareBomFileContainer;
         PREPARE_ICEBERG_TABLE_BUCKET = 
AwsS3ExternalDatasetTest::prepareIcebergTableContainer;
-
+        PREPARE_DELTALAKE_TABLE_BUCKET = 
AwsS3ExternalDatasetTest::prepareDeltalakeTableContainer;
         return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
     }

@@ -411,6 +421,7 @@
         
client.createBucket(CreateBucketRequest.builder().bucket(INCLUDE_EXCLUDE_CONTAINER).build());
         
client.createBucket(CreateBucketRequest.builder().bucket(BOM_FILE_CONTAINER).build());
         
client.createBucket(CreateBucketRequest.builder().bucket(ICEBERG_TABLE_CONTAINER).build());
+        
client.createBucket(CreateBucketRequest.builder().bucket(DELTALAKE_TABLE_CONTAINER).build());
         LOGGER.info("Client created successfully");

         // Create the bucket and upload some json files
@@ -424,6 +435,7 @@
         PREPARE_MIXED_DATA_BUCKET.run();
         PREPARE_BOM_FILE_BUCKET.run();
         PREPARE_ICEBERG_TABLE_BUCKET.run();
+        PREPARE_DELTALAKE_TABLE_BUCKET.run();
     }

     private static void loadPlaygroundData(String key, String content, boolean 
fromFile, boolean gzipped) {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaLakeTableGenerator.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaLakeTableGenerator.java
new file mode 100644
index 0000000..4841418
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaLakeTableGenerator.java
@@ -0,0 +1,92 @@
+package org.apache.asterix.test.external_dataset.deltalake;
+
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Operation;
+import io.delta.standalone.OptimisticTransaction;
+import io.delta.standalone.actions.Action;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.actions.Metadata;
+import io.delta.standalone.types.IntegerType;
+import io.delta.standalone.types.StringType;
+import io.delta.standalone.types.StructField;
+import io.delta.standalone.types.StructType;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static 
org.apache.asterix.test.external_dataset.aws.AwsS3ExternalDatasetTest.DELTALAKE_TABLE_PATH;
+
+public class DeltaLakeTableGenerator {
+
+    public static void prepareDeltalakeTableContainer(Configuration conf) {
+        Schema schema = 
SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("data").endRecord();
+        try {
+            Path path = new Path(DELTALAKE_TABLE_PATH, "file.parquet");
+            ParquetWriter<GenericData.Record> writer =
+                    AvroParquetWriter.<GenericData.Record> 
builder(path).withConf(conf).withSchema(schema).build();
+
+            List<GenericData.Record> fileFirstSnapshotRecords = 
ImmutableList.of(new GenericData.Record(schema),
+                    new GenericData.Record(schema), new 
GenericData.Record(schema));
+            List<GenericData.Record> fileSecondSnapshotRecords = 
ImmutableList.of(new GenericData.Record(schema));
+
+            fileFirstSnapshotRecords.get(0).put("id", 0);
+            fileFirstSnapshotRecords.get(0).put("data", "vibrant_mclean");
+
+            fileFirstSnapshotRecords.get(1).put("id", 1);
+            fileFirstSnapshotRecords.get(1).put("data", "frosty_wilson");
+
+            fileFirstSnapshotRecords.get(2).put("id", 2);
+            fileFirstSnapshotRecords.get(2).put("data", "serene_kirby");
+
+            fileSecondSnapshotRecords.get(0).put("id", 3);
+            fileSecondSnapshotRecords.get(0).put("data", "peaceful_pare");
+
+            for (GenericData.Record record : fileFirstSnapshotRecords) {
+                writer.write(record);
+            }
+
+            long size = writer.getDataSize();
+            writer.close();
+
+            List<Action> actions = List.of(new AddFile("file.parquet", new 
HashMap<String, String>(), size,
+                    System.currentTimeMillis(), true, null, null));
+            DeltaLog log = DeltaLog.forTable(conf, DELTALAKE_TABLE_PATH);
+            OptimisticTransaction txn = log.startTransaction();
+            Metadata metaData = 
txn.metadata().copyBuilder().partitionColumns(new ArrayList<String>())
+                    .schema(new StructType().add(new StructField("id", new 
IntegerType(), true))
+                            .add(new StructField("data", new StringType(), 
true)))
+                    .build();
+            txn.updateMetadata(metaData);
+            txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), 
"deltalake-table-create");
+
+            Path path2 = new Path(DELTALAKE_TABLE_PATH, "file2.parquet");
+            ParquetWriter<GenericData.Record> writer2 =
+                    AvroParquetWriter.<GenericData.Record> 
builder(path2).withConf(conf).withSchema(schema).build();
+
+            for (GenericData.Record record : fileSecondSnapshotRecords) {
+                writer2.write(record);
+            }
+
+            long size2 = writer2.getDataSize();
+            writer2.close();
+
+            List<Action> actions2 = List.of(new AddFile("file2.parquet", new 
HashMap<String, String>(), size2,
+                    System.currentTimeMillis(), true, null, null));
+            OptimisticTransaction txn2 = log.startTransaction();
+            txn2.commit(actions2, new Operation(Operation.Name.WRITE), 
"deltalake-table-create");
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/deltalake/deltalake-read-from-latest-snapshot.00.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/deltalake/deltalake-read-from-latest-snapshot.00.ddl.sqlpp
new file mode 100644
index 0000000..080f172
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/deltalake/deltalake-read-from-latest-snapshot.00.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+ USE test;
+
+ CREATE TYPE DeltalakeTableType AS {
+ };
+
+ CREATE EXTERNAL DATASET DeltalakeDataset(DeltalakeTableType) USING S3 (
+ ("accessKeyId"="dummyAccessKey"),
+ ("secretAccessKey"="dummySecretKey"),
+ ("region"="us-west-2"),
+ ("serviceEndpoint"="http://127.0.0.1:8001";),
+ ("container"="deltalake-container"),
+ ("definition"="my-table"),
+ ("table-format"="deltalake"),
+ ("format"="parquet")
+ );
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/deltalake/deltalake-read-from-latest-snapshot.01.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/deltalake/deltalake-read-from-latest-snapshot.01.query.sqlpp
new file mode 100644
index 0000000..bfd6581
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/deltalake/deltalake-read-from-latest-snapshot.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds order by ds.id;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake/read-data.2.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake/read-data.2.adm
new file mode 100644
index 0000000..1ef5083
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake/read-data.2.adm
@@ -0,0 +1,4 @@
+{ "id": 0, "data": "vibrant_mclean" }
+{ "id": 1, "data": "frosty_wilson" }
+{ "id": 2, "data": "serene_kirby" }
+{ "id": 3, "data": "peaceful_pare" }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 8e31aa4..39809e7 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -980,6 +980,14 @@
         <output-dir compare="Text">common/byte_order_mark/tsv</output-dir>
       </compilation-unit>
     </test-case>
+  <!-- Deltalake Tests Start -->
+  </test-group>-->
+  <test-group name="deltalake">
+    <test-case FilePath="external-dataset/s3">
+      <compilation-unit name="deltalake">
+        <output-dir compare="Text">deltalake</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <!-- Iceberg Tests Start -->
   <!-- ASTERIXDB-3468: iceberg tests failing due to unsupported version
diff --git a/asterixdb/asterix-external-data/pom.xml 
b/asterixdb/asterix-external-data/pom.xml
index 6a3f891..21eaf71 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -573,6 +573,11 @@
       <groupId>org.apache.avro</groupId>
       <artifactId>avro</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.delta</groupId>
+      <artifactId>delta-standalone_2.12</artifactId>
+      <version>3.0.0</version>
+    </dependency>
   </dependencies>
   <!-- apply patch for HADOOP-17225 to workaround CVE-2019-10172 -->
   <repositories>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 6a4b336..65e6b67 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -125,6 +125,7 @@
     public static final String KEY_RECORD_FORMAT = "record-format";
     public static final String TABLE_FORMAT = "table-format";
     public static final String ICEBERG_METADATA_LOCATION = "metadata-path";
+    public static final String DELTALAKE_METADATA_LOCATION = "metadata-path";
     public static final int SUPPORTED_ICEBERG_FORMAT_VERSION = 1;
     public static final String KEY_META_TYPE_NAME = "meta-type-name";
     public static final String KEY_ADAPTER_NAME = "adapter-name";
@@ -212,6 +213,7 @@
     public static final String FORMAT_TSV = "tsv";
     public static final String FORMAT_PARQUET = "parquet";
     public static final String FORMAT_APACHE_ICEBERG = "apache-iceberg";
+    public static final String FORMAT_DELTALAKE = "deltalake";
     public static final Set<String> ALL_FORMATS;
     public static final Set<String> TEXTUAL_FORMATS;

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 104f301..0086cda 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,41 +18,9 @@
  */
 package org.apache.asterix.external.util;

-import static 
org.apache.asterix.common.metadata.MetadataConstants.DEFAULT_DATABASE;
-import static 
org.apache.asterix.external.util.ExternalDataConstants.DEFINITION_FIELD_NAME;
-import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER;
-import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_ESCAPE;
-import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_EXCLUDE;
-import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_EXTERNAL_SCAN_BUFFER_SIZE;
-import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_INCLUDE;
-import static org.apache.asterix.external.util.ExternalDataConstants.KEY_PATH;
-import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
-import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
-import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START;
-import static 
org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties;
-import static 
org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
-import static 
org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties;
-import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
-import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
-import static 
org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS;
-import static org.msgpack.core.MessagePack.Code.ARRAY16;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.BiPredicate;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
-
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
@@ -108,6 +76,41 @@
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;

+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiPredicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import static 
org.apache.asterix.common.metadata.MetadataConstants.DEFAULT_DATABASE;
+import static 
org.apache.asterix.external.util.ExternalDataConstants.DEFINITION_FIELD_NAME;
+import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER;
+import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_ESCAPE;
+import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_EXCLUDE;
+import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_EXTERNAL_SCAN_BUFFER_SIZE;
+import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_INCLUDE;
+import static org.apache.asterix.external.util.ExternalDataConstants.KEY_PATH;
+import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
+import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
+import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START;
+import static 
org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties;
+import static 
org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
+import static 
org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties;
+import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
+import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
+import static 
org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS;
+import static org.msgpack.core.MessagePack.Code.ARRAY16;
+
 public class ExternalDataUtils {
     private static final Map<ATypeTag, IValueParserFactory> 
valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
     private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
@@ -479,27 +482,46 @@
      * @param configuration external data configuration
      */
     public static void prepareTableFormat(Map<String, String> configuration) 
throws AlgebricksException {
+        // Deltalake table format
+        Configuration conf = new Configuration();
+
+        String metadata_path = 
configuration.get(ExternalDataConstants.ICEBERG_METADATA_LOCATION);
+
+        // If the table is in S3
+        if (configuration.get(ExternalDataConstants.KEY_READER)
+                .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
+
+            conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
+            conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY,
+                    
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+            metadata_path = S3Constants.HADOOP_S3_PROTOCOL + "://"
+                    + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+                    + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+        } else if 
(configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_HDFS))
 {
+            conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI,
+                    configuration.get(ExternalDataConstants.KEY_HDFS_URL));
+            metadata_path = 
configuration.get(ExternalDataConstants.KEY_HDFS_URL) + '/' + metadata_path;
+        }
+
+        if 
(configuration.get(ExternalDataConstants.TABLE_FORMAT).equals(ExternalDataConstants.FORMAT_DELTALAKE))
 {
+            DeltaLog deltaLog = DeltaLog.forTable(conf, metadata_path);
+            Snapshot snapshot = deltaLog.snapshot();
+            // we can set version deltaLog.getSnapshotForVersionAsOf(1);
+            List<AddFile> dataFiles = snapshot.getAllFiles();
+            StringBuilder builder = new StringBuilder();
+            for (AddFile batchFile : dataFiles) {
+                builder.append(",");
+                String path = batchFile.getPath();
+                builder.append(metadata_path + '/' + path);
+            }
+            if (builder.length() > 0) {
+                builder.deleteCharAt(0);
+            }
+            configuration.put(ExternalDataConstants.KEY_PATH, 
builder.toString());
+        }
+
         // Apache Iceberg table format
         if 
(configuration.get(ExternalDataConstants.TABLE_FORMAT).equals(ExternalDataConstants.FORMAT_APACHE_ICEBERG))
 {
-            Configuration conf = new Configuration();
-
-            String metadata_path = 
configuration.get(ExternalDataConstants.ICEBERG_METADATA_LOCATION);
-
-            // If the table is in S3
-            if (configuration.get(ExternalDataConstants.KEY_READER)
-                    .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
-
-                conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
-                conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY,
-                        
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
-                metadata_path = S3Constants.HADOOP_S3_PROTOCOL + "://"
-                        + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
-                        + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
-            } else if 
(configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_HDFS))
 {
-                conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI,
-                        configuration.get(ExternalDataConstants.KEY_HDFS_URL));
-                metadata_path = 
configuration.get(ExternalDataConstants.KEY_HDFS_URL) + '/' + metadata_path;
-            }

             HadoopTables tables = new HadoopTables(conf);


--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18708
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Iff608397aab711f324861fe83eeb428f73682912
Gerrit-Change-Number: 18708
Gerrit-PatchSet: 1
Gerrit-Owner: Ayush Tripathi <[email protected]>
Gerrit-MessageType: newchange

Reply via email to