>From Ayush Tripathi <[email protected]>:

Ayush Tripathi has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19138 )


Change subject: wip
......................................................................

wip

Change-Id: I8edb4c9abbf1679ad36fe1f7bc6ef3ac9c11a9e6
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
6 files changed, 247 insertions(+), 141 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/38/19138/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index 9909cc3..45dd546 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -18,155 +18,42 @@
  */
 package org.apache.asterix.external.input.record.reader.aws.delta;

-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
-
-import java.util.ArrayList;
+import static 
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;

-import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
-import org.apache.asterix.external.api.IExternalDataRuntimeContext;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;

-import io.delta.kernel.Scan;
-import io.delta.kernel.Snapshot;
-import io.delta.kernel.data.FilteredColumnarBatch;
-import io.delta.kernel.data.Row;
-import io.delta.kernel.defaults.engine.DefaultEngine;
-import io.delta.kernel.engine.Engine;
-import io.delta.kernel.utils.CloseableIterator;

-public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
-
-    private static final long serialVersionUID = 1L;
-    private static final List<String> recordReaderNames =
-            
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
-    private static final Logger LOGGER = LogManager.getLogger();
-    private transient AlgebricksAbsolutePartitionConstraint 
locationConstraints;
-    private Map<Integer, List<String>> schedule;
-    private String scanState;
-    private Map<String, String> configuration;
-    private List<String> scanFiles;
-
-    @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        return locationConstraints;
-    }
+public class AwsS3DeltaReaderFactory extends DeltaReaderFactory{
+    private static List<String> 
recordReaderNames=Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);

     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration,
             IWarningCollector warningCollector, 
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
             throws AlgebricksException, HyracksDataException {
-        this.configuration = configuration;
-        Configuration conf = new Configuration();
-        conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
-        conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, 
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
-        if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
-            conf.set(S3Constants.HADOOP_SESSION_TOKEN, 
configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
-        }
-        conf.set(S3Constants.HADOOP_REGION, 
configuration.get(S3Constants.REGION_FIELD_NAME));
-        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
-        if (serviceEndpoint != null) {
-            conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        }
+        JobConf conf = new JobConf();
+        configureAwsS3HdfsJobConf(conf,configuration,500);
+        ConfFactory config= new ConfFactory(conf);
+        setConfFactory(config);
         String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
                 + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
                 + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
-
-        ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
-
-        Engine engine = DefaultEngine.create(conf);
-        io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, 
tableMetadataPath);
-        Snapshot snapshot = table.getLatestSnapshot(engine);
-        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
snapshot.getSchema(engine)).build();
-        scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
-        CloseableIterator<FilteredColumnarBatch> iter = 
scan.getScanFiles(engine);
-
-        scanFiles = new ArrayList<>();
-        while (iter.hasNext()) {
-            FilteredColumnarBatch batch = iter.next();
-            CloseableIterator<Row> rowIter = batch.getRows();
-            while (rowIter.hasNext()) {
-                Row row = rowIter.next();
-                scanFiles.add(RowSerDe.serializeRowToJson(row));
-            }
-        }
-        locationConstraints = configureLocationConstraints(appCtx);
-        configuration.put(ExternalDataConstants.KEY_PARSER, 
ExternalDataConstants.FORMAT_DELTA);
-        distributeFiles();
+        setTableMetadataPath(tableMetadataPath);
+        
super.configure(serviceCtx,configuration,warningCollector,filterEvaluatorFactory);
     }
