>From Savyasach Reddy <[email protected]>:

Savyasach Reddy has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19066 )


Change subject: [ASTERIXDB-3519][EXT]: Support dynamic prefixes on HDFS
......................................................................

[ASTERIXDB-3519][EXT]: Support dynamic prefixes on HDFS

- user model changes: add dynamic prefixes for HDFS datasets
- storage format changes: no
- interface changes: no

details:
- Support dynamic prefixes on HDFS
- Support include/exclude parameters
- Parameter 'path' is used to specify prefixes

Change-Id: I1bdbcd44c059f64f2da436a40ac3f59293442cf2
---
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.01.ddl.sqlpp
M 
asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/parquet/parquet.1.ddl.sqlpp
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHdfsExecutionTest.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/issue_245_hdfs/issue_245_hdfs.1.ddl.sqlpp
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_02/hdfs_02.1.ddl.sqlpp
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_03/hdfs_03.1.ddl.sqlpp
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.02.update.sqlpp
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.ddl.sqlpp
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.05.ddl.sqlpp
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hints/issue_251_dataset_hint_6/issue_251_dataset_hint_6.1.ddl.sqlpp
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id_2/decorrelate_with_unique_id_2.1.ddl.sqlpp
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.03.update.sqlpp
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.04.update.sqlpp
21 files changed, 231 insertions(+), 62 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/66/19066/1

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
index 0a96943..2be6a7d 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
@@ -34,6 +34,9 @@
     public static final String S3_TEMPLATE_DEFAULT = "(\"accessKeyId\"=\"" + 
S3_ACCESS_KEY_ID_DEFAULT + "\"),\n"
             + "(\"secretAccessKey\"=\"" + S3_SECRET_ACCESS_KEY_DEFAULT + 
"\"),\n" + "(\"region\"=\"" + S3_REGION_DEFAULT
             + "\"),\n" + "(\"serviceEndpoint\"=\"" + 
S3_SERVICE_ENDPOINT_DEFAULT + "\")";
+    public static final String S3_DDL_TEMPLATE_DEFAULT = "\"accessKeyId\":\"" 
+ S3_ACCESS_KEY_ID_DEFAULT + "\",\n"
+            + "\"secretAccessKey\":\"" + S3_SECRET_ACCESS_KEY_DEFAULT + 
"\",\n" + "\"region\":\"" + S3_REGION_DEFAULT
+            + "\",\n" + "\"serviceEndpoint\":\"" + S3_SERVICE_ENDPOINT_DEFAULT 
+ "\"";

     // Azure blob storage constants and placeholders
     public static class Azure {
@@ -89,5 +92,9 @@
         public static final String KERBEROS_PASSWORD_DEFAULT = "hdfspassword";
         public static final String KERBEROS_REALM_DEFAULT = "EXAMPLE.COM";
         public static final String KERBEROS_KDC_DEFAULT = "localhost:8800";
+        public static final String HDFS_ENDPOINT_DEFAULT = 
"hdfs://localhost:31888";
+
+        public static final String HDFS_TEMPLATE_DEFAULT = "(\"hdfs\"=\"" + 
HDFS_ENDPOINT_DEFAULT + "\")";
+        public static final String HDFS_DDL_TEMPLATE_DEFAULT = "\"hdfs\":\"" + 
HDFS_ENDPOINT_DEFAULT + "\"";
     }
 }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 3de0cd7..2290dc8 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -1274,7 +1274,11 @@
                 if (isDmlRecoveryTest && statement.contains("nc1://")) {
                     statement = statement.replaceAll("nc1://", 
"127.0.0.1://../../../../../../asterix-app/");
                 }
-                executeSqlppUpdateOrDdl(statement, 
OutputFormat.forCompilationUnit(cUnit));
+                if (cUnit.getPlaceholder().isEmpty()) {
+                    executeSqlppUpdateOrDdl(statement, 
OutputFormat.forCompilationUnit(cUnit));
+                } else {
+                    executeSqlppUpdateOrDdl(statement, 
OutputFormat.forCompilationUnit(cUnit), cUnit);
+                }
                 break;
             case "pollget":
             case "pollquery":
@@ -2453,7 +2457,7 @@
     }

     protected boolean noTemplateRequired(String str) {
-        return !str.contains("%template%");
+        return !str.contains("%template%") && !str.contains("%ddltemplate%");
     }

     protected String applyS3Substitution(String str, List<Placeholder> 
placeholders) {
@@ -2504,7 +2508,11 @@
     }

     protected String setS3TemplateDefault(String str) {
-        return str.replace("%template%", TestConstants.S3_TEMPLATE_DEFAULT);
+        if (str.contains("%template%")) {
+            return str.replace("%template%", 
TestConstants.S3_TEMPLATE_DEFAULT);
+        } else {
+            return str.replace("%ddltemplate%", 
TestConstants.S3_DDL_TEMPLATE_DEFAULT);
+        }
     }

     protected String applyAzureSubstitution(String str, List<Placeholder> 
placeholders) {
@@ -2554,7 +2562,11 @@
     }

     protected String setHDFSTemplateDefault(String str) {
-        return str;
+        if (str.contains("%template%")) {
+            return str.replace("%template%", 
TestConstants.HDFS.HDFS_TEMPLATE_DEFAULT);
+        } else {
+            return str.replace("%ddltemplate%", 
TestConstants.HDFS.HDFS_DDL_TEMPLATE_DEFAULT);
+        }
     }

     protected void fail(boolean runDiagnostics, TestCaseContext testCaseCtx, 
CompilationUnit cUnit,
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHdfsExecutionTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHdfsExecutionTest.java
index fe3006e..412fbc1 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHdfsExecutionTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHdfsExecutionTest.java
@@ -18,17 +18,36 @@
  */
 package org.apache.asterix.test.runtime;

+import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createAvroFiles;
+import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createAvroFilesRecursively;
+import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createBinaryFiles;
+import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createBinaryFilesRecursively;
+import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createDeltaTable;
+import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.setDataPaths;
+import static 
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.setUploaders;
+import static 
org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil.DEFAULT_PARQUET_SRC_PATH;
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
 import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
 import static org.apache.iceberg.types.Types.NestedField.required;

+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.List;
+import java.util.zip.GZIPOutputStream;

 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.test.common.TestConstants;
 import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils;
 import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
@@ -60,34 +79,42 @@
  */
 @RunWith(Parameterized.class)
 public class SqlppHdfsExecutionTest {
+    private static final String PATH_BASE = joinPath("data");
+    private static final String EXTERNAL_FILTER_DATA_PATH = 
joinPath(PATH_BASE, "json", "external-filter");
+
     protected static final String TEST_CONFIG_FILE_NAME = 
"src/main/resources/cc.conf";

-    private static DataFile writeFile(String filename, List<Record> records, 
String location, Schema schema,
-            Configuration conf) throws IOException {
+    static Runnable PREPARE_BUCKET;
+
+    private static final String JSON_DATA_PATH = joinPath("data", "json");
+    private static final String CSV_DATA_PATH = joinPath("data", "csv");
+    private static final String TSV_DATA_PATH = joinPath("data", "tsv");
+
+    private static final Configuration CONF = new Configuration();
+
+    private static DataFile writeFile(String filename, List<Record> records, 
String location, Schema schema)
+            throws IOException {
         Path path = new Path(location, filename);
         FileFormat fileFormat = FileFormat.fromFileName(filename);
         Preconditions.checkNotNull(fileFormat, "Cannot determine format for 
file: %s", filename);

         FileAppender<Record> fileAppender =
-                new GenericAppenderFactory(schema).newAppender(fromPath(path, 
conf), fileFormat);
+                new GenericAppenderFactory(schema).newAppender(fromPath(path, 
CONF), fileFormat);
         try (FileAppender<Record> appender = fileAppender) {
             appender.addAll(records);
         }

-        return 
DataFiles.builder(PartitionSpec.unpartitioned()).withInputFile(HadoopInputFile.fromPath(path,
 conf))
+        return 
DataFiles.builder(PartitionSpec.unpartitioned()).withInputFile(HadoopInputFile.fromPath(path,
 CONF))
                 .withMetrics(fileAppender.metrics()).build();
     }

     private static void setUpIcebergData() {
-        Configuration conf = new Configuration();
-        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, 
"hdfs://127.0.0.1:31888/");
-
-        Tables tables = new HadoopTables(conf);
+        Tables tables = new HadoopTables(CONF);

         Schema schema =
                 new Schema(required(1, "id", Types.IntegerType.get()), 
required(2, "data", Types.StringType.get()));

-        String path = "hdfs://localhost:31888/my_table/";
+        String path = TestConstants.HDFS.HDFS_ENDPOINT_DEFAULT + "/my_table/";

         Table table = tables.create(schema, PartitionSpec.unpartitioned(),
                 ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
FileFormat.PARQUET.name()), path);
@@ -100,8 +127,7 @@

         // load test data
         try {
-            DataFile file =
-                    writeFile(FileFormat.PARQUET.addExtension("file"), 
fileFirstSnapshotRecords, path, schema, conf);
+            DataFile file = writeFile(FileFormat.PARQUET.addExtension("file"), 
fileFirstSnapshotRecords, path, schema);
             table.newAppend().appendFile(file).commit();
         } catch (IOException e) {
             throw new RuntimeException(e);
@@ -110,8 +136,52 @@

     @BeforeClass
     public static void setUp() throws Exception {
+        CONF.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, 
TestConstants.HDFS.HDFS_ENDPOINT_DEFAULT);
         LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor(), 
true);
         setUpIcebergData();
+        createBinaryFiles(DEFAULT_PARQUET_SRC_PATH);
+        createBinaryFilesRecursively(EXTERNAL_FILTER_DATA_PATH);
+        createAvroFiles(DEFAULT_PARQUET_SRC_PATH);
+        createAvroFilesRecursively(EXTERNAL_FILTER_DATA_PATH);
+        createDeltaTable();
+        setUpData();
+    }
+
+    private static void setUpData() {
+        setDataPaths(JSON_DATA_PATH, CSV_DATA_PATH, TSV_DATA_PATH);
+        setUploaders(SqlppHdfsExecutionTest::loadPlaygroundData, null, null, 
null, null);
+
+        PREPARE_BUCKET.run();
+    }
+
+    private static void loadPlaygroundData(String key, String content, boolean 
fromFile, boolean gzipped) {
+        loadData("/playground/", key, content, fromFile, gzipped);
+    }
+
+    private static void loadData(String prefix, String key, String content, 
boolean fromFile, boolean gzipped) {
+        try {
+            try (FileSystem fs = FileSystem.get(CONF)) {
+                Path path = new Path(prefix + key);
+                if (!fromFile) {
+                    try (FSDataOutputStream out = fs.create(path)) {
+                        out.writeBytes(content);
+                    }
+                } else {
+                    if (!gzipped) {
+                        try (FSDataOutputStream out = fs.create(path); 
InputStream in = new FileInputStream(content)) {
+                            IOUtils.copy(in, out);
+                        }
+                    } else {
+                        try (FSDataOutputStream out = fs.create(path);
+                                GZIPOutputStream gzipOutputStream = new 
GZIPOutputStream(out)) {
+                            
gzipOutputStream.write(Files.readAllBytes(Paths.get(content)));
+                        }
+                    }
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }

     @AfterClass
@@ -121,6 +191,7 @@

     @Parameters(name = "SqlppHdfsExecutionTest {index}: {0}")
     public static Collection<Object[]> tests() throws Exception {
+        PREPARE_BUCKET = ExternalDatasetTestUtils::preparePlaygroundContainer;
         return LangExecutionUtil.tests("only_sqlpp_hdfs.xml", 
"testsuite_sqlpp_hdfs.xml");
     }

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.01.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.01.ddl.sqlpp
index 7207aa7..d2cc2d5 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.01.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.01.ddl.sqlpp
@@ -20,13 +20,12 @@
 CREATE TYPE OpenType AS {
 };

-CREATE EXTERNAL DATASET Customer(OpenType) USING S3 (
-    ("accessKeyId"="dummyAccessKey"),
-    ("secretAccessKey"="dummySecretKey"),
-    ("region"="us-west-2"),
-    ("serviceEndpoint"="http://127.0.0.1:8001";),
+CREATE EXTERNAL DATASET Customer(OpenType) USING %adapter% (
+    %template%,
     ("container"="playground"),
     
("definition"="external-filter/car/{company:string}/customer/{customer_id:int}"),
+    
("path"="/playground/external-filter/car/{company:string}/customer/{customer_id:int}"),
+    ("input-format" = "text-input-format"),
     ("embed-filter-values" = "false"),
     ("format"="json")
 );
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.02.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.02.update.sqlpp
index 55916f3..cb9ead9 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.02.update.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.02.update.sqlpp
@@ -18,13 +18,10 @@
  */

 COPY Customer c
-TO S3
-PATH ("copy-to-result", "default-namespace1")
+TO %adapter%
+PATH (%pathprefix% "copy-to-result", "default-namespace1")
 WITH {
-    "accessKeyId":"dummyAccessKey",
-    "secretAccessKey":"dummySecretKey",
-    "region":"us-west-2",
-    "serviceEndpoint":"http://127.0.0.1:8001";,
+    %ddltemplate%,
     "container":"playground",
     "format":"json"
 }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.03.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.03.update.sqlpp
index edb038f..3cb7c06 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.03.update.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.03.update.sqlpp
@@ -18,17 +18,14 @@
  */

 COPY Customer AS c
-TO S3
-PATH ("copy-to-result/default-namespace2", company, "customer", customer_id)
+TO %adapter%
+PATH (%pathprefix% "copy-to-result/default-namespace2", company, "customer", 
customer_id)
 OVER (
    PARTITION BY c.company company,
                 c.customer_id customer_id
 )
 WITH {
-    "accessKeyId":"dummyAccessKey",
-    "secretAccessKey":"dummySecretKey",
-    "region":"us-west-2",
-    "serviceEndpoint":"http://127.0.0.1:8001";,
+    %ddltemplate%,
     "container":"playground",
     "format":"json"
 }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.04.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.04.update.sqlpp
index ee75bd7..a347d92 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.04.update.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.04.update.sqlpp
@@ -21,17 +21,14 @@
    SELECT DISTINCT UPPERCASE(c.company) company, c.year
    FROM Customer c
 ) AS toWriter
-TO S3
-PATH ("copy-to-result/default-namespace3", company, year)
+TO %adapter%
+PATH (%pathprefix% "copy-to-result/default-namespace3", company, year)
 OVER (
    PARTITION BY toWriter.company company,
                 toWriter.year year
 )
 WITH {
-    "accessKeyId":"dummyAccessKey",
-    "secretAccessKey":"dummySecretKey",
-    "region":"us-west-2",
-    "serviceEndpoint":"http://127.0.0.1:8001";,
+    %ddltemplate%,
     "container":"playground",
     "format":"json"
 }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.05.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.05.ddl.sqlpp
index 4c7ba29..1dea789 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.05.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.05.ddl.sqlpp
@@ -17,35 +17,32 @@
  * under the License.
  */

-CREATE EXTERNAL DATASET CustomerCopy1(OpenType) USING S3 (
-    ("accessKeyId"="dummyAccessKey"),
-    ("secretAccessKey"="dummySecretKey"),
-    ("region"="us-west-2"),
-    ("serviceEndpoint"="http://127.0.0.1:8001";),
+CREATE EXTERNAL DATASET CustomerCopy1(OpenType) USING %adapter% (
+    %template%,
     ("container"="playground"),
     ("definition"="copy-to-result/default-namespace1"),
+    ("path"="/playground/copy-to-result/default-namespace1"),
+    ("input-format" = "text-input-format"),
     ("embed-filter-values" = "false"),
     ("format"="json")
 );

-CREATE EXTERNAL DATASET CustomerCopy2(OpenType) USING S3 (
-    ("accessKeyId"="dummyAccessKey"),
-    ("secretAccessKey"="dummySecretKey"),
-    ("region"="us-west-2"),
-    ("serviceEndpoint"="http://127.0.0.1:8001";),
+CREATE EXTERNAL DATASET CustomerCopy2(OpenType) USING %adapter% (
+    %template%,
     ("container"="playground"),
     
("definition"="copy-to-result/default-namespace2/{company:string}/customer/{customer_id:int}"),
+    
("path"="/playground/copy-to-result/default-namespace2/{company:string}/customer/{customer_id:int}"),
+    ("input-format" = "text-input-format"),
     ("embed-filter-values" = "false"),
     ("format"="json")
 );

-CREATE EXTERNAL DATASET CustomerCopy3(OpenType) USING S3 (
-    ("accessKeyId"="dummyAccessKey"),
-    ("secretAccessKey"="dummySecretKey"),
-    ("region"="us-west-2"),
-    ("serviceEndpoint"="http://127.0.0.1:8001";),
+CREATE EXTERNAL DATASET CustomerCopy3(OpenType) USING %adapter% (
+    %template%,
     ("container"="playground"),
     
("definition"="copy-to-result/default-namespace3/{company:string}/{year:int}"),
+    
("path"="/playground/copy-to-result/default-namespace3/{company:string}/{year:int}"),
+    ("input-format" = "text-input-format"),
     ("embed-filter-values" = "false"),
     ("format"="json")
 );
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_02/hdfs_02.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_02/hdfs_02.1.ddl.sqlpp
index 68ae7b2..5c7fee6 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_02/hdfs_02.1.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_02/hdfs_02.1.ddl.sqlpp
@@ -34,5 +34,5 @@
   content : string
 };

-create external  dataset TextDataset(LineType) using 
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`path`=`/asterix/textFileS`),(`input-format`=`sequence-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`));
+create external  dataset TextDataset(LineType) using 
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`include`=`/asterix/textFileS`),(`input-format`=`sequence-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`));

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_03/hdfs_03.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_03/hdfs_03.1.ddl.sqlpp
index 11b64df..3925a02 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_03/hdfs_03.1.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_03/hdfs_03.1.ddl.sqlpp
@@ -36,5 +36,5 @@
   content : string
 };

-create external  dataset TextDataset(LineType) using 
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`path`=`/asterix/large_text`),(`input-format`=`text-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`));
+create external  dataset TextDataset(LineType) using 
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`include`=`/asterix/large_text`),(`input-format`=`text-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`));

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.ddl.sqlpp
index 45ae5df..ab03a0d 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.ddl.sqlpp
@@ -35,5 +35,5 @@
   content : string
 };

-create external  dataset TextDataset(LineType) using 
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`path`=`/asterix/textFileS`),(`input-format`=`sequence-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`),(`local-socket-path`=`/var/lib/hadoop-hdfs/dn_socket`));
+create external  dataset TextDataset(LineType) using 
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`include`=`/asterix/textFileS`),(`input-format`=`sequence-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`),(`local-socket-path`=`/var/lib/hadoop-hdfs/dn_socket`));

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/issue_245_hdfs/issue_245_hdfs.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/issue_245_hdfs/issue_245_hdfs.1.ddl.sqlpp
index 16bccc7..a6f14a5 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/issue_245_hdfs/issue_245_hdfs.1.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/issue_245_hdfs/issue_245_hdfs.1.ddl.sqlpp
@@ -35,5 +35,5 @@
   line : string
 };

-create external  dataset TextDataset(LineType) using 
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`path`=`/asterix/asterix_info.txt`),(`input-format`=`text-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`));
+create external  dataset TextDataset(LineType) using 
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`include`=`/asterix/asterix_info.txt`),(`input-format`=`text-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`));

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/parquet/parquet.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/parquet/parquet.1.ddl.sqlpp
index 308d43d..ab743a5 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/parquet/parquet.1.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/parquet/parquet.1.ddl.sqlpp
@@ -36,6 +36,6 @@
 CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING hdfs
 (
   ("hdfs"="hdfs://127.0.0.1:31888"),
-  ("path"="/asterix/id_age.parquet"),
+  ("include"="/asterix/id_age.parquet"),
   ("input-format"="parquet-input-format")
 );
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hints/issue_251_dataset_hint_6/issue_251_dataset_hint_6.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hints/issue_251_dataset_hint_6/issue_251_dataset_hint_6.1.ddl.sqlpp
index 314a769..1f81faf 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hints/issue_251_dataset_hint_6/issue_251_dataset_hint_6.1.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hints/issue_251_dataset_hint_6/issue_251_dataset_hint_6.1.ddl.sqlpp
@@ -35,5 +35,5 @@
   content : string
 };

-create external  dataset TextDataset(LineType) using 
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`path`=`/asterix/textFileS`),(`input-format`=`sequence-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`))
 hints (`CARDINALITY`=`10`);
+create external  dataset TextDataset(LineType) using 
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`include`=`/asterix/textFileS`),(`input-format`=`sequence-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`))
 hints (`CARDINALITY`=`10`);

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id_2/decorrelate_with_unique_id_2.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id_2/decorrelate_with_unique_id_2.1.ddl.sqlpp
index aea6acc..c91e844 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id_2/decorrelate_with_unique_id_2.1.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id_2/decorrelate_with_unique_id_2.1.ddl.sqlpp
@@ -45,5 +45,5 @@
   countB : bigint
 };

-create external  dataset TweetMessages(TweetMessageType) using 
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`path`=`/asterix/tw_for_indexleftouterjoin.adm`),(`input-format`=`text-input-format`),(`format`=`adm`));
+create external  dataset TweetMessages(TweetMessageType) using 
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`include`=`/asterix/tw_for_indexleftouterjoin.adm`),(`input-format`=`text-input-format`),(`format`=`adm`));

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index ee9dc3b..23dbdb6 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -36,6 +36,8 @@
     </test-case>
     <test-case FilePath="copy-to">
       <compilation-unit name="default-namespace">
+        <placeholder name="adapter" value="S3" />
+        <placeholder name="pathprefix" value="" />
         <output-dir compare="Text">default-namespace</output-dir>
       </compilation-unit>
     </test-case>
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
index d14746a..ddfe73c 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
@@ -92,5 +92,12 @@
         <output-dir compare="Text">parquet-empty-array</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="copy-to">
+      <compilation-unit name="default-namespace">
+        <placeholder name="adapter" value="hdfs" />
+        <placeholder name="pathprefix" value='"/playground", ' />
+        <output-dir compare="Text">default-namespace</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index a27e325..616a601 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -22,8 +22,10 @@
 import static 
org.apache.asterix.external.util.ExternalDataConstants.FORMAT_PARQUET;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -33,6 +35,7 @@
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IExternalDataRuntimeContext;
@@ -40,6 +43,7 @@
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
 import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.ParquetFileRecordReader;
 import 
org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
@@ -47,8 +51,14 @@
 import org.apache.asterix.external.provider.StreamRecordReaderProvider;
 import 
org.apache.asterix.external.provider.context.ExternalStreamRuntimeDataContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -66,11 +76,14 @@
 import org.apache.hyracks.hdfs.dataflow.ConfFactory;
 import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
 import org.apache.hyracks.hdfs.scheduler.Scheduler;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;

 public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, 
IExternalDataSourceFactory {

     private static final long serialVersionUID = 1L;
     private static final List<String> recordReaderNames = 
Collections.singletonList("hdfs");
+    private static final Logger LOGGER = LogManager.getLogger();

     protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
     protected transient IServiceContext serviceCtx;
@@ -108,9 +121,45 @@
         } catch (IOException ex) {
             throw HyracksDataException.create(ex);
         }
+
+        extractRequiredFiles(serviceCtx, configuration, warningCollector, 
filterEvaluatorFactory, hdfsConf);
         configureHdfsConf(hdfsConf, configuration);
     }

+    private void extractRequiredFiles(IServiceContext serviceCtx, Map<String, 
String> configuration,
+            IWarningCollector warningCollector, 
IExternalFilterEvaluatorFactory filterEvaluatorFactory,
+            JobConf hdfsConf) throws HyracksDataException, AlgebricksException 
{
+        AbstractExternalInputStreamFactory.IncludeExcludeMatcher 
includeExcludeMatcher =
+                ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+
+        IExternalFilterEvaluator evaluator = 
filterEvaluatorFactory.create(serviceCtx, warningCollector);
+        ExternalDataPrefix externalDataPrefix = new 
ExternalDataPrefix(configuration);
+        configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, 
externalDataPrefix.getRoot());
+        try (FileSystem fs = ugi == null ? FileSystem.get(hdfsConf)
+                : ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> 
FileSystem.get(hdfsConf))) {
+            List<Path> reqFiles = new ArrayList<>();
+            RemoteIterator<LocatedFileStatus> files =
+                    fs.listFiles(new 
Path(configuration.get(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME)), true);
+            while (files.hasNext()) {
+                LocatedFileStatus file = files.next();
+                if 
(ExternalDataUtils.evaluate(file.getPath().toUri().getPath(), 
includeExcludeMatcher.getPredicate(),
+                        includeExcludeMatcher.getMatchersList(), 
externalDataPrefix, evaluator, warningCollector)) {
+                    reqFiles.add(file.getPath());
+                }
+            }
+            if (reqFiles.isEmpty()) {
+                LOGGER.warn("The provided external dataset configuration 
returned no files from the external source");
+                hdfsConf.set(ExternalDataConstants.KEY_HADOOP_INPUT_DIR, "");
+            } else {
+                FileInputFormat.setInputPaths(hdfsConf, reqFiles.toArray(new 
Path[0]));
+            }
+        } catch (FileNotFoundException e) {
+            hdfsConf.set(ExternalDataConstants.KEY_HADOOP_INPUT_DIR, "");
+        } catch (InterruptedException | IOException ex) {
+            throw HyracksDataException.create(ex);
+        }
+    }
+
     protected JobConf prepareHDFSConf(IServiceContext serviceCtx, Map<String, 
String> configuration,
             IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws 
HyracksDataException {
         this.serviceCtx = serviceCtx;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
index 0ed0622..79e98f6 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
@@ -75,7 +75,9 @@

     public ExternalDataPrefix(Map<String, String> configuration) throws 
AlgebricksException {
         String prefix = ExternalDataUtils.getDefinitionOrPath(configuration);
-        this.original = prefix != null ? prefix : "";
+        this.original = prefix != null ? prefix
+                : 
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
+                        .equals(ExternalDataConstants.KEY_HDFS_URL) ? "/" : "";
         this.endsWithSlash = this.original.endsWith("/");
         protocolContainerPair = 
ExternalDataUtils.getProtocolContainerPair(configuration);
         segments = extractPrefixSegments(original);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 6ba618d..0bccc47 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -48,6 +48,7 @@
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
@@ -1090,6 +1091,10 @@
     }

     public static String getDefinitionOrPath(Map<String, String> 
configuration) {
+        if 
(Objects.equals(configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE),
+                ExternalDataConstants.KEY_HDFS_URL)) {
+            return configuration.get(KEY_PATH);
+        }
         return configuration.getOrDefault(DEFINITION_FIELD_NAME, 
configuration.get(KEY_PATH));
     }

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 2984954..9eb5ff4 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -233,7 +233,11 @@
                     ExternalDataConstants.CLASS_NAME_HDFS_FILESYSTEM);
             conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, url);
         }
-        conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_DIR, 
configuration.get(ExternalDataConstants.KEY_PATH).trim());
+
+        String path = configuration.get(ExternalDataConstants.KEY_PATH);
+        if (path != null) {
+            conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_DIR, path.trim());
+        }
         conf.setClassLoader(HDFSInputStream.class.getClassLoader());
         conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_FORMAT, 
formatClassName);

@@ -563,5 +567,10 @@
         }

         validateIncludeExclude(configuration);
+        try {
+            new ExternalDataPrefix(configuration);
+        } catch (AlgebricksException ex) {
+            throw new 
CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex);
+        }
     }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19066
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I1bdbcd44c059f64f2da436a40ac3f59293442cf2
Gerrit-Change-Number: 19066
Gerrit-PatchSet: 1
Gerrit-Owner: Savyasach Reddy <[email protected]>
Gerrit-MessageType: newchange

Reply via email to