>From Savyasach Reddy <[email protected]>:
Savyasach Reddy has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18767 )
Change subject: Write to HDFS
......................................................................
Write to HDFS
Change-Id: Ifc4ed9fb17a7540cf444a2f8bef590756b909988
---
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
5 files changed, 281 insertions(+), 1 deletion(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/67/18767/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 02c2070..4999208 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -157,6 +157,7 @@
public static final String KEY_ADAPTER_NAME_AZURE_BLOB = "AZUREBLOB";
public static final String KEY_ADAPTER_NAME_AZURE_DATA_LAKE =
"AZUREDATALAKE";
public static final String KEY_ADAPTER_NAME_GCS = "GCS";
+ public static final String KEY_ADAPTER_NAME_HDFS = "HDFS";
/**
* HDFS class names
@@ -174,6 +175,10 @@
public static final String INPUT_FORMAT_TEXT = "text-input-format";
public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
public static final String INPUT_FORMAT_PARQUET = "parquet-input-format";
+
+ public static final String HDFS_BLOCKSIZE = "blocksize";
+ public static final String HDFS_REPLICATION = "replication";
+
/**
* Builtin streams
*/
@@ -341,7 +346,7 @@
static {
WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE,
FORMAT_PARQUET);
WRITER_SUPPORTED_ADAPTERS =
Set.of(ALIAS_LOCALFS_ADAPTER.toLowerCase(),
KEY_ADAPTER_NAME_AWS_S3.toLowerCase(),
- KEY_ADAPTER_NAME_GCS.toLowerCase());
+ KEY_ADAPTER_NAME_GCS.toLowerCase(),
KEY_ADAPTER_NAME_HDFS.toLowerCase());
TEXTUAL_WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
PARQUET_WRITER_SUPPORTED_COMPRESSION =
Set.of(KEY_COMPRESSION_GZIP, KEY_COMPRESSION_SNAPPY,
KEY_COMPRESSION_ZSTD);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 6bc013a..dde8e5d 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -20,6 +20,11 @@
import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
+import static
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
+import static
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
+import static
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT;
+import static
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_KEY;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -197,6 +202,7 @@
String localShortCircuitSocketPath =
configuration.get(ExternalDataConstants.KEY_LOCAL_SOCKET_PATH);
String formatClassName =
HDFSUtils.getInputFormatClassName(configuration);
String url = configuration.get(ExternalDataConstants.KEY_HDFS_URL);
+ conf.set(DFS_CLIENT_USE_DN_HOSTNAME, "true");
//Allow hdfs adapter to read from local-files. However, this only
works in a single-node configuration.
if (url != null && url.trim().startsWith("hdfs")) {
@@ -226,6 +232,25 @@
return conf;
}
+ public static Configuration configureHDFSwrite(Map<String, String>
configuration) throws HyracksDataException {
+ Configuration conf = new Configuration();
+ String url = configuration.get(ExternalDataConstants.KEY_HDFS_URL);
+ String blocksize =
configuration.getOrDefault(ExternalDataConstants.HDFS_BLOCKSIZE,
+ String.valueOf(DFS_BLOCK_SIZE_DEFAULT));
+ String replication =
configuration.getOrDefault(ExternalDataConstants.HDFS_REPLICATION,
+ String.valueOf(DFS_REPLICATION_DEFAULT));
+ if (url == null || !url.trim().startsWith("hdfs")) {
+ throw new HyracksDataException("Wrong URL for HDFS");
+ }
+ conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, url);
+ conf.set(DFS_BLOCK_SIZE_KEY, blocksize);
+ conf.set(DFS_REPLICATION_KEY, replication);
+ // TODO: Is this necessary?
+ disableHadoopFileSystemCache(conf, "hdfs");
+ conf.set(DFS_CLIENT_USE_DN_HOSTNAME, "true");
+ return conf;
+ }
+
private static void configureParquet(Map<String, String> configuration,
JobConf conf) {
//Parquet configurations
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
ParquetReadSupport.class.getName());
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
new file mode 100644
index 0000000..0dc0f44
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import java.io.IOException;
+
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class HDFSExternalFileWriter implements IExternalFileWriter {
+
+ private final IExternalPrinter printer;
+ private final FileSystem fs;
+ private FSDataOutputStream outputStream = null;
+
+ HDFSExternalFileWriter(IExternalPrinter printer, FileSystem fs) {
+ this.printer = printer;
+ this.fs = fs;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ printer.open();
+ }
+
+ @Override
+ public void validate(String directory) throws HyracksDataException {
+ // TODO: What should be validated over here?
+ }
+
+ @Override
+ public boolean newFile(String directory, String fileName) throws
HyracksDataException {
+ Path path = new Path(directory, fileName);
+ try {
+ outputStream = fs.create(path, false);
+ printer.newStream(outputStream);
+ } catch (FileAlreadyExistsException e) {
+ return false;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ return true;
+ }
+
+ @Override
+ public void write(IValueReference value) throws HyracksDataException {
+ try {
+ printer.print(value);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ if (outputStream != null) {
+ outputStream.abort();
+ }
+ printer.close();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ printer.close();
+ } catch (HyracksDataException e) {
+ throw e;
+ }
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
new file mode 100644
index 0000000..e6110c1
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import static
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.util.ExceptionUtils;
+
+public class HDFSExternalFileWriterFactory implements
IExternalFileWriterFactory {
+ private static final long serialVersionUID = 1L;
+ private static final char SEPARATOR = '/';
+ public static final IExternalFileWriterFactoryProvider PROVIDER = new
IExternalFileWriterFactoryProvider() {
+ @Override
+ public IExternalFileWriterFactory
create(ExternalFileWriterConfiguration configuration) {
+ return new HDFSExternalFileWriterFactory(configuration);
+ }
+
+ @Override
+ public char getSeparator() {
+ return SEPARATOR;
+ }
+ };
+
+ private final Map<String, String> configuration;
+ private final String staticPath;
+ private final SourceLocation pathSourceLocation;
+ private transient FileSystem fs;
+
+ private HDFSExternalFileWriterFactory(ExternalFileWriterConfiguration
externalConfig) {
+ configuration = externalConfig.getConfiguration();
+ staticPath = externalConfig.getStaticPath();
+ pathSourceLocation = externalConfig.getPathSourceLocation();
+ }
+
+ private FileSystem createFileSystem() throws CompilationException {
+ try {
+ Configuration conf = HDFSUtils.configureHDFSwrite(configuration);
+ return FileSystem.get(conf);
+ } catch (IOException ex) {
+ throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex,
getMessageOrToString(ex));
+ }
+ }
+
+ private void buildFileSystem() throws HyracksDataException {
+ try {
+ synchronized (this) {
+ if (fs == null) {
+ fs = createFileSystem();
+ }
+ }
+ } catch (CompilationException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public IExternalFileWriter createWriter(IHyracksTaskContext context,
IExternalPrinterFactory printerFactory)
+ throws HyracksDataException {
+ buildFileSystem();
+ IExternalPrinter printer = printerFactory.createPrinter();
+ return new HDFSExternalFileWriter(printer, fs);
+ }
+
+ @Override
+ public char getSeparator() {
+ return SEPARATOR;
+ }
+
+ @Override
+ public void validate() throws AlgebricksException {
+ try (FileSystem testFs = createFileSystem()) {
+ Path dirPath = new Path(staticPath);
+ if (testFs.exists(dirPath)) {
+ FileStatus fileStatus = testFs.getFileStatus(dirPath);
+ if (fileStatus.isFile()) {
+ // TODO: Should a new Error Code be created for this case?
+ throw new
CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation,
staticPath);
+ }
+ if (fileStatus.isDirectory()) {
+ FileStatus[] fileStatuses = testFs.listStatus(dirPath);
+ if (fileStatuses.length != 0) {
+ throw new
CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation,
+ staticPath);
+ }
+ }
+ }
+ checkDirectoryWritePermission(testFs);
+ } catch (IOException ex) {
+ throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
ExceptionUtils.getMessageOrToString(ex));
+ }
+ }
+
+ private void checkDirectoryWritePermission(FileSystem fs) throws
AlgebricksException {
+ if
(!Boolean.parseBoolean(configuration.getOrDefault(ExternalDataConstants.KEY_VALIDATE_WRITE_PERMISSION,
+ Boolean.TRUE.toString()))) {
+ return;
+ }
+ Path path = new Path(staticPath, "testFile");
+ try {
+ FSDataOutputStream outputStream = fs.create(path);
+ outputStream.write(0);
+ outputStream.close();
+ fs.delete(path, false);
+ } catch (IOException ex) {
+ throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
ex, getMessageOrToString(ex));
+ }
+ }
+}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 23b9f93..319df12 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -28,6 +28,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.writer.HDFSExternalFileWriterFactory;
import org.apache.asterix.external.writer.LocalFSExternalFileWriterFactory;
import
org.apache.asterix.external.writer.compressor.GzipExternalFileCompressStreamFactory;
import
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
@@ -59,6 +60,7 @@
addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS,
LocalFSExternalFileWriterFactory.PROVIDER);
addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3,
S3ExternalFileWriterFactory.PROVIDER);
addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_GCS,
GCSExternalFileWriterFactory.PROVIDER);
+ addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_HDFS,
HDFSExternalFileWriterFactory.PROVIDER);
}
public static IExternalFileWriterFactory
createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18767
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ifc4ed9fb17a7540cf444a2f8bef590756b909988
Gerrit-Change-Number: 18767
Gerrit-PatchSet: 1
Gerrit-Owner: Savyasach Reddy <[email protected]>
Gerrit-MessageType: newchange