-
-    private AlgebricksAbsolutePartitionConstraint 
configureLocationConstraints(ICcApplicationContext appCtx) {
-        IClusterStateManager csm = appCtx.getClusterStateManager();
-
-        String[] locations = csm.getClusterLocations().getLocations();
-        if (scanFiles.size() == 0) {
-            return 
AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
-        } else if (locations.length > scanFiles.size()) {
-            LOGGER.debug(
-                    "analytics partitions ({}) exceeds total partition count 
({}); limiting ingestion partitions to total partition count",
-                    locations.length, scanFiles.size());
-            final String[] locationCopy = locations.clone();
-            ArrayUtils.shuffle(locationCopy);
-            locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
-        }
-        return new AlgebricksAbsolutePartitionConstraint(locations);
-    }
-
-    private void distributeFiles() {
-        final int numComputePartitions = 
getPartitionConstraint().getLocations().length;
-        schedule = new HashMap<>();
-        for (int i = 0; i < numComputePartitions; i++) {
-            schedule.put(i, new ArrayList<>());
-        }
-        int i = 0;
-        for (String scanFile : scanFiles) {
-            schedule.get(i).add(scanFile);
-            i = (i + 1) % numComputePartitions;
-        }
-    }
-
-    @Override
-    public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext 
context) throws HyracksDataException {
-        try {
-            int partition = context.getPartition();
-            return new DeltaFileRecordReader(schedule.get(partition), 
scanState, configuration, context);
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    @Override
-    public Class<?> getRecordClass() throws AsterixException {
-        return Row.class;
-    }
-
     @Override
     public List<String> getRecordReaderNames() {
         return recordReaderNames;
     }

-    @Override
-    public Set<String> getReaderSupportedFormats() {
-        return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
-    }
-
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index 558f8a9..d61caa4 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -35,6 +35,7 @@
 import org.apache.asterix.external.util.IFeedLogManager;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.VoidPointable;

@@ -49,6 +50,7 @@
 import io.delta.kernel.types.StructType;
 import io.delta.kernel.utils.CloseableIterator;
 import io.delta.kernel.utils.FileStatus;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;

 /**
  * Delta record reader.
@@ -69,20 +71,10 @@
     private Row scanFile;
     private CloseableIterator<Row> rows;

-    public DeltaFileRecordReader(List<String> serScanFiles, String 
serScanState, Map<String, String> conf,
-            IExternalDataRuntimeContext context) {
-        Configuration config = new Configuration();
-        config.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
conf.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
-        config.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, 
conf.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
-        if (conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
-            config.set(S3Constants.HADOOP_SESSION_TOKEN, 
conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
-        }
-        config.set(S3Constants.HADOOP_REGION, 
conf.get(S3Constants.REGION_FIELD_NAME));
-        String serviceEndpoint = conf.get(SERVICE_END_POINT_FIELD_NAME);
-        if (serviceEndpoint != null) {
-            config.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        }
-        this.engine = DefaultEngine.create(config);
+    public DeltaFileRecordReader(List<String> serScanFiles, String 
serScanState, ConfFactory config,
+            IExternalDataRuntimeContext context) throws HyracksDataException {
+        JobConf conf = config.getConf();
+        this.engine = DefaultEngine.create(conf);
         this.scanFiles = new ArrayList<>();
         for (String scanFile : serScanFiles) {
             this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile));
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
new file mode 100644
index 0000000..46df010
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static 
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
+import org.apache.kerby.config.Conf;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.delta.kernel.Scan;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.utils.CloseableIterator;
+
+public class DeltaReaderFactory implements IRecordReaderFactory<Object> {
+    private static final long serialVersionUID = 1L;
+    private static List<String> recordReaderNames=null;
+    private static final Logger LOGGER = LogManager.getLogger();
+    protected transient AlgebricksAbsolutePartitionConstraint 
locationConstraints;
+    Map<Integer, List<String>> schedule;
+    private String scanState;
+    private String tableMetadataPath;
+    private Map<String, String> configuration;
+    private List<String> scanFiles;
+    protected ConfFactory confFactory;
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        return locationConstraints;
+    }
+
+    public void setRecordReaderNames(List<String> readerNames) {
+        recordReaderNames = readerNames;
+    }
+    public void setTableMetadataPath(String tableMetadataPath) {
+        this.tableMetadataPath = tableMetadataPath;
+    }
+    public void setConfFactory(ConfFactory config) {
+        this.confFactory = config;
+    }
+
+    public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration,
+                          IWarningCollector warningCollector, 
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+            throws AlgebricksException, HyracksDataException {
+        this.configuration = configuration;
+
+        ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
+        Configuration conf = confFactory.getConf();
+        Engine engine = DefaultEngine.create(conf);
+        io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, 
tableMetadataPath);
+        Snapshot snapshot = table.getLatestSnapshot(engine);
+        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
snapshot.getSchema(engine)).build();
+        scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
+        CloseableIterator<FilteredColumnarBatch> iter = 
scan.getScanFiles(engine);
+
+        scanFiles = new ArrayList<>();
+        while (iter.hasNext()) {
+            FilteredColumnarBatch batch = iter.next();
+            CloseableIterator<Row> rowIter = batch.getRows();
+            while (rowIter.hasNext()) {
+                Row row = rowIter.next();
+                scanFiles.add(RowSerDe.serializeRowToJson(row));
+            }
+        }
+        locationConstraints = configureLocationConstraints(appCtx);
+        configuration.put(ExternalDataConstants.KEY_PARSER, 
ExternalDataConstants.FORMAT_DELTA);
+        distributeFiles();
+    }
+
+    protected AlgebricksAbsolutePartitionConstraint 
configureLocationConstraints(ICcApplicationContext appCtx) {
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+
+        String[] locations = csm.getClusterLocations().getLocations();
+        if (scanFiles.size() == 0) {
+            return 
AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
+        } else if (locations.length > scanFiles.size()) {
+            LOGGER.debug(
+                    "analytics partitions ({}) exceeds total partition count 
({}); limiting ingestion partitions to total partition count",
+                    locations.length, scanFiles.size());
+            final String[] locationCopy = locations.clone();
+            ArrayUtils.shuffle(locationCopy);
+            locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
+        }
+        return new AlgebricksAbsolutePartitionConstraint(locations);
+    }
+
+    private void distributeFiles() {
+        final int numComputePartitions = 
getPartitionConstraint().getLocations().length;
+        schedule = new HashMap<>();
+        for (int i = 0; i < numComputePartitions; i++) {
+            schedule.put(i, new ArrayList<>());
+        }
+        int i = 0;
+        for (String scanFile : scanFiles) {
+            schedule.get(i).add(scanFile);
+            i = (i + 1) % numComputePartitions;
+        }
+    }
+
+    @Override
+    public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext 
context) throws HyracksDataException {
+        try {
+            int partition = context.getPartition();
+            return new DeltaFileRecordReader(schedule.get(partition), 
scanState, confFactory, context);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public Class<?> getRecordClass() throws AsterixException {
+        return Row.class;
+    }
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
+
+    @Override
+    public Set<String> getReaderSupportedFormats() {
+        return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
+    }
+
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
new file mode 100644
index 0000000..d496b41
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
@@ -0,0 +1,43 @@
+package org.apache.asterix.external.input.record.reader.gcs.delta;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import 
org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.external.util.google.gcs.GCSConstants;
+import org.apache.asterix.external.util.google.gcs.GCSUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+
+public class GCSDeltaReaderFactory extends DeltaReaderFactory {
+    private static final List<String> recordReaderNames =
+            
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_GCS);
+    @Override
+    public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration,
+                          IWarningCollector warningCollector, 
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+            throws AlgebricksException, HyracksDataException {
+        JobConf conf = new JobConf();
+        GCSUtils.configureHdfsJobConf(conf, configuration, 500);
+        ConfFactory config= new ConfFactory(conf);
+        setConfFactory(config);
+        String tableMetadataPath = GCSConstants.HADOOP_GCS_PROTOCOL + "://"
+                + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+                + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+        setTableMetadataPath(tableMetadataPath);
+        
super.configure(serviceCtx,configuration,warningCollector,filterEvaluatorFactory);
+    }
+    @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
+
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index 2ba1844..ee9e7c2 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -64,6 +64,7 @@
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -333,7 +334,7 @@
      * @param numberOfPartitions number of partitions in the cluster
      */
     public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, 
String> configuration,
-            int numberOfPartitions) {
+                                                 int numberOfPartitions) {
         String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
         String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
         String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index 5274c44..a0e9a0e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -22,6 +22,8 @@
 import static 
org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
 import static 
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
 import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
@@ -134,9 +136,11 @@
      */
     public static void validateProperties(Map<String, String> configuration, 
SourceLocation srcLoc,
             IWarningCollector collector) throws CompilationException {
-
+        if (isDeltaTable(configuration)) {
+            validateDeltaTableProperties(configuration);
+        }
         // check if the format property is present
-        if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+        else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
             throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, 
srcLoc, ExternalDataConstants.KEY_FORMAT);
         }


--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19138
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I8edb4c9abbf1679ad36fe1f7bc6ef3ac9c11a9e6
Gerrit-Change-Number: 19138
Gerrit-PatchSet: 1
Gerrit-Owner: Ayush Tripathi <[email protected]>
Gerrit-MessageType: newchange

Reply via email to