http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java new file mode 100644 index 0000000..acc5500 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java @@ -0,0 +1,796 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3native; + +import java.io.BufferedOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BufferedFSInputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.s3.S3Exception; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p> + * A {@link FileSystem} for reading and writing files stored on + * <a href="http://aws.amazon.com/s3">Amazon S3</a>. + * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation + * stores files on S3 in their + * native form so they can be read by other S3 tools. + * + * A note about directories. S3 of course has no "native" support for them. + * The idiom we choose then is: for any directory created by this class, + * we use an empty object "#{dirpath}_$folder$" as a marker. + * Further, to interoperate with other S3 tools, we also accept the following: + * - an object "#{dirpath}/' denoting a directory marker + * - if there exists any objects with the prefix "#{dirpath}/", then the + * directory is said to exist + * - if both a file with the name of a directory and a marker for that + * directory exists, then the *file masks the directory*, and the directory + * is never returned. + * </p> + * @see org.apache.hadoop.fs.s3.S3FileSystem + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class NativeS3FileSystem extends FileSystem { + + public static final Logger LOG = + LoggerFactory.getLogger(NativeS3FileSystem.class); + + private static final String FOLDER_SUFFIX = "_$folder$"; + static final String PATH_DELIMITER = Path.SEPARATOR; + private static final int S3_MAX_LISTING_LENGTH = 1000; + + static class NativeS3FsInputStream extends FSInputStream { + + private NativeFileSystemStore store; + private Statistics statistics; + private InputStream in; + private final String key; + private long pos = 0; + + public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) { + Preconditions.checkNotNull(in, "Null input stream"); + this.store = store; + this.statistics = statistics; + this.in = in; + this.key = key; + } + + @Override + public synchronized int read() throws IOException { + int result; + try { + result = in.read(); + } catch (IOException e) { + LOG.info("Received IOException while reading '{}', attempting to reopen", + key); + LOG.debug("{}", e, e); + try { + seek(pos); + result = in.read(); + } catch (EOFException eof) { + LOG.debug("EOF on input stream read: {}", eof, eof); + result = -1; + } + } + if (result != -1) { + pos++; + } + if (statistics != null && result != -1) { + statistics.incrementBytesRead(1); + } + return result; + } + @Override + public synchronized int read(byte[] b, int off, int len) + throws IOException { + if (in == null) { + throw new EOFException("Cannot read closed stream"); + } + int result = -1; + try { + result = in.read(b, off, len); + } catch (EOFException eof) { + throw eof; + } catch (IOException e) { + LOG.info( "Received IOException while reading '{}'," + + " attempting to reopen.", key); + seek(pos); + result = in.read(b, off, len); + } + if (result > 0) { + pos += result; + } + if (statistics != null && result > 0) { + statistics.incrementBytesRead(result); + } + return result; + } + + @Override + public synchronized void close() throws IOException { + closeInnerStream(); + } + + /** + * Close the inner stream if not null. Even if an exception + * is raised during the close, the field is set to null + * @throws IOException if raised by the close() operation. + */ + private void closeInnerStream() throws IOException { + if (in != null) { + try { + in.close(); + } finally { + in = null; + } + } + } + + /** + * Update inner stream with a new stream and position + * @param newStream new stream -must not be null + * @param newpos new position + * @throws IOException IO exception on a failure to close the existing + * stream. + */ + private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException { + Preconditions.checkNotNull(newStream, "Null newstream argument"); + closeInnerStream(); + in = newStream; + this.pos = newpos; + } + + @Override + public synchronized void seek(long newpos) throws IOException { + if (newpos < 0) { + throw new EOFException( + FSExceptionMessages.NEGATIVE_SEEK); + } + if (pos != newpos) { + // the seek is attempting to move the current position + LOG.debug("Opening key '{}' for reading at position '{}", key, newpos); + InputStream newStream = store.retrieve(key, newpos); + updateInnerStream(newStream, newpos); + } + } + + @Override + public synchronized long getPos() throws IOException { + return pos; + } + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + } + + private class NativeS3FsOutputStream extends OutputStream { + + private Configuration conf; + private String key; + private File backupFile; + private OutputStream backupStream; + private MessageDigest digest; + private boolean closed; + private LocalDirAllocator lDirAlloc; + + public NativeS3FsOutputStream(Configuration conf, + NativeFileSystemStore store, String key, Progressable progress, + int bufferSize) throws IOException { + this.conf = conf; + this.key = key; + this.backupFile = newBackupFile(); + LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'"); + try { + this.digest = MessageDigest.getInstance("MD5"); + this.backupStream = new BufferedOutputStream(new DigestOutputStream( + new FileOutputStream(backupFile), this.digest)); + } catch (NoSuchAlgorithmException e) { + LOG.warn("Cannot load MD5 digest algorithm," + + "skipping message integrity check.", e); + this.backupStream = new BufferedOutputStream( + new FileOutputStream(backupFile)); + } + } + + private File newBackupFile() throws IOException { + if (lDirAlloc == null) { + lDirAlloc = new LocalDirAllocator("fs.s3.buffer.dir"); + } + File result = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf); + result.deleteOnExit(); + return result; + } + + @Override + public void flush() throws IOException { + backupStream.flush(); + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + + backupStream.close(); + LOG.info("OutputStream for key '{}' closed. Now beginning upload", key); + + try { + byte[] md5Hash = digest == null ? null : digest.digest(); + store.storeFile(key, backupFile, md5Hash); + } finally { + if (!backupFile.delete()) { + LOG.warn("Could not delete temporary s3n file: " + backupFile); + } + super.close(); + closed = true; + } + LOG.info("OutputStream for key '{}' upload complete", key); + } + + @Override + public void write(int b) throws IOException { + backupStream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + backupStream.write(b, off, len); + } + } + + private URI uri; + private NativeFileSystemStore store; + private Path workingDir; + + public NativeS3FileSystem() { + // set store in initialize() + } + + public NativeS3FileSystem(NativeFileSystemStore store) { + this.store = store; + } + + /** + * Return the protocol scheme for the FileSystem. + * <p/> + * + * @return <code>s3n</code> + */ + @Override + public String getScheme() { + return "s3n"; + } + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + super.initialize(uri, conf); + if (store == null) { + store = createDefaultStore(conf); + } + store.initialize(uri, conf); + setConf(conf); + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + this.workingDir = + new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory()); + } + + private static NativeFileSystemStore createDefaultStore(Configuration conf) { + NativeFileSystemStore store = new Jets3tNativeFileSystemStore(); + + RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( + conf.getInt("fs.s3.maxRetries", 4), + conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); + Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = + new HashMap<Class<? extends Exception>, RetryPolicy>(); + exceptionToPolicyMap.put(IOException.class, basePolicy); + exceptionToPolicyMap.put(S3Exception.class, basePolicy); + + RetryPolicy methodPolicy = RetryPolicies.retryByException( + RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); + Map<String, RetryPolicy> methodNameToPolicyMap = + new HashMap<String, RetryPolicy>(); + methodNameToPolicyMap.put("storeFile", methodPolicy); + methodNameToPolicyMap.put("rename", methodPolicy); + + return (NativeFileSystemStore) + RetryProxy.create(NativeFileSystemStore.class, store, + methodNameToPolicyMap); + } + + private static String pathToKey(Path path) { + if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) { + // allow uris without trailing slash after bucket to refer to root, + // like s3n://mybucket + return ""; + } + if (!path.isAbsolute()) { + throw new IllegalArgumentException("Path must be absolute: " + path); + } + String ret = path.toUri().getPath().substring(1); // remove initial slash + if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) { + ret = ret.substring(0, ret.length() -1); + } + return ret; + } + + private static Path keyToPath(String key) { + return new Path("/" + key); + } + + private Path makeAbsolute(Path path) { + if (path.isAbsolute()) { + return path; + } + return new Path(workingDir, path); + } + + /** This optional operation is not yet supported. */ + @Override + public FSDataOutputStream append(Path f, int bufferSize, + Progressable progress) throws IOException { + throw new IOException("Not supported"); + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + + if (exists(f) && !overwrite) { + throw new FileAlreadyExistsException("File already exists: " + f); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("Creating new file '" + f + "' in S3"); + } + Path absolutePath = makeAbsolute(f); + String key = pathToKey(absolutePath); + return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store, + key, progress, bufferSize), statistics); + } + + @Override + public boolean delete(Path f, boolean recurse) throws IOException { + FileStatus status; + try { + status = getFileStatus(f); + } catch (FileNotFoundException e) { + if(LOG.isDebugEnabled()) { + LOG.debug("Delete called for '" + f + + "' but file does not exist, so returning false"); + } + return false; + } + Path absolutePath = makeAbsolute(f); + String key = pathToKey(absolutePath); + if (status.isDirectory()) { + if (!recurse && listStatus(f).length > 0) { + throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false"); + } + + createParent(f); + + if(LOG.isDebugEnabled()) { + LOG.debug("Deleting directory '" + f + "'"); + } + String priorLastKey = null; + do { + PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true); + for (FileMetadata file : listing.getFiles()) { + store.delete(file.getKey()); + } + priorLastKey = listing.getPriorLastKey(); + } while (priorLastKey != null); + + try { + store.delete(key + FOLDER_SUFFIX); + } catch (FileNotFoundException e) { + //this is fine, we don't require a marker + } + } else { + if(LOG.isDebugEnabled()) { + LOG.debug("Deleting file '" + f + "'"); + } + createParent(f); + store.delete(key); + } + return true; + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + Path absolutePath = makeAbsolute(f); + String key = pathToKey(absolutePath); + + if (key.length() == 0) { // root always exists + return newDirectory(absolutePath); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("getFileStatus retrieving metadata for key '" + key + "'"); + } + FileMetadata meta = store.retrieveMetadata(key); + if (meta != null) { + if(LOG.isDebugEnabled()) { + LOG.debug("getFileStatus returning 'file' for key '" + key + "'"); + } + return newFile(meta, absolutePath); + } + if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) { + if(LOG.isDebugEnabled()) { + LOG.debug("getFileStatus returning 'directory' for key '" + key + + "' as '" + key + FOLDER_SUFFIX + "' exists"); + } + return newDirectory(absolutePath); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("getFileStatus listing key '" + key + "'"); + } + PartialListing listing = store.list(key, 1); + if (listing.getFiles().length > 0 || + listing.getCommonPrefixes().length > 0) { + if(LOG.isDebugEnabled()) { + LOG.debug("getFileStatus returning 'directory' for key '" + key + + "' as it has contents"); + } + return newDirectory(absolutePath); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("getFileStatus could not find key '" + key + "'"); + } + throw new FileNotFoundException("No such file or directory '" + absolutePath + "'"); + } + + @Override + public URI getUri() { + return uri; + } + + /** + * <p> + * If <code>f</code> is a file, this method will make a single call to S3. + * If <code>f</code> is a directory, this method will make a maximum of + * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of + * files and directories contained directly in <code>f</code>. + * </p> + */ + @Override + public FileStatus[] listStatus(Path f) throws IOException { + + Path absolutePath = makeAbsolute(f); + String key = pathToKey(absolutePath); + + if (key.length() > 0) { + FileMetadata meta = store.retrieveMetadata(key); + if (meta != null) { + return new FileStatus[] { newFile(meta, absolutePath) }; + } + } + + URI pathUri = absolutePath.toUri(); + Set<FileStatus> status = new TreeSet<FileStatus>(); + String priorLastKey = null; + do { + PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false); + for (FileMetadata fileMetadata : listing.getFiles()) { + Path subpath = keyToPath(fileMetadata.getKey()); + String relativePath = pathUri.relativize(subpath.toUri()).getPath(); + + if (fileMetadata.getKey().equals(key + "/")) { + // this is just the directory we have been asked to list + } + else if (relativePath.endsWith(FOLDER_SUFFIX)) { + status.add(newDirectory(new Path( + absolutePath, + relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX))))); + } + else { + status.add(newFile(fileMetadata, subpath)); + } + } + for (String commonPrefix : listing.getCommonPrefixes()) { + Path subpath = keyToPath(commonPrefix); + String relativePath = pathUri.relativize(subpath.toUri()).getPath(); + status.add(newDirectory(new Path(absolutePath, relativePath))); + } + priorLastKey = listing.getPriorLastKey(); + } while (priorLastKey != null); + + if (status.isEmpty() && + key.length() > 0 && + store.retrieveMetadata(key + FOLDER_SUFFIX) == null) { + throw new FileNotFoundException("File " + f + " does not exist."); + } + + return status.toArray(new FileStatus[status.size()]); + } + + private FileStatus newFile(FileMetadata meta, Path path) { + return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(), + meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory())); + } + + private FileStatus newDirectory(Path path) { + return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory())); + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + Path absolutePath = makeAbsolute(f); + List<Path> paths = new ArrayList<Path>(); + do { + paths.add(0, absolutePath); + absolutePath = absolutePath.getParent(); + } while (absolutePath != null); + + boolean result = true; + for (Path path : paths) { + result &= mkdir(path); + } + return result; + } + + private boolean mkdir(Path f) throws IOException { + try { + FileStatus fileStatus = getFileStatus(f); + if (fileStatus.isFile()) { + throw new FileAlreadyExistsException(String.format( + "Can't make directory for path '%s' since it is a file.", f)); + + } + } catch (FileNotFoundException e) { + if(LOG.isDebugEnabled()) { + LOG.debug("Making dir '" + f + "' in S3"); + } + String key = pathToKey(f) + FOLDER_SUFFIX; + store.storeEmptyFile(key); + } + return true; + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist + if (fs.isDirectory()) { + throw new FileNotFoundException("'" + f + "' is a directory"); + } + LOG.info("Opening '" + f + "' for reading"); + Path absolutePath = makeAbsolute(f); + String key = pathToKey(absolutePath); + return new FSDataInputStream(new BufferedFSInputStream( + new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize)); + } + + // rename() and delete() use this method to ensure that the parent directory + // of the source does not vanish. + private void createParent(Path path) throws IOException { + Path parent = path.getParent(); + if (parent != null) { + String key = pathToKey(makeAbsolute(parent)); + if (key.length() > 0) { + store.storeEmptyFile(key + FOLDER_SUFFIX); + } + } + } + + + @Override + public boolean rename(Path src, Path dst) throws IOException { + + String srcKey = pathToKey(makeAbsolute(src)); + final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - "; + + if (srcKey.length() == 0) { + // Cannot rename root of file system + if (LOG.isDebugEnabled()) { + LOG.debug(debugPreamble + + "returning false as cannot rename the root of a filesystem"); + } + return false; + } + + //get status of source + boolean srcIsFile; + try { + srcIsFile = getFileStatus(src).isFile(); + } catch (FileNotFoundException e) { + //bail out fast if the source does not exist + if (LOG.isDebugEnabled()) { + LOG.debug(debugPreamble + "returning false as src does not exist"); + } + return false; + } + // Figure out the final destination + String dstKey = pathToKey(makeAbsolute(dst)); + + try { + boolean dstIsFile = getFileStatus(dst).isFile(); + if (dstIsFile) { + //destination is a file. + //you can't copy a file or a directory onto an existing file + //except for the special case of dest==src, which is a no-op + if(LOG.isDebugEnabled()) { + LOG.debug(debugPreamble + + "returning without rename as dst is an already existing file"); + } + //exit, returning true iff the rename is onto self + return srcKey.equals(dstKey); + } else { + //destination exists and is a directory + if(LOG.isDebugEnabled()) { + LOG.debug(debugPreamble + "using dst as output directory"); + } + //destination goes under the dst path, with the name of the + //source entry + dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); + } + } catch (FileNotFoundException e) { + //destination does not exist => the source file or directory + //is copied over with the name of the destination + if(LOG.isDebugEnabled()) { + LOG.debug(debugPreamble + "using dst as output destination"); + } + try { + if (getFileStatus(dst.getParent()).isFile()) { + if(LOG.isDebugEnabled()) { + LOG.debug(debugPreamble + + "returning false as dst parent exists and is a file"); + } + return false; + } + } catch (FileNotFoundException ex) { + if(LOG.isDebugEnabled()) { + LOG.debug(debugPreamble + + "returning false as dst parent does not exist"); + } + return false; + } + } + + //rename to self behavior follows Posix rules and is different + //for directories and files -the return code is driven by src type + if (srcKey.equals(dstKey)) { + //fully resolved destination key matches source: fail + if (LOG.isDebugEnabled()) { + LOG.debug(debugPreamble + "renamingToSelf; returning true"); + } + return true; + } + if (srcIsFile) { + //source is a file; COPY then DELETE + if(LOG.isDebugEnabled()) { + LOG.debug(debugPreamble + + "src is file, so doing copy then delete in S3"); + } + store.copy(srcKey, dstKey); + store.delete(srcKey); + } else { + //src is a directory + if(LOG.isDebugEnabled()) { + LOG.debug(debugPreamble + "src is directory, so copying contents"); + } + //Verify dest is not a child of the parent + if (dstKey.startsWith(srcKey + "/")) { + if (LOG.isDebugEnabled()) { + LOG.debug( + debugPreamble + "cannot rename a directory to a subdirectory of self"); + } + return false; + } + //create the subdir under the destination + store.storeEmptyFile(dstKey + FOLDER_SUFFIX); + + List<String> keysToDelete = new ArrayList<String>(); + String priorLastKey = null; + do { + PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true); + for (FileMetadata file : listing.getFiles()) { + keysToDelete.add(file.getKey()); + store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length())); + } + priorLastKey = listing.getPriorLastKey(); + } while (priorLastKey != null); + + if(LOG.isDebugEnabled()) { + LOG.debug(debugPreamble + + "all files in src copied, now removing src files"); + } + for (String key: keysToDelete) { + store.delete(key); + } + + try { + store.delete(srcKey + FOLDER_SUFFIX); + } catch (FileNotFoundException e) { + //this is fine, we don't require a marker + } + if(LOG.isDebugEnabled()) { + LOG.debug(debugPreamble + "done"); + } + } + + return true; + } + + @Override + public long getDefaultBlockSize() { + return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024); + } + + /** + * Set the working directory to the given directory. + */ + @Override + public void setWorkingDirectory(Path newDir) { + workingDir = newDir; + } + + @Override + public Path getWorkingDirectory() { + return workingDir; + } + + @Override + public String getCanonicalServiceName() { + // Does not support Token + return null; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/PartialListing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/PartialListing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/PartialListing.java new file mode 100644 index 0000000..8290092 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/PartialListing.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3native; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * <p> + * Holds information on a directory listing for a + * {@link NativeFileSystemStore}. + * This includes the {@link FileMetadata files} and directories + * (their names) contained in a directory. + * </p> + * <p> + * This listing may be returned in chunks, so a <code>priorLastKey</code> + * is provided so that the next chunk may be requested. + * </p> + * @see NativeFileSystemStore#list(String, int, String) + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +class PartialListing { + + private final String priorLastKey; + private final FileMetadata[] files; + private final String[] commonPrefixes; + + public PartialListing(String priorLastKey, FileMetadata[] files, + String[] commonPrefixes) { + this.priorLastKey = priorLastKey; + this.files = files; + this.commonPrefixes = commonPrefixes; + } + + public FileMetadata[] getFiles() { + return files; + } + + public String[] getCommonPrefixes() { + return commonPrefixes; + } + + public String getPriorLastKey() { + return priorLastKey; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3NativeFileSystemConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3NativeFileSystemConfigKeys.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3NativeFileSystemConfigKeys.java new file mode 100644 index 0000000..75884fa --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3NativeFileSystemConfigKeys.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3native; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.CommonConfigurationKeys; + +/** + * This class contains constants for configuration keys used + * in the s3 file system. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class S3NativeFileSystemConfigKeys extends CommonConfigurationKeys { + public static final String S3_NATIVE_BLOCK_SIZE_KEY = "s3native.blocksize"; + public static final long S3_NATIVE_BLOCK_SIZE_DEFAULT = 64*1024*1024; + public static final String S3_NATIVE_REPLICATION_KEY = "s3native.replication"; + public static final short S3_NATIVE_REPLICATION_DEFAULT = 1; + public static final String S3_NATIVE_STREAM_BUFFER_SIZE_KEY = + "s3native.stream-buffer-size"; + public static final int S3_NATIVE_STREAM_BUFFER_SIZE_DEFAULT = 4096; + public static final String S3_NATIVE_BYTES_PER_CHECKSUM_KEY = + "s3native.bytes-per-checksum"; + public static final int S3_NATIVE_BYTES_PER_CHECKSUM_DEFAULT = 512; + public static final String S3_NATIVE_CLIENT_WRITE_PACKET_SIZE_KEY = + "s3native.client-write-packet-size"; + public static final int S3_NATIVE_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; +} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/package.html ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/package.html b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/package.html new file mode 100644 index 0000000..24b9b1d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/package.html @@ -0,0 +1,32 @@ +<html> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<body> + +<p> +A distributed implementation of {@link +org.apache.hadoop.fs.FileSystem} for reading and writing files on +<a href="http://aws.amazon.com/s3">Amazon S3</a>. +Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem}, which is block-based, +this implementation stores +files on S3 in their native form for interoperability with other S3 tools. +</p> + +</body> +</html> http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem new file mode 100644 index 0000000..3cd1d6b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hadoop.fs.s3.S3FileSystem +org.apache.hadoop.fs.s3native.NativeS3FileSystem http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/NativeS3Contract.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/NativeS3Contract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/NativeS3Contract.java new file mode 100644 index 0000000..ace6444 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/NativeS3Contract.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3n; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractBondedFSContract; + +/** + * The contract of S3N: only enabled if the test bucket is provided + */ +public class NativeS3Contract extends AbstractBondedFSContract { + + public static final String CONTRACT_XML = "contract/s3n.xml"; + + + public NativeS3Contract(Configuration conf) { + super(conf); + //insert the base features + addConfResource(CONTRACT_XML); + } + + @Override + public String getScheme() { + return "s3n"; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractCreate.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractCreate.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractCreate.java new file mode 100644 index 0000000..e44e2b1 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractCreate.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3n; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractCreateTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; + +public class TestS3NContractCreate extends AbstractContractCreateTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new NativeS3Contract(conf); + } + + @Override + public void testOverwriteEmptyDirectory() throws Throwable { + ContractTestUtils.skip( + "blobstores can't distinguish empty directories from files"); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractDelete.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractDelete.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractDelete.java new file mode 100644 index 0000000..1b79d27 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractDelete.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3n; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractDeleteTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class TestS3NContractDelete extends AbstractContractDeleteTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new NativeS3Contract(conf); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractMkdir.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractMkdir.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractMkdir.java new file mode 100644 index 0000000..527a31d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractMkdir.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3n; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractMkdirTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Test dir operations on S3 + */ +public class TestS3NContractMkdir extends AbstractContractMkdirTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new NativeS3Contract(conf); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractOpen.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractOpen.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractOpen.java new file mode 100644 index 0000000..2186f28 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractOpen.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3n; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractOpenTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class TestS3NContractOpen extends AbstractContractOpenTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new NativeS3Contract(conf); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractRename.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractRename.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractRename.java new file mode 100644 index 0000000..d673416 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractRename.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3n; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractRenameTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class TestS3NContractRename extends AbstractContractRenameTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new NativeS3Contract(conf); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractRootDir.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractRootDir.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractRootDir.java new file mode 100644 index 0000000..94f8483 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractRootDir.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3n; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * root dir operations against an S3 bucket + */ +public class TestS3NContractRootDir extends + AbstractContractRootDirectoryTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new NativeS3Contract(conf); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractSeek.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractSeek.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractSeek.java new file mode 100644 index 0000000..6d04fff --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3n/TestS3NContractSeek.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3n; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractSeekTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class TestS3NContractSeek extends AbstractContractSeekTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new NativeS3Contract(conf); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java new file mode 100644 index 0000000..2d43c8b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/InMemoryFileSystemStore.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3.INode.FileType; + +/** + * A stub implementation of {@link FileSystemStore} for testing + * {@link S3FileSystem} without actually connecting to S3. + */ +public class InMemoryFileSystemStore implements FileSystemStore { + + private Configuration conf; + private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>(); + private Map<Long, byte[]> blocks = new HashMap<Long, byte[]>(); + + @Override + public void initialize(URI uri, Configuration conf) { + this.conf = conf; + inodes.put(new Path("/"), INode.DIRECTORY_INODE); + } + + @Override + public String getVersion() throws IOException { + return "0"; + } + + @Override + public void deleteINode(Path path) throws IOException { + inodes.remove(normalize(path)); + } + + @Override + public void deleteBlock(Block block) throws IOException { + blocks.remove(block.getId()); + } + + @Override + public boolean inodeExists(Path path) throws IOException { + return inodes.containsKey(normalize(path)); + } + + @Override + public boolean blockExists(long blockId) throws IOException { + return blocks.containsKey(blockId); + } + + @Override + public INode retrieveINode(Path path) throws IOException { + return inodes.get(normalize(path)); + } + + @Override + public File retrieveBlock(Block block, long byteRangeStart) throws IOException { + byte[] data = blocks.get(block.getId()); + File file = createTempFile(); + BufferedOutputStream out = null; + try { + out = new BufferedOutputStream(new FileOutputStream(file)); + out.write(data, (int) byteRangeStart, data.length - (int) byteRangeStart); + } finally { + if (out != null) { + out.close(); + } + } + return file; + } + + private File createTempFile() throws IOException { + File dir = new File(conf.get("fs.s3.buffer.dir")); + if (!dir.exists() && !dir.mkdirs()) { + throw new IOException("Cannot create S3 buffer directory: " + dir); + } + File result = File.createTempFile("test-", ".tmp", dir); + result.deleteOnExit(); + return result; + } + + @Override + public Set<Path> listSubPaths(Path path) throws IOException { + Path normalizedPath = normalize(path); + // This is inefficient but more than adequate for testing purposes. + Set<Path> subPaths = new LinkedHashSet<Path>(); + for (Path p : inodes.tailMap(normalizedPath).keySet()) { + if (normalizedPath.equals(p.getParent())) { + subPaths.add(p); + } + } + return subPaths; + } + + @Override + public Set<Path> listDeepSubPaths(Path path) throws IOException { + Path normalizedPath = normalize(path); + String pathString = normalizedPath.toUri().getPath(); + if (!pathString.endsWith("/")) { + pathString += "/"; + } + // This is inefficient but more than adequate for testing purposes. + Set<Path> subPaths = new LinkedHashSet<Path>(); + for (Path p : inodes.tailMap(normalizedPath).keySet()) { + if (p.toUri().getPath().startsWith(pathString)) { + subPaths.add(p); + } + } + return subPaths; + } + + @Override + public void storeINode(Path path, INode inode) throws IOException { + inodes.put(normalize(path), inode); + } + + @Override + public void storeBlock(Block block, File file) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] buf = new byte[8192]; + int numRead; + BufferedInputStream in = null; + try { + in = new BufferedInputStream(new FileInputStream(file)); + while ((numRead = in.read(buf)) >= 0) { + out.write(buf, 0, numRead); + } + } finally { + if (in != null) { + in.close(); + } + } + blocks.put(block.getId(), out.toByteArray()); + } + + private Path normalize(Path path) { + if (!path.isAbsolute()) { + throw new IllegalArgumentException("Path must be absolute: " + path); + } + return new Path(path.toUri().getPath()); + } + + @Override + public void purge() throws IOException { + inodes.clear(); + blocks.clear(); + } + + @Override + public void dump() throws IOException { + StringBuilder sb = new StringBuilder(getClass().getSimpleName()); + sb.append(", \n"); + for (Map.Entry<Path, INode> entry : inodes.entrySet()) { + sb.append(entry.getKey()).append("\n"); + INode inode = entry.getValue(); + sb.append("\t").append(inode.getFileType()).append("\n"); + if (inode.getFileType() == FileType.DIRECTORY) { + continue; + } + for (int j = 0; j < inode.getBlocks().length; j++) { + sb.append("\t").append(inode.getBlocks()[j]).append("\n"); + } + } + System.out.println(sb); + + System.out.println(inodes.keySet()); + System.out.println(blocks.keySet()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/Jets3tS3FileSystemContractTest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/Jets3tS3FileSystemContractTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/Jets3tS3FileSystemContractTest.java new file mode 100644 index 0000000..53b3c03 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/Jets3tS3FileSystemContractTest.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3; + +import java.io.IOException; + +public class Jets3tS3FileSystemContractTest + extends S3FileSystemContractBaseTest { + + @Override + FileSystemStore getFileSystemStore() throws IOException { + return new Jets3tFileSystemStore(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java new file mode 100644 index 0000000..28b0507 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3; + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystemContractBaseTest; +import org.apache.hadoop.fs.Path; + +public abstract class S3FileSystemContractBaseTest + extends FileSystemContractBaseTest { + + private FileSystemStore store; + + abstract FileSystemStore getFileSystemStore() throws IOException; + + @Override + protected void setUp() throws Exception { + Configuration conf = new Configuration(); + store = getFileSystemStore(); + fs = new S3FileSystem(store); + fs.initialize(URI.create(conf.get("test.fs.s3.name")), conf); + } + + @Override + protected void tearDown() throws Exception { + store.purge(); + super.tearDown(); + } + + public void testCanonicalName() throws Exception { + assertNull("s3 doesn't support security token and shouldn't have canonical name", + fs.getCanonicalServiceName()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/S3InMemoryFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/S3InMemoryFileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/S3InMemoryFileSystem.java new file mode 100644 index 0000000..a4e9770 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/S3InMemoryFileSystem.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3; + +import org.apache.hadoop.fs.s3.S3FileSystem; +import org.apache.hadoop.fs.s3.InMemoryFileSystemStore; + +/** + * A helper implementation of {@link S3FileSystem} + * without actually connecting to S3 for unit testing. + */ +public class S3InMemoryFileSystem extends S3FileSystem { + public S3InMemoryFileSystem() { + super(new InMemoryFileSystemStore()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestINode.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestINode.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestINode.java new file mode 100644 index 0000000..086a43e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestINode.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3; + +import java.io.IOException; +import java.io.InputStream; + +import junit.framework.TestCase; + +import org.apache.hadoop.fs.s3.INode.FileType; + +public class TestINode extends TestCase { + + public void testSerializeFileWithSingleBlock() throws IOException { + Block[] blocks = { new Block(849282477840258181L, 128L) }; + INode inode = new INode(FileType.FILE, blocks); + + assertEquals("Length", 1L + 4 + 16, inode.getSerializedLength()); + InputStream in = inode.serialize(); + + INode deserialized = INode.deserialize(in); + + assertEquals("FileType", inode.getFileType(), deserialized.getFileType()); + Block[] deserializedBlocks = deserialized.getBlocks(); + assertEquals("Length", 1, deserializedBlocks.length); + assertEquals("Id", blocks[0].getId(), deserializedBlocks[0].getId()); + assertEquals("Length", blocks[0].getLength(), deserializedBlocks[0] + .getLength()); + + } + + public void testSerializeDirectory() throws IOException { + INode inode = INode.DIRECTORY_INODE; + assertEquals("Length", 1L, inode.getSerializedLength()); + InputStream in = inode.serialize(); + INode deserialized = INode.deserialize(in); + assertSame(INode.DIRECTORY_INODE, deserialized); + } + + public void testDeserializeNull() throws IOException { + assertNull(INode.deserialize(null)); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystemContract.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystemContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystemContract.java new file mode 100644 index 0000000..5d66cf1 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestInMemoryS3FileSystemContract.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3; + +import java.io.IOException; + +public class TestInMemoryS3FileSystemContract + extends S3FileSystemContractBaseTest { + + @Override + FileSystemStore getFileSystemStore() throws IOException { + return new InMemoryFileSystemStore(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestS3Credentials.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestS3Credentials.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestS3Credentials.java new file mode 100644 index 0000000..bcbf0dc --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestS3Credentials.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.s3; + +import java.net.URI; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; + +public class TestS3Credentials extends TestCase { + public void testInvalidHostnameWithUnderscores() throws Exception { + S3Credentials s3Credentials = new S3Credentials(); + try { + s3Credentials.initialize(new URI("s3://a:b@c_d"), new Configuration()); + fail("Should throw IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals("Invalid hostname in URI s3://a:b@c_d", e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestS3FileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestS3FileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestS3FileSystem.java new file mode 100644 index 0000000..f21989c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestS3FileSystem.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3; + +import java.io.IOException; +import java.net.URI; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; + +public class TestS3FileSystem extends TestCase { + + public void testInitialization() throws IOException { + initializationTest("s3://a:b@c", "s3://a:b@c"); + initializationTest("s3://a:b@c/", "s3://a:b@c"); + initializationTest("s3://a:b@c/path", "s3://a:b@c"); + initializationTest("s3://a@c", "s3://a@c"); + initializationTest("s3://a@c/", "s3://a@c"); + initializationTest("s3://a@c/path", "s3://a@c"); + initializationTest("s3://c", "s3://c"); + initializationTest("s3://c/", "s3://c"); + initializationTest("s3://c/path", "s3://c"); + } + + private void initializationTest(String initializationUri, String expectedUri) + throws IOException { + + S3FileSystem fs = new S3FileSystem(new InMemoryFileSystemStore()); + fs.initialize(URI.create(initializationUri), new Configuration()); + assertEquals(URI.create(expectedUri), fs.getUri()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestS3InMemoryFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestS3InMemoryFileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestS3InMemoryFileSystem.java new file mode 100644 index 0000000..fbdcd68 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3/TestS3InMemoryFileSystem.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URI; +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; + +public class TestS3InMemoryFileSystem extends TestCase { + + private static final String TEST_PATH = "s3://test/data.txt"; + + private static final String TEST_DATA = "Sample data for testing."; + + private S3InMemoryFileSystem fs; + + @Override + public void setUp() throws IOException { + fs = new S3InMemoryFileSystem(); + fs.initialize(URI.create("s3://test/"), new Configuration()); + } + + public void testBasicReadWriteIO() throws IOException { + FSDataOutputStream writeStream = fs.create(new Path(TEST_PATH)); + writeStream.write(TEST_DATA.getBytes()); + writeStream.flush(); + writeStream.close(); + + FSDataInputStream readStream = fs.open(new Path(TEST_PATH)); + BufferedReader br = new BufferedReader(new InputStreamReader(readStream)); + String line = ""; + StringBuffer stringBuffer = new StringBuffer(); + while ((line = br.readLine()) != null) { + stringBuffer.append(line); + } + br.close(); + + assert(TEST_DATA.equals(stringBuffer.toString())); + } + + @Override + public void tearDown() throws IOException { + fs.close(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java new file mode 100644 index 0000000..ac572aa --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3native; + +import static org.apache.hadoop.fs.s3native.NativeS3FileSystem.PATH_DELIMITER; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Time; + +/** + * <p> + * A stub implementation of {@link NativeFileSystemStore} for testing + * {@link NativeS3FileSystem} without actually connecting to S3. + * </p> + */ +public class InMemoryNativeFileSystemStore implements NativeFileSystemStore { + + private Configuration conf; + + private SortedMap<String, FileMetadata> metadataMap = + new TreeMap<String, FileMetadata>(); + private SortedMap<String, byte[]> dataMap = new TreeMap<String, byte[]>(); + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + this.conf = conf; + } + + @Override + public void storeEmptyFile(String key) throws IOException { + metadataMap.put(key, new FileMetadata(key, 0, Time.now())); + dataMap.put(key, new byte[0]); + } + + @Override + public void storeFile(String key, File file, byte[] md5Hash) + throws IOException { + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] buf = new byte[8192]; + int numRead; + BufferedInputStream in = null; + try { + in = new BufferedInputStream(new FileInputStream(file)); + while ((numRead = in.read(buf)) >= 0) { + out.write(buf, 0, numRead); + } + } finally { + if (in != null) { + in.close(); + } + } + metadataMap.put(key, + new FileMetadata(key, file.length(), Time.now())); + dataMap.put(key, out.toByteArray()); + } + + @Override + public InputStream retrieve(String key) throws IOException { + return retrieve(key, 0); + } + + @Override + public InputStream retrieve(String key, long byteRangeStart) + throws IOException { + + byte[] data = dataMap.get(key); + File file = createTempFile(); + BufferedOutputStream out = null; + try { + out = new BufferedOutputStream(new FileOutputStream(file)); + out.write(data, (int) byteRangeStart, + data.length - (int) byteRangeStart); + } finally { + if (out != null) { + out.close(); + } + } + return new FileInputStream(file); + } + + private File createTempFile() throws IOException { + File dir = new File(conf.get("fs.s3.buffer.dir")); + if (!dir.exists() && !dir.mkdirs()) { + throw new IOException("Cannot create S3 buffer directory: " + dir); + } + File result = File.createTempFile("test-", ".tmp", dir); + result.deleteOnExit(); + return result; + } + + @Override + public FileMetadata retrieveMetadata(String key) throws IOException { + return metadataMap.get(key); + } + + @Override + public PartialListing list(String prefix, int maxListingLength) + throws IOException { + return list(prefix, maxListingLength, null, false); + } + + @Override + public PartialListing list(String prefix, int maxListingLength, + String priorLastKey, boolean recursive) throws IOException { + + return list(prefix, recursive ? null : PATH_DELIMITER, maxListingLength, priorLastKey); + } + + private PartialListing list(String prefix, String delimiter, + int maxListingLength, String priorLastKey) throws IOException { + + if (prefix.length() > 0 && !prefix.endsWith(PATH_DELIMITER)) { + prefix += PATH_DELIMITER; + } + + List<FileMetadata> metadata = new ArrayList<FileMetadata>(); + SortedSet<String> commonPrefixes = new TreeSet<String>(); + for (String key : dataMap.keySet()) { + if (key.startsWith(prefix)) { + if (delimiter == null) { + metadata.add(retrieveMetadata(key)); + } else { + int delimIndex = key.indexOf(delimiter, prefix.length()); + if (delimIndex == -1) { + metadata.add(retrieveMetadata(key)); + } else { + String commonPrefix = key.substring(0, delimIndex); + commonPrefixes.add(commonPrefix); + } + } + } + if (metadata.size() + commonPrefixes.size() == maxListingLength) { + new PartialListing(key, metadata.toArray(new FileMetadata[0]), + commonPrefixes.toArray(new String[0])); + } + } + return new PartialListing(null, metadata.toArray(new FileMetadata[0]), + commonPrefixes.toArray(new String[0])); + } + + @Override + public void delete(String key) throws IOException { + metadataMap.remove(key); + dataMap.remove(key); + } + + @Override + public void copy(String srcKey, String dstKey) throws IOException { + metadataMap.put(dstKey, metadataMap.get(srcKey)); + dataMap.put(dstKey, dataMap.get(srcKey)); + } + + @Override + public void purge(String prefix) throws IOException { + Iterator<Entry<String, FileMetadata>> i = + metadataMap.entrySet().iterator(); + while (i.hasNext()) { + Entry<String, FileMetadata> entry = i.next(); + if (entry.getKey().startsWith(prefix)) { + dataMap.remove(entry.getKey()); + i.remove(); + } + } + } + + @Override + public void dump() throws IOException { + System.out.println(metadataMap.values()); + System.out.println(dataMap.keySet()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ec7fcd9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/Jets3tNativeS3FileSystemContractTest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/Jets3tNativeS3FileSystemContractTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/Jets3tNativeS3FileSystemContractTest.java new file mode 100644 index 0000000..6516c83 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/Jets3tNativeS3FileSystemContractTest.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3native; + +import java.io.IOException; + +public class Jets3tNativeS3FileSystemContractTest + extends NativeS3FileSystemContractBaseTest { + + @Override + NativeFileSystemStore getNativeFileSystemStore() throws IOException { + return new Jets3tNativeFileSystemStore(); + } +}