>From hari <[email protected]>:
hari has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17419 )
Change subject: [WIP] Integrate apache iceberg support
......................................................................
[WIP] Integrate apache iceberg support
- user model changes: no
- storage format changes: no
- interface changes: no
* External Adapters modified: AWS S3 and Hadoop
Change-Id: I12df589a6dffdc5af4a5cace68a11729995ea9af
---
M asterixdb/asterix-external-data/pom.xml
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M asterixdb/pom.xml
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
M hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
7 files changed, 133 insertions(+), 7 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/19/17419/1
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 2235005..67cda5c 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -267,6 +267,12 @@
INVALID_TIMEZONE(1172),
INVALID_PARAM_VALUE_ALLOWED_VALUE(1173),
+ UNSUPPORTED_ICEBERG_TABLE(1174),
+
+ UNSUPPORTED_ICEBERG_FORMAT_VERSION(1175),
+
+ ERROR_READING_ICEBERG_METADATA(1176),
+
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
UTIL_DATAFLOW_UTILS_TUPLE_TOO_LARGE(3002),
diff --git a/asterixdb/asterix-external-data/pom.xml
b/asterixdb/asterix-external-data/pom.xml
index dea4278..fd538df 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -556,6 +556,16 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util-ajax</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ <version>1.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.11.1</version>
+ </dependency>
</dependencies>
<!-- apply patch for HADOOP-17225 to workaround CVE-2019-10172 -->
<repositories>
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 92b7a95..93f1e69 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -55,7 +55,8 @@
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
IWarningCollector warningCollector) throws AlgebricksException,
HyracksDataException {
//Get path
- String path = buildPathURIs(configuration, warningCollector);
+ String path = configuration.containsKey(ExternalDataConstants.KEY_PATH)
+ ? configuration.get(ExternalDataConstants.KEY_PATH) :
buildPathURIs(configuration, warningCollector);
//Put S3 configurations to AsterixDB's Hadoop configuration
putS3ConfToHadoopConf(configuration, path);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 429706e..7af4519 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -116,6 +116,9 @@
public static final String FORMAT_RECORD_WITH_METADATA =
"record-with-metadata";
// a string representing the format of the record (for adapters which
produces records with additional information like pk or metadata)
public static final String KEY_RECORD_FORMAT = "record-format";
+ public static final String TABLE_FORMAT = "table-format";
+ public static final String TABLE_LOCATION = "table-location";
+ public static final int SUPPORTED_ICEBERG_FORMAT_VERSION = 1;
public static final String KEY_META_TYPE_NAME = "meta-type-name";
public static final String KEY_ADAPTER_NAME = "adapter-name";
public static final String READER_STREAM = "stream";
@@ -196,6 +199,7 @@
public static final String FORMAT_CSV = "csv";
public static final String FORMAT_TSV = "tsv";
public static final String FORMAT_PARQUET = "parquet";
+ public static final String FORMAT_APACHE_ICEBERG = "apache-iceberg";
public static final Set<String> ALL_FORMATS;
static {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 5bf5844..d009502 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,15 +18,10 @@
*/
package org.apache.asterix.external.util;
-import static
org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL;
-import static
org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_NOT_ALLOWED_AT_SAME_TIME;
-import static
org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_REQUIRED;
-import static
org.apache.asterix.external.util.ExternalDataConstants.KEY_ADAPTER_NAME_GCS;
import static
org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER;
import static
org.apache.asterix.external.util.ExternalDataConstants.KEY_ESCAPE;
import static
org.apache.asterix.external.util.ExternalDataConstants.KEY_EXCLUDE;
import static
org.apache.asterix.external.util.ExternalDataConstants.KEY_EXTERNAL_SCAN_BUFFER_SIZE;
-import static
org.apache.asterix.external.util.ExternalDataConstants.KEY_FORMAT;
import static
org.apache.asterix.external.util.ExternalDataConstants.KEY_INCLUDE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
import static
org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
@@ -69,6 +64,7 @@
import org.apache.asterix.external.library.JavaLibrary;
import org.apache.asterix.external.library.msgpack.MessagePackUtils;
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -79,6 +75,8 @@
import org.apache.asterix.runtime.evaluators.common.NumberUtils;
import org.apache.asterix.runtime.projection.DataProjectionInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -94,6 +92,11 @@
import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import org.apache.hyracks.util.StorageUtil;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
public class ExternalDataUtils {
private static final Map<ATypeTag, IValueParserFactory>
valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
@@ -422,7 +425,7 @@
* @param configuration
* external data configuration
*/
- public static void prepare(String adapterName, Map<String, String>
configuration) {
+ public static void prepare(String adapterName, Map<String, String>
configuration) throws AlgebricksException {
if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) {
configuration.put(ExternalDataConstants.KEY_READER, adapterName);
}
@@ -436,6 +439,76 @@
&&
configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
configuration.put(ExternalDataConstants.KEY_PARSER,
configuration.get(ExternalDataConstants.KEY_FORMAT));
}
+
+ if (configuration.containsKey(ExternalDataConstants.TABLE_FORMAT)) {
+ prepareTableFormat(configuration);
+ }
+ }
+
+ /**
+ * Prepares the configuration for data-lake table formats
+ *
+ * @param configuration
+ * external data configuration
+ */
+ public static void prepareTableFormat(Map<String, String> configuration)
throws AlgebricksException {
+ // Apache Iceberg table format
+ if
(configuration.get(ExternalDataConstants.TABLE_FORMAT).equals(ExternalDataConstants.FORMAT_APACHE_ICEBERG))
{
+ Configuration conf = new Configuration();
+
+ // If the table is in S3
+ if (configuration.get(ExternalDataConstants.KEY_READER)
+ .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
+
+ conf.set(S3Constants.HADOOP_ACCESS_KEY_ID,
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
+ conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY,
+
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+ } else if
(configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_HDFS))
{
+ conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI,
+ configuration.get(ExternalDataConstants.KEY_HDFS_URL));
+ }
+
+ HadoopTables tables = new HadoopTables(conf);
+
+ // Table location can be built using container and definition. See
appendFileURI in AWSS3Parquetreaderfactory.
+ Table icebergTable =
tables.load(configuration.get(ExternalDataConstants.TABLE_LOCATION));
+
+ if (icebergTable instanceof BaseTable) {
+ BaseTable baseTable = (BaseTable) icebergTable;
+
+ if (baseTable.operations().current()
+ .formatVersion() !=
ExternalDataConstants.SUPPORTED_ICEBERG_FORMAT_VERSION) {
+ throw new
AsterixException(ErrorCode.UNSUPPORTED_ICEBERG_FORMAT_VERSION,
+ "AsterixDB only supports Iceberg version up to "
+ +
ExternalDataConstants.SUPPORTED_ICEBERG_FORMAT_VERSION);
+ }
+
+ try (CloseableIterable<FileScanTask> fileScanTasks =
baseTable.newScan().planFiles()) {
+
+ StringBuilder builder = new StringBuilder();
+
+ for (FileScanTask task : fileScanTasks) {
+ builder.append(",");
+ String path = task.file().path().toString();
+ builder.append(path);
+ }
+
+ if (builder.length() > 0) {
+ builder.deleteCharAt(0);
+ }
+
+ configuration.put(ExternalDataConstants.KEY_PATH,
builder.toString());
+
+ } catch (IOException e) {
+ throw new
AsterixException(ErrorCode.ERROR_READING_ICEBERG_METADATA, e);
+ }
+
+ } else {
+ throw new AsterixException(ErrorCode.UNSUPPORTED_ICEBERG_TABLE,
+ "Invalid iceberg base table. Please remove metadata
specifiers");
+ }
+
+ }
}
/**
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 561bc03..34e1ed5 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -1029,6 +1029,10 @@
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>org.codehaus.woodstox</groupId>
<artifactId>stax-api</artifactId>
</exclusion>
@@ -2029,6 +2033,11 @@
<artifactId>jetty-util-ajax</artifactId>
<version>9.4.48.v20220622</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.11.1</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
index b16904d..3e57420 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/pom.xml
@@ -104,6 +104,10 @@
<artifactId>nimbus-jose-jwt</artifactId>
</exclusion>
<exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
@@ -122,6 +126,10 @@
<artifactId>hadoop-mapreduce-client-core</artifactId>
<exclusions>
<exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>com.sun.jersey.jersey-test-framework</groupId>
<artifactId>jersey-test-framework-grizzly2</artifactId>
</exclusion>
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17419
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I12df589a6dffdc5af4a5cace68a11729995ea9af
Gerrit-Change-Number: 17419
Gerrit-PatchSet: 1
Gerrit-Owner: hari <[email protected]>
Gerrit-MessageType: newchange