>From Ayush Tripathi <[email protected]>:
Ayush Tripathi has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19138 )
Change subject: wip
......................................................................
wip
Change-Id: I8edb4c9abbf1679ad36fe1f7bc6ef3ac9c11a9e6
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
6 files changed, 247 insertions(+), 141 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/38/19138/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index 9909cc3..45dd546 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -18,155 +18,42 @@
*/
package org.apache.asterix.external.input.record.reader.aws.delta;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
-
-import java.util.ArrayList;
+import static
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
-import org.apache.asterix.external.api.IExternalDataRuntimeContext;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
-import io.delta.kernel.Scan;
-import io.delta.kernel.Snapshot;
-import io.delta.kernel.data.FilteredColumnarBatch;
-import io.delta.kernel.data.Row;
-import io.delta.kernel.defaults.engine.DefaultEngine;
-import io.delta.kernel.engine.Engine;
-import io.delta.kernel.utils.CloseableIterator;
-public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
-
- private static final long serialVersionUID = 1L;
- private static final List<String> recordReaderNames =
-
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
- private static final Logger LOGGER = LogManager.getLogger();
- private transient AlgebricksAbsolutePartitionConstraint
locationConstraints;
- private Map<Integer, List<String>> schedule;
- private String scanState;
- private Map<String, String> configuration;
- private List<String> scanFiles;
-
- @Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
- return locationConstraints;
- }
+public class AwsS3DeltaReaderFactory extends DeltaReaderFactory{
+ private static List<String>
recordReaderNames=Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
throws AlgebricksException, HyracksDataException {
- this.configuration = configuration;
- Configuration conf = new Configuration();
- conf.set(S3Constants.HADOOP_ACCESS_KEY_ID,
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
- conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY,
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
- if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
- conf.set(S3Constants.HADOOP_SESSION_TOKEN,
configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
- }
- conf.set(S3Constants.HADOOP_REGION,
configuration.get(S3Constants.REGION_FIELD_NAME));
- String serviceEndpoint =
configuration.get(SERVICE_END_POINT_FIELD_NAME);
- if (serviceEndpoint != null) {
- conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
- }
+ JobConf conf = new JobConf();
+ configureAwsS3HdfsJobConf(conf,configuration,500);
+ ConfFactory config= new ConfFactory(conf);
+ setConfFactory(config);
String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
+
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
-
- ICcApplicationContext appCtx = (ICcApplicationContext)
serviceCtx.getApplicationContext();
-
- Engine engine = DefaultEngine.create(conf);
- io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine,
tableMetadataPath);
- Snapshot snapshot = table.getLatestSnapshot(engine);
- Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine,
snapshot.getSchema(engine)).build();
- scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
- CloseableIterator<FilteredColumnarBatch> iter =
scan.getScanFiles(engine);
-
- scanFiles = new ArrayList<>();
- while (iter.hasNext()) {
- FilteredColumnarBatch batch = iter.next();
- CloseableIterator<Row> rowIter = batch.getRows();
- while (rowIter.hasNext()) {
- Row row = rowIter.next();
- scanFiles.add(RowSerDe.serializeRowToJson(row));
- }
- }
- locationConstraints = configureLocationConstraints(appCtx);
- configuration.put(ExternalDataConstants.KEY_PARSER,
ExternalDataConstants.FORMAT_DELTA);
- distributeFiles();
+ setTableMetadataPath(tableMetadataPath);
+
super.configure(serviceCtx,configuration,warningCollector,filterEvaluatorFactory);
}
-
- private AlgebricksAbsolutePartitionConstraint
configureLocationConstraints(ICcApplicationContext appCtx) {
- IClusterStateManager csm = appCtx.getClusterStateManager();
-
- String[] locations = csm.getClusterLocations().getLocations();
- if (scanFiles.size() == 0) {
- return
AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
- } else if (locations.length > scanFiles.size()) {
- LOGGER.debug(
- "analytics partitions ({}) exceeds total partition count
({}); limiting ingestion partitions to total partition count",
- locations.length, scanFiles.size());
- final String[] locationCopy = locations.clone();
- ArrayUtils.shuffle(locationCopy);
- locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
- }
- return new AlgebricksAbsolutePartitionConstraint(locations);
- }
-
- private void distributeFiles() {
- final int numComputePartitions =
getPartitionConstraint().getLocations().length;
- schedule = new HashMap<>();
- for (int i = 0; i < numComputePartitions; i++) {
- schedule.put(i, new ArrayList<>());
- }
- int i = 0;
- for (String scanFile : scanFiles) {
- schedule.get(i).add(scanFile);
- i = (i + 1) % numComputePartitions;
- }
- }
-
- @Override
- public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext
context) throws HyracksDataException {
- try {
- int partition = context.getPartition();
- return new DeltaFileRecordReader(schedule.get(partition),
scanState, configuration, context);
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
- }
-
- @Override
- public Class<?> getRecordClass() throws AsterixException {
- return Row.class;
- }
-
@Override
public List<String> getRecordReaderNames() {
return recordReaderNames;
}
- @Override
- public Set<String> getReaderSupportedFormats() {
- return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
- }
-
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index 558f8a9..d61caa4 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -35,6 +35,7 @@
import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.asterix.external.util.aws.s3.S3Constants;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.VoidPointable;
@@ -49,6 +50,7 @@
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
/**
* Delta record reader.
@@ -69,20 +71,10 @@
private Row scanFile;
private CloseableIterator<Row> rows;
- public DeltaFileRecordReader(List<String> serScanFiles, String
serScanState, Map<String, String> conf,
- IExternalDataRuntimeContext context) {
- Configuration config = new Configuration();
- config.set(S3Constants.HADOOP_ACCESS_KEY_ID,
conf.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
- config.set(S3Constants.HADOOP_SECRET_ACCESS_KEY,
conf.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
- if (conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
- config.set(S3Constants.HADOOP_SESSION_TOKEN,
conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
- }
- config.set(S3Constants.HADOOP_REGION,
conf.get(S3Constants.REGION_FIELD_NAME));
- String serviceEndpoint = conf.get(SERVICE_END_POINT_FIELD_NAME);
- if (serviceEndpoint != null) {
- config.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
- }
- this.engine = DefaultEngine.create(config);
+ public DeltaFileRecordReader(List<String> serScanFiles, String
serScanState, ConfFactory config,
+ IExternalDataRuntimeContext context) throws HyracksDataException {
+ JobConf conf = config.getConf();
+ this.engine = DefaultEngine.create(conf);
this.scanFiles = new ArrayList<>();
for (String scanFile : serScanFiles) {
this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile));
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
new file mode 100644
index 0000000..46df010
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
+import static
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
+import org.apache.kerby.config.Conf;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.delta.kernel.Scan;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.utils.CloseableIterator;
+
+public class DeltaReaderFactory implements IRecordReaderFactory<Object> {
+ private static final long serialVersionUID = 1L;
+ private static List<String> recordReaderNames=null;
+ private static final Logger LOGGER = LogManager.getLogger();
+ protected transient AlgebricksAbsolutePartitionConstraint
locationConstraints;
+ Map<Integer, List<String>> schedule;
+ private String scanState;
+ private String tableMetadataPath;
+ private Map<String, String> configuration;
+ private List<String> scanFiles;
+ protected ConfFactory confFactory;
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ return locationConstraints;
+ }
+
+ public void setRecordReaderNames(List<String> readerNames) {
+ recordReaderNames = readerNames;
+ }
+ public void setTableMetadataPath(String tableMetadataPath) {
+ this.tableMetadataPath = tableMetadataPath;
+ }
+ public void setConfFactory(ConfFactory config) {
+ this.confFactory = config;
+ }
+
+ public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AlgebricksException, HyracksDataException {
+ this.configuration = configuration;
+
+ ICcApplicationContext appCtx = (ICcApplicationContext)
serviceCtx.getApplicationContext();
+ Configuration conf = confFactory.getConf();
+ Engine engine = DefaultEngine.create(conf);
+ io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine,
tableMetadataPath);
+ Snapshot snapshot = table.getLatestSnapshot(engine);
+ Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine,
snapshot.getSchema(engine)).build();
+ scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
+ CloseableIterator<FilteredColumnarBatch> iter =
scan.getScanFiles(engine);
+
+ scanFiles = new ArrayList<>();
+ while (iter.hasNext()) {
+ FilteredColumnarBatch batch = iter.next();
+ CloseableIterator<Row> rowIter = batch.getRows();
+ while (rowIter.hasNext()) {
+ Row row = rowIter.next();
+ scanFiles.add(RowSerDe.serializeRowToJson(row));
+ }
+ }
+ locationConstraints = configureLocationConstraints(appCtx);
+ configuration.put(ExternalDataConstants.KEY_PARSER,
ExternalDataConstants.FORMAT_DELTA);
+ distributeFiles();
+ }
+
+ protected AlgebricksAbsolutePartitionConstraint
configureLocationConstraints(ICcApplicationContext appCtx) {
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+
+ String[] locations = csm.getClusterLocations().getLocations();
+ if (scanFiles.size() == 0) {
+ return
AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
+ } else if (locations.length > scanFiles.size()) {
+ LOGGER.debug(
+ "analytics partitions ({}) exceeds total partition count
({}); limiting ingestion partitions to total partition count",
+ locations.length, scanFiles.size());
+ final String[] locationCopy = locations.clone();
+ ArrayUtils.shuffle(locationCopy);
+ locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
+ }
+ return new AlgebricksAbsolutePartitionConstraint(locations);
+ }
+
+ private void distributeFiles() {
+ final int numComputePartitions =
getPartitionConstraint().getLocations().length;
+ schedule = new HashMap<>();
+ for (int i = 0; i < numComputePartitions; i++) {
+ schedule.put(i, new ArrayList<>());
+ }
+ int i = 0;
+ for (String scanFile : scanFiles) {
+ schedule.get(i).add(scanFile);
+ i = (i + 1) % numComputePartitions;
+ }
+ }
+
+ @Override
+ public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext
context) throws HyracksDataException {
+ try {
+ int partition = context.getPartition();
+ return new DeltaFileRecordReader(schedule.get(partition),
scanState, confFactory, context);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public Class<?> getRecordClass() throws AsterixException {
+ return Row.class;
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return recordReaderNames;
+ }
+
+ @Override
+ public Set<String> getReaderSupportedFormats() {
+ return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
+ }
+
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
new file mode 100644
index 0000000..d496b41
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
@@ -0,0 +1,43 @@
+package org.apache.asterix.external.input.record.reader.gcs.delta;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import
org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.external.util.google.gcs.GCSConstants;
+import org.apache.asterix.external.util.google.gcs.GCSUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+
+public class GCSDeltaReaderFactory extends DeltaReaderFactory {
+ private static final List<String> recordReaderNames =
+
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_GCS);
+ @Override
+ public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+ throws AlgebricksException, HyracksDataException {
+ JobConf conf = new JobConf();
+ GCSUtils.configureHdfsJobConf(conf, configuration, 500);
+ ConfFactory config= new ConfFactory(conf);
+ setConfFactory(config);
+ String tableMetadataPath = GCSConstants.HADOOP_GCS_PROTOCOL + "://"
+ +
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+ +
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+ setTableMetadataPath(tableMetadataPath);
+
super.configure(serviceCtx,configuration,warningCollector,filterEvaluatorFactory);
+ }
+ @Override
+ public List<String> getRecordReaderNames() {
+ return recordReaderNames;
+ }
+
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index 2ba1844..ee9e7c2 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -64,6 +64,7 @@
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -333,7 +334,7 @@
* @param numberOfPartitions number of partitions in the cluster
*/
public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String,
String> configuration,
- int numberOfPartitions) {
+ int numberOfPartitions) {
String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
String secretAccessKey =
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index 5274c44..a0e9a0e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -22,6 +22,8 @@
import static
org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
import static
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
import static
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
import static
org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
import static
org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
@@ -134,9 +136,11 @@
*/
public static void validateProperties(Map<String, String> configuration,
SourceLocation srcLoc,
IWarningCollector collector) throws CompilationException {
-
+ if (isDeltaTable(configuration)) {
+ validateDeltaTableProperties(configuration);
+ }
// check if the format property is present
- if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
srcLoc, ExternalDataConstants.KEY_FORMAT);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19138
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I8edb4c9abbf1679ad36fe1f7bc6ef3ac9c11a9e6
Gerrit-Change-Number: 19138
Gerrit-PatchSet: 1
Gerrit-Owner: Ayush Tripathi <[email protected]>
Gerrit-MessageType: newchange