>From Savyasach Reddy <[email protected]>:
Savyasach Reddy has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18710 )
Change subject: Write to HDFS
......................................................................
Write to HDFS
Change-Id: I067d8cf9b55adf69d78e033983351d3b53af3fed
---
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
5 files changed, 309 insertions(+), 1 deletion(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/10/18710/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 6a4b336..3b7530a 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -157,6 +157,7 @@
public static final String KEY_ADAPTER_NAME_AZURE_BLOB = "AZUREBLOB";
public static final String KEY_ADAPTER_NAME_AZURE_DATA_LAKE =
"AZUREDATALAKE";
public static final String KEY_ADAPTER_NAME_GCS = "GCS";
+ public static final String KEY_ADAPTER_NAME_HDFS = "hdfs";
/**
* HDFS class names
@@ -334,7 +335,7 @@
static {
WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE,
FORMAT_PARQUET);
WRITER_SUPPORTED_ADAPTERS =
Set.of(ALIAS_LOCALFS_ADAPTER.toLowerCase(),
KEY_ADAPTER_NAME_AWS_S3.toLowerCase(),
- KEY_ADAPTER_NAME_GCS.toLowerCase());
+ KEY_ADAPTER_NAME_GCS.toLowerCase(),
ALIAS_HDFS_ADAPTER.toLowerCase());
TEXTUAL_WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
PARQUET_WRITER_SUPPORTED_COMPRESSION =
Set.of(KEY_COMPRESSION_GZIP, KEY_COMPRESSION_SNAPPY,
KEY_COMPRESSION_ZSTD);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 6bc013a..6b1b1c7 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -20,6 +20,7 @@
import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
+import static
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -197,6 +198,7 @@
String localShortCircuitSocketPath =
configuration.get(ExternalDataConstants.KEY_LOCAL_SOCKET_PATH);
String formatClassName =
HDFSUtils.getInputFormatClassName(configuration);
String url = configuration.get(ExternalDataConstants.KEY_HDFS_URL);
+ conf.set(DFS_CLIENT_USE_DN_HOSTNAME, "true");
//Allow hdfs adapter to read from local-files. However, this only
works in a single-node configuration.
if (url != null && url.trim().startsWith("hdfs")) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
new file mode 100644
index 0000000..8022a3d
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import static
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class HDFSExternalFileWriter implements IExternalFileWriter {
+
+ private final IExternalPrinter printer;
+
+ private FileSystem fs = null;
+ private FSDataOutputStream outputStream = null;
+ private final URI hdfsUri;
+
+ HDFSExternalFileWriter(IExternalPrinter printer, URI hdfsUri) {
+ this.printer = printer;
+ this.hdfsUri = hdfsUri;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ printer.open();
+ Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", String.valueOf(hdfsUri));
+ conf.set(DFS_CLIENT_USE_DN_HOSTNAME, "true");
+ HDFSUtils.disableHadoopFileSystemCache(conf, "hdfs");
+ try {
+ fs = FileSystem.get(hdfsUri, conf);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void validate(String directory) throws HyracksDataException {
+ // Path dirPath = new Path(directory);
+ // FileStatus[] fileStatuses = null;
+ // try {
+ // fileStatuses = fs.listStatus(dirPath);
+ // } catch (FileNotFoundException e) {
+ // return;
+ // } catch (IOException e) {
+ // throw HyracksDataException.create(e);
+ // }
+ //
+ // for (FileStatus fileStatus : fileStatuses) {
+ // if (fileStatus.isFile()) {
+ // throw new
RuntimeDataException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, directory, directory);
+ // }
+ // }
+ }
+
+ @Override
+ public boolean newFile(String directory, String fileName) throws
HyracksDataException {
+ Path path = new Path(directory, fileName);
+ try {
+ outputStream = fs.create(path, false);
+ printer.newStream(outputStream);
+ } catch (FileAlreadyExistsException e) {
+ return false;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ return true;
+ }
+
+ @Override
+ public void write(IValueReference value) throws HyracksDataException {
+ try {
+ printer.print(value);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ try {
+ if (outputStream != null) {
+ outputStream.abort();
+ outputStream.close();
+ outputStream = null;
+ }
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+ printer.close();
+ } catch (HyracksDataException e) {
+ throw e;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ printer.close();
+ if (outputStream != null) {
+ outputStream.close();
+ outputStream = null;
+ }
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+ } catch (HyracksDataException e) {
+ throw e;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public static void main(String[] args) {
+ String hdfsUri =
"hdfs://ec2-18-118-211-217.us-east-2.compute.amazonaws.com:9000";
+ Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", hdfsUri);
+ HDFSUtils.disableHadoopFileSystemCache(conf, "hdfs");
+ conf.set(DFS_CLIENT_USE_DN_HOSTNAME, "true");
+
+ FileSystem fs = null;
+ try {
+ // Get the FileSystem instance
+ fs = FileSystem.get(conf);
+
+ // Define a file path in S3 and write data to it
+ long startTime = System.nanoTime();
+ Path filePath = new Path("/user/ubuntu/input/file.txt");
+ try (FSDataOutputStream outputStream = fs.create(filePath)) {
+ String data = "Hello, S3!";
+ outputStream.writeUTF(data);
+ outputStream.close();
+ System.out.println("Data written to HDFS successfully.");
+ }
+ long endTime = System.nanoTime();
+ double elapsedTimeInSeconds = (endTime - startTime) /
1_000_000_000.0;
+
+ // Print the elapsed time
+ System.out.printf("Elapsed time: %.3f seconds%n",
elapsedTimeInSeconds);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ // Clean up and close the FileSystem
+ try {
+ if (fs != null) {
+ fs.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
new file mode 100644
index 0000000..e685607
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
+
+public class HDFSExternalFileWriterFactory implements
IExternalFileWriterFactory {
+
+ private static final char SEPARATOR = '/';
+ public static final IExternalFileWriterFactoryProvider PROVIDER = new
IExternalFileWriterFactoryProvider() {
+ @Override
+ public IExternalFileWriterFactory
create(ExternalFileWriterConfiguration configuration) {
+ return new HDFSExternalFileWriterFactory(configuration);
+ }
+
+ @Override
+ public char getSeparator() {
+ return SEPARATOR;
+ }
+ };
+
+ private final Map<String, String> configuration;
+ private final String staticPath;
+ private final URI hdfsUri;
+
+ private HDFSExternalFileWriterFactory(ExternalFileWriterConfiguration
externalConfig) {
+ configuration = externalConfig.getConfiguration();
+ staticPath = externalConfig.getStaticPath();
+ hdfsUri =
URI.create(configuration.get(ExternalDataConstants.KEY_ADAPTER_NAME_HDFS));
+ }
+
+ @Override
+ public IExternalFileWriter createWriter(IHyracksTaskContext context,
IExternalPrinterFactory printerFactory)
+ throws HyracksDataException {
+ IExternalPrinter printer = printerFactory.createPrinter();
+ return new HDFSExternalFileWriter(printer, hdfsUri);
+ }
+
+ @Override
+ public char getSeparator() {
+ return SEPARATOR;
+ }
+
+ @Override
+ public void validate() throws AlgebricksException {
+ Path dirPath = new Path(staticPath);
+ FileStatus[] fileStatuses = null;
+ try {
+ Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", String.valueOf(hdfsUri));
+ HDFSUtils.disableHadoopFileSystemCache(conf, "defaultFS");
+ FileSystem fs = FileSystem.get(hdfsUri, conf);
+ fileStatuses = fs.listStatus(dirPath);
+ } catch (FileNotFoundException e) {
+ return;
+ } catch (IOException e) {
+ throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
ExceptionUtils.getMessageOrToString(e));
+ }
+
+ for (FileStatus fileStatus : fileStatuses) {
+ if (fileStatus.isFile()) {
+ try {
+ throw new
RuntimeDataException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, staticPath);
+ } catch (Exception e) {
+ throw
CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
+ ExceptionUtils.getMessageOrToString(e));
+ }
+ }
+ }
+ }
+}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index c931a93..c5d4cda 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.writer.HDFSExternalFileWriterFactory;
import org.apache.asterix.external.writer.LocalFSExternalFileWriterFactory;
import
org.apache.asterix.external.writer.compressor.GzipExternalFileCompressStreamFactory;
import
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
@@ -57,6 +58,7 @@
addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS,
LocalFSExternalFileWriterFactory.PROVIDER);
addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3,
S3ExternalFileWriterFactory.PROVIDER);
addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_GCS,
GCSExternalFileWriterFactory.PROVIDER);
+ addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_HDFS,
HDFSExternalFileWriterFactory.PROVIDER);
}
public static IExternalFileWriterFactory
createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18710
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I067d8cf9b55adf69d78e033983351d3b53af3fed
Gerrit-Change-Number: 18710
Gerrit-PatchSet: 1
Gerrit-Owner: Savyasach Reddy <[email protected]>
Gerrit-MessageType: newchange