>From hari <[email protected]>:

hari has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17419 )


Change subject: [WIP] Integrate apache iceberg support
......................................................................

[WIP] Integrate apache iceberg support

        - user model changes: no
        - storage format changes: no
        - interface changes: no

        * External Adapters modified: AWS S3 and Hadoop

Change-Id: I12df589a6dffdc5af4a5cace68a11729995ea9af
---
M asterixdb/asterix-external-data/pom.xml
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M asterixdb/pom.xml
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
M hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
7 files changed, 133 insertions(+), 7 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/19/17419/1

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 2235005..67cda5c 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -267,6 +267,12 @@
     INVALID_TIMEZONE(1172),
     INVALID_PARAM_VALUE_ALLOWED_VALUE(1173),

+    UNSUPPORTED_ICEBERG_TABLE(1174),
+
+    UNSUPPORTED_ICEBERG_FORMAT_VERSION(1175),
+
+    ERROR_READING_ICEBERG_METADATA(1176),
+
     // Feed errors
     DATAFLOW_ILLEGAL_STATE(3001),
     UTIL_DATAFLOW_UTILS_TUPLE_TOO_LARGE(3002),
diff --git a/asterixdb/asterix-external-data/pom.xml 
b/asterixdb/asterix-external-data/pom.xml
index dea4278..fd538df 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -556,6 +556,16 @@
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-util-ajax</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.iceberg</groupId>
+      <artifactId>iceberg-core</artifactId>
+      <version>1.1.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>1.11.1</version>
+    </dependency>
   </dependencies>
   <!-- apply patch for HADOOP-17225 to workaround CVE-2019-10172 -->
   <repositories>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 92b7a95..93f1e69 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -55,7 +55,8 @@
     public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration,
             IWarningCollector warningCollector) throws AlgebricksException, 
HyracksDataException {
         //Get path
-        String path = buildPathURIs(configuration, warningCollector);
+        String path = configuration.containsKey(ExternalDataConstants.KEY_PATH)
+                ? configuration.get(ExternalDataConstants.KEY_PATH) : 
buildPathURIs(configuration, warningCollector);
         //Put S3 configurations to AsterixDB's Hadoop configuration
         putS3ConfToHadoopConf(configuration, path);

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 429706e..7af4519 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -116,6 +116,9 @@
     public static final String FORMAT_RECORD_WITH_METADATA = 
"record-with-metadata";
     // a string representing the format of the record (for adapters which 
produces records with additional information like pk or metadata)
     public static final String KEY_RECORD_FORMAT = "record-format";
+    public static final String TABLE_FORMAT = "table-format";
+    public static final String TABLE_LOCATION = "table-location";
+    public static final int SUPPORTED_ICEBERG_FORMAT_VERSION = 1;
     public static final String KEY_META_TYPE_NAME = "meta-type-name";
     public static final String KEY_ADAPTER_NAME = "adapter-name";
     public static final String READER_STREAM = "stream";
@@ -196,6 +199,7 @@
     public static final String FORMAT_CSV = "csv";
     public static final String FORMAT_TSV = "tsv";
     public static final String FORMAT_PARQUET = "parquet";
