>From Peeyush Gupta <[email protected]>: Peeyush Gupta has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20573?usp=email )
Change subject: WIP: Support Delta lake tables on Azure ...................................................................... WIP: Support Delta lake tables on Azure Change-Id: I528849556dc70d11cc5eb4ab55a7f7382c84061f --- A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/delta/AzureDeltaReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureConstants.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java M asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory 6 files changed, 87 insertions(+), 12 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/73/20573/1 diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/delta/AzureDeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/delta/AzureDeltaReaderFactory.java new file mode 100644 index 0000000..c546b13 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/delta/AzureDeltaReaderFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.azure.delta; + +import static org.apache.asterix.external.util.azure.blob.BlobUtils.buildClient; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.azure.AzureConstants; +import org.apache.asterix.external.util.azure.AzureUtils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; + +import com.azure.storage.blob.BlobServiceClient; + +public class AzureDeltaReaderFactory extends DeltaReaderFactory { + private static final long serialVersionUID = 1L; + private static final List<String> RECORD_READER_NAMES = + Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB); + + @Override + protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration) + throws AlgebricksException { + // get endpoint + BlobServiceClient blobServiceClient = buildClient(appCtx, configuration); + String endPoint = AzureUtils.extractEndPoint(blobServiceClient.getAccountUrl()); + configuration.put(AzureConstants.ACCOUNT_URL, blobServiceClient.getAccountUrl()); + AzureUtils.configureAzureHdfsJobConf(conf, configuration, endPoint); + } + + @Override + protected String getTablePath(Map<String, String> configuration) throws AlgebricksException { + return AzureUtils.getPath(configuration); + } + + @Override + public List<String> getRecordReaderNames() { + return RECORD_READER_NAMES; + } + +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java index d8f6a9e..428368b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java @@ -37,6 +37,7 @@ import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataPrefix; import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.asterix.external.util.azure.AzureUtils; import org.apache.hadoop.mapred.JobConf; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; @@ -61,7 +62,7 @@ // get endpoint BlobServiceClient blobServiceClient = buildClient(appCtx, configuration); - String endPoint = extractEndPoint(blobServiceClient.getAccountUrl()); + String endPoint = AzureUtils.extractEndPoint(blobServiceClient.getAccountUrl()); // get include/exclude matchers IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration); @@ -138,12 +139,6 @@ return builder.toString(); } - private static String extractEndPoint(String uri) { - //The URI is in the form http(s)://<accountName>.blob.core.windows.net - //We need to Remove the protocol (i.e., http(s)://) from the URI - return uri.substring(uri.indexOf("//") + "//".length()); - } - private static void appendFileURI(StringBuilder builder, String container, String endPoint, BlobItem file) { builder.append(HADOOP_AZURE_PROTOCOL); builder.append("://"); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureConstants.java index 9ae8359..7c74828 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureConstants.java @@ -51,11 +51,12 @@ public static final String CLIENT_ID_FIELD_NAME = "clientId"; public static final String CLIENT_SECRET_FIELD_NAME = "clientSecret"; public static final String ENDPOINT_FIELD_NAME = "endpoint"; + public static final String ACCOUNT_URL = "accountURL"; /* * Hadoop-Azure */ - public static final String HADOOP_AZURE_PROTOCOL = "abfss"; + public static final String HADOOP_AZURE_PROTOCOL = "wasbs"; /* * Hadoop-Azure diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureUtils.java index b3aeba1..089be49 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/AzureUtils.java @@ -44,6 +44,7 @@ import java.util.Map; +import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.HDFSUtils; import org.apache.hadoop.mapred.JobConf; @@ -113,4 +114,16 @@ conf.set(HADOOP_CLIENT_ID, clientId); } } + + public static String extractEndPoint(String uri) { + //The URI is in the form http(s)://<accountName>.blob.core.windows.net + //We need to Remove the protocol (i.e., http(s)://) from the URI + return uri.substring(uri.indexOf("//") + "//".length()); + } + + public static String getPath(Map<String, String> configuration) { + return HADOOP_AZURE_PROTOCOL + "://" + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '@' + + extractEndPoint(configuration.get(AzureConstants.ACCOUNT_URL)) + '/' + + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java index 66e7f98..d810afd 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob/BlobUtils.java @@ -24,6 +24,8 @@ import static org.apache.asterix.external.util.ExternalDataUtils.getDisableSslVerify; import static org.apache.asterix.external.util.ExternalDataUtils.getFirstNotNull; import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix; +import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable; +import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties; import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude; import static org.apache.asterix.external.util.azure.AzureConstants.ACCOUNT_KEY_FIELD_NAME; import static org.apache.asterix.external.util.azure.AzureConstants.ACCOUNT_NAME_FIELD_NAME; @@ -305,9 +307,10 @@ */ public static void validateAzureBlobProperties(Map<String, String> configuration, SourceLocation srcLoc, IWarningCollector collector, IApplicationContext appCtx) throws CompilationException { - - // check if the format property is present - if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { + if (isDeltaTable(configuration)) { + validateDeltaTableProperties(configuration); + } else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { + // check if the format property is present throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); } diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory index 1f25c4b..0eef480 100644 --- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory +++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory @@ -29,4 +29,5 @@ org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory org.apache.asterix.external.input.record.reader.azure.parquet.AzureDataLakeParquetReaderFactory org.apache.asterix.external.input.record.reader.gcs.parquet.GCSParquetReaderFactory -org.apache.asterix.external.input.record.reader.gcs.delta.GCSDeltaReaderFactory \ No newline at end of file +org.apache.asterix.external.input.record.reader.gcs.delta.GCSDeltaReaderFactory +org.apache.asterix.external.input.record.reader.azure.delta.AzureDeltaReaderFactory \ No newline at end of file -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20573?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: newchange Gerrit-Project: asterixdb Gerrit-Branch: phoenix Gerrit-Change-Id: I528849556dc70d11cc5eb4ab55a7f7382c84061f Gerrit-Change-Number: 20573 Gerrit-PatchSet: 1 Gerrit-Owner: Peeyush Gupta <[email protected]>
