http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java deleted file mode 100644 index c25d0fb..0000000 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java +++ /dev/null @@ -1,410 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a; - -import com.amazonaws.AmazonClientException; -import com.amazonaws.event.ProgressEvent; -import com.amazonaws.event.ProgressListener; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; -import com.amazonaws.services.s3.model.CannedAccessControlList; -import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.PutObjectResult; -import com.amazonaws.services.s3.model.UploadPartRequest; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.util.Progressable; -import org.slf4j.Logger; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; - -import static org.apache.hadoop.fs.s3a.S3AUtils.*; -import static org.apache.hadoop.fs.s3a.Statistic.*; - -/** - * Upload files/parts asap directly from a memory buffer (instead of buffering - * to a file). - * <p> - * Uploads are managed low-level rather than through the AWS TransferManager. - * This allows for uploading each part of a multi-part upload as soon as - * the bytes are in memory, rather than waiting until the file is closed. - * <p> - * Unstable: statistics and error handling might evolve - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class S3AFastOutputStream extends OutputStream { - - private static final Logger LOG = S3AFileSystem.LOG; - private final String key; - private final String bucket; - private final AmazonS3 client; - private final int partSize; - private final int multiPartThreshold; - private final S3AFileSystem fs; - private final CannedAccessControlList cannedACL; - private final ProgressListener progressListener; - private final ListeningExecutorService executorService; - private MultiPartUpload multiPartUpload; - private boolean closed; - private ByteArrayOutputStream buffer; - private int bufferLimit; - - - /** - * Creates a fast OutputStream that uploads to S3 from memory. - * For MultiPartUploads, as soon as sufficient bytes have been written to - * the stream a part is uploaded immediately (by using the low-level - * multi-part upload API on the AmazonS3Client). - * - * @param client AmazonS3Client used for S3 calls - * @param fs S3AFilesystem - * @param bucket S3 bucket name - * @param key S3 key name - * @param progress report progress in order to prevent timeouts - * @param cannedACL used CannedAccessControlList - * @param partSize size of a single part in a multi-part upload (except - * last part) - * @param multiPartThreshold files at least this size use multi-part upload - * @param threadPoolExecutor thread factory - * @throws IOException on any problem - */ - public S3AFastOutputStream(AmazonS3 client, - S3AFileSystem fs, - String bucket, - String key, - Progressable progress, - CannedAccessControlList cannedACL, - long partSize, - long multiPartThreshold, - ExecutorService threadPoolExecutor) - throws IOException { - this.bucket = bucket; - this.key = key; - this.client = client; - this.fs = fs; - this.cannedACL = cannedACL; - //Ensure limit as ByteArrayOutputStream size cannot exceed Integer.MAX_VALUE - if (partSize > Integer.MAX_VALUE) { - this.partSize = Integer.MAX_VALUE; - LOG.warn("s3a: MULTIPART_SIZE capped to ~2.14GB (maximum allowed size " + - "when using 'FAST_UPLOAD = true')"); - } else { - this.partSize = (int) partSize; - } - if (multiPartThreshold > Integer.MAX_VALUE) { - this.multiPartThreshold = Integer.MAX_VALUE; - LOG.warn("s3a: MIN_MULTIPART_THRESHOLD capped to ~2.14GB (maximum " + - "allowed size when using 'FAST_UPLOAD = true')"); - } else { - this.multiPartThreshold = (int) multiPartThreshold; - } - this.bufferLimit = this.multiPartThreshold; - this.closed = false; - int initialBufferSize = this.fs.getConf() - .getInt(Constants.FAST_BUFFER_SIZE, Constants.DEFAULT_FAST_BUFFER_SIZE); - if (initialBufferSize < 0) { - LOG.warn("s3a: FAST_BUFFER_SIZE should be a positive number. Using " + - "default value"); - initialBufferSize = Constants.DEFAULT_FAST_BUFFER_SIZE; - } else if (initialBufferSize > this.bufferLimit) { - LOG.warn("s3a: automatically adjusting FAST_BUFFER_SIZE to not " + - "exceed MIN_MULTIPART_THRESHOLD"); - initialBufferSize = this.bufferLimit; - } - this.buffer = new ByteArrayOutputStream(initialBufferSize); - this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor); - this.multiPartUpload = null; - this.progressListener = new ProgressableListener(progress); - LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'", - bucket, key); - } - - /** - * Writes a byte to the memory buffer. If this causes the buffer to reach - * its limit, the actual upload is submitted to the threadpool. - * @param b the int of which the lowest byte is written - * @throws IOException on any problem - */ - @Override - public synchronized void write(int b) throws IOException { - buffer.write(b); - if (buffer.size() == bufferLimit) { - uploadBuffer(); - } - } - - /** - * Writes a range of bytes from to the memory buffer. If this causes the - * buffer to reach its limit, the actual upload is submitted to the - * threadpool and the remainder of the array is written to memory - * (recursively). - * @param b byte array containing - * @param off offset in array where to start - * @param len number of bytes to be written - * @throws IOException on any problem - */ - @Override - public synchronized void write(byte[] b, int off, int len) - throws IOException { - if (b == null) { - throw new NullPointerException(); - } else if ((off < 0) || (off > b.length) || (len < 0) || - ((off + len) > b.length) || ((off + len) < 0)) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { - return; - } - if (buffer.size() + len < bufferLimit) { - buffer.write(b, off, len); - } else { - int firstPart = bufferLimit - buffer.size(); - buffer.write(b, off, firstPart); - uploadBuffer(); - this.write(b, off + firstPart, len - firstPart); - } - } - - private synchronized void uploadBuffer() throws IOException { - if (multiPartUpload == null) { - multiPartUpload = initiateMultiPartUpload(); - /* Upload the existing buffer if it exceeds partSize. This possibly - requires multiple parts! */ - final byte[] allBytes = buffer.toByteArray(); - buffer = null; //earlier gc? - LOG.debug("Total length of initial buffer: {}", allBytes.length); - int processedPos = 0; - while ((multiPartThreshold - processedPos) >= partSize) { - LOG.debug("Initial buffer: processing from byte {} to byte {}", - processedPos, (processedPos + partSize - 1)); - multiPartUpload.uploadPartAsync(new ByteArrayInputStream(allBytes, - processedPos, partSize), partSize); - processedPos += partSize; - } - //resize and reset stream - bufferLimit = partSize; - buffer = new ByteArrayOutputStream(bufferLimit); - buffer.write(allBytes, processedPos, multiPartThreshold - processedPos); - } else { - //upload next part - multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer - .toByteArray()), partSize); - buffer.reset(); - } - } - - /** - * Close the stream. This will not return until the upload is complete - * or the attempt to perform the upload has failed. - * Exceptions raised in this method are indicative that the write has - * failed and data is at risk of being lost. - * @throws IOException on any failure. - */ - @Override - public synchronized void close() throws IOException { - if (closed) { - return; - } - closed = true; - try { - if (multiPartUpload == null) { - putObject(); - } else { - int size = buffer.size(); - if (size > 0) { - fs.incrementPutStartStatistics(size); - //send last part - multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer - .toByteArray()), size); - } - final List<PartETag> partETags = multiPartUpload - .waitForAllPartUploads(); - multiPartUpload.complete(partETags); - } - // This will delete unnecessary fake parent directories - fs.finishedWrite(key); - LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key); - } finally { - buffer = null; - super.close(); - } - } - - /** - * Create the default metadata for a multipart upload operation. - * @return the metadata to use/extend. - */ - private ObjectMetadata createDefaultMetadata() { - return fs.newObjectMetadata(); - } - - private MultiPartUpload initiateMultiPartUpload() throws IOException { - final InitiateMultipartUploadRequest initiateMPURequest = - new InitiateMultipartUploadRequest(bucket, - key, - createDefaultMetadata()); - initiateMPURequest.setCannedACL(cannedACL); - try { - return new MultiPartUpload( - client.initiateMultipartUpload(initiateMPURequest).getUploadId()); - } catch (AmazonClientException ace) { - throw translateException("initiate MultiPartUpload", key, ace); - } - } - - private void putObject() throws IOException { - LOG.debug("Executing regular upload for bucket '{}' key '{}'", - bucket, key); - final ObjectMetadata om = createDefaultMetadata(); - final int size = buffer.size(); - om.setContentLength(size); - final PutObjectRequest putObjectRequest = - fs.newPutObjectRequest(key, - om, - new ByteArrayInputStream(buffer.toByteArray())); - putObjectRequest.setGeneralProgressListener(progressListener); - ListenableFuture<PutObjectResult> putObjectResult = - executorService.submit(new Callable<PutObjectResult>() { - @Override - public PutObjectResult call() throws Exception { - fs.incrementPutStartStatistics(size); - return client.putObject(putObjectRequest); - } - }); - //wait for completion - try { - putObjectResult.get(); - } catch (InterruptedException ie) { - LOG.warn("Interrupted object upload: {}", ie, ie); - Thread.currentThread().interrupt(); - } catch (ExecutionException ee) { - throw extractException("regular upload", key, ee); - } - } - - - private class MultiPartUpload { - private final String uploadId; - private final List<ListenableFuture<PartETag>> partETagsFutures; - - public MultiPartUpload(String uploadId) { - this.uploadId = uploadId; - this.partETagsFutures = new ArrayList<ListenableFuture<PartETag>>(); - LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " + - "id '{}'", bucket, key, uploadId); - } - - private void uploadPartAsync(ByteArrayInputStream inputStream, - int partSize) { - final int currentPartNumber = partETagsFutures.size() + 1; - final UploadPartRequest request = - new UploadPartRequest().withBucketName(bucket).withKey(key) - .withUploadId(uploadId).withInputStream(inputStream) - .withPartNumber(currentPartNumber).withPartSize(partSize); - request.setGeneralProgressListener(progressListener); - ListenableFuture<PartETag> partETagFuture = - executorService.submit(new Callable<PartETag>() { - @Override - public PartETag call() throws Exception { - LOG.debug("Uploading part {} for id '{}'", currentPartNumber, - uploadId); - return fs.uploadPart(request).getPartETag(); - } - }); - partETagsFutures.add(partETagFuture); - } - - private List<PartETag> waitForAllPartUploads() throws IOException { - try { - return Futures.allAsList(partETagsFutures).get(); - } catch (InterruptedException ie) { - LOG.warn("Interrupted partUpload: {}", ie, ie); - Thread.currentThread().interrupt(); - return null; - } catch (ExecutionException ee) { - //there is no way of recovering so abort - //cancel all partUploads - for (ListenableFuture<PartETag> future : partETagsFutures) { - future.cancel(true); - } - //abort multipartupload - this.abort(); - throw extractException("Multi-part upload with id '" + uploadId + "'", - key, ee); - } - } - - private void complete(List<PartETag> partETags) throws IOException { - try { - LOG.debug("Completing multi-part upload for key '{}', id '{}'", - key, uploadId); - client.completeMultipartUpload( - new CompleteMultipartUploadRequest(bucket, - key, - uploadId, - partETags)); - } catch (AmazonClientException e) { - throw translateException("Completing multi-part upload", key, e); - } - } - - public void abort() { - LOG.warn("Aborting multi-part upload with id '{}'", uploadId); - try { - fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED); - client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, - key, uploadId)); - } catch (Exception e2) { - LOG.warn("Unable to abort multipart upload, you may need to purge " + - "uploaded parts: {}", e2, e2); - } - } - } - - private static class ProgressableListener implements ProgressListener { - private final Progressable progress; - - public ProgressableListener(Progressable progress) { - this.progress = progress; - } - - public void progressChanged(ProgressEvent progressEvent) { - if (progress != null) { - progress.progress(); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 15bd23a..1532cde 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -30,21 +30,26 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; @@ -55,6 +60,8 @@ import com.amazonaws.services.s3.transfer.Upload; import com.amazonaws.event.ProgressListener; import com.amazonaws.event.ProgressEvent; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListeningExecutorService; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; @@ -68,6 +75,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.GlobalStorageStatistics; import org.apache.hadoop.fs.InvalidRequestException; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -118,21 +126,26 @@ public class S3AFileSystem extends FileSystem { private long partSize; private boolean enableMultiObjectsDelete; private TransferManager transfers; - private ExecutorService threadPoolExecutor; + private ListeningExecutorService threadPoolExecutor; private long multiPartThreshold; public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); + private static final Logger PROGRESS = + LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress"); + private LocalDirAllocator directoryAllocator; private CannedAccessControlList cannedACL; private String serverSideEncryptionAlgorithm; private S3AInstrumentation instrumentation; private S3AStorageStatistics storageStatistics; private long readAhead; private S3AInputPolicy inputPolicy; - private static final AtomicBoolean warnedOfCoreThreadDeprecation = - new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false); // The maximum number of entries that can be deleted in any call to s3 private static final int MAX_ENTRIES_TO_DELETE = 1000; + private boolean blockUploadEnabled; + private String blockOutputBuffer; + private S3ADataBlocks.BlockFactory blockFactory; + private int blockOutputActiveBlocks; /** Called after a new FileSystem instance is constructed. * @param name a uri whose authority section names the host, port, etc. @@ -159,18 +172,11 @@ public class S3AFileSystem extends FileSystem { maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); listing = new Listing(this); - partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); - if (partSize < 5 * 1024 * 1024) { - LOG.error(MULTIPART_SIZE + " must be at least 5 MB"); - partSize = 5 * 1024 * 1024; - } + partSize = getMultipartSizeProperty(conf, + MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); + multiPartThreshold = getMultipartSizeProperty(conf, + MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD); - multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD, - DEFAULT_MIN_MULTIPART_THRESHOLD); - if (multiPartThreshold < 5 * 1024 * 1024) { - LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB"); - multiPartThreshold = 5 * 1024 * 1024; - } //check but do not store the block size longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1); enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); @@ -186,26 +192,19 @@ public class S3AFileSystem extends FileSystem { } }); - if (conf.get("fs.s3a.threads.core") != null && - warnedOfCoreThreadDeprecation.compareAndSet(false, true)) { - LoggerFactory.getLogger( - "org.apache.hadoop.conf.Configuration.deprecation") - .warn("Unsupported option \"fs.s3a.threads.core\"" + - " will be ignored {}", conf.get("fs.s3a.threads.core")); - } int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS); if (maxThreads < 2) { LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2."); maxThreads = 2; } - int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS); - if (totalTasks < 1) { - LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1."); - totalTasks = 1; - } - long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME); - threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads, - maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS, + int totalTasks = intOption(conf, + MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1); + long keepAliveTime = longOption(conf, KEEPALIVE_TIME, + DEFAULT_KEEPALIVE_TIME, 0); + threadPoolExecutor = BlockingThreadPoolExecutorService.newInstance( + maxThreads, + maxThreads + totalTasks, + keepAliveTime, TimeUnit.SECONDS, "s3a-transfer-shared"); initTransferManager(); @@ -218,8 +217,25 @@ public class S3AFileSystem extends FileSystem { serverSideEncryptionAlgorithm = conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM); + LOG.debug("Using encryption {}", serverSideEncryptionAlgorithm); inputPolicy = S3AInputPolicy.getPolicy( conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL)); + + blockUploadEnabled = conf.getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD); + + if (blockUploadEnabled) { + blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER, + DEFAULT_FAST_UPLOAD_BUFFER); + partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize); + blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer); + blockOutputActiveBlocks = intOption(conf, + FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1); + LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" + + " queue limit={}", + blockOutputBuffer, partSize, blockOutputActiveBlocks); + } else { + LOG.debug("Using S3AOutputStream"); + } } catch (AmazonClientException e) { throw translateException("initializing ", new Path(name), e); } @@ -346,6 +362,33 @@ public class S3AFileSystem extends FileSystem { } /** + * Demand create the directory allocator, then create a temporary file. + * {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}. + * @param pathStr prefix for the temporary file + * @param size the size of the file that is going to be written + * @param conf the Configuration object + * @return a unique temporary file + * @throws IOException IO problems + */ + synchronized File createTmpFileForWrite(String pathStr, long size, + Configuration conf) throws IOException { + if (directoryAllocator == null) { + String bufferDir = conf.get(BUFFER_DIR) != null + ? BUFFER_DIR : "hadoop.tmp.dir"; + directoryAllocator = new LocalDirAllocator(bufferDir); + } + return directoryAllocator.createTmpFileForWrite(pathStr, size, conf); + } + + /** + * Get the bucket of this filesystem. + * @return the bucket + */ + public String getBucket() { + return bucket; + } + + /** * Change the input policy for this FS. * @param inputPolicy new policy */ @@ -469,6 +512,7 @@ public class S3AFileSystem extends FileSystem { * @see #setPermission(Path, FsPermission) */ @Override + @SuppressWarnings("IOResourceOpenedButNotSafelyClosed") public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { @@ -493,28 +537,33 @@ public class S3AFileSystem extends FileSystem { } instrumentation.fileCreated(); - if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) { - return new FSDataOutputStream( - new S3AFastOutputStream(s3, - this, - bucket, + FSDataOutputStream output; + if (blockUploadEnabled) { + output = new FSDataOutputStream( + new S3ABlockOutputStream(this, key, + new SemaphoredDelegatingExecutor(threadPoolExecutor, + blockOutputActiveBlocks, true), progress, - cannedACL, partSize, - multiPartThreshold, - threadPoolExecutor), - statistics); + blockFactory, + instrumentation.newOutputStreamStatistics(), + new WriteOperationHelper(key) + ), + null); + } else { + + // We pass null to FSDataOutputStream so it won't count writes that + // are being buffered to a file + output = new FSDataOutputStream( + new S3AOutputStream(getConf(), + this, + key, + progress + ), + null); } - // We pass null to FSDataOutputStream so it won't count writes that - // are being buffered to a file - return new FSDataOutputStream( - new S3AOutputStream(getConf(), - this, - key, - progress - ), - null); + return output; } /** @@ -758,6 +807,33 @@ public class S3AFileSystem extends FileSystem { } /** + * Decrement a gauge by a specific value. + * @param statistic The operation to decrement + * @param count the count to decrement + */ + protected void decrementGauge(Statistic statistic, long count) { + instrumentation.decrementGauge(statistic, count); + } + + /** + * Increment a gauge by a specific value. + * @param statistic The operation to increment + * @param count the count to increment + */ + protected void incrementGauge(Statistic statistic, long count) { + instrumentation.incrementGauge(statistic, count); + } + + /** + * Get the storage statistics of this filesystem. + * @return the storage statistics + */ + @Override + public S3AStorageStatistics getStorageStatistics() { + return storageStatistics; + } + + /** * Request object metadata; increments counters in the process. * @param key key * @return the metadata @@ -904,7 +980,9 @@ public class S3AFileSystem extends FileSystem { */ public ObjectMetadata newObjectMetadata(long length) { final ObjectMetadata om = newObjectMetadata(); - om.setContentLength(length); + if (length >= 0) { + om.setContentLength(length); + } return om; } @@ -926,7 +1004,41 @@ public class S3AFileSystem extends FileSystem { len = putObjectRequest.getMetadata().getContentLength(); } incrementPutStartStatistics(len); - return transfers.upload(putObjectRequest); + try { + Upload upload = transfers.upload(putObjectRequest); + incrementPutCompletedStatistics(true, len); + return upload; + } catch (AmazonClientException e) { + incrementPutCompletedStatistics(false, len); + throw e; + } + } + + /** + * PUT an object directly (i.e. not via the transfer manager). + * Byte length is calculated from the file length, or, if there is no + * file, from the content length of the header. + * @param putObjectRequest the request + * @return the upload initiated + * @throws AmazonClientException on problems + */ + public PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest) + throws AmazonClientException { + long len; + if (putObjectRequest.getFile() != null) { + len = putObjectRequest.getFile().length(); + } else { + len = putObjectRequest.getMetadata().getContentLength(); + } + incrementPutStartStatistics(len); + try { + PutObjectResult result = s3.putObject(putObjectRequest); + incrementPutCompletedStatistics(true, len); + return result; + } catch (AmazonClientException e) { + incrementPutCompletedStatistics(false, len); + throw e; + } } /** @@ -934,10 +1046,20 @@ public class S3AFileSystem extends FileSystem { * Increments the write and put counters * @param request request * @return the result of the operation. + * @throws AmazonClientException on problems */ - public UploadPartResult uploadPart(UploadPartRequest request) { - incrementPutStartStatistics(request.getPartSize()); - return s3.uploadPart(request); + public UploadPartResult uploadPart(UploadPartRequest request) + throws AmazonClientException { + long len = request.getPartSize(); + incrementPutStartStatistics(len); + try { + UploadPartResult uploadPartResult = s3.uploadPart(request); + incrementPutCompletedStatistics(true, len); + return uploadPartResult; + } catch (AmazonClientException e) { + incrementPutCompletedStatistics(false, len); + throw e; + } } /** @@ -950,9 +1072,28 @@ public class S3AFileSystem extends FileSystem { LOG.debug("PUT start {} bytes", bytes); incrementWriteOperations(); incrementStatistic(OBJECT_PUT_REQUESTS); + incrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1); + if (bytes > 0) { + incrementGauge(OBJECT_PUT_BYTES_PENDING, bytes); + } + } + + /** + * At the end of a put/multipart upload operation, update the + * relevant counters and gauges. + * + * @param success did the operation succeed? + * @param bytes bytes in the request. + */ + public void incrementPutCompletedStatistics(boolean success, long bytes) { + LOG.debug("PUT completed success={}; {} bytes", success, bytes); + incrementWriteOperations(); if (bytes > 0) { incrementStatistic(OBJECT_PUT_BYTES, bytes); + decrementGauge(OBJECT_PUT_BYTES_PENDING, bytes); } + incrementStatistic(OBJECT_PUT_REQUESTS_COMPLETED); + decrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1); } /** @@ -963,7 +1104,7 @@ public class S3AFileSystem extends FileSystem { * @param bytes bytes successfully uploaded. */ public void incrementPutProgressStatistics(String key, long bytes) { - LOG.debug("PUT {}: {} bytes", key, bytes); + PROGRESS.debug("PUT {}: {} bytes", key, bytes); incrementWriteOperations(); if (bytes > 0) { statistics.incrementBytesWritten(bytes); @@ -1483,7 +1624,7 @@ public class S3AFileSystem extends FileSystem { LocalFileSystem local = getLocal(getConf()); File srcfile = local.pathToFile(src); - final ObjectMetadata om = newObjectMetadata(); + final ObjectMetadata om = newObjectMetadata(srcfile.length()); PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile); Upload up = putObject(putObjectRequest); ProgressableProgressListener listener = new ProgressableProgressListener( @@ -1751,6 +1892,10 @@ public class S3AFileSystem extends FileSystem { .append(serverSideEncryptionAlgorithm) .append('\''); } + if (blockFactory != null) { + sb.append(", blockFactory=").append(blockFactory); + } + sb.append(", executor=").append(threadPoolExecutor); sb.append(", statistics {") .append(statistics) .append("}"); @@ -1958,4 +2103,163 @@ public class S3AFileSystem extends FileSystem { getFileBlockLocations(status, 0, status.getLen()) : null); } + + /** + * Helper for an ongoing write operation. + * <p> + * It hides direct access to the S3 API from the output stream, + * and is a location where the object upload process can be evolved/enhanced. + * <p> + * Features + * <ul> + * <li>Methods to create and submit requests to S3, so avoiding + * all direct interaction with the AWS APIs.</li> + * <li>Some extra preflight checks of arguments, so failing fast on + * errors.</li> + * <li>Callbacks to let the FS know of events in the output stream + * upload process.</li> + * </ul> + * + * Each instance of this state is unique to a single output stream. + */ + final class WriteOperationHelper { + private final String key; + + private WriteOperationHelper(String key) { + this.key = key; + } + + /** + * Create a {@link PutObjectRequest} request. + * The metadata is assumed to have been configured with the size of the + * operation. + * @param inputStream source data. + * @param length size, if known. Use -1 for not known + * @return the request + */ + PutObjectRequest newPutRequest(InputStream inputStream, long length) { + return newPutObjectRequest(key, newObjectMetadata(length), inputStream); + } + + /** + * Callback on a successful write. + */ + void writeSuccessful() { + finishedWrite(key); + } + + /** + * Callback on a write failure. + * @param e Any exception raised which triggered the failure. + */ + void writeFailed(Exception e) { + LOG.debug("Write to {} failed", this, e); + } + + /** + * Create a new object metadata instance. + * Any standard metadata headers are added here, for example: + * encryption. + * @param length size, if known. Use -1 for not known + * @return a new metadata instance + */ + public ObjectMetadata newObjectMetadata(long length) { + return S3AFileSystem.this.newObjectMetadata(length); + } + + /** + * Start the multipart upload process. + * @return the upload result containing the ID + * @throws IOException IO problem + */ + String initiateMultiPartUpload() throws IOException { + LOG.debug("Initiating Multipart upload"); + final InitiateMultipartUploadRequest initiateMPURequest = + new InitiateMultipartUploadRequest(bucket, + key, + newObjectMetadata(-1)); + initiateMPURequest.setCannedACL(cannedACL); + try { + return s3.initiateMultipartUpload(initiateMPURequest) + .getUploadId(); + } catch (AmazonClientException ace) { + throw translateException("initiate MultiPartUpload", key, ace); + } + } + + /** + * Complete a multipart upload operation. + * @param uploadId multipart operation Id + * @param partETags list of partial uploads + * @return the result + * @throws AmazonClientException on problems. + */ + CompleteMultipartUploadResult completeMultipartUpload(String uploadId, + List<PartETag> partETags) throws AmazonClientException { + Preconditions.checkNotNull(uploadId); + Preconditions.checkNotNull(partETags); + Preconditions.checkArgument(!partETags.isEmpty(), + "No partitions have been uploaded"); + return s3.completeMultipartUpload( + new CompleteMultipartUploadRequest(bucket, + key, + uploadId, + partETags)); + } + + /** + * Abort a multipart upload operation. + * @param uploadId multipart operation Id + * @return the result + * @throws AmazonClientException on problems. + */ + void abortMultipartUpload(String uploadId) throws AmazonClientException { + s3.abortMultipartUpload( + new AbortMultipartUploadRequest(bucket, key, uploadId)); + } + + /** + * Create and initialize a part request of a multipart upload. + * @param uploadId ID of ongoing upload + * @param uploadStream source of data to upload + * @param partNumber current part number of the upload + * @param size amount of data + * @return the request. + */ + UploadPartRequest newUploadPartRequest(String uploadId, + InputStream uploadStream, + int partNumber, + int size) { + Preconditions.checkNotNull(uploadId); + Preconditions.checkNotNull(uploadStream); + Preconditions.checkArgument(size > 0, "Invalid partition size %s", size); + Preconditions.checkArgument(partNumber> 0 && partNumber <=10000, + "partNumber must be between 1 and 10000 inclusive, but is %s", + partNumber); + + LOG.debug("Creating part upload request for {} #{} size {}", + uploadId, partNumber, size); + return new UploadPartRequest() + .withBucketName(bucket) + .withKey(key) + .withUploadId(uploadId) + .withInputStream(uploadStream) + .withPartNumber(partNumber) + .withPartSize(size); + } + + /** + * The toString method is intended to be used in logging/toString calls. + * @return a string description. + */ + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "{bucket=").append(bucket); + sb.append(", key='").append(key).append('\''); + sb.append('}'); + return sb.toString(); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 26b5b51..963c53f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -18,7 +18,9 @@ package org.apache.hadoop.fs.s3a; -import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.metrics2.MetricStringBuilder; @@ -29,10 +31,12 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableMetric; +import java.io.Closeable; import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.hadoop.fs.s3a.Statistic.*; @@ -50,6 +54,9 @@ import static org.apache.hadoop.fs.s3a.Statistic.*; @InterfaceAudience.Private @InterfaceStability.Evolving public class S3AInstrumentation { + private static final Logger LOG = LoggerFactory.getLogger( + S3AInstrumentation.class); + public static final String CONTEXT = "S3AFileSystem"; private final MetricsRegistry registry = new MetricsRegistry("S3AFileSystem").setContext(CONTEXT); @@ -100,7 +107,23 @@ public class S3AInstrumentation { OBJECT_METADATA_REQUESTS, OBJECT_MULTIPART_UPLOAD_ABORTED, OBJECT_PUT_BYTES, - OBJECT_PUT_REQUESTS + OBJECT_PUT_REQUESTS, + OBJECT_PUT_REQUESTS_COMPLETED, + STREAM_WRITE_FAILURES, + STREAM_WRITE_BLOCK_UPLOADS, + STREAM_WRITE_BLOCK_UPLOADS_COMMITTED, + STREAM_WRITE_BLOCK_UPLOADS_ABORTED, + STREAM_WRITE_TOTAL_TIME, + STREAM_WRITE_TOTAL_DATA, + }; + + + private static final Statistic[] GAUGES_TO_CREATE = { + OBJECT_PUT_REQUESTS_ACTIVE, + OBJECT_PUT_BYTES_PENDING, + STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, + STREAM_WRITE_BLOCK_UPLOADS_PENDING, + STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, }; public S3AInstrumentation(URI name) { @@ -143,6 +166,9 @@ public class S3AInstrumentation { for (Statistic statistic : COUNTERS_TO_CREATE) { counter(statistic); } + for (Statistic statistic : GAUGES_TO_CREATE) { + gauge(statistic.getSymbol(), statistic.getDescription()); + } } /** @@ -254,13 +280,13 @@ public class S3AInstrumentation { * Lookup a counter by name. Return null if it is not known. * @param name counter name * @return the counter + * @throws IllegalStateException if the metric is not a counter */ private MutableCounterLong lookupCounter(String name) { MutableMetric metric = lookupMetric(name); if (metric == null) { return null; } - Preconditions.checkNotNull(metric, "not found: " + name); if (!(metric instanceof MutableCounterLong)) { throw new IllegalStateException("Metric " + name + " is not a MutableCounterLong: " + metric); @@ -269,6 +295,20 @@ public class S3AInstrumentation { } /** + * Look up a gauge. + * @param name gauge name + * @return the gauge or null + * @throws ClassCastException if the metric is not a Gauge. + */ + public MutableGaugeLong lookupGauge(String name) { + MutableMetric metric = lookupMetric(name); + if (metric == null) { + LOG.debug("No gauge {}", name); + } + return (MutableGaugeLong) metric; + } + + /** * Look up a metric from both the registered set and the lighter weight * stream entries. * @param name metric name @@ -349,6 +389,47 @@ public class S3AInstrumentation { counter.incr(count); } } + /** + * Increment a specific counter. + * No-op if not defined. + * @param op operation + * @param count atomic long containing value + */ + public void incrementCounter(Statistic op, AtomicLong count) { + incrementCounter(op, count.get()); + } + + /** + * Increment a specific gauge. + * No-op if not defined. + * @param op operation + * @param count increment value + * @throws ClassCastException if the metric is of the wrong type + */ + public void incrementGauge(Statistic op, long count) { + MutableGaugeLong gauge = lookupGauge(op.getSymbol()); + if (gauge != null) { + gauge.incr(count); + } else { + LOG.debug("No Gauge: "+ op); + } + } + + /** + * Decrement a specific gauge. + * No-op if not defined. + * @param op operation + * @param count increment value + * @throws ClassCastException if the metric is of the wrong type + */ + public void decrementGauge(Statistic op, long count) { + MutableGaugeLong gauge = lookupGauge(op.getSymbol()); + if (gauge != null) { + gauge.decr(count); + } else { + LOG.debug("No Gauge: " + op); + } + } /** * Create a stream input statistics instance. @@ -553,4 +634,165 @@ public class S3AInstrumentation { return sb.toString(); } } + + /** + * Create a stream output statistics instance. + * @return the new instance + */ + + OutputStreamStatistics newOutputStreamStatistics() { + return new OutputStreamStatistics(); + } + + /** + * Merge in the statistics of a single output stream into + * the filesystem-wide statistics. + * @param statistics stream statistics + */ + private void mergeOutputStreamStatistics(OutputStreamStatistics statistics) { + incrementCounter(STREAM_WRITE_TOTAL_TIME, statistics.totalUploadDuration()); + incrementCounter(STREAM_WRITE_QUEUE_DURATION, statistics.queueDuration); + incrementCounter(STREAM_WRITE_TOTAL_DATA, statistics.bytesUploaded); + incrementCounter(STREAM_WRITE_BLOCK_UPLOADS, + statistics.blockUploadsCompleted); + } + + /** + * Statistics updated by an output stream during its actual operation. + * Some of these stats may be relayed. However, as block upload is + * spans multiple + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + public final class OutputStreamStatistics implements Closeable { + private final AtomicLong blocksSubmitted = new AtomicLong(0); + private final AtomicLong blocksInQueue = new AtomicLong(0); + private final AtomicLong blocksActive = new AtomicLong(0); + private final AtomicLong blockUploadsCompleted = new AtomicLong(0); + private final AtomicLong blockUploadsFailed = new AtomicLong(0); + private final AtomicLong bytesPendingUpload = new AtomicLong(0); + + private final AtomicLong bytesUploaded = new AtomicLong(0); + private final AtomicLong transferDuration = new AtomicLong(0); + private final AtomicLong queueDuration = new AtomicLong(0); + private final AtomicLong exceptionsInMultipartFinalize = new AtomicLong(0); + + /** + * Block is queued for upload. + */ + void blockUploadQueued(int blockSize) { + blocksSubmitted.incrementAndGet(); + blocksInQueue.incrementAndGet(); + bytesPendingUpload.addAndGet(blockSize); + incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1); + incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, blockSize); + } + + /** Queued block has been scheduled for upload. */ + void blockUploadStarted(long duration, int blockSize) { + queueDuration.addAndGet(duration); + blocksInQueue.decrementAndGet(); + blocksActive.incrementAndGet(); + incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_PENDING, -1); + incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, 1); + } + + /** A block upload has completed. */ + void blockUploadCompleted(long duration, int blockSize) { + this.transferDuration.addAndGet(duration); + incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1); + blocksActive.decrementAndGet(); + blockUploadsCompleted.incrementAndGet(); + } + + /** + * A block upload has failed. + * A final transfer completed event is still expected, so this + * does not decrement the active block counter. + */ + void blockUploadFailed(long duration, int blockSize) { + blockUploadsFailed.incrementAndGet(); + } + + /** Intermediate report of bytes uploaded. */ + void bytesTransferred(long byteCount) { + bytesUploaded.addAndGet(byteCount); + bytesPendingUpload.addAndGet(-byteCount); + incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, -byteCount); + } + + /** + * Note an exception in a multipart complete. + */ + void exceptionInMultipartComplete() { + exceptionsInMultipartFinalize.incrementAndGet(); + } + + /** + * Note an exception in a multipart abort. + */ + void exceptionInMultipartAbort() { + exceptionsInMultipartFinalize.incrementAndGet(); + } + + /** + * Get the number of bytes pending upload. + * @return the number of bytes in the pending upload state. + */ + public long getBytesPendingUpload() { + return bytesPendingUpload.get(); + } + + /** + * Output stream has closed. + * Trigger merge in of all statistics not updated during operation. + */ + @Override + public void close() { + if (bytesPendingUpload.get() > 0) { + LOG.warn("Closing output stream statistics while data is still marked" + + " as pending upload in {}", this); + } + mergeOutputStreamStatistics(this); + } + + long averageQueueTime() { + return blocksSubmitted.get() > 0 ? + (queueDuration.get() / blocksSubmitted.get()) : 0; + } + + double effectiveBandwidth() { + double duration = totalUploadDuration() / 1000.0; + return duration > 0 ? + (bytesUploaded.get() / duration) : 0; + } + + long totalUploadDuration() { + return queueDuration.get() + transferDuration.get(); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "OutputStreamStatistics{"); + sb.append("blocksSubmitted=").append(blocksSubmitted); + sb.append(", blocksInQueue=").append(blocksInQueue); + sb.append(", blocksActive=").append(blocksActive); + sb.append(", blockUploadsCompleted=").append(blockUploadsCompleted); + sb.append(", blockUploadsFailed=").append(blockUploadsFailed); + sb.append(", bytesPendingUpload=").append(bytesPendingUpload); + sb.append(", bytesUploaded=").append(bytesUploaded); + sb.append(", exceptionsInMultipartFinalize=").append( + exceptionsInMultipartFinalize); + sb.append(", transferDuration=").append(transferDuration).append(" ms"); + sb.append(", queueDuration=").append(queueDuration).append(" ms"); + sb.append(", averageQueueTime=").append(averageQueueTime()).append(" ms"); + sb.append(", totalUploadDuration=").append(totalUploadDuration()) + .append(" ms"); + sb.append(", effectiveBandwidth=").append(effectiveBandwidth()) + .append(" bytes/s"); + sb.append('}'); + return sb.toString(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java index 23ba682..6ebc9e4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -35,8 +35,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*; /** @@ -45,37 +45,27 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.*; @InterfaceAudience.Private @InterfaceStability.Evolving public class S3AOutputStream extends OutputStream { - private OutputStream backupStream; - private File backupFile; - private boolean closed; - private String key; - private Progressable progress; - private long partSize; - private long partSizeThreshold; - private S3AFileSystem fs; - private LocalDirAllocator lDirAlloc; + private final OutputStream backupStream; + private final File backupFile; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final String key; + private final Progressable progress; + private final S3AFileSystem fs; public static final Logger LOG = S3AFileSystem.LOG; public S3AOutputStream(Configuration conf, - S3AFileSystem fs, String key, Progressable progress) + S3AFileSystem fs, + String key, + Progressable progress) throws IOException { this.key = key; this.progress = progress; this.fs = fs; - partSize = fs.getPartitionSize(); - partSizeThreshold = fs.getMultiPartThreshold(); - - if (conf.get(BUFFER_DIR, null) != null) { - lDirAlloc = new LocalDirAllocator(BUFFER_DIR); - } else { - lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a"); - } - backupFile = lDirAlloc.createTmpFileForWrite("output-", + backupFile = fs.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf); - closed = false; LOG.debug("OutputStream for key '{}' writing to tempfile: {}", key, backupFile); @@ -84,25 +74,33 @@ public class S3AOutputStream extends OutputStream { new FileOutputStream(backupFile)); } + /** + * Check for the filesystem being open. + * @throws IOException if the filesystem is closed. + */ + void checkOpen() throws IOException { + if (closed.get()) { + throw new IOException("Output Stream closed"); + } + } + @Override public void flush() throws IOException { + checkOpen(); backupStream.flush(); } @Override - public synchronized void close() throws IOException { - if (closed) { + public void close() throws IOException { + if (closed.getAndSet(true)) { return; } backupStream.close(); LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key); - LOG.debug("Minimum upload part size: {} threshold {}" , partSize, - partSizeThreshold); - try { - final ObjectMetadata om = fs.newObjectMetadata(); + final ObjectMetadata om = fs.newObjectMetadata(backupFile.length()); Upload upload = fs.putObject( fs.newPutObjectRequest( key, @@ -126,18 +124,19 @@ public class S3AOutputStream extends OutputStream { LOG.warn("Could not delete temporary s3a file: {}", backupFile); } super.close(); - closed = true; } LOG.debug("OutputStream for key '{}' upload complete", key); } @Override public void write(int b) throws IOException { + checkOpen(); backupStream.write(b); } @Override public void write(byte[] b, int off, int len) throws IOException { + checkOpen(); backupStream.write(b, off, len); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 93d819b..c89f690 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -49,6 +49,7 @@ import java.util.concurrent.ExecutionException; import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY; import static org.apache.hadoop.fs.s3a.Constants.AWS_CREDENTIALS_PROVIDER; import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY; /** @@ -460,4 +461,42 @@ public final class S3AUtils { key, v, min)); return v; } + + /** + * Get a size property from the configuration: this property must + * be at least equal to {@link Constants#MULTIPART_MIN_SIZE}. + * If it is too small, it is rounded up to that minimum, and a warning + * printed. + * @param conf configuration + * @param property property name + * @param defVal default value + * @return the value, guaranteed to be above the minimum size + */ + public static long getMultipartSizeProperty(Configuration conf, + String property, long defVal) { + long partSize = conf.getLong(property, defVal); + if (partSize < MULTIPART_MIN_SIZE) { + LOG.warn("{} must be at least 5 MB; configured value is {}", + property, partSize); + partSize = MULTIPART_MIN_SIZE; + } + return partSize; + } + + /** + * Ensure that the long value is in the range of an integer. + * @param name property name for error messages + * @param size original size + * @return the size, guaranteed to be less than or equal to the max + * value of an integer. + */ + public static int ensureOutputParameterInRange(String name, long size) { + if (size > Integer.MAX_VALUE) { + LOG.warn("s3a: {} capped to ~2.14GB" + + " (maximum allowed size with current output mechanism)", name); + return Integer.MAX_VALUE; + } else { + return (int)size; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java new file mode 100644 index 0000000..6b21912 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.google.common.util.concurrent.ForwardingListeningExecutorService; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; + +import org.apache.hadoop.classification.InterfaceAudience; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * This ExecutorService blocks the submission of new tasks when its queue is + * already full by using a semaphore. Task submissions require permits, task + * completions release permits. + * <p> + * This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code + * contains the thread pool logic, whereas this isolates the semaphore + * and submit logic for use with other thread pools and delegation models. + * In particular, it <i>permits multiple per stream executors to share a + * single per-FS-instance executor; the latter to throttle overall + * load from the the FS, the others to limit the amount of load which + * a single output stream can generate.</i> + * <p> + * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java"> + * this s4 threadpool</a> + */ +@SuppressWarnings("NullableProblems") +@InterfaceAudience.Private +class SemaphoredDelegatingExecutor extends + ForwardingListeningExecutorService { + + private final Semaphore queueingPermits; + private final ListeningExecutorService executorDelegatee; + private final int permitCount; + + /** + * Instantiate. + * @param executorDelegatee Executor to delegate to + * @param permitCount number of permits into the queue permitted + * @param fair should the semaphore be "fair" + */ + SemaphoredDelegatingExecutor(ListeningExecutorService executorDelegatee, + int permitCount, + boolean fair) { + this.permitCount = permitCount; + queueingPermits = new Semaphore(permitCount, fair); + this.executorDelegatee = executorDelegatee; + } + + @Override + protected ListeningExecutorService delegate() { + return executorDelegatee; + } + + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException { + throw new RuntimeException("Not implemented"); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, + long timeout, TimeUnit unit) throws InterruptedException { + throw new RuntimeException("Not implemented"); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + throw new RuntimeException("Not implemented"); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, + TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + throw new RuntimeException("Not implemented"); + } + + @Override + public <T> ListenableFuture<T> submit(Callable<T> task) { + try { + queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Futures.immediateFailedCheckedFuture(e); + } + return super.submit(new CallableWithPermitRelease<>(task)); + } + + @Override + public <T> ListenableFuture<T> submit(Runnable task, T result) { + try { + queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Futures.immediateFailedCheckedFuture(e); + } + return super.submit(new RunnableWithPermitRelease(task), result); + } + + @Override + public ListenableFuture<?> submit(Runnable task) { + try { + queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Futures.immediateFailedCheckedFuture(e); + } + return super.submit(new RunnableWithPermitRelease(task)); + } + + @Override + public void execute(Runnable command) { + try { + queueingPermits.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + super.execute(new RunnableWithPermitRelease(command)); + } + + /** + * Get the number of permits available; guaranteed to be + * {@code 0 <= availablePermits <= size}. + * @return the number of permits available at the time of invocation. + */ + public int getAvailablePermits() { + return queueingPermits.availablePermits(); + } + + /** + * Get the number of threads waiting to acquire a permit. + * @return snapshot of the length of the queue of blocked threads. + */ + public int getWaitingCount() { + return queueingPermits.getQueueLength(); + } + + /** + * Total number of permits. + * @return the number of permits as set in the constructor + */ + public int getPermitCount() { + return permitCount; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "SemaphoredDelegatingExecutor{"); + sb.append("permitCount=").append(getPermitCount()); + sb.append(", available=").append(getAvailablePermits()); + sb.append(", waiting=").append(getWaitingCount()); + sb.append('}'); + return sb.toString(); + } + + /** + * Releases a permit after the task is executed. + */ + class RunnableWithPermitRelease implements Runnable { + + private Runnable delegatee; + + public RunnableWithPermitRelease(Runnable delegatee) { + this.delegatee = delegatee; + } + + @Override + public void run() { + try { + delegatee.run(); + } finally { + queueingPermits.release(); + } + + } + } + + /** + * Releases a permit after the task is completed. + */ + class CallableWithPermitRelease<T> implements Callable<T> { + + private Callable<T> delegatee; + + public CallableWithPermitRelease(Callable<T> delegatee) { + this.delegatee = delegatee; + } + + @Override + public T call() throws Exception { + try { + return delegatee.call(); + } finally { + queueingPermits.release(); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index d84a355..36ec50b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -81,10 +81,16 @@ public enum Statistic { "Object multipart upload aborted"), OBJECT_PUT_REQUESTS("object_put_requests", "Object put/multipart upload count"), + OBJECT_PUT_REQUESTS_COMPLETED("object_put_requests_completed", + "Object put/multipart upload completed count"), + OBJECT_PUT_REQUESTS_ACTIVE("object_put_requests_active", + "Current number of active put requests"), OBJECT_PUT_BYTES("object_put_bytes", "number of bytes uploaded"), + OBJECT_PUT_BYTES_PENDING("object_put_bytes_pending", + "number of bytes queued for upload/being actively uploaded"), STREAM_ABORTED("stream_aborted", "Count of times the TCP stream was aborted"), - STREAM_BACKWARD_SEEK_OPERATIONS("stream_backward_seek_pperations", + STREAM_BACKWARD_SEEK_OPERATIONS("stream_backward_seek_operations", "Number of executed seek operations which went backwards in a stream"), STREAM_CLOSED("streamClosed", "Count of times the TCP stream was closed"), STREAM_CLOSE_OPERATIONS("stream_close_operations", @@ -112,7 +118,29 @@ public enum Statistic { STREAM_CLOSE_BYTES_READ("stream_bytes_read_in_close", "Count of bytes read when closing streams during seek operations."), STREAM_ABORT_BYTES_DISCARDED("stream_bytes_discarded_in_abort", - "Count of bytes discarded by aborting the stream"); + "Count of bytes discarded by aborting the stream"), + STREAM_WRITE_FAILURES("stream_write_failures", + "Count of stream write failures reported"), + STREAM_WRITE_BLOCK_UPLOADS("stream_write_block_uploads", + "Count of block/partition uploads completed"), + STREAM_WRITE_BLOCK_UPLOADS_ACTIVE("stream_write_block_uploads_active", + "Count of block/partition uploads completed"), + STREAM_WRITE_BLOCK_UPLOADS_COMMITTED("stream_write_block_uploads_committed", + "Count of number of block uploads committed"), + STREAM_WRITE_BLOCK_UPLOADS_ABORTED("stream_write_block_uploads_aborted", + "Count of number of block uploads aborted"), + + STREAM_WRITE_BLOCK_UPLOADS_PENDING("stream_write_block_uploads_pending", + "Gauge of block/partitions uploads queued to be written"), + STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING( + "stream_write_block_uploads_data_pending", + "Gauge of block/partitions data uploads queued to be written"), + STREAM_WRITE_TOTAL_TIME("stream_write_total_time", + "Count of total time taken for uploads to complete"), + STREAM_WRITE_TOTAL_DATA("stream_write_total_data", + "Count of total data uploaded in block output"), + STREAM_WRITE_QUEUE_DURATION("stream_write_queue_duration", + "Total queue duration of all block uploads"); private static final Map<String, Statistic> SYMBOL_MAP = new HashMap<>(Statistic.values().length); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org