>From Savyasach Reddy <[email protected]>:

Savyasach Reddy has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18767 )


Change subject: Write to HDFS
......................................................................

Write to HDFS

Change-Id: Ifc4ed9fb17a7540cf444a2f8bef590756b909988
---
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
5 files changed, 281 insertions(+), 1 deletion(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/67/18767/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 02c2070..4999208 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -157,6 +157,7 @@
     public static final String KEY_ADAPTER_NAME_AZURE_BLOB = "AZUREBLOB";
     public static final String KEY_ADAPTER_NAME_AZURE_DATA_LAKE = 
"AZUREDATALAKE";
     public static final String KEY_ADAPTER_NAME_GCS = "GCS";
+    public static final String KEY_ADAPTER_NAME_HDFS = "HDFS";

     /**
      * HDFS class names
@@ -174,6 +175,10 @@
     public static final String INPUT_FORMAT_TEXT = "text-input-format";
     public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
     public static final String INPUT_FORMAT_PARQUET = "parquet-input-format";
+
+    public static final String HDFS_BLOCKSIZE = "blocksize";
+    public static final String HDFS_REPLICATION = "replication";
+
     /**
      * Builtin streams
      */
@@ -341,7 +346,7 @@
     static {
         WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE, 
FORMAT_PARQUET);
         WRITER_SUPPORTED_ADAPTERS = 
Set.of(ALIAS_LOCALFS_ADAPTER.toLowerCase(), 
KEY_ADAPTER_NAME_AWS_S3.toLowerCase(),
-                KEY_ADAPTER_NAME_GCS.toLowerCase());
+                KEY_ADAPTER_NAME_GCS.toLowerCase(), 
KEY_ADAPTER_NAME_HDFS.toLowerCase());
         TEXTUAL_WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
         PARQUET_WRITER_SUPPORTED_COMPRESSION =
                 Set.of(KEY_COMPRESSION_GZIP, KEY_COMPRESSION_SNAPPY, 
KEY_COMPRESSION_ZSTD);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 6bc013a..dde8e5d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -20,6 +20,11 @@

 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_KEY;

 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -197,6 +202,7 @@
         String localShortCircuitSocketPath = 
configuration.get(ExternalDataConstants.KEY_LOCAL_SOCKET_PATH);
         String formatClassName = 
HDFSUtils.getInputFormatClassName(configuration);
         String url = configuration.get(ExternalDataConstants.KEY_HDFS_URL);
+        conf.set(DFS_CLIENT_USE_DN_HOSTNAME, "true");

         //Allow hdfs adapter to read from local-files. However, this only 
works in a single-node configuration.
         if (url != null && url.trim().startsWith("hdfs")) {
@@ -226,6 +232,25 @@
         return conf;
     }

