Repository: metamodel Updated Branches: refs/heads/master 75abac663 -> 2a4b85410
METAMODEL-219: Fixed Fixes #78 Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/2a4b8541 Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/2a4b8541 Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/2a4b8541 Branch: refs/heads/master Commit: 2a4b85410b39ef7c1bcc958be2d3bbbc294a9c47 Parents: 75abac6 Author: Kasper Sørensen <i.am.kasper.soren...@gmail.com> Authored: Sat Dec 12 22:48:32 2015 +0100 Committer: Kasper Sørensen <i.am.kasper.soren...@gmail.com> Committed: Sat Dec 12 22:48:32 2015 +0100 ---------------------------------------------------------------------- CHANGES.md | 1 + .../org/apache/metamodel/util/FileHelper.java | 36 +-- .../util/HdfsDirectoryInputStream.java | 74 ++++++ .../metamodel/util/HdfsFileInputStream.java | 88 +++++++ .../metamodel/util/HdfsFileOutputStream.java | 68 +++++ .../org/apache/metamodel/util/HdfsResource.java | 263 ++++++++----------- 6 files changed, 338 insertions(+), 192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metamodel/blob/2a4b8541/CHANGES.md ---------------------------------------------------------------------- diff --git a/CHANGES.md b/CHANGES.md index 13629d0..2474450 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,7 @@ * [METAMODEL-215] - Improved the capability of NumberComparator to support Integer, Long, Double, BigInteger and other built-in Number classes. * [METAMODEL-218] - Fixed conversion of STRING and NUMBER types to database-specific types in JDBC module. * [METAMODEL-205] - Added validation of Excel sheet name before attempting to create table (sheet). + * [METAMODEL-219] - Made HdfsResource capable of incorporating Hadoop configuration files core-site.xml and hdfs-site.xml ### Apache MetaModel 4.4.1 http://git-wip-us.apache.org/repos/asf/metamodel/blob/2a4b8541/core/src/main/java/org/apache/metamodel/util/FileHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/metamodel/util/FileHelper.java b/core/src/main/java/org/apache/metamodel/util/FileHelper.java index ae02373..7465fd0 100644 --- a/core/src/main/java/org/apache/metamodel/util/FileHelper.java +++ b/core/src/main/java/org/apache/metamodel/util/FileHelper.java @@ -23,7 +23,6 @@ import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.ByteArrayOutputStream; -import java.io.Closeable; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -39,9 +38,6 @@ import java.io.Reader; import java.io.Writer; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.Statement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -263,40 +259,16 @@ public final class FileHelper { } } - if (obj instanceof Closeable) { + if (obj instanceof AutoCloseable) { try { - ((Closeable) obj).close(); - } catch (IOException e) { - if (debugEnabled) { - logger.debug("Closing Closeable failed", e); - } - } - } else if (obj instanceof Connection) { - try { - ((Connection) obj).close(); - } catch (Exception e) { - if (debugEnabled) { - logger.debug("Closing Connection failed", e); - } - } - } else if (obj instanceof Statement) { - try { - ((Statement) obj).close(); - } catch (Exception e) { - if (debugEnabled) { - logger.debug("Closing Statement failed", e); - } - } - } else if (obj instanceof ResultSet) { - try { - ((ResultSet) obj).close(); + ((AutoCloseable) obj).close(); } catch (Exception e) { if (debugEnabled) { - logger.debug("Closing ResultSet failed", e); + logger.debug("Closing AutoCloseable failed", e); } } } else { - logger.info("obj was neither Closeable, Connection, Statement or ResultSet."); + logger.info("obj was not AutoCloseable, trying to find close() method via reflection."); try { Method method = obj.getClass().getMethod("close", new Class[0]); http://git-wip-us.apache.org/repos/asf/metamodel/blob/2a4b8541/hadoop/src/main/java/org/apache/metamodel/util/HdfsDirectoryInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/metamodel/util/HdfsDirectoryInputStream.java b/hadoop/src/main/java/org/apache/metamodel/util/HdfsDirectoryInputStream.java new file mode 100644 index 0000000..f02f2ff --- /dev/null +++ b/hadoop/src/main/java/org/apache/metamodel/util/HdfsDirectoryInputStream.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.util; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +/** + * An {@link InputStream} that represents all the data found in a directory on + * HDFS. This {@link InputStream} is used by {@link HdfsResource#read()} when + * pointed to a directory. + */ +class HdfsDirectoryInputStream extends AbstractDirectoryInputStream<FileStatus> { + + private final Path _hadoopPath; + private final FileSystem _fs; + + public HdfsDirectoryInputStream(final Path hadoopPath, final FileSystem fs) { + _hadoopPath = hadoopPath; + _fs = fs; + FileStatus[] fileStatuses; + try { + fileStatuses = _fs.listStatus(_hadoopPath, new PathFilter() { + @Override + public boolean accept(final Path path) { + try { + return _fs.isFile(path); + } catch (IOException e) { + return false; + } + } + }); + // Natural ordering is the URL + Arrays.sort(fileStatuses); + } catch (IOException e) { + fileStatuses = new FileStatus[0]; + } + _files = fileStatuses; + } + + @Override + public InputStream openStream(final int index) throws IOException { + final Path nextPath = _files[index].getPath(); + return _fs.open(nextPath); + } + + @Override + public void close() throws IOException { + super.close(); + FileHelper.safeClose(_fs); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/2a4b8541/hadoop/src/main/java/org/apache/metamodel/util/HdfsFileInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/metamodel/util/HdfsFileInputStream.java b/hadoop/src/main/java/org/apache/metamodel/util/HdfsFileInputStream.java new file mode 100644 index 0000000..f793257 --- /dev/null +++ b/hadoop/src/main/java/org/apache/metamodel/util/HdfsFileInputStream.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.util; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.fs.FileSystem; + +/** + * A managed {@link InputStream} for a file on HDFS. + * + * The "purpose in life" for this class is to ensure that the {@link FileSystem} + * is closed when the stream is closed. + */ +class HdfsFileInputStream extends InputStream { + + private final InputStream _in; + private final FileSystem _fs; + + public HdfsFileInputStream(final InputStream in, final FileSystem fs) { + _in = in; + _fs = fs; + } + + @Override + public int read() throws IOException { + return _in.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return _in.read(b, off, len); + } + + @Override + public int read(byte[] b) throws IOException { + return _in.read(b); + } + + @Override + public boolean markSupported() { + return _in.markSupported(); + } + + @Override + public synchronized void mark(int readLimit) { + _in.mark(readLimit); + } + + @Override + public int available() throws IOException { + return _in.available(); + } + + @Override + public synchronized void reset() throws IOException { + _in.reset(); + } + + @Override + public long skip(long n) throws IOException { + return _in.skip(n); + } + + @Override + public void close() throws IOException { + super.close(); + // need to close 'fs' when input stream is closed + FileHelper.safeClose(_fs); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/2a4b8541/hadoop/src/main/java/org/apache/metamodel/util/HdfsFileOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/metamodel/util/HdfsFileOutputStream.java b/hadoop/src/main/java/org/apache/metamodel/util/HdfsFileOutputStream.java new file mode 100644 index 0000000..b7abaf6 --- /dev/null +++ b/hadoop/src/main/java/org/apache/metamodel/util/HdfsFileOutputStream.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.util; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hadoop.fs.FileSystem; + +/** + * A managed {@link OutputStream} for a file on HDFS. + * + * The "purpose in life" for this class is to ensure that the {@link FileSystem} + * is closed when the stream is closed. + */ +class HdfsFileOutputStream extends OutputStream { + + private final OutputStream _out; + private final FileSystem _fs; + + public HdfsFileOutputStream(final OutputStream out, final FileSystem fs) { + _out = out; + _fs = fs; + } + + @Override + public void write(int b) throws IOException { + _out.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + _out.write(b, off, len); + } + + @Override + public void write(byte[] b) throws IOException { + _out.write(b); + } + + @Override + public void flush() throws IOException { + _out.flush(); + } + + @Override + public void close() throws IOException { + super.close(); + // need to close 'fs' when output stream is closed + FileHelper.safeClose(_fs); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/2a4b8541/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java b/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java index b481f95..9523b23 100644 --- a/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java +++ b/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java @@ -18,168 +18,36 @@ */ package org.apache.metamodel.util; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import java.util.Arrays; +import java.util.Objects; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.metamodel.MetaModelException; +import com.google.common.base.Strings; + /** * A {@link Resource} implementation that connects to Apache Hadoop's HDFS * distributed file system. */ public class HdfsResource extends AbstractResource implements Serializable { - private static class HdfsFileInputStream extends InputStream { - - private final InputStream _in; - private final FileSystem _fs; - - public HdfsFileInputStream(final InputStream in, final FileSystem fs) { - _in = in; - _fs = fs; - } - - @Override - public int read() throws IOException { - return _in.read(); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return _in.read(b, off, len); - } - - @Override - public int read(byte[] b) throws IOException { - return _in.read(b); - } - - @Override - public boolean markSupported() { - return _in.markSupported(); - } - - @Override - public synchronized void mark(int readLimit) { - _in.mark(readLimit); - } - - @Override - public int available() throws IOException { - return _in.available(); - } - - @Override - public synchronized void reset() throws IOException { - _in.reset(); - } - - @Override - public long skip(long n) throws IOException { - return _in.skip(n); - } - - @Override - public void close() throws IOException { - super.close(); - // need to close 'fs' when input stream is closed - FileHelper.safeClose(_fs); - } - } - - private static class HdfsFileOutputStream extends OutputStream { - - private final OutputStream _out; - private final FileSystem _fs; - - public HdfsFileOutputStream(final OutputStream out, final FileSystem fs) { - _out = out; - _fs = fs; - } - - @Override - public void write(int b) throws IOException { - _out.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - _out.write(b, off, len); - } - - @Override - public void write(byte[] b) throws IOException { - _out.write(b); - } - - @Override - public void flush() throws IOException { - _out.flush(); - } - - @Override - public void close() throws IOException { - super.close(); - // need to close 'fs' when output stream is closed - FileHelper.safeClose(_fs); - } - } - - private static class HdfsDirectoryInputStream extends AbstractDirectoryInputStream<FileStatus> { - private final Path _hadoopPath; - private final FileSystem _fs; - - public HdfsDirectoryInputStream(final Path hadoopPath, final FileSystem fs) { - _hadoopPath = hadoopPath; - _fs = fs; - FileStatus[] fileStatuses; - try { - fileStatuses = _fs.listStatus(_hadoopPath, new PathFilter() { - @Override - public boolean accept(final Path path) { - try { - return _fs.isFile(path); - } catch (IOException e) { - return false; - } - } - }); - // Natural ordering is the URL - Arrays.sort(fileStatuses); - } catch (IOException e) { - fileStatuses = new FileStatus[0]; - } - _files = fileStatuses; - } - - @Override - public InputStream openStream(final int index) throws IOException { - final Path nextPath = _files[index].getPath(); - return _fs.open(nextPath); - } - - @Override - public void close() throws IOException { - super.close(); - FileHelper.safeClose(_fs); - } - } - private static final long serialVersionUID = 1L; + public static final String SYSTEM_PROPERTY_HADOOP_CONF_DIR_ENABLED = "metamodel.hadoop.use_hadoop_conf_dir"; + private static final Pattern URL_PATTERN = Pattern.compile("hdfs://(.+):([0-9]+)/(.*)"); + private final String _hadoopConfDir; private final String _hostname; private final int _port; private final String _filepath; @@ -192,6 +60,19 @@ public class HdfsResource extends AbstractResource implements Serializable { * a URL of the form: hdfs://hostname:port/path/to/file */ public HdfsResource(String url) { + this(url, null); + } + + /** + * Creates a {@link HdfsResource} + * + * @param url + * a URL of the form: hdfs://hostname:port/path/to/file + * @param hadoopConfDir + * the path of a directory containing the Hadoop and HDFS + * configuration file(s). + */ + public HdfsResource(String url, String hadoopConfDir) { if (url == null) { throw new IllegalArgumentException("Url cannot be null"); } @@ -203,6 +84,7 @@ public class HdfsResource extends AbstractResource implements Serializable { _hostname = matcher.group(1); _port = Integer.parseInt(matcher.group(2)); _filepath = '/' + matcher.group(3); + _hadoopConfDir = hadoopConfDir; } /** @@ -216,9 +98,27 @@ public class HdfsResource extends AbstractResource implements Serializable { * the path on HDFS to the file, starting with slash ('/') */ public HdfsResource(String hostname, int port, String filepath) { + this(hostname, port, filepath, null); + } + + /** + * Creates a {@link HdfsResource} + * + * @param hostname + * the HDFS (namenode) hostname + * @param port + * the HDFS (namenode) port number + * @param filepath + * the path on HDFS to the file, starting with slash ('/') + * @param hadoopConfDir + * the path of a directory containing the Hadoop and HDFS + * configuration file(s). + */ + public HdfsResource(String hostname, int port, String filepath, String hadoopConfDir) { _hostname = hostname; _port = port; _filepath = filepath; + _hadoopConfDir = hadoopConfDir; } public String getFilepath() { @@ -233,6 +133,10 @@ public class HdfsResource extends AbstractResource implements Serializable { return _port; } + public String getHadoopConfDir() { + return _hadoopConfDir; + } + @Override public String getName() { final int lastSlash = _filepath.lastIndexOf('/'); @@ -272,7 +176,7 @@ public class HdfsResource extends AbstractResource implements Serializable { if (fs.isFile(getHadoopPath())) { return fs.getFileStatus(getHadoopPath()).getLen(); } else { - return fs.getContentSummary(getHadoopPath()).getLength(); + return fs.getContentSummary(getHadoopPath()).getLength(); } } catch (Exception e) { throw wrapException(e); @@ -349,9 +253,59 @@ public class HdfsResource extends AbstractResource implements Serializable { public Configuration getHadoopConfiguration() { final Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://" + _hostname + ":" + _port); + + final File hadoopConfigurationDirectory = getHadoopConfigurationDirectoryToUse(); + if (hadoopConfigurationDirectory != null) { + addResourceIfExists(conf, hadoopConfigurationDirectory, "core-site.xml"); + addResourceIfExists(conf, hadoopConfigurationDirectory, "hdfs-site.xml"); + } + return conf; } + private void addResourceIfExists(Configuration conf, File hadoopConfigurationDirectory, String filename) { + final File file = new File(hadoopConfigurationDirectory, filename); + if (file.exists()) { + final InputStream inputStream = FileHelper.getInputStream(file); + conf.addResource(inputStream, filename); + } + } + + private File getHadoopConfigurationDirectoryToUse() { + File candidate = getDirectoryIfExists(null, _hadoopConfDir); + if ("true".equals(System.getProperty(SYSTEM_PROPERTY_HADOOP_CONF_DIR_ENABLED))) { + candidate = getDirectoryIfExists(candidate, System.getProperty("YARN_CONF_DIR")); + candidate = getDirectoryIfExists(candidate, System.getProperty("HADOOP_CONF_DIR")); + candidate = getDirectoryIfExists(candidate, System.getenv("YARN_CONF_DIR")); + candidate = getDirectoryIfExists(candidate, System.getenv("HADOOP_CONF_DIR")); + } + return candidate; + } + + /** + * Gets a candidate directory based on a file path, if it exists, and if it + * another candidate hasn't already been resolved. + * + * @param existingCandidate + * an existing candidate directory. If this is non-null, it will + * be returned immediately. + * @param path + * the path of a directory + * @return a candidate directory, or null if none was resolved. + */ + private File getDirectoryIfExists(File existingCandidate, String path) { + if (existingCandidate != null) { + return existingCandidate; + } + if (!Strings.isNullOrEmpty(path)) { + final File directory = new File(path); + if (directory.exists() && directory.isDirectory()) { + return directory; + } + } + return null; + } + public FileSystem getHadoopFileSystem() { try { return FileSystem.newInstance(getHadoopConfiguration()); @@ -369,30 +323,19 @@ public class HdfsResource extends AbstractResource implements Serializable { @Override public int hashCode() { - return Arrays.hashCode(new Object[] { _filepath, _hostname, _port }); + return Objects.hash(_filepath, _hostname, _port, _hadoopConfDir); } @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - HdfsResource other = (HdfsResource) obj; - if (_filepath == null) { - if (other._filepath != null) - return false; - } else if (!_filepath.equals(other._filepath)) - return false; - if (_hostname == null) { - if (other._hostname != null) - return false; - } else if (!_hostname.equals(other._hostname)) - return false; - if (_port != other._port) - return false; - return true; + } + if (obj instanceof HdfsResource) { + final HdfsResource other = (HdfsResource) obj; + return Objects.equals(_filepath, other._filepath) && Objects.equals(_hostname, other._hostname) + && Objects.equals(_port, other._port) && Objects.equals(_hadoopConfDir, other._hadoopConfDir); + } + return false; } }