>From Savyasach Reddy <[email protected]>:
Savyasach Reddy has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19066 )
Change subject: [ASTERIXDB-3519][EXT]: Support dynamic prefixes on HDFS
......................................................................
[ASTERIXDB-3519][EXT]: Support dynamic prefixes on HDFS
- user model changes: add dynamic prefixes for HDFS datasets
- storage format changes: no
- interface changes: no
details:
- Support dynamic prefixes on HDFS
- Support include/exclude parameters
- Parameter 'path' is used to specify prefixes
Change-Id: I1bdbcd44c059f64f2da436a40ac3f59293442cf2
---
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.01.ddl.sqlpp
M
asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/parquet/parquet.1.ddl.sqlpp
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHdfsExecutionTest.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/issue_245_hdfs/issue_245_hdfs.1.ddl.sqlpp
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_02/hdfs_02.1.ddl.sqlpp
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_03/hdfs_03.1.ddl.sqlpp
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.02.update.sqlpp
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.ddl.sqlpp
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.05.ddl.sqlpp
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hints/issue_251_dataset_hint_6/issue_251_dataset_hint_6.1.ddl.sqlpp
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id_2/decorrelate_with_unique_id_2.1.ddl.sqlpp
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.03.update.sqlpp
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.04.update.sqlpp
21 files changed, 231 insertions(+), 62 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/66/19066/1
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
index 0a96943..2be6a7d 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
@@ -34,6 +34,9 @@
public static final String S3_TEMPLATE_DEFAULT = "(\"accessKeyId\"=\"" +
S3_ACCESS_KEY_ID_DEFAULT + "\"),\n"
+ "(\"secretAccessKey\"=\"" + S3_SECRET_ACCESS_KEY_DEFAULT +
"\"),\n" + "(\"region\"=\"" + S3_REGION_DEFAULT
+ "\"),\n" + "(\"serviceEndpoint\"=\"" +
S3_SERVICE_ENDPOINT_DEFAULT + "\")";
+ public static final String S3_DDL_TEMPLATE_DEFAULT = "\"accessKeyId\":\""
+ S3_ACCESS_KEY_ID_DEFAULT + "\",\n"
+ + "\"secretAccessKey\":\"" + S3_SECRET_ACCESS_KEY_DEFAULT +
"\",\n" + "\"region\":\"" + S3_REGION_DEFAULT
+ + "\",\n" + "\"serviceEndpoint\":\"" + S3_SERVICE_ENDPOINT_DEFAULT
+ "\"";
// Azure blob storage constants and placeholders
public static class Azure {
@@ -89,5 +92,9 @@
public static final String KERBEROS_PASSWORD_DEFAULT = "hdfspassword";
public static final String KERBEROS_REALM_DEFAULT = "EXAMPLE.COM";
public static final String KERBEROS_KDC_DEFAULT = "localhost:8800";
+ public static final String HDFS_ENDPOINT_DEFAULT =
"hdfs://localhost:31888";
+
+ public static final String HDFS_TEMPLATE_DEFAULT = "(\"hdfs\"=\"" +
HDFS_ENDPOINT_DEFAULT + "\")";
+ public static final String HDFS_DDL_TEMPLATE_DEFAULT = "\"hdfs\":\"" +
HDFS_ENDPOINT_DEFAULT + "\"";
}
}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 3de0cd7..2290dc8 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -1274,7 +1274,11 @@
if (isDmlRecoveryTest && statement.contains("nc1://")) {
statement = statement.replaceAll("nc1://",
"127.0.0.1://../../../../../../asterix-app/");
}
- executeSqlppUpdateOrDdl(statement,
OutputFormat.forCompilationUnit(cUnit));
+ if (cUnit.getPlaceholder().isEmpty()) {
+ executeSqlppUpdateOrDdl(statement,
OutputFormat.forCompilationUnit(cUnit));
+ } else {
+ executeSqlppUpdateOrDdl(statement,
OutputFormat.forCompilationUnit(cUnit), cUnit);
+ }
break;
case "pollget":
case "pollquery":
@@ -2453,7 +2457,7 @@
}
protected boolean noTemplateRequired(String str) {
- return !str.contains("%template%");
+ return !str.contains("%template%") && !str.contains("%ddltemplate%");
}
protected String applyS3Substitution(String str, List<Placeholder>
placeholders) {
@@ -2504,7 +2508,11 @@
}
protected String setS3TemplateDefault(String str) {
- return str.replace("%template%", TestConstants.S3_TEMPLATE_DEFAULT);
+ if (str.contains("%template%")) {
+ return str.replace("%template%",
TestConstants.S3_TEMPLATE_DEFAULT);
+ } else {
+ return str.replace("%ddltemplate%",
TestConstants.S3_DDL_TEMPLATE_DEFAULT);
+ }
}
protected String applyAzureSubstitution(String str, List<Placeholder>
placeholders) {
@@ -2554,7 +2562,11 @@
}
protected String setHDFSTemplateDefault(String str) {
- return str;
+ if (str.contains("%template%")) {
+ return str.replace("%template%",
TestConstants.HDFS.HDFS_TEMPLATE_DEFAULT);
+ } else {
+ return str.replace("%ddltemplate%",
TestConstants.HDFS.HDFS_DDL_TEMPLATE_DEFAULT);
+ }
}
protected void fail(boolean runDiagnostics, TestCaseContext testCaseCtx,
CompilationUnit cUnit,
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHdfsExecutionTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHdfsExecutionTest.java
index fe3006e..412fbc1 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHdfsExecutionTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHdfsExecutionTest.java
@@ -18,17 +18,36 @@
*/
package org.apache.asterix.test.runtime;
+import static
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createAvroFiles;
+import static
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createAvroFilesRecursively;
+import static
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createBinaryFiles;
+import static
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createBinaryFilesRecursively;
+import static
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createDeltaTable;
+import static
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.setDataPaths;
+import static
org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.setUploaders;
+import static
org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil.DEFAULT_PARQUET_SRC_PATH;
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
import static org.apache.iceberg.types.Types.NestedField.required;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
+import java.util.zip.GZIPOutputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.test.common.TestConstants;
import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils;
import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
@@ -60,34 +79,42 @@
*/
@RunWith(Parameterized.class)
public class SqlppHdfsExecutionTest {
+ private static final String PATH_BASE = joinPath("data");
+ private static final String EXTERNAL_FILTER_DATA_PATH =
joinPath(PATH_BASE, "json", "external-filter");
+
protected static final String TEST_CONFIG_FILE_NAME =
"src/main/resources/cc.conf";
- private static DataFile writeFile(String filename, List<Record> records,
String location, Schema schema,
- Configuration conf) throws IOException {
+ static Runnable PREPARE_BUCKET;
+
+ private static final String JSON_DATA_PATH = joinPath("data", "json");
+ private static final String CSV_DATA_PATH = joinPath("data", "csv");
+ private static final String TSV_DATA_PATH = joinPath("data", "tsv");
+
+ private static final Configuration CONF = new Configuration();
+
+ private static DataFile writeFile(String filename, List<Record> records,
String location, Schema schema)
+ throws IOException {
Path path = new Path(location, filename);
FileFormat fileFormat = FileFormat.fromFileName(filename);
Preconditions.checkNotNull(fileFormat, "Cannot determine format for
file: %s", filename);
FileAppender<Record> fileAppender =
- new GenericAppenderFactory(schema).newAppender(fromPath(path,
conf), fileFormat);
+ new GenericAppenderFactory(schema).newAppender(fromPath(path,
CONF), fileFormat);
try (FileAppender<Record> appender = fileAppender) {
appender.addAll(records);
}
- return
DataFiles.builder(PartitionSpec.unpartitioned()).withInputFile(HadoopInputFile.fromPath(path,
conf))
+ return
DataFiles.builder(PartitionSpec.unpartitioned()).withInputFile(HadoopInputFile.fromPath(path,
CONF))
.withMetrics(fileAppender.metrics()).build();
}
private static void setUpIcebergData() {
- Configuration conf = new Configuration();
- conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI,
"hdfs://127.0.0.1:31888/");
-
- Tables tables = new HadoopTables(conf);
+ Tables tables = new HadoopTables(CONF);
Schema schema =
new Schema(required(1, "id", Types.IntegerType.get()),
required(2, "data", Types.StringType.get()));
- String path = "hdfs://localhost:31888/my_table/";
+ String path = TestConstants.HDFS.HDFS_ENDPOINT_DEFAULT + "/my_table/";
Table table = tables.create(schema, PartitionSpec.unpartitioned(),
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT,
FileFormat.PARQUET.name()), path);
@@ -100,8 +127,7 @@
// load test data
try {
- DataFile file =
- writeFile(FileFormat.PARQUET.addExtension("file"),
fileFirstSnapshotRecords, path, schema, conf);
+ DataFile file = writeFile(FileFormat.PARQUET.addExtension("file"),
fileFirstSnapshotRecords, path, schema);
table.newAppend().appendFile(file).commit();
} catch (IOException e) {
throw new RuntimeException(e);
@@ -110,8 +136,52 @@
@BeforeClass
public static void setUp() throws Exception {
+ CONF.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI,
TestConstants.HDFS.HDFS_ENDPOINT_DEFAULT);
LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor(),
true);
setUpIcebergData();
+ createBinaryFiles(DEFAULT_PARQUET_SRC_PATH);
+ createBinaryFilesRecursively(EXTERNAL_FILTER_DATA_PATH);
+ createAvroFiles(DEFAULT_PARQUET_SRC_PATH);
+ createAvroFilesRecursively(EXTERNAL_FILTER_DATA_PATH);
+ createDeltaTable();
+ setUpData();
+ }
+
+ private static void setUpData() {
+ setDataPaths(JSON_DATA_PATH, CSV_DATA_PATH, TSV_DATA_PATH);
+ setUploaders(SqlppHdfsExecutionTest::loadPlaygroundData, null, null,
null, null);
+
+ PREPARE_BUCKET.run();
+ }
+
+ private static void loadPlaygroundData(String key, String content, boolean
fromFile, boolean gzipped) {
+ loadData("/playground/", key, content, fromFile, gzipped);
+ }
+
+ private static void loadData(String prefix, String key, String content,
boolean fromFile, boolean gzipped) {
+ try {
+ try (FileSystem fs = FileSystem.get(CONF)) {
+ Path path = new Path(prefix + key);
+ if (!fromFile) {
+ try (FSDataOutputStream out = fs.create(path)) {
+ out.writeBytes(content);
+ }
+ } else {
+ if (!gzipped) {
+ try (FSDataOutputStream out = fs.create(path);
InputStream in = new FileInputStream(content)) {
+ IOUtils.copy(in, out);
+ }
+ } else {
+ try (FSDataOutputStream out = fs.create(path);
+ GZIPOutputStream gzipOutputStream = new
GZIPOutputStream(out)) {
+
gzipOutputStream.write(Files.readAllBytes(Paths.get(content)));
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
@AfterClass
@@ -121,6 +191,7 @@
@Parameters(name = "SqlppHdfsExecutionTest {index}: {0}")
public static Collection<Object[]> tests() throws Exception {
+ PREPARE_BUCKET = ExternalDatasetTestUtils::preparePlaygroundContainer;
return LangExecutionUtil.tests("only_sqlpp_hdfs.xml",
"testsuite_sqlpp_hdfs.xml");
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.01.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.01.ddl.sqlpp
index 7207aa7..d2cc2d5 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.01.ddl.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.01.ddl.sqlpp
@@ -20,13 +20,12 @@
CREATE TYPE OpenType AS {
};
-CREATE EXTERNAL DATASET Customer(OpenType) USING S3 (
- ("accessKeyId"="dummyAccessKey"),
- ("secretAccessKey"="dummySecretKey"),
- ("region"="us-west-2"),
- ("serviceEndpoint"="http://127.0.0.1:8001"),
+CREATE EXTERNAL DATASET Customer(OpenType) USING %adapter% (
+ %template%,
("container"="playground"),
("definition"="external-filter/car/{company:string}/customer/{customer_id:int}"),
+
("path"="/playground/external-filter/car/{company:string}/customer/{customer_id:int}"),
+ ("input-format" = "text-input-format"),
("embed-filter-values" = "false"),
("format"="json")
);
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.02.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.02.update.sqlpp
index 55916f3..cb9ead9 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.02.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.02.update.sqlpp
@@ -18,13 +18,10 @@
*/
COPY Customer c
-TO S3
-PATH ("copy-to-result", "default-namespace1")
+TO %adapter%
+PATH (%pathprefix% "copy-to-result", "default-namespace1")
WITH {
- "accessKeyId":"dummyAccessKey",
- "secretAccessKey":"dummySecretKey",
- "region":"us-west-2",
- "serviceEndpoint":"http://127.0.0.1:8001",
+ %ddltemplate%,
"container":"playground",
"format":"json"
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.03.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.03.update.sqlpp
index edb038f..3cb7c06 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.03.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.03.update.sqlpp
@@ -18,17 +18,14 @@
*/
COPY Customer AS c
-TO S3
-PATH ("copy-to-result/default-namespace2", company, "customer", customer_id)
+TO %adapter%
+PATH (%pathprefix% "copy-to-result/default-namespace2", company, "customer",
customer_id)
OVER (
PARTITION BY c.company company,
c.customer_id customer_id
)
WITH {
- "accessKeyId":"dummyAccessKey",
- "secretAccessKey":"dummySecretKey",
- "region":"us-west-2",
- "serviceEndpoint":"http://127.0.0.1:8001",
+ %ddltemplate%,
"container":"playground",
"format":"json"
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.04.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.04.update.sqlpp
index ee75bd7..a347d92 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.04.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.04.update.sqlpp
@@ -21,17 +21,14 @@
SELECT DISTINCT UPPERCASE(c.company) company, c.year
FROM Customer c
) AS toWriter
-TO S3
-PATH ("copy-to-result/default-namespace3", company, year)
+TO %adapter%
+PATH (%pathprefix% "copy-to-result/default-namespace3", company, year)
OVER (
PARTITION BY toWriter.company company,
toWriter.year year
)
WITH {
- "accessKeyId":"dummyAccessKey",
- "secretAccessKey":"dummySecretKey",
- "region":"us-west-2",
- "serviceEndpoint":"http://127.0.0.1:8001",
+ %ddltemplate%,
"container":"playground",
"format":"json"
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.05.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.05.ddl.sqlpp
index 4c7ba29..1dea789 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.05.ddl.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/default-namespace/default-namespace.05.ddl.sqlpp
@@ -17,35 +17,32 @@
* under the License.
*/
-CREATE EXTERNAL DATASET CustomerCopy1(OpenType) USING S3 (
- ("accessKeyId"="dummyAccessKey"),
- ("secretAccessKey"="dummySecretKey"),
- ("region"="us-west-2"),
- ("serviceEndpoint"="http://127.0.0.1:8001"),
+CREATE EXTERNAL DATASET CustomerCopy1(OpenType) USING %adapter% (
+ %template%,
("container"="playground"),
("definition"="copy-to-result/default-namespace1"),
+ ("path"="/playground/copy-to-result/default-namespace1"),
+ ("input-format" = "text-input-format"),
("embed-filter-values" = "false"),
("format"="json")
);
-CREATE EXTERNAL DATASET CustomerCopy2(OpenType) USING S3 (
- ("accessKeyId"="dummyAccessKey"),
- ("secretAccessKey"="dummySecretKey"),
- ("region"="us-west-2"),
- ("serviceEndpoint"="http://127.0.0.1:8001"),
+CREATE EXTERNAL DATASET CustomerCopy2(OpenType) USING %adapter% (
+ %template%,
("container"="playground"),
("definition"="copy-to-result/default-namespace2/{company:string}/customer/{customer_id:int}"),
+
("path"="/playground/copy-to-result/default-namespace2/{company:string}/customer/{customer_id:int}"),
+ ("input-format" = "text-input-format"),
("embed-filter-values" = "false"),
("format"="json")
);
-CREATE EXTERNAL DATASET CustomerCopy3(OpenType) USING S3 (
- ("accessKeyId"="dummyAccessKey"),
- ("secretAccessKey"="dummySecretKey"),
- ("region"="us-west-2"),
- ("serviceEndpoint"="http://127.0.0.1:8001"),
+CREATE EXTERNAL DATASET CustomerCopy3(OpenType) USING %adapter% (
+ %template%,
("container"="playground"),
("definition"="copy-to-result/default-namespace3/{company:string}/{year:int}"),
+
("path"="/playground/copy-to-result/default-namespace3/{company:string}/{year:int}"),
+ ("input-format" = "text-input-format"),
("embed-filter-values" = "false"),
("format"="json")
);
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_02/hdfs_02.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_02/hdfs_02.1.ddl.sqlpp
index 68ae7b2..5c7fee6 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_02/hdfs_02.1.ddl.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_02/hdfs_02.1.ddl.sqlpp
@@ -34,5 +34,5 @@
content : string
};
-create external dataset TextDataset(LineType) using
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`path`=`/asterix/textFileS`),(`input-format`=`sequence-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`));
+create external dataset TextDataset(LineType) using
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`include`=`/asterix/textFileS`),(`input-format`=`sequence-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`));
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_03/hdfs_03.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_03/hdfs_03.1.ddl.sqlpp
index 11b64df..3925a02 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_03/hdfs_03.1.ddl.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_03/hdfs_03.1.ddl.sqlpp
@@ -36,5 +36,5 @@
content : string
};
-create external dataset TextDataset(LineType) using
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`path`=`/asterix/large_text`),(`input-format`=`text-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`));
+create external dataset TextDataset(LineType) using
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`include`=`/asterix/large_text`),(`input-format`=`text-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`));
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.ddl.sqlpp
index 45ae5df..ab03a0d 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.ddl.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/hdfs_shortcircuit/hdfs_shortcircuit.1.ddl.sqlpp
@@ -35,5 +35,5 @@
content : string
};
-create external dataset TextDataset(LineType) using
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`path`=`/asterix/textFileS`),(`input-format`=`sequence-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`),(`local-socket-path`=`/var/lib/hadoop-hdfs/dn_socket`));
+create external dataset TextDataset(LineType) using
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`include`=`/asterix/textFileS`),(`input-format`=`sequence-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`),(`local-socket-path`=`/var/lib/hadoop-hdfs/dn_socket`));
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/issue_245_hdfs/issue_245_hdfs.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/issue_245_hdfs/issue_245_hdfs.1.ddl.sqlpp
index 16bccc7..a6f14a5 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/issue_245_hdfs/issue_245_hdfs.1.ddl.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/issue_245_hdfs/issue_245_hdfs.1.ddl.sqlpp
@@ -35,5 +35,5 @@
line : string
};
-create external dataset TextDataset(LineType) using
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`path`=`/asterix/asterix_info.txt`),(`input-format`=`text-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`));
+create external dataset TextDataset(LineType) using
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`include`=`/asterix/asterix_info.txt`),(`input-format`=`text-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`));
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/parquet/parquet.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/parquet/parquet.1.ddl.sqlpp
index 308d43d..ab743a5 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/parquet/parquet.1.ddl.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hdfs/parquet/parquet.1.ddl.sqlpp
@@ -36,6 +36,6 @@
CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING hdfs
(
("hdfs"="hdfs://127.0.0.1:31888"),
- ("path"="/asterix/id_age.parquet"),
+ ("include"="/asterix/id_age.parquet"),
("input-format"="parquet-input-format")
);
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hints/issue_251_dataset_hint_6/issue_251_dataset_hint_6.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hints/issue_251_dataset_hint_6/issue_251_dataset_hint_6.1.ddl.sqlpp
index 314a769..1f81faf 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hints/issue_251_dataset_hint_6/issue_251_dataset_hint_6.1.ddl.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/hints/issue_251_dataset_hint_6/issue_251_dataset_hint_6.1.ddl.sqlpp
@@ -35,5 +35,5 @@
content : string
};
-create external dataset TextDataset(LineType) using
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`path`=`/asterix/textFileS`),(`input-format`=`sequence-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`))
hints (`CARDINALITY`=`10`);
+create external dataset TextDataset(LineType) using
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`include`=`/asterix/textFileS`),(`input-format`=`sequence-input-format`),(`format`=`delimited-text`),(`delimiter`=`.`))
hints (`CARDINALITY`=`10`);
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id_2/decorrelate_with_unique_id_2.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id_2/decorrelate_with_unique_id_2.1.ddl.sqlpp
index aea6acc..c91e844 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id_2/decorrelate_with_unique_id_2.1.ddl.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id_2/decorrelate_with_unique_id_2.1.ddl.sqlpp
@@ -45,5 +45,5 @@
countB : bigint
};
-create external dataset TweetMessages(TweetMessageType) using
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`path`=`/asterix/tw_for_indexleftouterjoin.adm`),(`input-format`=`text-input-format`),(`format`=`adm`));
+create external dataset TweetMessages(TweetMessageType) using
`hdfs`((`hdfs`=`hdfs://127.0.0.1:31888`),(`include`=`/asterix/tw_for_indexleftouterjoin.adm`),(`input-format`=`text-input-format`),(`format`=`adm`));
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index ee9dc3b..23dbdb6 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -36,6 +36,8 @@
</test-case>
<test-case FilePath="copy-to">
<compilation-unit name="default-namespace">
+ <placeholder name="adapter" value="S3" />
+ <placeholder name="pathprefix" value="" />
<output-dir compare="Text">default-namespace</output-dir>
</compilation-unit>
</test-case>
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
index d14746a..ddfe73c 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml
@@ -92,5 +92,12 @@
<output-dir compare="Text">parquet-empty-array</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="copy-to">
+ <compilation-unit name="default-namespace">
+ <placeholder name="adapter" value="hdfs" />
+ <placeholder name="pathprefix" value='"/playground", ' />
+ <output-dir compare="Text">default-namespace</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index a27e325..616a601 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -22,8 +22,10 @@
import static
org.apache.asterix.external.util.ExternalDataConstants.FORMAT_PARQUET;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -33,6 +35,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IExternalDataRuntimeContext;
@@ -40,6 +43,7 @@
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.ParquetFileRecordReader;
import
org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
@@ -47,8 +51,14 @@
import org.apache.asterix.external.provider.StreamRecordReaderProvider;
import
org.apache.asterix.external.provider.context.ExternalStreamRuntimeDataContext;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -66,11 +76,14 @@
import org.apache.hyracks.hdfs.dataflow.ConfFactory;
import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
import org.apache.hyracks.hdfs.scheduler.Scheduler;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>,
IExternalDataSourceFactory {
private static final long serialVersionUID = 1L;
private static final List<String> recordReaderNames =
Collections.singletonList("hdfs");
+ private static final Logger LOGGER = LogManager.getLogger();
protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
protected transient IServiceContext serviceCtx;
@@ -108,9 +121,45 @@
} catch (IOException ex) {
throw HyracksDataException.create(ex);
}
+
+ extractRequiredFiles(serviceCtx, configuration, warningCollector,
filterEvaluatorFactory, hdfsConf);
configureHdfsConf(hdfsConf, configuration);
}
+ private void extractRequiredFiles(IServiceContext serviceCtx, Map<String,
String> configuration,
+ IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory,
+ JobConf hdfsConf) throws HyracksDataException, AlgebricksException
{
+ AbstractExternalInputStreamFactory.IncludeExcludeMatcher
includeExcludeMatcher =
+ ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+
+ IExternalFilterEvaluator evaluator =
filterEvaluatorFactory.create(serviceCtx, warningCollector);
+ ExternalDataPrefix externalDataPrefix = new
ExternalDataPrefix(configuration);
+ configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME,
externalDataPrefix.getRoot());
+ try (FileSystem fs = ugi == null ? FileSystem.get(hdfsConf)
+ : ugi.doAs((PrivilegedExceptionAction<FileSystem>) () ->
FileSystem.get(hdfsConf))) {
+ List<Path> reqFiles = new ArrayList<>();
+ RemoteIterator<LocatedFileStatus> files =
+ fs.listFiles(new
Path(configuration.get(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME)), true);
+ while (files.hasNext()) {
+ LocatedFileStatus file = files.next();
+ if
(ExternalDataUtils.evaluate(file.getPath().toUri().getPath(),
includeExcludeMatcher.getPredicate(),
+ includeExcludeMatcher.getMatchersList(),
externalDataPrefix, evaluator, warningCollector)) {
+ reqFiles.add(file.getPath());
+ }
+ }
+ if (reqFiles.isEmpty()) {
+ LOGGER.warn("The provided external dataset configuration
returned no files from the external source");
+ hdfsConf.set(ExternalDataConstants.KEY_HADOOP_INPUT_DIR, "");
+ } else {
+ FileInputFormat.setInputPaths(hdfsConf, reqFiles.toArray(new
Path[0]));
+ }
+ } catch (FileNotFoundException e) {
+ hdfsConf.set(ExternalDataConstants.KEY_HADOOP_INPUT_DIR, "");
+ } catch (InterruptedException | IOException ex) {
+ throw HyracksDataException.create(ex);
+ }
+ }
+
protected JobConf prepareHDFSConf(IServiceContext serviceCtx, Map<String,
String> configuration,
IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
HyracksDataException {
this.serviceCtx = serviceCtx;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
index 0ed0622..79e98f6 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
@@ -75,7 +75,9 @@
public ExternalDataPrefix(Map<String, String> configuration) throws
AlgebricksException {
String prefix = ExternalDataUtils.getDefinitionOrPath(configuration);
- this.original = prefix != null ? prefix : "";
+ this.original = prefix != null ? prefix
+ :
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
+ .equals(ExternalDataConstants.KEY_HDFS_URL) ? "/" : "";
this.endsWithSlash = this.original.endsWith("/");
protocolContainerPair =
ExternalDataUtils.getProtocolContainerPair(configuration);
segments = extractPrefixSegments(original);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 6ba618d..0bccc47 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -48,6 +48,7 @@
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
@@ -1090,6 +1091,10 @@
}
public static String getDefinitionOrPath(Map<String, String>
configuration) {
+ if
(Objects.equals(configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE),
+ ExternalDataConstants.KEY_HDFS_URL)) {
+ return configuration.get(KEY_PATH);
+ }
return configuration.getOrDefault(DEFINITION_FIELD_NAME,
configuration.get(KEY_PATH));
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 2984954..9eb5ff4 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -233,7 +233,11 @@
ExternalDataConstants.CLASS_NAME_HDFS_FILESYSTEM);
conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, url);
}
- conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_DIR,
configuration.get(ExternalDataConstants.KEY_PATH).trim());
+
+ String path = configuration.get(ExternalDataConstants.KEY_PATH);
+ if (path != null) {
+ conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_DIR, path.trim());
+ }
conf.setClassLoader(HDFSInputStream.class.getClassLoader());
conf.set(ExternalDataConstants.KEY_HADOOP_INPUT_FORMAT,
formatClassName);
@@ -563,5 +567,10 @@
}
validateIncludeExclude(configuration);
+ try {
+ new ExternalDataPrefix(configuration);
+ } catch (AlgebricksException ex) {
+ throw new
CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex);
+ }
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19066
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I1bdbcd44c059f64f2da436a40ac3f59293442cf2
Gerrit-Change-Number: 19066
Gerrit-PatchSet: 1
Gerrit-Owner: Savyasach Reddy <[email protected]>
Gerrit-MessageType: newchange