>From Murtadha Hubail <[email protected]>: Murtadha Hubail has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19130 )
Change subject: [NO ISSUE]: Improve error handling ...................................................................... [NO ISSUE]: Improve error handling - user model changes: no - storage format changes: no - interface changes: no details: - Throw errors relevant to the issue instead of an internal error - Add valueEmbedder to HDFSInputStream - Add a codec to read .gzip files as GzipCodec Change-Id: Icdb0b51cf7444de4074e22b78f02d3b29b07414f Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19130 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.01.ddl.sqlpp M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.03.ddl.sqlpp M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.02.update.sqlpp M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java 12 files changed, 97 insertions(+), 38 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Jenkins: Verified; Verified diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.01.ddl.sqlpp index b68c38b..a384802 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.01.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.01.ddl.sqlpp @@ -24,13 +24,10 @@ CREATE TYPE OpenType AS { }; -CREATE EXTERNAL DATASET Customer(OpenType) USING S3 ( - ("accessKeyId"="dummyAccessKey"), - ("secretAccessKey"="dummySecretKey"), - ("region"="us-west-2"), - ("serviceEndpoint"="http://127.0.0.1:8001"), - ("container"="playground"), - ("definition"="external-filter/car/{company:string}/customer/{customer_id:int}"), +CREATE EXTERNAL DATASET Customer(OpenType) USING %adapter% ( + %template%, + %additional_Properties%, + ("definition"="%path_prefix%external-filter/car/{company:string}/customer/{customer_id:int}"), ("embed-filter-values" = "false"), ("format"="json") ); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.02.update.sqlpp index f1a22d0..a9a7a8f 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.02.update.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.02.update.sqlpp @@ -20,18 +20,15 @@ USE test; COPY Customer c -TO S3 -PATH ("copy-to-result", "car", company, "customer", customer_id) +TO %adapter% +PATH (%pathprefix% "copy-to-result", "car", company, "customer", customer_id) OVER ( PARTITION BY c.company company, c.customer_id customer_id ) WITH { - "accessKeyId":"dummyAccessKey", - "secretAccessKey":"dummySecretKey", - "region":"us-west-2", - "serviceEndpoint":"http://127.0.0.1:8001", - "container":"playground", + %template_colons%, + %additionalProperties% "format":"json", "compression":"gzip" } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.03.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.03.ddl.sqlpp index 14d1d92..f46ddf9 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.03.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/partition/partition.03.ddl.sqlpp @@ -19,13 +19,10 @@ USE test; -CREATE EXTERNAL DATASET CustomerCopy(OpenType) USING S3 ( - ("accessKeyId"="dummyAccessKey"), - ("secretAccessKey"="dummySecretKey"), - ("region"="us-west-2"), - ("serviceEndpoint"="http://127.0.0.1:8001"), - ("container"="playground"), - ("definition"="copy-to-result/car/{company:string}/customer/{customer_id:int}"), +CREATE EXTERNAL DATASET CustomerCopy(OpenType) USING %adapter% ( + %template%, + %additional_Properties%, + ("definition"="%path_prefix%copy-to-result/car/{company:string}/customer/{customer_id:int}"), ("embed-filter-values" = "false"), ("format"="json") ); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml index 5ae2fc5..c8a6785 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml @@ -21,6 +21,11 @@ <test-group name="copy-to"> <test-case FilePath="copy-to"> <compilation-unit name="partition"> + <placeholder name="adapter" value="S3" /> + <placeholder name="pathprefix" value="" /> + <placeholder name="path_prefix" value="" /> + <placeholder name="additionalProperties" value='"container":"playground",' /> + <placeholder name="additional_Properties" value='("container"="playground")' /> <output-dir compare="Text">partition</output-dir> </compilation-unit> </test-case> diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml index a5af248..6851433 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp_hdfs.xml @@ -93,6 +93,16 @@ </compilation-unit> </test-case> <test-case FilePath="copy-to"> + <compilation-unit name="partition"> + <placeholder name="adapter" value="HDFS" /> + <placeholder name="pathprefix" value='"/playground", ' /> + <placeholder name="path_prefix" value="/playground/" /> + <placeholder name="additionalProperties" value="" /> + <placeholder name="additional_Properties" value='("input-format" = "text-input-format")' /> + <output-dir compare="Text">partition</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="copy-to"> <compilation-unit name="simple-write"> <placeholder name="adapter" value="HDFS" /> <placeholder name="pathprefix" value='"/playground", ' /> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java index 135b93f..934ba1d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java @@ -73,6 +73,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; import org.apache.hyracks.api.exceptions.Warning; +import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.hdfs.dataflow.ConfFactory; import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory; @@ -160,8 +161,10 @@ } } catch (FileNotFoundException ex) { throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); - } catch (InterruptedException | IOException ex) { + } catch (InterruptedException ex) { throw HyracksDataException.create(ex); + } catch (IOException ex) { + throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ExceptionUtils.getMessageOrToString(ex)); } } @@ -229,10 +232,11 @@ * 1. when target files are not null, it generates a file aware input stream that validate * against the files * 2. if the data is binary, it returns a generic reader */ - public AsterixInputStream createInputStream(IHyracksTaskContext ctx) throws HyracksDataException { + public AsterixInputStream createInputStream(IHyracksTaskContext ctx, IExternalDataRuntimeContext context) + throws HyracksDataException { try { restoreConfig(ctx); - return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, ugi); + return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, ugi, context); } catch (Exception e) { throw HyracksDataException.create(e); } @@ -307,7 +311,7 @@ try { if (recordReaderClazz != null) { StreamRecordReader streamReader = (StreamRecordReader) recordReaderClazz.getConstructor().newInstance(); - streamReader.configure(ctx, createInputStream(ctx), configuration); + streamReader.configure(ctx, createInputStream(ctx, context), configuration); return streamReader; } restoreConfig(ctx); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java index d508336..c1bad6c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStream.java @@ -24,9 +24,12 @@ import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.external.api.AsterixInputStream; +import org.apache.asterix.external.api.IExternalDataRuntimeContext; +import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder; import org.apache.asterix.external.input.record.reader.hdfs.EmptyRecordReader; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -49,12 +52,14 @@ private JobConf conf; private int pos = 0; private UserGroupInformation ugi; + private IExternalFilterValueEmbedder valueEmbedder; @SuppressWarnings("unchecked") public HDFSInputStream(boolean read[], InputSplit[] inputSplits, String[] readSchedule, String nodeName, - JobConf conf, Map<String, String> configuration, UserGroupInformation ugi) - throws IOException, AsterixException { + JobConf conf, Map<String, String> configuration, UserGroupInformation ugi, + IExternalDataRuntimeContext context) throws IOException, AsterixException { this.ugi = ugi; + this.valueEmbedder = context.getValueEmbedder(); this.read = read; this.inputSplits = inputSplits; this.readSchedule = readSchedule; @@ -169,6 +174,7 @@ } private RecordReader<Object, Text> getRecordReader(int splitIndex) throws IOException { + valueEmbedder.setPath(getPath(inputSplits[splitIndex])); try { reader = ugi == null ? getReader(splitIndex) : ugi.doAs((PrivilegedExceptionAction<RecordReader<Object, Text>>) () -> getReader(splitIndex)); @@ -187,4 +193,12 @@ private RecordReader<Object, Text> getReader(int splitIndex) throws IOException { return (RecordReader<Object, Text>) inputFormat.getRecordReader(inputSplits[splitIndex], conf, Reporter.NULL); } + + private String getPath(InputSplit split) { + if (split instanceof FileSplit) { + return ((FileSplit) split).getPath().toString(); + } else { + return split.toString(); + } + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index 1de2cd2..d487e68 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -179,6 +179,7 @@ public static final String CLASS_NAME_HDFS_FILESYSTEM = "org.apache.hadoop.hdfs.DistributedFileSystem"; public static final String S3A_CHANGE_DETECTION_REQUIRED = "requireVersionChangeDetection"; public static final String S3A_CHANGE_DETECTION_REQUIRED_CONFIG_KEY = "fs.s3a.change.detection.version.required"; + public static final String HDFS_IO_COMPRESSION_CODECS_KEY = "io.compression.codecs"; /** * input formats aliases */ diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index f06638d..4d093f5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -1134,8 +1134,7 @@ protocol = nodePathPair[0]; break; case ExternalDataConstants.KEY_ADAPTER_NAME_HDFS: - protocol = ExternalDataConstants.KEY_HDFS_URL; - break; + return configurations.get(ExternalDataConstants.KEY_HDFS_URL).replaceAll("/+$", ""); default: return ""; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java index 75d68ba..35f2a94 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java @@ -70,6 +70,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -262,6 +263,7 @@ if (useDatanodeHostname != null) { conf.set(ExternalDataConstants.KEY_HDFS_USE_DATANODE_HOSTNAME, useDatanodeHostname); } + conf.set(ExternalDataConstants.HDFS_IO_COMPRESSION_CODECS_KEY, AliasGzipCodec.class.getName()); return conf; } @@ -547,6 +549,9 @@ public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc, IWarningCollector collector) throws CompilationException { + if (configuration.get(ExternalDataConstants.KEY_HDFS_URL) == null) { + throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_HDFS_URL); + } if (configuration.get(ExternalDataConstants.KEY_INPUT_FORMAT) == null) { throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_INPUT_FORMAT); @@ -590,4 +595,11 @@ return ExternalDataConstants.KEY_ADAPTER_NAME_HDFS .equalsIgnoreCase(configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)); } + + public static class AliasGzipCodec extends GzipCodec { + @Override + public String getDefaultExtension() { + return "." + ExternalDataConstants.KEY_COMPRESSION_GZIP; + } + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java index 9203c8f..f06b8e9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.external.writer; +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -79,7 +81,7 @@ } } } catch (IOException ex) { - throw HyracksDataException.create(ex); + throw RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR, ex, getMessageOrToString(ex)); } } } @@ -94,8 +96,8 @@ printer.newStream(outputStream); } catch (FileAlreadyExistsException e) { return false; - } catch (IOException e) { - throw HyracksDataException.create(e); + } catch (IOException ex) { + throw RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR, ex, getMessageOrToString(ex)); } return true; } @@ -113,7 +115,7 @@ fs.delete(path, false); } } catch (IOException ex) { - throw HyracksDataException.create(ex); + throw RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR, ex, getMessageOrToString(ex)); } } @@ -125,7 +127,7 @@ fs.rename(path, new Path(path.getParent(), path.getName().substring(1))); } } catch (IOException ex) { - throw HyracksDataException.create(ex); + throw RuntimeDataException.create(ErrorCode.EXTERNAL_SINK_ERROR, ex, getMessageOrToString(ex)); } } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java index a4113d1..7bb07bc 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java @@ -18,7 +18,6 @@ */ package org.apache.asterix.external.writer; -import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.IOException; @@ -89,7 +88,7 @@ } catch (InterruptedException ex) { throw HyracksDataException.create(ex); } catch (IOException ex) { - throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); + throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ex, getMessageOrToString(ex)); } } @@ -135,7 +134,7 @@ doValidate(testFs); } } catch (IOException ex) { - throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, ExceptionUtils.getMessageOrToString(ex)); + throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ExceptionUtils.getMessageOrToString(ex)); } } @@ -171,7 +170,7 @@ outputStream.write(0); } } catch (IOException ex) { - throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, ex, getMessageOrToString(ex)); + throw new CompilationException(ErrorCode.EXTERNAL_SINK_ERROR, ex, getMessageOrToString(ex)); } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19130 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: Icdb0b51cf7444de4074e22b78f02d3b29b07414f Gerrit-Change-Number: 19130 Gerrit-PatchSet: 4 Gerrit-Owner: Savyasach Reddy <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-MessageType: merged