+    public static Configuration configureHDFSwrite(Map<String, String> 
configuration) throws HyracksDataException {
+        Configuration conf = new Configuration();
+        String url = configuration.get(ExternalDataConstants.KEY_HDFS_URL);
+        String blocksize = 
configuration.getOrDefault(ExternalDataConstants.HDFS_BLOCKSIZE,
+                String.valueOf(DFS_BLOCK_SIZE_DEFAULT));
+        String replication = 
configuration.getOrDefault(ExternalDataConstants.HDFS_REPLICATION,
+                String.valueOf(DFS_REPLICATION_DEFAULT));
+        if (url == null || !url.trim().startsWith("hdfs")) {
+            throw new HyracksDataException("Wrong URL for HDFS");
+        }
+        conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, url);
+        conf.set(DFS_BLOCK_SIZE_KEY, blocksize);
+        conf.set(DFS_REPLICATION_KEY, replication);
+        // TODO: Is this necessary?
+        disableHadoopFileSystemCache(conf, "hdfs");
+        conf.set(DFS_CLIENT_USE_DN_HOSTNAME, "true");
+        return conf;
+    }
+
     private static void configureParquet(Map<String, String> configuration, 
JobConf conf) {
         //Parquet configurations
         conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
ParquetReadSupport.class.getName());
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
new file mode 100644
index 0000000..0dc0f44
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import java.io.IOException;
+
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class HDFSExternalFileWriter implements IExternalFileWriter {
+
+    private final IExternalPrinter printer;
+    private final FileSystem fs;
+    private FSDataOutputStream outputStream = null;
+
+    HDFSExternalFileWriter(IExternalPrinter printer, FileSystem fs) {
+        this.printer = printer;
+        this.fs = fs;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        printer.open();
+    }
+
+    @Override
+    public void validate(String directory) throws HyracksDataException {
+        // TODO: What should be validated over here?
+    }
+
+    @Override
+    public boolean newFile(String directory, String fileName) throws 
HyracksDataException {
+        Path path = new Path(directory, fileName);
+        try {
+            outputStream = fs.create(path, false);
+            printer.newStream(outputStream);
+        } catch (FileAlreadyExistsException e) {
+            return false;
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+        return true;
+    }
+
+    @Override
+    public void write(IValueReference value) throws HyracksDataException {
+        try {
+            printer.print(value);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        if (outputStream != null) {
+            outputStream.abort();
+        }
+        printer.close();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            printer.close();
+        } catch (HyracksDataException e) {
+            throw e;
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
new file mode 100644
index 0000000..e6110c1
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import static 
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.util.ExceptionUtils;
+
+public class HDFSExternalFileWriterFactory implements 
IExternalFileWriterFactory {
+    private static final long serialVersionUID = 1L;
+    private static final char SEPARATOR = '/';
+    public static final IExternalFileWriterFactoryProvider PROVIDER = new 
IExternalFileWriterFactoryProvider() {
+        @Override
+        public IExternalFileWriterFactory 
create(ExternalFileWriterConfiguration configuration) {
+            return new HDFSExternalFileWriterFactory(configuration);
+        }
+
+        @Override
+        public char getSeparator() {
+            return SEPARATOR;
+        }
+    };
+
+    private final Map<String, String> configuration;
+    private final String staticPath;
+    private final SourceLocation pathSourceLocation;
+    private transient FileSystem fs;
+
+    private HDFSExternalFileWriterFactory(ExternalFileWriterConfiguration 
externalConfig) {
+        configuration = externalConfig.getConfiguration();
+        staticPath = externalConfig.getStaticPath();
+        pathSourceLocation = externalConfig.getPathSourceLocation();
+    }
+
+    private FileSystem createFileSystem() throws CompilationException {
+        try {
+            Configuration conf = HDFSUtils.configureHDFSwrite(configuration);
+            return FileSystem.get(conf);
+        } catch (IOException ex) {
+            throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
+        }
+    }
+
+    private void buildFileSystem() throws HyracksDataException {
+        try {
+            synchronized (this) {
+                if (fs == null) {
+                    fs = createFileSystem();
+                }
+            }
+        } catch (CompilationException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public IExternalFileWriter createWriter(IHyracksTaskContext context, 
IExternalPrinterFactory printerFactory)
+            throws HyracksDataException {
+        buildFileSystem();
+        IExternalPrinter printer = printerFactory.createPrinter();
+        return new HDFSExternalFileWriter(printer, fs);
+    }
+
+    @Override
+    public char getSeparator() {
+        return SEPARATOR;
+    }
+
+    @Override
+    public void validate() throws AlgebricksException {
+        try (FileSystem testFs = createFileSystem()) {
+            Path dirPath = new Path(staticPath);
+            if (testFs.exists(dirPath)) {
+                FileStatus fileStatus = testFs.getFileStatus(dirPath);
+                if (fileStatus.isFile()) {
+                    // TODO: Should a new Error Code be created for this case?
+                    throw new 
CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, 
staticPath);
+                }
+                if (fileStatus.isDirectory()) {
+                    FileStatus[] fileStatuses = testFs.listStatus(dirPath);
+                    if (fileStatuses.length != 0) {
+                        throw new 
CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation,
+                                staticPath);
+                    }
+                }
+            }
+            checkDirectoryWritePermission(testFs);
+        } catch (IOException ex) {
+            throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, 
ExceptionUtils.getMessageOrToString(ex));
+        }
+    }
+
+    private void checkDirectoryWritePermission(FileSystem fs) throws 
AlgebricksException {
+        if 
(!Boolean.parseBoolean(configuration.getOrDefault(ExternalDataConstants.KEY_VALIDATE_WRITE_PERMISSION,
+                Boolean.TRUE.toString()))) {
+            return;
+        }
+        Path path = new Path(staticPath, "testFile");
+        try {
+            FSDataOutputStream outputStream = fs.create(path);
+            outputStream.write(0);
+            outputStream.close();
+            fs.delete(path, false);
+        } catch (IOException ex) {
+            throw CompilationException.create(ErrorCode.EXTERNAL_SINK_ERROR, 
ex, getMessageOrToString(ex));
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 23b9f93..319df12 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -28,6 +28,7 @@
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.writer.HDFSExternalFileWriterFactory;
 import org.apache.asterix.external.writer.LocalFSExternalFileWriterFactory;
 import 
org.apache.asterix.external.writer.compressor.GzipExternalFileCompressStreamFactory;
 import 
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
@@ -59,6 +60,7 @@
         addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS, 
LocalFSExternalFileWriterFactory.PROVIDER);
         addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3, 
S3ExternalFileWriterFactory.PROVIDER);
         addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_GCS, 
GCSExternalFileWriterFactory.PROVIDER);
+        addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_HDFS, 
HDFSExternalFileWriterFactory.PROVIDER);
     }

     public static IExternalFileWriterFactory 
createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18767
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ifc4ed9fb17a7540cf444a2f8bef590756b909988
Gerrit-Change-Number: 18767
Gerrit-PatchSet: 1
Gerrit-Owner: Savyasach Reddy <[email protected]>
Gerrit-MessageType: newchange

Reply via email to