Repository: hadoop Updated Branches: refs/heads/HADOOP-13345 cfd0fbf13 -> 013a3c454
HADOOP-13793. S3guard: add inconsistency injection, integration tests. Contributed by Aaron Fabbri Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/013a3c45 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/013a3c45 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/013a3c45 Branch: refs/heads/HADOOP-13345 Commit: 013a3c4540f2622fc8182a214e19cdb407f21b8b Parents: cfd0fbf Author: Mingliang Liu <lium...@apache.org> Authored: Mon Dec 5 22:10:14 2016 -0800 Committer: Mingliang Liu <lium...@apache.org> Committed: Mon Dec 5 22:53:54 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/fs/s3a/Constants.java | 2 +- .../hadoop/fs/s3a/DefaultS3ClientFactory.java | 223 +++++++++++++++++++ .../fs/s3a/InconsistentAmazonS3Client.java | 189 ++++++++++++++++ .../apache/hadoop/fs/s3a/S3ClientFactory.java | 186 ---------------- .../fs/s3a/ITestS3GuardListConsistency.java | 79 +++++++ .../fs/s3a/InconsistentS3ClientFactory.java | 35 +++ 6 files changed, 527 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 518bd33..c102460 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -280,7 +280,7 @@ public final class Constants { @InterfaceStability.Unstable public static final Class<? extends S3ClientFactory> DEFAULT_S3_CLIENT_FACTORY_IMPL = - S3ClientFactory.DefaultS3ClientFactory.class; + DefaultS3ClientFactory.class; /** * Maximum number of partitions in a multipart upload: {@value}. http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java new file mode 100644 index 0000000..a43a746 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ClientOptions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.VersionInfo; +import org.slf4j.Logger; + +import java.io.IOException; +import java.net.URI; + +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet; +import static org.apache.hadoop.fs.s3a.S3AUtils.intOption; + +/** + * The default factory implementation, which calls the AWS SDK to configure + * and create an {@link AmazonS3Client} that communicates with the S3 service. + */ +public class DefaultS3ClientFactory extends Configured implements + S3ClientFactory { + + private static final Logger LOG = S3AFileSystem.LOG; + + @Override + public AmazonS3 createS3Client(URI name, URI uri) throws IOException { + Configuration conf = getConf(); + AWSCredentialsProvider credentials = + createAWSCredentialProviderSet(name, conf, uri); + ClientConfiguration awsConf = new ClientConfiguration(); + initConnectionSettings(conf, awsConf); + initProxySupport(conf, awsConf); + initUserAgent(conf, awsConf); + AmazonS3 s3 = newAmazonS3Client(credentials, awsConf); + return createAmazonS3Client(s3, conf, credentials, awsConf); + } + + /** + * Wrapper around constructor for {@link AmazonS3} client. Override this to + * provide an extended version of the client + * @param credentials credentials to use + * @param awsConf AWS configuration + * @return new AmazonS3 client + */ + protected AmazonS3 newAmazonS3Client( + AWSCredentialsProvider credentials, ClientConfiguration awsConf) { + return new AmazonS3Client(credentials, awsConf); + } + + /** + * Initializes all AWS SDK settings related to connection management. + * + * @param conf Hadoop configuration + * @param awsConf AWS SDK configuration + */ + private static void initConnectionSettings(Configuration conf, + ClientConfiguration awsConf) { + awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, + DEFAULT_MAXIMUM_CONNECTIONS, 1)); + boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, + DEFAULT_SECURE_CONNECTIONS); + awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); + awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES, + DEFAULT_MAX_ERROR_RETRIES, 0)); + awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT, + DEFAULT_ESTABLISH_TIMEOUT, 0)); + awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT, + DEFAULT_SOCKET_TIMEOUT, 0)); + int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER, + DEFAULT_SOCKET_SEND_BUFFER, 2048); + int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER, + DEFAULT_SOCKET_RECV_BUFFER, 2048); + awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer); + String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); + if (!signerOverride.isEmpty()) { + LOG.debug("Signer override = {}", signerOverride); + awsConf.setSignerOverride(signerOverride); + } + } + + /** + * Initializes AWS SDK proxy support if configured. + * + * @param conf Hadoop configuration + * @param awsConf AWS SDK configuration + * @throws IllegalArgumentException if misconfigured + */ + private static void initProxySupport(Configuration conf, + ClientConfiguration awsConf) throws IllegalArgumentException { + String proxyHost = conf.getTrimmed(PROXY_HOST, ""); + int proxyPort = conf.getInt(PROXY_PORT, -1); + if (!proxyHost.isEmpty()) { + awsConf.setProxyHost(proxyHost); + if (proxyPort >= 0) { + awsConf.setProxyPort(proxyPort); + } else { + if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) { + LOG.warn("Proxy host set without port. Using HTTPS default 443"); + awsConf.setProxyPort(443); + } else { + LOG.warn("Proxy host set without port. Using HTTP default 80"); + awsConf.setProxyPort(80); + } + } + String proxyUsername = conf.getTrimmed(PROXY_USERNAME); + String proxyPassword = conf.getTrimmed(PROXY_PASSWORD); + if ((proxyUsername == null) != (proxyPassword == null)) { + String msg = "Proxy error: " + PROXY_USERNAME + " or " + + PROXY_PASSWORD + " set without the other."; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + awsConf.setProxyUsername(proxyUsername); + awsConf.setProxyPassword(proxyPassword); + awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN)); + awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION)); + if (LOG.isDebugEnabled()) { + LOG.debug("Using proxy server {}:{} as user {} with password {} on " + + "domain {} as workstation {}", awsConf.getProxyHost(), + awsConf.getProxyPort(), + String.valueOf(awsConf.getProxyUsername()), + awsConf.getProxyPassword(), awsConf.getProxyDomain(), + awsConf.getProxyWorkstation()); + } + } else if (proxyPort >= 0) { + String msg = + "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + } + + /** + * Initializes the User-Agent header to send in HTTP requests to the S3 + * back-end. We always include the Hadoop version number. The user also + * may set an optional custom prefix to put in front of the Hadoop version + * number. The AWS SDK interally appends its own information, which seems + * to include the AWS SDK version, OS and JVM version. + * + * @param conf Hadoop configuration + * @param awsConf AWS SDK configuration + */ + private static void initUserAgent(Configuration conf, + ClientConfiguration awsConf) { + String userAgent = "Hadoop " + VersionInfo.getVersion(); + String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, ""); + if (!userAgentPrefix.isEmpty()) { + userAgent = userAgentPrefix + ", " + userAgent; + } + LOG.debug("Using User-Agent: {}", userAgent); + awsConf.setUserAgentPrefix(userAgent); + } + + /** + * Creates an {@link AmazonS3Client} from the established configuration. + * + * @param conf Hadoop configuration + * @param credentials AWS credentials + * @param awsConf AWS SDK configuration + * @return S3 client + * @throws IllegalArgumentException if misconfigured + */ + private static AmazonS3 createAmazonS3Client(AmazonS3 s3, Configuration conf, + AWSCredentialsProvider credentials, ClientConfiguration awsConf) + throws IllegalArgumentException { + String endPoint = conf.getTrimmed(ENDPOINT, ""); + if (!endPoint.isEmpty()) { + try { + s3.setEndpoint(endPoint); + } catch (IllegalArgumentException e) { + String msg = "Incorrect endpoint: " + e.getMessage(); + LOG.error(msg); + throw new IllegalArgumentException(msg, e); + } + } + enablePathStyleAccessIfRequired(s3, conf); + return s3; + } + + /** + * Enables path-style access to S3 buckets if configured. By default, the + * behavior is to use virtual hosted-style access with URIs of the form + * http://bucketname.s3.amazonaws.com. Enabling path-style access and a + * region-specific endpoint switches the behavior to use URIs of the form + * http://s3-eu-west-1.amazonaws.com/bucketname. + * + * @param s3 S3 client + * @param conf Hadoop configuration + */ + private static void enablePathStyleAccessIfRequired(AmazonS3 s3, + Configuration conf) { + final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false); + if (pathStyleAccess) { + LOG.debug("Enabling path style access!"); + s3.setS3ClientOptions(S3ClientOptions.builder() + .setPathStyleAccess(true) + .build()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java new file mode 100644 index 0000000..ebca268 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A wrapper around {@link com.amazonaws.services.s3.AmazonS3} that injects + * inconsistency and/or errors. Used for testing S3Guard. + * Currently only delays listing visibility, not affecting GET. + */ +public class InconsistentAmazonS3Client extends AmazonS3Client { + + /** + * Keys containing this substring will be subject to delayed visibility. + */ + public static final String DELAY_KEY_SUBSTRING = "DELAY_LISTING_ME"; + + /** + * How many seconds affected keys will be delayed from appearing in listing. + * This should probably be a config value. + */ + public static final long DELAY_KEY_MILLIS = 5 * 1000; + + private static final Logger LOG = + LoggerFactory.getLogger(InconsistentAmazonS3Client.class); + + /** Map of key to delay -> time it was created. */ + private Map<String, Long> delayedKeys = new HashMap<>(); + + public InconsistentAmazonS3Client(AWSCredentialsProvider credentials, + ClientConfiguration clientConfiguration) { + super(credentials, clientConfiguration); + } + + /* We should only need to override this version of putObject() */ + @Override + public PutObjectResult putObject(PutObjectRequest putObjectRequest) + throws AmazonClientException, AmazonServiceException { + LOG.debug("key {}", putObjectRequest.getKey()); + registerPutObject(putObjectRequest); + return super.putObject(putObjectRequest); + } + + /* We should only need to override this version of listObjects() */ + @Override + public ObjectListing listObjects(ListObjectsRequest listObjectsRequest) + throws AmazonClientException, AmazonServiceException { + LOG.debug("prefix {}", listObjectsRequest.getPrefix()); + ObjectListing listing = super.listObjects(listObjectsRequest); + return filterListObjects(listObjectsRequest, + listing); + } + + + private ObjectListing filterListObjects(ListObjectsRequest request, + ObjectListing rawListing) { + + // Filter object listing + List<S3ObjectSummary> outputList = new ArrayList<>(); + for (S3ObjectSummary s : rawListing.getObjectSummaries()) { + if (!isVisibilityDelayed(s.getKey())) { + outputList.add(s); + } + } + + // Filter prefixes (directories) + List<String> outputPrefixes = new ArrayList<>(); + for (String key : rawListing.getCommonPrefixes()) { + if (!isVisibilityDelayed(key)) { + outputPrefixes.add(key); + } + } + + return new CustomObjectListing(rawListing, outputList, outputPrefixes); + } + + private boolean isVisibilityDelayed(String key) { + Long createTime = delayedKeys.get(key); + if (createTime == null) { + LOG.debug("no delay for key {}", key); + return false; + } + long currentTime = System.currentTimeMillis(); + long deadline = createTime + DELAY_KEY_MILLIS; + if (currentTime >= deadline) { + delayedKeys.remove(key); + LOG.debug("{} no longer delayed", key); + return false; + } else { + LOG.info("{} delaying visibility", key); + return true; + } + } + + private void registerPutObject(PutObjectRequest req) { + String key = req.getKey(); + if (shouldDelay(key)) { + enqueueDelayKey(key); + } + } + + /** + * Should we delay listing visibility for this key? + * @param key key which is being put + * @return true if we should delay + */ + private boolean shouldDelay(String key) { + boolean delay = key.contains(DELAY_KEY_SUBSTRING); + LOG.debug("{} -> {}", key, delay); + return delay; + } + + /** + * Record this key as something that should not become visible in + * listObject replies for a while, to simulate eventual list consistency. + * @param key key to delay visibility of + */ + private void enqueueDelayKey(String key) { + LOG.debug("key {}", key); + delayedKeys.put(key, System.currentTimeMillis()); + } + + /** Since ObjectListing is immutable, we just override it with wrapper. */ + private static class CustomObjectListing extends ObjectListing { + + private final List<S3ObjectSummary> customListing; + private final List<String> customPrefixes; + + public CustomObjectListing(ObjectListing rawListing, + List<S3ObjectSummary> customListing, List<String> customPrefixes) { + super(); + this.customListing = customListing; + this.customPrefixes = customPrefixes; + + this.setBucketName(rawListing.getBucketName()); + this.setCommonPrefixes(rawListing.getCommonPrefixes()); + this.setDelimiter(rawListing.getDelimiter()); + this.setEncodingType(rawListing.getEncodingType()); + this.setMarker(rawListing.getMarker()); + this.setMaxKeys(rawListing.getMaxKeys()); + this.setNextMarker(rawListing.getNextMarker()); + this.setPrefix(rawListing.getPrefix()); + this.setTruncated(rawListing.isTruncated()); + } + + @Override + public List<S3ObjectSummary> getObjectSummaries() { + return customListing; + } + + @Override + public List<String> getCommonPrefixes() { + return customPrefixes; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java index 871322d..5169840 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java @@ -18,26 +18,13 @@ package org.apache.hadoop.fs.s3a; -import static org.apache.hadoop.fs.s3a.Constants.*; -import static org.apache.hadoop.fs.s3a.S3AUtils.*; - import java.io.IOException; import java.net.URI; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.Protocol; -import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.S3ClientOptions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.util.VersionInfo; - -import org.slf4j.Logger; /** * Factory for creation of S3 client instances to be used by {@link S3Store}. @@ -58,177 +45,4 @@ interface S3ClientFactory { */ AmazonS3 createS3Client(URI name, URI uri) throws IOException; - /** - * The default factory implementation, which calls the AWS SDK to configure - * and create an {@link AmazonS3Client} that communicates with the S3 service. - */ - static class DefaultS3ClientFactory extends Configured - implements S3ClientFactory { - - private static final Logger LOG = S3AFileSystem.LOG; - - @Override - public AmazonS3 createS3Client(URI name, URI uri) throws IOException { - Configuration conf = getConf(); - AWSCredentialsProvider credentials = - createAWSCredentialProviderSet(name, conf, uri); - ClientConfiguration awsConf = new ClientConfiguration(); - initConnectionSettings(conf, awsConf); - initProxySupport(conf, awsConf); - initUserAgent(conf, awsConf); - return createAmazonS3Client(conf, credentials, awsConf); - } - - /** - * Initializes all AWS SDK settings related to connection management. - * - * @param conf Hadoop configuration - * @param awsConf AWS SDK configuration - */ - private static void initConnectionSettings(Configuration conf, - ClientConfiguration awsConf) { - awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, - DEFAULT_MAXIMUM_CONNECTIONS, 1)); - boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, - DEFAULT_SECURE_CONNECTIONS); - awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); - awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES, - DEFAULT_MAX_ERROR_RETRIES, 0)); - awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT, - DEFAULT_ESTABLISH_TIMEOUT, 0)); - awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT, - DEFAULT_SOCKET_TIMEOUT, 0)); - int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER, - DEFAULT_SOCKET_SEND_BUFFER, 2048); - int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER, - DEFAULT_SOCKET_RECV_BUFFER, 2048); - awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer); - String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); - if (!signerOverride.isEmpty()) { - LOG.debug("Signer override = {}", signerOverride); - awsConf.setSignerOverride(signerOverride); - } - } - - /** - * Initializes AWS SDK proxy support if configured. - * - * @param conf Hadoop configuration - * @param awsConf AWS SDK configuration - * @throws IllegalArgumentException if misconfigured - */ - private static void initProxySupport(Configuration conf, - ClientConfiguration awsConf) throws IllegalArgumentException { - String proxyHost = conf.getTrimmed(PROXY_HOST, ""); - int proxyPort = conf.getInt(PROXY_PORT, -1); - if (!proxyHost.isEmpty()) { - awsConf.setProxyHost(proxyHost); - if (proxyPort >= 0) { - awsConf.setProxyPort(proxyPort); - } else { - if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) { - LOG.warn("Proxy host set without port. Using HTTPS default 443"); - awsConf.setProxyPort(443); - } else { - LOG.warn("Proxy host set without port. Using HTTP default 80"); - awsConf.setProxyPort(80); - } - } - String proxyUsername = conf.getTrimmed(PROXY_USERNAME); - String proxyPassword = conf.getTrimmed(PROXY_PASSWORD); - if ((proxyUsername == null) != (proxyPassword == null)) { - String msg = "Proxy error: " + PROXY_USERNAME + " or " + - PROXY_PASSWORD + " set without the other."; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - awsConf.setProxyUsername(proxyUsername); - awsConf.setProxyPassword(proxyPassword); - awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN)); - awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION)); - if (LOG.isDebugEnabled()) { - LOG.debug("Using proxy server {}:{} as user {} with password {} on " + - "domain {} as workstation {}", awsConf.getProxyHost(), - awsConf.getProxyPort(), - String.valueOf(awsConf.getProxyUsername()), - awsConf.getProxyPassword(), awsConf.getProxyDomain(), - awsConf.getProxyWorkstation()); - } - } else if (proxyPort >= 0) { - String msg = - "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - } - - /** - * Initializes the User-Agent header to send in HTTP requests to the S3 - * back-end. We always include the Hadoop version number. The user also - * may set an optional custom prefix to put in front of the Hadoop version - * number. The AWS SDK interally appends its own information, which seems - * to include the AWS SDK version, OS and JVM version. - * - * @param conf Hadoop configuration - * @param awsConf AWS SDK configuration - */ - private static void initUserAgent(Configuration conf, - ClientConfiguration awsConf) { - String userAgent = "Hadoop " + VersionInfo.getVersion(); - String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, ""); - if (!userAgentPrefix.isEmpty()) { - userAgent = userAgentPrefix + ", " + userAgent; - } - LOG.debug("Using User-Agent: {}", userAgent); - awsConf.setUserAgentPrefix(userAgent); - } - - /** - * Creates an {@link AmazonS3Client} from the established configuration. - * - * @param conf Hadoop configuration - * @param credentials AWS credentials - * @param awsConf AWS SDK configuration - * @return S3 client - * @throws IllegalArgumentException if misconfigured - */ - private static AmazonS3 createAmazonS3Client(Configuration conf, - AWSCredentialsProvider credentials, ClientConfiguration awsConf) - throws IllegalArgumentException { - AmazonS3 s3 = new AmazonS3Client(credentials, awsConf); - String endPoint = conf.getTrimmed(ENDPOINT, ""); - if (!endPoint.isEmpty()) { - try { - s3.setEndpoint(endPoint); - } catch (IllegalArgumentException e) { - String msg = "Incorrect endpoint: " + e.getMessage(); - LOG.error(msg); - throw new IllegalArgumentException(msg, e); - } - } - enablePathStyleAccessIfRequired(s3, conf); - return s3; - } - - /** - * Enables path-style access to S3 buckets if configured. By default, the - * behavior is to use virtual hosted-style access with URIs of the form - * http://bucketname.s3.amazonaws.com. Enabling path-style access and a - * region-specific endpoint switches the behavior to use URIs of the form - * http://s3-eu-west-1.amazonaws.com/bucketname. - * - * @param s3 S3 client - * @param conf Hadoop configuration - */ - private static void enablePathStyleAccessIfRequired(AmazonS3 s3, - Configuration conf) { - final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false); - if (pathStyleAccess) { - LOG.debug("Enabling path style access!"); - s3.setS3ClientOptions(S3ClientOptions.builder() - .setPathStyleAccess(true) - .build()); - } - } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java new file mode 100644 index 0000000..0a9ee4f --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.s3a.S3AContract; +import org.junit.Assume; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.fs.s3a.Constants.*; + +/** + * Test S3Guard list consistency feature by injecting delayed listObjects() + * visibility via {@link InconsistentAmazonS3Client}. + */ +public class ITestS3GuardListConsistency extends AbstractS3ATestBase { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class, + S3ClientFactory.class); + return new S3AContract(conf); + } + + @Test + public void testConsistentList() throws Exception { + + S3AFileSystem fs = getFileSystem(); + + // This test will fail if NullMetadataStore (the default) is configured: + // skip it. + Assume.assumeTrue(fs.isMetadataStoreConfigured()); + + // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed + // in listObjects() results via InconsistentS3Client + Path inconsistentPath = + path("a/b/dir3-" + InconsistentAmazonS3Client.DELAY_KEY_SUBSTRING); + + Path[] testDirs = {path("a/b/dir1"), + path("a/b/dir2"), + inconsistentPath}; + + for (Path path : testDirs) { + assertTrue(fs.mkdirs(path)); + } + + FileStatus[] paths = fs.listStatus(path("a/b/")); + List<Path> list = new ArrayList<>(); + for (FileStatus fileState : paths) { + list.add(fileState.getPath()); + } + assertTrue(list.contains(path("a/b/dir1"))); + assertTrue(list.contains(path("a/b/dir2"))); + // This should fail without S3Guard, and succeed with it. + assertTrue(list.contains(inconsistentPath)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java new file mode 100644 index 0000000..88a9c78 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3; + +/** + * S3 Client factory used for testing with eventual consistency fault injection. + */ +public class InconsistentS3ClientFactory extends DefaultS3ClientFactory { + + @Override + protected AmazonS3 newAmazonS3Client(AWSCredentialsProvider credentials, + ClientConfiguration awsConf) { + return new InconsistentAmazonS3Client(credentials, awsConf); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org