>From Savyasach Reddy <[email protected]>:

Savyasach Reddy has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18710 )


Change subject: Write to HDFS
......................................................................

Write to HDFS

Change-Id: I067d8cf9b55adf69d78e033983351d3b53af3fed
---
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
5 files changed, 309 insertions(+), 1 deletion(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/10/18710/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 6a4b336..3b7530a 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -157,6 +157,7 @@
     public static final String KEY_ADAPTER_NAME_AZURE_BLOB = "AZUREBLOB";
     public static final String KEY_ADAPTER_NAME_AZURE_DATA_LAKE = 
"AZUREDATALAKE";
     public static final String KEY_ADAPTER_NAME_GCS = "GCS";
+    public static final String KEY_ADAPTER_NAME_HDFS = "hdfs";

     /**
      * HDFS class names
@@ -334,7 +335,7 @@
     static {
         WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE, 
FORMAT_PARQUET);
         WRITER_SUPPORTED_ADAPTERS = 
Set.of(ALIAS_LOCALFS_ADAPTER.toLowerCase(), 
KEY_ADAPTER_NAME_AWS_S3.toLowerCase(),
-                KEY_ADAPTER_NAME_GCS.toLowerCase());
+                KEY_ADAPTER_NAME_GCS.toLowerCase(), 
ALIAS_HDFS_ADAPTER.toLowerCase());
         TEXTUAL_WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
         PARQUET_WRITER_SUPPORTED_COMPRESSION =
                 Set.of(KEY_COMPRESSION_GZIP, KEY_COMPRESSION_SNAPPY, 
KEY_COMPRESSION_ZSTD);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 6bc013a..6b1b1c7 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -20,6 +20,7 @@

 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;

 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -197,6 +198,7 @@
         String localShortCircuitSocketPath = 
configuration.get(ExternalDataConstants.KEY_LOCAL_SOCKET_PATH);
         String formatClassName = 
HDFSUtils.getInputFormatClassName(configuration);
         String url = configuration.get(ExternalDataConstants.KEY_HDFS_URL);
+        conf.set(DFS_CLIENT_USE_DN_HOSTNAME, "true");

         //Allow hdfs adapter to read from local-files. However, this only 
works in a single-node configuration.
         if (url != null && url.trim().startsWith("hdfs")) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
new file mode 100644
index 0000000..8022a3d
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class HDFSExternalFileWriter implements IExternalFileWriter {
+
+    private final IExternalPrinter printer;
+
+    private FileSystem fs = null;
+    private FSDataOutputStream outputStream = null;
+    private final URI hdfsUri;
+
+    HDFSExternalFileWriter(IExternalPrinter printer, URI hdfsUri) {
+        this.printer = printer;
+        this.hdfsUri = hdfsUri;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        printer.open();
+        Configuration conf = new Configuration();
+        conf.set("fs.defaultFS", String.valueOf(hdfsUri));
+        conf.set(DFS_CLIENT_USE_DN_HOSTNAME, "true");
+        HDFSUtils.disableHadoopFileSystemCache(conf, "hdfs");
+        try {
+            fs = FileSystem.get(hdfsUri, conf);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void validate(String directory) throws HyracksDataException {
+        //        Path dirPath = new Path(directory);
+        //        FileStatus[] fileStatuses = null;
+        //        try {
+        //            fileStatuses = fs.listStatus(dirPath);
+        //        } catch (FileNotFoundException e) {
+        //            return;
+        //        } catch (IOException e) {
+        //            throw HyracksDataException.create(e);
+        //        }
+        //
+        //        for (FileStatus fileStatus : fileStatuses) {
+        //            if (fileStatus.isFile()) {
+        //                throw new 
RuntimeDataException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, directory, directory);
+        //            }
+        //        }
+    }
+
+    @Override
+    public boolean newFile(String directory, String fileName) throws 
HyracksDataException {
+        Path path = new Path(directory, fileName);
+        try {
+            outputStream = fs.create(path, false);
+            printer.newStream(outputStream);
+        } catch (FileAlreadyExistsException e) {
+            return false;
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+        return true;
+    }
+
+    @Override
+    public void write(IValueReference value) throws HyracksDataException {
+        try {
+            printer.print(value);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        try {
+            if (outputStream != null) {
+                outputStream.abort();
+                outputStream.close();
+                outputStream = null;
+            }
+            if (fs != null) {
+                fs.close();
+                fs = null;
+            }
+            printer.close();
+        } catch (HyracksDataException e) {
+            throw e;
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            printer.close();
+            if (outputStream != null) {
+                outputStream.close();
+                outputStream = null;
+            }
+            if (fs != null) {
+                fs.close();
+                fs = null;
+            }
+        } catch (HyracksDataException e) {
+            throw e;
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static void main(String[] args) {
+        String hdfsUri = 
"hdfs://ec2-18-118-211-217.us-east-2.compute.amazonaws.com:9000";
+        Configuration conf = new Configuration();
+        conf.set("fs.defaultFS", hdfsUri);
+        HDFSUtils.disableHadoopFileSystemCache(conf, "hdfs");
+        conf.set(DFS_CLIENT_USE_DN_HOSTNAME, "true");
+
+        FileSystem fs = null;
+        try {
+            // Get the FileSystem instance
+            fs = FileSystem.get(conf);
+
+            // Define a file path in S3 and write data to it
+            long startTime = System.nanoTime();
+            Path filePath = new Path("/user/ubuntu/input/file.txt");
+            try (FSDataOutputStream outputStream = fs.create(filePath)) {
+                String data = "Hello, S3!";
+                outputStream.writeUTF(data);
+                outputStream.close();
+                System.out.println("Data written to HDFS successfully.");
+            }
+            long endTime = System.nanoTime();
+            double elapsedTimeInSeconds = (endTime - startTime) / 
1_000_000_000.0;
+
+            // Print the elapsed time
+            System.out.printf("Elapsed time: %.3f seconds%n", 
elapsedTimeInSeconds);
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            // Clean up and close the FileSystem
+            try {
+                if (fs != null) {
+                    fs.close();
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
new file mode 100644
index 0000000..e685607
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
+
+public class HDFSExternalFileWriterFactory implements 
IExternalFileWriterFactory {
+
+    private static final char SEPARATOR = '/';
+    public static final IExternalFileWriterFactoryProvider PROVIDER = new 
IExternalFileWriterFactoryProvider() {
+        @Override
+        public IExternalFileWriterFactory 
create(ExternalFileWriterConfiguration configuration) {
+            return new HDFSExternalFileWriterFactory(configuration);
+        }
+
+        @Override
+        public char getSeparator() {
+            return SEPARATOR;
+        }
+    };
+
+    private final Map<String, String> configuration;
+    private final String staticPath;
+    private final URI hdfsUri;
+
+    private HDFSExternalFileWriterFactory(ExternalFileWriterConfiguration 
externalConfig) {
+        configuration = externalConfig.getConfiguration();
+        staticPath = externalConfig.getStaticPath();
+        hdfsUri = 
URI.create(configuration.get(ExternalDataConstants.KEY_ADAPTER_NAME_HDFS));
+    }
+
+    @Override
+    public IExternalFileWriter createWriter(IHyracksTaskContext context, 
IExternalPrinterFactory printerFactory)
+            throws HyracksDataException {
+        IExternalPrinter printer = printerFactory.createPrinter();
+        return new HDFSExternalFileWriter(printer, hdfsUri);
+    }
+
+    @Override
+    public char getSeparator() {
+        return SEPARATOR;
+    }
+
+    @Override
+    public void validate() throws AlgebricksException {
+        Path dirPath = new Path(staticPath);
+        FileStatus[] fileStatuses = null;
+        try {
+            Configuration conf = new Configuration();
+            conf.set("fs.defaultFS", String.valueOf(hdfsUri));
+            HDFSUtils.disableHadoopFileSystemCache(conf, "defaultFS");
+            FileSystem fs = FileSystem.get(hdfsUri, conf);
+            fileStatuses = fs.listStatus(dirPath);
+        } catch (FileNotFoundException e) {
+            return;
+        } catch (IOException e) {
+            throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, 
ExceptionUtils.getMessageOrToString(e));
+        }
+
+        for (FileStatus fileStatus : fileStatuses) {
+            if (fileStatus.isFile()) {
+                try {
+                    throw new 
RuntimeDataException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, staticPath);
+                } catch (Exception e) {
+                    throw 
CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR,
+                            ExceptionUtils.getMessageOrToString(e));
+                }
+            }
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index c931a93..c5d4cda 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.writer.HDFSExternalFileWriterFactory;
 import org.apache.asterix.external.writer.LocalFSExternalFileWriterFactory;
 import 
org.apache.asterix.external.writer.compressor.GzipExternalFileCompressStreamFactory;
 import 
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
@@ -57,6 +58,7 @@
         addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS, 
LocalFSExternalFileWriterFactory.PROVIDER);
         addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3, 
S3ExternalFileWriterFactory.PROVIDER);
         addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_GCS, 
GCSExternalFileWriterFactory.PROVIDER);
+        addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_HDFS, 
HDFSExternalFileWriterFactory.PROVIDER);
     }

     public static IExternalFileWriterFactory 
createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18710
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I067d8cf9b55adf69d78e033983351d3b53af3fed
Gerrit-Change-Number: 18710
Gerrit-PatchSet: 1
Gerrit-Owner: Savyasach Reddy <[email protected]>
Gerrit-MessageType: newchange

Reply via email to