+    public static final String FORMAT_APACHE_ICEBERG = "apache-iceberg";
     public static final Set<String> ALL_FORMATS;

     static {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 5bf5844..d009502 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,15 +18,10 @@
  */
 package org.apache.asterix.external.util;

-import static 
org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL;
-import static 
org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_NOT_ALLOWED_AT_SAME_TIME;
-import static 
org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_REQUIRED;
-import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_ADAPTER_NAME_GCS;
 import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER;
 import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_ESCAPE;
 import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_EXCLUDE;
 import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_EXTERNAL_SCAN_BUFFER_SIZE;
-import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_FORMAT;
 import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_INCLUDE;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
 import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
@@ -69,6 +64,7 @@
 import org.apache.asterix.external.library.JavaLibrary;
 import org.apache.asterix.external.library.msgpack.MessagePackUtils;
 import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
 import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -79,6 +75,8 @@
 import org.apache.asterix.runtime.evaluators.common.NumberUtils;
 import org.apache.asterix.runtime.projection.DataProjectionInfo;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -94,6 +92,11 @@
 import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import org.apache.hyracks.util.StorageUtil;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;

 public class ExternalDataUtils {
     private static final Map<ATypeTag, IValueParserFactory> 
valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
@@ -422,7 +425,7 @@
      * @param configuration
      *            external data configuration
      */
-    public static void prepare(String adapterName, Map<String, String> 
configuration) {
+    public static void prepare(String adapterName, Map<String, String> 
configuration) throws AlgebricksException {
         if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) {
             configuration.put(ExternalDataConstants.KEY_READER, adapterName);
         }
@@ -436,6 +439,76 @@
                 && 
configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
             configuration.put(ExternalDataConstants.KEY_PARSER, 
configuration.get(ExternalDataConstants.KEY_FORMAT));
         }
+
+        if (configuration.containsKey(ExternalDataConstants.TABLE_FORMAT)) {
+            prepareTableFormat(configuration);
+        }
+    }
+
+    /**
+     * Prepares the configuration for data-lake table formats
+     *
+     * @param configuration
+     *            external data configuration
+     */
+    public static void prepareTableFormat(Map<String, String> configuration) 
throws AlgebricksException {
+        // Apache Iceberg table format
+        if 
(configuration.get(ExternalDataConstants.TABLE_FORMAT).equals(ExternalDataConstants.FORMAT_APACHE_ICEBERG))
 {
+            Configuration conf = new Configuration();
+
+            // If the table is in S3
+            if (configuration.get(ExternalDataConstants.KEY_READER)
+                    .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
+
+                conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
+                conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY,
+                        
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+            } else if 
(configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_HDFS))
 {
+                conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI,
+                        configuration.get(ExternalDataConstants.KEY_HDFS_URL));
+            }
+
+            HadoopTables tables = new HadoopTables(conf);
+
+            // Table location can be built using container and definition. See 
appendFileURI in AWSS3Parquetreaderfactory.
+            Table icebergTable = 
tables.load(configuration.get(ExternalDataConstants.TABLE_LOCATION));
+
+            if (icebergTable instanceof BaseTable) {
+                BaseTable baseTable = (BaseTable) icebergTable;
+
+                if (baseTable.operations().current()
+                        .formatVersion() != 
ExternalDataConstants.SUPPORTED_ICEBERG_FORMAT_VERSION) {
+                    throw new 
AsterixException(ErrorCode.UNSUPPORTED_ICEBERG_FORMAT_VERSION,
+                            "AsterixDB only supports Iceberg version up to "
+                                    + 
ExternalDataConstants.SUPPORTED_ICEBERG_FORMAT_VERSION);
+                }
+
+                try (CloseableIterable<FileScanTask> fileScanTasks = 
baseTable.newScan().planFiles()) {
+
+                    StringBuilder builder = new StringBuilder();
+
+                    for (FileScanTask task : fileScanTasks) {
+                        builder.append(",");
+                        String path = task.file().path().toString();
+                        builder.append(path);
+                    }
+
+                    if (builder.length() > 0) {
+                        builder.deleteCharAt(0);
+                    }
+
+                    configuration.put(ExternalDataConstants.KEY_PATH, 
builder.toString());
+
+                } catch (IOException e) {
+                    throw new 
AsterixException(ErrorCode.ERROR_READING_ICEBERG_METADATA, e);
+                }
+
+            } else {
+                throw new AsterixException(ErrorCode.UNSUPPORTED_ICEBERG_TABLE,
+                        "Invalid iceberg base table. Please remove metadata 
specifiers");
+            }
+
+        }
     }

     /**
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 561bc03..34e1ed5 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -1029,6 +1029,10 @@
             <artifactId>commons-logging</artifactId>
           </exclusion>
           <exclusion>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+          </exclusion>
+          <exclusion>
             <groupId>org.codehaus.woodstox</groupId>
             <artifactId>stax-api</artifactId>
           </exclusion>
@@ -2029,6 +2033,11 @@
         <artifactId>jetty-util-ajax</artifactId>
         <version>9.4.48.v20220622</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro</artifactId>
+        <version>1.11.1</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>

diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
index b16904d..3e57420 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
@@ -104,6 +104,10 @@
           <artifactId>nimbus-jose-jwt</artifactId>
         </exclusion>
         <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+        <exclusion>
           <groupId>javax.servlet.jsp</groupId>
           <artifactId>jsp-api</artifactId>
         </exclusion>
@@ -122,6 +126,10 @@
       <artifactId>hadoop-mapreduce-client-core</artifactId>
       <exclusions>
         <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+        <exclusion>
           <groupId>com.sun.jersey.jersey-test-framework</groupId>
           <artifactId>jersey-test-framework-grizzly2</artifactId>
         </exclusion>

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17419
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I12df589a6dffdc5af4a5cace68a11729995ea9af
Gerrit-Change-Number: 17419
Gerrit-PatchSet: 1
Gerrit-Owner: hari <[email protected]>
Gerrit-MessageType: newchange

Reply via email to