HADOOP-10400. Incorporate new S3A FileSystem implementation. Contributed by Jordan Mendelson and Dave Wang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/24d920b8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/24d920b8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/24d920b8 Branch: refs/heads/HDFS-6581 Commit: 24d920b80eb3626073925a1d0b6dcf148add8cc0 Parents: fc741b5 Author: Aaron T. Myers <a...@apache.org> Authored: Mon Sep 15 08:27:07 2014 -0700 Committer: Aaron T. Myers <a...@apache.org> Committed: Mon Sep 15 08:27:07 2014 -0700 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../src/main/conf/log4j.properties | 5 + .../src/main/resources/core-default.xml | 86 ++ hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml | 8 + hadoop-project/pom.xml | 26 +- hadoop-tools/hadoop-aws/pom.xml | 10 + .../fs/s3a/AnonymousAWSCredentialsProvider.java | 37 + .../fs/s3a/BasicAWSCredentialsProvider.java | 51 + .../org/apache/hadoop/fs/s3a/Constants.java | 90 ++ .../org/apache/hadoop/fs/s3a/S3AFileStatus.java | 62 ++ .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 1019 ++++++++++++++++++ .../apache/hadoop/fs/s3a/S3AInputStream.java | 207 ++++ .../apache/hadoop/fs/s3a/S3AOutputStream.java | 208 ++++ .../services/org.apache.hadoop.fs.FileSystem | 1 + .../hadoop/fs/contract/s3a/S3AContract.java | 43 + .../fs/contract/s3a/TestS3AContractCreate.java | 38 + .../fs/contract/s3a/TestS3AContractDelete.java | 31 + .../fs/contract/s3a/TestS3AContractMkdir.java | 34 + .../fs/contract/s3a/TestS3AContractOpen.java | 31 + .../fs/contract/s3a/TestS3AContractRename.java | 64 ++ .../fs/contract/s3a/TestS3AContractRootDir.java | 35 + .../fs/contract/s3a/TestS3AContractSeek.java | 31 + .../fs/s3a/S3AFileSystemContractBaseTest.java | 327 ++++++ .../src/test/resources/contract/s3a.xml | 105 ++ .../src/test/resources/contract/s3n.xml | 7 +- hadoop-tools/hadoop-azure/pom.xml | 10 +- 26 files changed, 2552 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 051eac1..c2ae5ed 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -342,6 +342,9 @@ Release 2.6.0 - UNRELEASED HADOOP-10893. isolated classloader on the client side (Sangjin Lee via jlowe) + HADOOP-10400. Incorporate new S3A FileSystem implementation. (Jordan + Mendelson and Dave Wang via atm) + IMPROVEMENTS HADOOP-10808. Remove unused native code for munlock. (cnauroth) http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties b/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties index ef9acbf..5fa21fa 100644 --- a/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties +++ b/hadoop-common-project/hadoop-common/src/main/conf/log4j.properties @@ -174,6 +174,11 @@ log4j.appender.MRAUDIT.MaxBackupIndex=${mapred.audit.log.maxbackupindex} # Jets3t library log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR +# AWS SDK & S3A FileSystem +log4j.logger.com.amazonaws=ERROR +log4j.logger.com.amazonaws.http.AmazonHttpClient=ERROR +log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN + # # Event Counter Appender # Sends counts of logging messages at different severity levels to Hadoop Metrics. http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 3cc7545..828dec2 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -690,6 +690,92 @@ for ldap providers in the same way as above does. </property> <property> + <name>fs.s3a.access.key</name> + <description>AWS access key ID. Omit for Role-based authentication.</description> +</property> + +<property> + <name>fs.s3a.secret.key</name> + <description>AWS secret key. Omit for Role-based authentication.</description> +</property> + +<property> + <name>fs.s3a.connection.maximum</name> + <value>15</value> + <description>Controls the maximum number of simultaneous connections to S3.</description> +</property> + +<property> + <name>fs.s3a.connection.ssl.enabled</name> + <value>true</value> + <description>Enables or disables SSL connections to S3.</description> +</property> + +<property> + <name>fs.s3a.attempts.maximum</name> + <value>10</value> + <description>How many times we should retry commands on transient errors.</description> +</property> + +<property> + <name>fs.s3a.connection.timeout</name> + <value>5000</value> + <description>Socket connection timeout in seconds.</description> +</property> + +<property> + <name>fs.s3a.paging.maximum</name> + <value>5000</value> + <description>How many keys to request from S3 when doing + directory listings at a time.</description> +</property> + +<property> + <name>fs.s3a.multipart.size</name> + <value>104857600</value> + <description>How big (in bytes) to split upload or copy operations up into.</description> +</property> + +<property> + <name>fs.s3a.multipart.threshold</name> + <value>2147483647</value> + <description>Threshold before uploads or copies use parallel multipart operations.</description> +</property> + +<property> + <name>fs.s3a.acl.default</name> + <description>Set a canned ACL for newly created and copied objects. Value may be private, + public-read, public-read-write, authenticated-read, log-delivery-write, + bucket-owner-read, or bucket-owner-full-control.</description> +</property> + +<property> + <name>fs.s3a.multipart.purge</name> + <value>false</value> + <description>True if you want to purge existing multipart uploads that may not have been + completed/aborted correctly</description> +</property> + +<property> + <name>fs.s3a.multipart.purge.age</name> + <value>86400</value> + <description>Minimum age in seconds of multipart uploads to purge</description> +</property> + +<property> + <name>fs.s3a.buffer.dir</name> + <value>${hadoop.tmp.dir}/s3a</value> + <description>Comma separated list of directories that will be used to buffer file + uploads to.</description> +</property> + +<property> + <name>fs.s3a.impl</name> + <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> + <description>The implementation class of the S3A Filesystem</description> +</property> + +<property> <name>io.seqfile.compress.blocksize</name> <value>1000000</value> <description>The minimum block size for compression in block compressed http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml index 24fa87b..a44f686 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml @@ -131,6 +131,10 @@ <artifactId>jets3t</artifactId> </exclusion> <exclusion> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk</artifactId> + </exclusion> + <exclusion> <groupId>org.eclipse.jdt</groupId> <artifactId>core</artifactId> </exclusion> @@ -170,6 +174,10 @@ <artifactId>jets3t</artifactId> </exclusion> <exclusion> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk</artifactId> + </exclusion> + <exclusion> <groupId>org.eclipse.jdt</groupId> <artifactId>core</artifactId> </exclusion> http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index ad8422f..502655f 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -61,8 +61,9 @@ <!-- jersey version --> <jersey.version>1.9</jersey.version> - <!-- jackson version --> + <!-- jackson versions --> <jackson.version>1.9.13</jackson.version> + <jackson2.version>2.2.3</jackson2.version> <!-- ProtocolBuffer version, used to verify the protoc version and --> <!-- define the protobuf JAR version --> @@ -581,13 +582,7 @@ <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk</artifactId> - <version>1.7.2</version> - <exclusions> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </exclusion> - </exclusions> + <version>1.7.4</version> </dependency> <dependency> <groupId>org.apache.mina</groupId> @@ -675,6 +670,21 @@ <version>${jackson.version}</version> </dependency> <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson2.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson2.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson2.version}</version> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>1.8.5</version> http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index c01a33d..61a5e84 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -100,6 +100,16 @@ <type>test-jar</type> </dependency> + <!-- see ../../hadoop-project/pom.xml for versions --> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk</artifactId> http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java new file mode 100644 index 0000000..2a24273 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AnonymousAWSCredentials; +import com.amazonaws.auth.AWSCredentials; + +public class AnonymousAWSCredentialsProvider implements AWSCredentialsProvider { + public AWSCredentials getCredentials() { + return new AnonymousAWSCredentials(); + } + + public void refresh() {} + + @Override + public String toString() { + return getClass().getSimpleName(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java new file mode 100644 index 0000000..8d45bc6 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.AWSCredentials; + +public class BasicAWSCredentialsProvider implements AWSCredentialsProvider { + private String accessKey; + private String secretKey; + + public BasicAWSCredentialsProvider(String accessKey, String secretKey) { + this.accessKey = accessKey; + this.secretKey = secretKey; + } + + public AWSCredentials getCredentials() { + if (accessKey != null && secretKey != null) { + return new BasicAWSCredentials(accessKey, secretKey); + } + + throw new AmazonClientException( + "Access key or secret key is null"); + } + + public void refresh() {} + + @Override + public String toString() { + return getClass().getSimpleName(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java new file mode 100644 index 0000000..9723b82 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + + +public class Constants { + // s3 access key + public static final String OLD_ACCESS_KEY = "fs.s3a.awsAccessKeyId"; + public static final String NEW_ACCESS_KEY = "fs.s3a.access.key"; + + // s3 secret key + public static final String OLD_SECRET_KEY = "fs.s3a.awsSecretAccessKey"; + public static final String NEW_SECRET_KEY = "fs.s3a.secret.key"; + + // number of simultaneous connections to s3 + public static final String OLD_MAXIMUM_CONNECTIONS = "fs.s3a.maxConnections"; + public static final String NEW_MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum"; + public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15; + + // connect to s3 over ssl? + public static final String OLD_SECURE_CONNECTIONS = "fs.s3a.secureConnections"; + public static final String NEW_SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled"; + public static final boolean DEFAULT_SECURE_CONNECTIONS = true; + + // number of times we should retry errors + public static final String OLD_MAX_ERROR_RETRIES = "fs.s3a.maxErrorRetries"; + public static final String NEW_MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum"; + public static final int DEFAULT_MAX_ERROR_RETRIES = 10; + + // seconds until we give up on a connection to s3 + public static final String OLD_SOCKET_TIMEOUT = "fs.s3a.socketTimeout"; + public static final String NEW_SOCKET_TIMEOUT = "fs.s3a.connection.timeout"; + public static final int DEFAULT_SOCKET_TIMEOUT = 50000; + + // number of records to get while paging through a directory listing + public static final String OLD_MAX_PAGING_KEYS = "fs.s3a.maxPagingKeys"; + public static final String NEW_MAX_PAGING_KEYS = "fs.s3a.paging.maximum"; + public static final int DEFAULT_MAX_PAGING_KEYS = 5000; + + // size of each of or multipart pieces in bytes + public static final String OLD_MULTIPART_SIZE = "fs.s3a.multipartSize"; + public static final String NEW_MULTIPART_SIZE = "fs.s3a.multipart.size"; + public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB + + // minimum size in bytes before we start a multipart uploads or copy + public static final String OLD_MIN_MULTIPART_THRESHOLD = "fs.s3a.minMultipartSize"; + public static final String NEW_MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold"; + public static final int DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE; + + // comma separated list of directories + public static final String BUFFER_DIR = "fs.s3a.buffer.dir"; + + // private | public-read | public-read-write | authenticated-read | + // log-delivery-write | bucket-owner-read | bucket-owner-full-control + public static final String OLD_CANNED_ACL = "fs.s3a.cannedACL"; + public static final String NEW_CANNED_ACL = "fs.s3a.acl.default"; + public static final String DEFAULT_CANNED_ACL = ""; + + // should we try to purge old multipart uploads when starting up + public static final String OLD_PURGE_EXISTING_MULTIPART = "fs.s3a.purgeExistingMultiPart"; + public static final String NEW_PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge"; + public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false; + + // purge any multipart uploads older than this number of seconds + public static final String OLD_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.purgeExistingMultiPartAge"; + public static final String NEW_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age"; + public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400; + + // s3 server-side encryption + public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = + "fs.s3a.server-side-encryption-algorithm"; + + public static final String S3N_FOLDER_SUFFIX = "_$folder$"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java new file mode 100644 index 0000000..eb64492 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +public class S3AFileStatus extends FileStatus { + private boolean isEmptyDirectory; + + // Directories + public S3AFileStatus(boolean isdir, boolean isemptydir, Path path) { + super(0, isdir, 1, 0, 0, path); + isEmptyDirectory = isemptydir; + } + + // Files + public S3AFileStatus(long length, long modification_time, Path path) { + super(length, false, 1, 0, modification_time, path); + isEmptyDirectory = false; + } + + public boolean isEmptyDirectory() { + return isEmptyDirectory; + } + + /** Compare if this object is equal to another object + * @param o the object to be compared. + * @return true if two file status has the same path name; false if not. + */ + @Override + public boolean equals(Object o) { + return super.equals(o); + } + + /** + * Returns a hash code value for the object, which is defined as + * the hash code of the path name. + * + * @return a hash code value for the path name. + */ + @Override + public int hashCode() { + return super.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java new file mode 100644 index 0000000..a597e62 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -0,0 +1,1019 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentialsProviderChain; + +import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.transfer.Copy; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; +import com.amazonaws.services.s3.transfer.Upload; +import com.amazonaws.event.ProgressListener; +import com.amazonaws.event.ProgressEvent; + +import org.apache.commons.lang.StringUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import static org.apache.hadoop.fs.s3a.Constants.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3AFileSystem extends FileSystem { + private URI uri; + private Path workingDir; + private AmazonS3Client s3; + private String bucket; + private int maxKeys; + private long partSize; + private int partSizeThreshold; + public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); + private CannedAccessControlList cannedACL; + private String serverSideEncryptionAlgorithm; + + + /** Called after a new FileSystem instance is constructed. + * @param name a uri whose authority section names the host, port, etc. + * for this FileSystem + * @param conf the configuration + */ + public void initialize(URI name, Configuration conf) throws IOException { + super.initialize(name, conf); + + + uri = URI.create(name.getScheme() + "://" + name.getAuthority()); + workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, + this.getWorkingDirectory()); + + // Try to get our credentials or just connect anonymously + String accessKey = conf.get(NEW_ACCESS_KEY, conf.get(OLD_ACCESS_KEY, null)); + String secretKey = conf.get(NEW_SECRET_KEY, conf.get(OLD_SECRET_KEY, null)); + + String userInfo = name.getUserInfo(); + if (userInfo != null) { + int index = userInfo.indexOf(':'); + if (index != -1) { + accessKey = userInfo.substring(0, index); + secretKey = userInfo.substring(index + 1); + } else { + accessKey = userInfo; + } + } + + AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain( + new BasicAWSCredentialsProvider(accessKey, secretKey), + new InstanceProfileCredentialsProvider(), + new AnonymousAWSCredentialsProvider() + ); + + bucket = name.getHost(); + + ClientConfiguration awsConf = new ClientConfiguration(); + awsConf.setMaxConnections(conf.getInt(NEW_MAXIMUM_CONNECTIONS, + conf.getInt(OLD_MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS))); + awsConf.setProtocol(conf.getBoolean(NEW_SECURE_CONNECTIONS, + conf.getBoolean(OLD_SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) ? + Protocol.HTTPS : Protocol.HTTP); + awsConf.setMaxErrorRetry(conf.getInt(NEW_MAX_ERROR_RETRIES, + conf.getInt(OLD_MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES))); + awsConf.setSocketTimeout(conf.getInt(NEW_SOCKET_TIMEOUT, + conf.getInt(OLD_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT))); + + s3 = new AmazonS3Client(credentials, awsConf); + + maxKeys = conf.getInt(NEW_MAX_PAGING_KEYS, + conf.getInt(OLD_MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS)); + partSize = conf.getLong(NEW_MULTIPART_SIZE, + conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE)); + partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD, + conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD)); + + if (partSize < 5 * 1024 * 1024) { + LOG.error(NEW_MULTIPART_SIZE + " must be at least 5 MB"); + partSize = 5 * 1024 * 1024; + } + + if (partSizeThreshold < 5 * 1024 * 1024) { + LOG.error(NEW_MIN_MULTIPART_THRESHOLD + " must be at least 5 MB"); + partSizeThreshold = 5 * 1024 * 1024; + } + + String cannedACLName = conf.get(NEW_CANNED_ACL, + conf.get(OLD_CANNED_ACL, DEFAULT_CANNED_ACL)); + if (!cannedACLName.isEmpty()) { + cannedACL = CannedAccessControlList.valueOf(cannedACLName); + } else { + cannedACL = null; + } + + if (!s3.doesBucketExist(bucket)) { + throw new IOException("Bucket " + bucket + " does not exist"); + } + + boolean purgeExistingMultipart = conf.getBoolean(NEW_PURGE_EXISTING_MULTIPART, + conf.getBoolean(OLD_PURGE_EXISTING_MULTIPART, DEFAULT_PURGE_EXISTING_MULTIPART)); + long purgeExistingMultipartAge = conf.getLong(NEW_PURGE_EXISTING_MULTIPART_AGE, + conf.getLong(OLD_PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE)); + + if (purgeExistingMultipart) { + TransferManager transferManager = new TransferManager(s3); + Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000); + + transferManager.abortMultipartUploads(bucket, purgeBefore); + transferManager.shutdownNow(false); + } + + serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM); + + setConf(conf); + } + + /** + * Return the protocol scheme for the FileSystem. + * + * @return "s3a" + */ + public String getScheme() { + return "s3a"; + } + + /** Returns a URI whose scheme and authority identify this FileSystem.*/ + public URI getUri() { + return uri; + } + + + public S3AFileSystem() { + super(); + } + + /* Turns a path (relative or otherwise) into an S3 key + */ + private String pathToKey(Path path) { + if (!path.isAbsolute()) { + path = new Path(workingDir, path); + } + + if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) { + return ""; + } + + return path.toUri().getPath().substring(1); + } + + private Path keyToPath(String key) { + return new Path("/" + key); + } + + /** + * Opens an FSDataInputStream at the indicated Path. + * @param f the file name to open + * @param bufferSize the size of the buffer to be used. + */ + public FSDataInputStream open(Path f, int bufferSize) + throws IOException { + + LOG.info("Opening '" + f + "' for reading"); + final FileStatus fileStatus = getFileStatus(f); + if (fileStatus.isDirectory()) { + throw new FileNotFoundException("Can't open " + f + " because it is a directory"); + } + + return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f), + fileStatus.getLen(), s3, statistics)); + } + + /** + * Create an FSDataOutputStream at the indicated Path with write-progress + * reporting. + * @param f the file name to open + * @param permission + * @param overwrite if a file with this name already exists, then if true, + * the file will be overwritten, and if false an error will be thrown. + * @param bufferSize the size of the buffer to be used. + * @param replication required block replication for the file. + * @param blockSize + * @param progress + * @throws IOException + * @see #setPermission(Path, FsPermission) + */ + public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, + int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { + String key = pathToKey(f); + + if (!overwrite && exists(f)) { + throw new FileAlreadyExistsException(f + " already exists"); + } + + // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file + return new FSDataOutputStream(new S3AOutputStream(getConf(), s3, this, + bucket, key, progress, cannedACL, statistics, + serverSideEncryptionAlgorithm), null); + } + + /** + * Append to an existing file (optional operation). + * @param f the existing file to be appended. + * @param bufferSize the size of the buffer to be used. + * @param progress for reporting progress if it is not null. + * @throws IOException + */ + public FSDataOutputStream append(Path f, int bufferSize, + Progressable progress) throws IOException { + throw new IOException("Not supported"); + } + + + /** + * Renames Path src to Path dst. Can take place on local fs + * or remote DFS. + * + * Warning: S3 does not support renames. This method does a copy which can + * take S3 some time to execute with large files and directories. Since + * there is no Progressable passed in, this can time out jobs. + * + * Note: This implementation differs with other S3 drivers. Specifically: + * Fails if src is a file and dst is a directory. + * Fails if src is a directory and dst is a file. + * Fails if the parent of dst does not exist or is a file. + * Fails if dst is a directory that is not empty. + * + * @param src path to be renamed + * @param dst new path after rename + * @throws IOException on failure + * @return true if rename is successful + */ + public boolean rename(Path src, Path dst) throws IOException { + LOG.info("Rename path " + src + " to " + dst); + + String srcKey = pathToKey(src); + String dstKey = pathToKey(dst); + + if (srcKey.length() == 0 || dstKey.length() == 0) { + LOG.info("rename: src or dst are empty"); + return false; + } + + if (srcKey.equals(dstKey)) { + LOG.info("rename: src and dst refer to the same file"); + return true; + } + + S3AFileStatus srcStatus; + try { + srcStatus = getFileStatus(src); + } catch (FileNotFoundException e) { + LOG.info("rename: src not found " + src); + return false; + } + + S3AFileStatus dstStatus = null; + try { + dstStatus = getFileStatus(dst); + + if (srcStatus.isFile() && dstStatus.isDirectory()) { + LOG.info("rename: src is a file and dst is a directory"); + return false; + } + + if (srcStatus.isDirectory() && dstStatus.isFile()) { + LOG.info("rename: src is a directory and dst is a file"); + return false; + } + + } catch (FileNotFoundException e) { + // Parent must exist + Path parent = dst.getParent(); + if (!pathToKey(parent).isEmpty()) { + try { + S3AFileStatus dstParentStatus = getFileStatus(dst.getParent()); + if (!dstParentStatus.isDirectory()) { + return false; + } + } catch (FileNotFoundException e2) { + return false; + } + } + } + + // Ok! Time to start + if (srcStatus.isFile()) { + if (LOG.isDebugEnabled()) { + LOG.debug("rename: renaming file " + src + " to " + dst); + } + copyFile(srcKey, dstKey); + delete(src, false); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("rename: renaming directory " + src + " to " + dst); + } + + // This is a directory to directory copy + if (!dstKey.endsWith("/")) { + dstKey = dstKey + "/"; + } + + if (!srcKey.endsWith("/")) { + srcKey = srcKey + "/"; + } + + List<DeleteObjectsRequest.KeyVersion> keysToDelete = + new ArrayList<DeleteObjectsRequest.KeyVersion>(); + if (dstStatus != null && dstStatus.isEmptyDirectory()) { + copyFile(srcKey, dstKey); + statistics.incrementWriteOps(1); + keysToDelete.add(new DeleteObjectsRequest.KeyVersion(srcKey)); + } + + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setPrefix(srcKey); + request.setMaxKeys(maxKeys); + + ObjectListing objects = s3.listObjects(request); + statistics.incrementReadOps(1); + + while (true) { + for (S3ObjectSummary summary : objects.getObjectSummaries()) { + keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); + String newDstKey = dstKey + summary.getKey().substring(srcKey.length()); + copyFile(summary.getKey(), newDstKey); + } + + if (objects.isTruncated()) { + objects = s3.listNextBatchOfObjects(objects); + statistics.incrementReadOps(1); + } else { + break; + } + } + + + if (!keysToDelete.isEmpty()) { + DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket); + deleteRequest.setKeys(keysToDelete); + s3.deleteObjects(deleteRequest); + statistics.incrementWriteOps(1); + } + } + + if (src.getParent() != dst.getParent()) { + deleteUnnecessaryFakeDirectories(dst.getParent()); + createFakeDirectoryIfNecessary(src.getParent()); + } + return true; + } + + /** Delete a file. + * + * @param f the path to delete. + * @param recursive if path is a directory and set to + * true, the directory is deleted else throws an exception. In + * case of a file the recursive can be set to either true or false. + * @return true if delete is successful else false. + * @throws IOException + */ + public boolean delete(Path f, boolean recursive) throws IOException { + LOG.info("Delete path " + f + " - recursive " + recursive); + S3AFileStatus status; + try { + status = getFileStatus(f); + } catch (FileNotFoundException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Couldn't delete " + f + " - does not exist"); + } + return false; + } + + String key = pathToKey(f); + + if (status.isDirectory()) { + if (LOG.isDebugEnabled()) { + LOG.debug("delete: Path is a directory"); + } + + if (!recursive && !status.isEmptyDirectory()) { + throw new IOException("Path is a folder: " + f + + " and it is not an empty directory"); + } + + if (!key.endsWith("/")) { + key = key + "/"; + } + + if (key.equals("/")) { + LOG.info("s3a cannot delete the root directory"); + return false; + } + + if (status.isEmptyDirectory()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting fake empty directory"); + } + s3.deleteObject(bucket, key); + statistics.incrementWriteOps(1); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Getting objects for directory prefix " + key + " to delete"); + } + + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setPrefix(key); + // Hopefully not setting a delimiter will cause this to find everything + //request.setDelimiter("/"); + request.setMaxKeys(maxKeys); + + List<DeleteObjectsRequest.KeyVersion> keys = + new ArrayList<DeleteObjectsRequest.KeyVersion>(); + ObjectListing objects = s3.listObjects(request); + statistics.incrementReadOps(1); + while (true) { + for (S3ObjectSummary summary : objects.getObjectSummaries()) { + keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey())); + if (LOG.isDebugEnabled()) { + LOG.debug("Got object to delete " + summary.getKey()); + } + } + + DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket); + deleteRequest.setKeys(keys); + s3.deleteObjects(deleteRequest); + statistics.incrementWriteOps(1); + keys.clear(); + + if (objects.isTruncated()) { + objects = s3.listNextBatchOfObjects(objects); + statistics.incrementReadOps(1); + } else { + break; + } + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("delete: Path is a file"); + } + s3.deleteObject(bucket, key); + statistics.incrementWriteOps(1); + } + + createFakeDirectoryIfNecessary(f.getParent()); + + return true; + } + + private void createFakeDirectoryIfNecessary(Path f) throws IOException { + String key = pathToKey(f); + if (!key.isEmpty() && !exists(f)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating new fake directory at " + f); + } + createFakeDirectory(bucket, key); + } + } + + /** + * List the statuses of the files/directories in the given path if the path is + * a directory. + * + * @param f given path + * @return the statuses of the files/directories in the given patch + * @throws FileNotFoundException when the path does not exist; + * IOException see specific implementation + */ + public FileStatus[] listStatus(Path f) throws FileNotFoundException, + IOException { + String key = pathToKey(f); + LOG.info("List status for path: " + f); + + final List<FileStatus> result = new ArrayList<FileStatus>(); + final FileStatus fileStatus = getFileStatus(f); + + if (fileStatus.isDirectory()) { + if (!key.isEmpty()) { + key = key + "/"; + } + + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setPrefix(key); + request.setDelimiter("/"); + request.setMaxKeys(maxKeys); + + if (LOG.isDebugEnabled()) { + LOG.debug("listStatus: doing listObjects for directory " + key); + } + + ObjectListing objects = s3.listObjects(request); + statistics.incrementReadOps(1); + + while (true) { + for (S3ObjectSummary summary : objects.getObjectSummaries()) { + Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir); + // Skip over keys that are ourselves and old S3N _$folder$ files + if (keyPath.equals(f) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring: " + keyPath); + } + continue; + } + + if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) { + result.add(new S3AFileStatus(true, true, keyPath)); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: fd: " + keyPath); + } + } else { + result.add(new S3AFileStatus(summary.getSize(), + dateToLong(summary.getLastModified()), keyPath)); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: fi: " + keyPath); + } + } + } + + for (String prefix : objects.getCommonPrefixes()) { + Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir); + if (keyPath.equals(f)) { + continue; + } + result.add(new S3AFileStatus(true, false, keyPath)); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: rd: " + keyPath); + } + } + + if (objects.isTruncated()) { + if (LOG.isDebugEnabled()) { + LOG.debug("listStatus: list truncated - getting next batch"); + } + + objects = s3.listNextBatchOfObjects(objects); + statistics.incrementReadOps(1); + } else { + break; + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: rd (not a dir): " + f); + } + result.add(fileStatus); + } + + return result.toArray(new FileStatus[result.size()]); + } + + + + /** + * Set the current working directory for the given file system. All relative + * paths will be resolved relative to it. + * + * @param new_dir + */ + public void setWorkingDirectory(Path new_dir) { + workingDir = new_dir; + } + + /** + * Get the current working directory for the given file system + * @return the directory pathname + */ + public Path getWorkingDirectory() { + return workingDir; + } + + /** + * Make the given file and all non-existent parents into + * directories. Has the semantics of Unix 'mkdir -p'. + * Existence of the directory hierarchy is not an error. + * @param f path to create + * @param permission to apply to f + */ + // TODO: If we have created an empty file at /foo/bar and we then call + // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/? + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + LOG.info("Making directory: " + f); + + try { + FileStatus fileStatus = getFileStatus(f); + + if (fileStatus.isDirectory()) { + return true; + } else { + throw new FileAlreadyExistsException("Path is a file: " + f); + } + } catch (FileNotFoundException e) { + Path fPart = f; + do { + try { + FileStatus fileStatus = getFileStatus(fPart); + if (fileStatus.isFile()) { + throw new FileAlreadyExistsException(String.format( + "Can't make directory for path '%s' since it is a file.", + fPart)); + } + } catch (FileNotFoundException fnfe) { + } + fPart = fPart.getParent(); + } while (fPart != null); + + String key = pathToKey(f); + createFakeDirectory(bucket, key); + return true; + } + } + + /** + * Return a file status object that represents the path. + * @param f The path we want information from + * @return a FileStatus object + * @throws java.io.FileNotFoundException when the path does not exist; + * IOException see specific implementation + */ + public S3AFileStatus getFileStatus(Path f) throws IOException { + String key = pathToKey(f); + + LOG.info("Getting path status for " + f + " (" + key + ")"); + + if (!key.isEmpty()) { + try { + ObjectMetadata meta = s3.getObjectMetadata(bucket, key); + statistics.incrementReadOps(1); + + if (objectRepresentsDirectory(key, meta.getContentLength())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found exact file: fake directory"); + } + return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Found exact file: normal file"); + } + return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()), + f.makeQualified(uri, workingDir)); + } + } catch (AmazonServiceException e) { + if (e.getStatusCode() != 404) { + printAmazonServiceException(e); + throw e; + } + } catch (AmazonClientException e) { + printAmazonClientException(e); + throw e; + } + + // Necessary? + if (!key.endsWith("/")) { + try { + String newKey = key + "/"; + ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey); + statistics.incrementReadOps(1); + + if (objectRepresentsDirectory(newKey, meta.getContentLength())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found file (with /): fake directory"); + } + return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir)); + } else { + LOG.warn("Found file (with /): real file? should not happen: " + key); + + return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()), + f.makeQualified(uri, workingDir)); + } + } catch (AmazonServiceException e) { + if (e.getStatusCode() != 404) { + printAmazonServiceException(e); + throw e; + } + } catch (AmazonClientException e) { + printAmazonClientException(e); + throw e; + } + } + } + + try { + if (!key.isEmpty() && !key.endsWith("/")) { + key = key + "/"; + } + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setPrefix(key); + request.setDelimiter("/"); + request.setMaxKeys(1); + + ObjectListing objects = s3.listObjects(request); + statistics.incrementReadOps(1); + + if (objects.getCommonPrefixes().size() > 0 || objects.getObjectSummaries().size() > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found path as directory (with /): " + + objects.getCommonPrefixes().size() + "/" + + objects.getObjectSummaries().size()); + + for (S3ObjectSummary summary : objects.getObjectSummaries()) { + LOG.debug("Summary: " + summary.getKey() + " " + summary.getSize()); + } + for (String prefix : objects.getCommonPrefixes()) { + LOG.debug("Prefix: " + prefix); + } + } + + return new S3AFileStatus(true, false, f.makeQualified(uri, workingDir)); + } + } catch (AmazonServiceException e) { + if (e.getStatusCode() != 404) { + printAmazonServiceException(e); + throw e; + } + } catch (AmazonClientException e) { + printAmazonClientException(e); + throw e; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Not Found: " + f); + } + throw new FileNotFoundException("No such file or directory: " + f); + } + + /** + * The src file is on the local disk. Add it to FS at + * the given dst name. + * + * This version doesn't need to create a temporary file to calculate the md5. + * Sadly this doesn't seem to be used by the shell cp :( + * + * delSrc indicates if the source should be removed + * @param delSrc whether to delete the src + * @param overwrite whether to overwrite an existing file + * @param src path + * @param dst path + */ + @Override + public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, + Path dst) throws IOException { + String key = pathToKey(dst); + + if (!overwrite && exists(dst)) { + throw new IOException(dst + " already exists"); + } + + LOG.info("Copying local file from " + src + " to " + dst); + + // Since we have a local file, we don't need to stream into a temporary file + LocalFileSystem local = getLocal(getConf()); + File srcfile = local.pathToFile(src); + + TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); + transferConfiguration.setMinimumUploadPartSize(partSize); + transferConfiguration.setMultipartUploadThreshold(partSizeThreshold); + + TransferManager transfers = new TransferManager(s3); + transfers.setConfiguration(transferConfiguration); + + final ObjectMetadata om = new ObjectMetadata(); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + om.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile); + putObjectRequest.setCannedAcl(cannedACL); + putObjectRequest.setMetadata(om); + + ProgressListener progressListener = new ProgressListener() { + public void progressChanged(ProgressEvent progressEvent) { + switch (progressEvent.getEventCode()) { + case ProgressEvent.PART_COMPLETED_EVENT_CODE: + statistics.incrementWriteOps(1); + break; + } + } + }; + + Upload up = transfers.upload(putObjectRequest); + up.addProgressListener(progressListener); + try { + up.waitForUploadResult(); + statistics.incrementWriteOps(1); + } catch (InterruptedException e) { + throw new IOException("Got interrupted, cancelling"); + } finally { + transfers.shutdownNow(false); + } + + // This will delete unnecessary fake parent directories + finishedWrite(key); + + if (delSrc) { + local.delete(src, false); + } + } + + /** + * Override getCononicalServiceName because we don't support token in S3A + */ + @Override + public String getCanonicalServiceName() { + // Does not support Token + return null; + } + + private void copyFile(String srcKey, String dstKey) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("copyFile " + srcKey + " -> " + dstKey); + } + + TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); + transferConfiguration.setMultipartCopyPartSize(partSize); + + TransferManager transfers = new TransferManager(s3); + transfers.setConfiguration(transferConfiguration); + + ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey); + final ObjectMetadata dstom = srcom.clone(); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + dstom.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey); + copyObjectRequest.setCannedAccessControlList(cannedACL); + copyObjectRequest.setNewObjectMetadata(dstom); + + ProgressListener progressListener = new ProgressListener() { + public void progressChanged(ProgressEvent progressEvent) { + switch (progressEvent.getEventCode()) { + case ProgressEvent.PART_COMPLETED_EVENT_CODE: + statistics.incrementWriteOps(1); + break; + } + } + }; + + Copy copy = transfers.copy(copyObjectRequest); + copy.addProgressListener(progressListener); + try { + copy.waitForCopyResult(); + statistics.incrementWriteOps(1); + } catch (InterruptedException e) { + throw new IOException("Got interrupted, cancelling"); + } finally { + transfers.shutdownNow(false); + } + } + + private boolean objectRepresentsDirectory(final String name, final long size) { + return !name.isEmpty() && name.charAt(name.length() - 1) == '/' && size == 0L; + } + + // Handles null Dates that can be returned by AWS + private static long dateToLong(final Date date) { + if (date == null) { + return 0L; + } + + return date.getTime(); + } + + public void finishedWrite(String key) throws IOException { + deleteUnnecessaryFakeDirectories(keyToPath(key).getParent()); + } + + private void deleteUnnecessaryFakeDirectories(Path f) throws IOException { + while (true) { + try { + String key = pathToKey(f); + if (key.isEmpty()) { + break; + } + + S3AFileStatus status = getFileStatus(f); + + if (status.isDirectory() && status.isEmptyDirectory()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting fake directory " + key + "/"); + } + s3.deleteObject(bucket, key + "/"); + statistics.incrementWriteOps(1); + } + } catch (FileNotFoundException e) { + } catch (AmazonServiceException e) {} + + if (f.isRoot()) { + break; + } + + f = f.getParent(); + } + } + + + private void createFakeDirectory(final String bucketName, final String objectName) + throws AmazonClientException, AmazonServiceException { + if (!objectName.endsWith("/")) { + createEmptyObject(bucketName, objectName + "/"); + } else { + createEmptyObject(bucketName, objectName); + } + } + + // Used to create an empty file that represents an empty directory + private void createEmptyObject(final String bucketName, final String objectName) + throws AmazonClientException, AmazonServiceException { + final InputStream im = new InputStream() { + @Override + public int read() throws IOException { + return -1; + } + }; + + final ObjectMetadata om = new ObjectMetadata(); + om.setContentLength(0L); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + om.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, im, om); + putObjectRequest.setCannedAcl(cannedACL); + s3.putObject(putObjectRequest); + statistics.incrementWriteOps(1); + } + + /** + * Return the number of bytes that large input files should be optimally + * be split into to minimize i/o time. + * @deprecated use {@link #getDefaultBlockSize(Path)} instead + */ + @Deprecated + public long getDefaultBlockSize() { + // default to 32MB: large enough to minimize the impact of seeks + return getConf().getLong("fs.s3a.block.size", 32 * 1024 * 1024); + } + + private void printAmazonServiceException(AmazonServiceException ase) { + LOG.info("Caught an AmazonServiceException, which means your request made it " + + "to Amazon S3, but was rejected with an error response for some reason."); + LOG.info("Error Message: " + ase.getMessage()); + LOG.info("HTTP Status Code: " + ase.getStatusCode()); + LOG.info("AWS Error Code: " + ase.getErrorCode()); + LOG.info("Error Type: " + ase.getErrorType()); + LOG.info("Request ID: " + ase.getRequestId()); + LOG.info("Class Name: " + ase.getClass().getName()); + } + + private void printAmazonClientException(AmazonClientException ace) { + LOG.info("Caught an AmazonClientException, which means the client encountered " + + "a serious internal problem while trying to communicate with S3, " + + "such as not being able to access the network."); + LOG.info("Error Message: " + ace.getMessage()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java new file mode 100644 index 0000000..f65a5b0 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem; + +import org.slf4j.Logger; + +import java.io.EOFException; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.net.SocketException; + +public class S3AInputStream extends FSInputStream { + private long pos; + private boolean closed; + private S3ObjectInputStream wrappedStream; + private S3Object wrappedObject; + private FileSystem.Statistics stats; + private AmazonS3Client client; + private String bucket; + private String key; + private long contentLength; + public static final Logger LOG = S3AFileSystem.LOG; + + + public S3AInputStream(String bucket, String key, long contentLength, AmazonS3Client client, + FileSystem.Statistics stats) { + this.bucket = bucket; + this.key = key; + this.contentLength = contentLength; + this.client = client; + this.stats = stats; + this.pos = 0; + this.closed = false; + this.wrappedObject = null; + this.wrappedStream = null; + } + + private void openIfNeeded() throws IOException { + if (wrappedObject == null) { + reopen(0); + } + } + + private synchronized void reopen(long pos) throws IOException { + if (wrappedStream != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Aborting old stream to open at pos " + pos); + } + wrappedStream.abort(); + } + + if (pos < 0) { + throw new EOFException("Trying to seek to a negative offset " + pos); + } + + if (contentLength > 0 && pos > contentLength-1) { + throw new EOFException("Trying to seek to an offset " + pos + + " past the end of the file"); + } + + LOG.info("Actually opening file " + key + " at pos " + pos); + + GetObjectRequest request = new GetObjectRequest(bucket, key); + request.setRange(pos, contentLength-1); + + wrappedObject = client.getObject(request); + wrappedStream = wrappedObject.getObjectContent(); + + if (wrappedStream == null) { + throw new IOException("Null IO stream"); + } + + this.pos = pos; + } + + @Override + public synchronized long getPos() throws IOException { + return pos; + } + + @Override + public synchronized void seek(long pos) throws IOException { + if (this.pos == pos) { + return; + } + + LOG.info("Reopening " + this.key + " to seek to new offset " + (pos - this.pos)); + reopen(pos); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + @Override + public synchronized int read() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + openIfNeeded(); + + int byteRead; + try { + byteRead = wrappedStream.read(); + } catch (SocketTimeoutException e) { + LOG.info("Got timeout while trying to read from stream, trying to recover " + e); + reopen(pos); + byteRead = wrappedStream.read(); + } catch (SocketException e) { + LOG.info("Got socket exception while trying to read from stream, trying to recover " + e); + reopen(pos); + byteRead = wrappedStream.read(); + } + + if (byteRead >= 0) { + pos++; + } + + if (stats != null && byteRead >= 0) { + stats.incrementBytesRead(1); + } + return byteRead; + } + + @Override + public synchronized int read(byte buf[], int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + openIfNeeded(); + + int byteRead; + try { + byteRead = wrappedStream.read(buf, off, len); + } catch (SocketTimeoutException e) { + LOG.info("Got timeout while trying to read from stream, trying to recover " + e); + reopen(pos); + byteRead = wrappedStream.read(buf, off, len); + } catch (SocketException e) { + LOG.info("Got socket exception while trying to read from stream, trying to recover " + e); + reopen(pos); + byteRead = wrappedStream.read(buf, off, len); + } + + if (byteRead > 0) { + pos += byteRead; + } + + if (stats != null && byteRead > 0) { + stats.incrementBytesRead(byteRead); + } + + return byteRead; + } + + @Override + public synchronized void close() throws IOException { + super.close(); + closed = true; + if (wrappedObject != null) { + wrappedObject.close(); + } + } + + @Override + public synchronized int available() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + long remaining = this.contentLength - this.pos; + if (remaining > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + return (int)remaining; + } + + @Override + public boolean markSupported() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java new file mode 100644 index 0000000..bdb723e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.event.ProgressEvent; +import com.amazonaws.event.ProgressListener; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; +import com.amazonaws.services.s3.transfer.Upload; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.util.Progressable; + +import org.slf4j.Logger; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import static org.apache.hadoop.fs.s3a.Constants.*; + +public class S3AOutputStream extends OutputStream { + private OutputStream backupStream; + private File backupFile; + private boolean closed; + private String key; + private String bucket; + private AmazonS3Client client; + private Progressable progress; + private long partSize; + private int partSizeThreshold; + private S3AFileSystem fs; + private CannedAccessControlList cannedACL; + private FileSystem.Statistics statistics; + private LocalDirAllocator lDirAlloc; + private String serverSideEncryptionAlgorithm; + + public static final Logger LOG = S3AFileSystem.LOG; + + public S3AOutputStream(Configuration conf, AmazonS3Client client, + S3AFileSystem fs, String bucket, String key, Progressable progress, + CannedAccessControlList cannedACL, FileSystem.Statistics statistics, + String serverSideEncryptionAlgorithm) + throws IOException { + this.bucket = bucket; + this.key = key; + this.client = client; + this.progress = progress; + this.fs = fs; + this.cannedACL = cannedACL; + this.statistics = statistics; + this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; + + partSize = conf.getLong(NEW_MULTIPART_SIZE, + conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE)); + partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD, + conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD)); + + if (conf.get(BUFFER_DIR, null) != null) { + lDirAlloc = new LocalDirAllocator(BUFFER_DIR); + } else { + lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a"); + } + + backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf); + closed = false; + + LOG.info("OutputStream for key '" + key + "' writing to tempfile: " + this.backupFile); + + this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile)); + } + + @Override + public void flush() throws IOException { + backupStream.flush(); + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + + backupStream.close(); + LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload"); + LOG.info("Minimum upload part size: " + partSize + " threshold " + partSizeThreshold); + + + try { + TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); + transferConfiguration.setMinimumUploadPartSize(partSize); + transferConfiguration.setMultipartUploadThreshold(partSizeThreshold); + + TransferManager transfers = new TransferManager(client); + transfers.setConfiguration(transferConfiguration); + + final ObjectMetadata om = new ObjectMetadata(); + if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { + om.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, backupFile); + putObjectRequest.setCannedAcl(cannedACL); + putObjectRequest.setMetadata(om); + + Upload upload = transfers.upload(putObjectRequest); + + ProgressableProgressListener listener = + new ProgressableProgressListener(upload, progress, statistics); + upload.addProgressListener(listener); + + upload.waitForUploadResult(); + + long delta = upload.getProgress().getBytesTransferred() - listener.getLastBytesTransferred(); + if (statistics != null && delta != 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("S3A write delta changed after finished: " + delta + " bytes"); + } + statistics.incrementBytesWritten(delta); + } + + // This will delete unnecessary fake parent directories + fs.finishedWrite(key); + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + if (!backupFile.delete()) { + LOG.warn("Could not delete temporary s3a file: " + backupFile); + } + super.close(); + closed = true; + } + + LOG.info("OutputStream for key '" + key + "' upload complete"); + } + + @Override + public void write(int b) throws IOException { + backupStream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + backupStream.write(b, off, len); + } + + public static class ProgressableProgressListener implements ProgressListener { + private Progressable progress; + private FileSystem.Statistics statistics; + private long lastBytesTransferred; + private Upload upload; + + public ProgressableProgressListener(Upload upload, Progressable progress, + FileSystem.Statistics statistics) { + this.upload = upload; + this.progress = progress; + this.statistics = statistics; + this.lastBytesTransferred = 0; + } + + public void progressChanged(ProgressEvent progressEvent) { + if (progress != null) { + progress.progress(); + } + + // There are 3 http ops here, but this should be close enough for now + if (progressEvent.getEventCode() == ProgressEvent.PART_STARTED_EVENT_CODE || + progressEvent.getEventCode() == ProgressEvent.COMPLETED_EVENT_CODE) { + statistics.incrementWriteOps(1); + } + + long transferred = upload.getProgress().getBytesTransferred(); + long delta = transferred - lastBytesTransferred; + if (statistics != null && delta != 0) { + statistics.incrementBytesWritten(delta); + } + + lastBytesTransferred = transferred; + } + + public long getLastBytesTransferred() { + return lastBytesTransferred; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem index 3cd1d6b..0e3c42a 100644 --- a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem +++ b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem @@ -15,3 +15,4 @@ org.apache.hadoop.fs.s3.S3FileSystem org.apache.hadoop.fs.s3native.NativeS3FileSystem +org.apache.hadoop.fs.s3a.S3AFileSystem http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java new file mode 100644 index 0000000..cbdb3bd --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractBondedFSContract; + +/** + * The contract of S3A: only enabled if the test bucket is provided + */ +public class S3AContract extends AbstractBondedFSContract { + + public static final String CONTRACT_XML = "contract/s3a.xml"; + + + public S3AContract(Configuration conf) { + super(conf); + //insert the base features + addConfResource(CONTRACT_XML); + } + + @Override + public String getScheme() { + return "s3a"; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractCreate.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractCreate.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractCreate.java new file mode 100644 index 0000000..1d95ddf --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractCreate.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractCreateTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; + +public class TestS3AContractCreate extends AbstractContractCreateTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } + + @Override + public void testOverwriteEmptyDirectory() throws Throwable { + ContractTestUtils.skip( + "blobstores can't distinguish empty directories from files"); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractDelete.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractDelete.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractDelete.java new file mode 100644 index 0000000..733a517 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractDelete.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractDeleteTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class TestS3AContractDelete extends AbstractContractDeleteTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractMkdir.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractMkdir.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractMkdir.java new file mode 100644 index 0000000..a312782 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractMkdir.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractMkdirTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Test dir operations on S3 + */ +public class TestS3AContractMkdir extends AbstractContractMkdirTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractOpen.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractOpen.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractOpen.java new file mode 100644 index 0000000..f735deb --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractOpen.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractOpenTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class TestS3AContractOpen extends AbstractContractOpenTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRename.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRename.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRename.java new file mode 100644 index 0000000..88ed6d6 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRename.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractRenameTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; + +public class TestS3AContractRename extends AbstractContractRenameTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } + + @Override + public void testRenameDirIntoExistingDir() throws Throwable { + describe("Verify renaming a dir into an existing dir puts the files" + +" from the source dir into the existing dir" + +" and leaves existing files alone"); + FileSystem fs = getFileSystem(); + String sourceSubdir = "source"; + Path srcDir = path(sourceSubdir); + Path srcFilePath = new Path(srcDir, "source-256.txt"); + byte[] srcDataset = dataset(256, 'a', 'z'); + writeDataset(fs, srcFilePath, srcDataset, srcDataset.length, 1024, false); + Path destDir = path("dest"); + + Path destFilePath = new Path(destDir, "dest-512.txt"); + byte[] destDateset = dataset(512, 'A', 'Z'); + writeDataset(fs, destFilePath, destDateset, destDateset.length, 1024, false); + assertIsFile(destFilePath); + + boolean rename = fs.rename(srcDir, destDir); + Path renamedSrcFilePath = new Path(destDir, "source-256.txt"); + assertIsFile(destFilePath); + assertIsFile(renamedSrcFilePath); + ContractTestUtils.verifyFileContents(fs, destFilePath, destDateset); + assertTrue("rename returned false though the contents were copied", rename); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/24d920b8/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRootDir.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRootDir.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRootDir.java new file mode 100644 index 0000000..5e2352c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRootDir.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * root dir operations against an S3 bucket + */ +public class TestS3AContractRootDir extends + AbstractContractRootDirectoryTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } +}