http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
deleted file mode 100644
index c25d0fb..0000000
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.event.ProgressEvent;
-import com.amazonaws.event.ProgressListener;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
-import com.amazonaws.services.s3.model.CannedAccessControlList;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PartETag;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.PutObjectResult;
-import com.amazonaws.services.s3.model.UploadPartRequest;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.util.Progressable;
-import org.slf4j.Logger;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-
-import static org.apache.hadoop.fs.s3a.S3AUtils.*;
-import static org.apache.hadoop.fs.s3a.Statistic.*;
-
-/**
- * Upload files/parts asap directly from a memory buffer (instead of buffering
- * to a file).
- * <p>
- * Uploads are managed low-level rather than through the AWS TransferManager.
- * This allows for uploading each part of a multi-part upload as soon as
- * the bytes are in memory, rather than waiting until the file is closed.
- * <p>
- * Unstable: statistics and error handling might evolve
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class S3AFastOutputStream extends OutputStream {
-
-  private static final Logger LOG = S3AFileSystem.LOG;
-  private final String key;
-  private final String bucket;
-  private final AmazonS3 client;
-  private final int partSize;
-  private final int multiPartThreshold;
-  private final S3AFileSystem fs;
-  private final CannedAccessControlList cannedACL;
-  private final ProgressListener progressListener;
-  private final ListeningExecutorService executorService;
-  private MultiPartUpload multiPartUpload;
-  private boolean closed;
-  private ByteArrayOutputStream buffer;
-  private int bufferLimit;
-
-
-  /**
-   * Creates a fast OutputStream that uploads to S3 from memory.
-   * For MultiPartUploads, as soon as sufficient bytes have been written to
-   * the stream a part is uploaded immediately (by using the low-level
-   * multi-part upload API on the AmazonS3Client).
-   *
-   * @param client AmazonS3Client used for S3 calls
-   * @param fs S3AFilesystem
-   * @param bucket S3 bucket name
-   * @param key S3 key name
-   * @param progress report progress in order to prevent timeouts
-   * @param cannedACL used CannedAccessControlList
-   * @param partSize size of a single part in a multi-part upload (except
-   * last part)
-   * @param multiPartThreshold files at least this size use multi-part upload
-   * @param threadPoolExecutor thread factory
-   * @throws IOException on any problem
-   */
-  public S3AFastOutputStream(AmazonS3 client,
-      S3AFileSystem fs,
-      String bucket,
-      String key,
-      Progressable progress,
-      CannedAccessControlList cannedACL,
-      long partSize,
-      long multiPartThreshold,
-      ExecutorService threadPoolExecutor)
-      throws IOException {
-    this.bucket = bucket;
-    this.key = key;
-    this.client = client;
-    this.fs = fs;
-    this.cannedACL = cannedACL;
-    //Ensure limit as ByteArrayOutputStream size cannot exceed 
Integer.MAX_VALUE
-    if (partSize > Integer.MAX_VALUE) {
-      this.partSize = Integer.MAX_VALUE;
-      LOG.warn("s3a: MULTIPART_SIZE capped to ~2.14GB (maximum allowed size " +
-          "when using 'FAST_UPLOAD = true')");
-    } else {
-      this.partSize = (int) partSize;
-    }
-    if (multiPartThreshold > Integer.MAX_VALUE) {
-      this.multiPartThreshold = Integer.MAX_VALUE;
-      LOG.warn("s3a: MIN_MULTIPART_THRESHOLD capped to ~2.14GB (maximum " +
-          "allowed size when using 'FAST_UPLOAD = true')");
-    } else {
-      this.multiPartThreshold = (int) multiPartThreshold;
-    }
-    this.bufferLimit = this.multiPartThreshold;
-    this.closed = false;
-    int initialBufferSize = this.fs.getConf()
-        .getInt(Constants.FAST_BUFFER_SIZE, 
Constants.DEFAULT_FAST_BUFFER_SIZE);
-    if (initialBufferSize < 0) {
-      LOG.warn("s3a: FAST_BUFFER_SIZE should be a positive number. Using " +
-          "default value");
-      initialBufferSize = Constants.DEFAULT_FAST_BUFFER_SIZE;
-    } else if (initialBufferSize > this.bufferLimit) {
-      LOG.warn("s3a: automatically adjusting FAST_BUFFER_SIZE to not " +
-          "exceed MIN_MULTIPART_THRESHOLD");
-      initialBufferSize = this.bufferLimit;
-    }
-    this.buffer = new ByteArrayOutputStream(initialBufferSize);
-    this.executorService = 
MoreExecutors.listeningDecorator(threadPoolExecutor);
-    this.multiPartUpload = null;
-    this.progressListener = new ProgressableListener(progress);
-    LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'",
-        bucket, key);
-  }
-
-  /**
-   * Writes a byte to the memory buffer. If this causes the buffer to reach
-   * its limit, the actual upload is submitted to the threadpool.
-   * @param b the int of which the lowest byte is written
-   * @throws IOException on any problem
-   */
-  @Override
-  public synchronized void write(int b) throws IOException {
-    buffer.write(b);
-    if (buffer.size() == bufferLimit) {
-      uploadBuffer();
-    }
-  }
-
-  /**
-   * Writes a range of bytes from to the memory buffer. If this causes the
-   * buffer to reach its limit, the actual upload is submitted to the
-   * threadpool and the remainder of the array is written to memory
-   * (recursively).
-   * @param b byte array containing
-   * @param off offset in array where to start
-   * @param len number of bytes to be written
-   * @throws IOException on any problem
-   */
-  @Override
-  public synchronized void write(byte[] b, int off, int len)
-      throws IOException {
-    if (b == null) {
-      throw new NullPointerException();
-    } else if ((off < 0) || (off > b.length) || (len < 0) ||
-        ((off + len) > b.length) || ((off + len) < 0)) {
-      throw new IndexOutOfBoundsException();
-    } else if (len == 0) {
-      return;
-    }
-    if (buffer.size() + len < bufferLimit) {
-      buffer.write(b, off, len);
-    } else {
-      int firstPart = bufferLimit - buffer.size();
-      buffer.write(b, off, firstPart);
-      uploadBuffer();
-      this.write(b, off + firstPart, len - firstPart);
-    }
-  }
-
-  private synchronized void uploadBuffer() throws IOException {
-    if (multiPartUpload == null) {
-      multiPartUpload = initiateMultiPartUpload();
-       /* Upload the existing buffer if it exceeds partSize. This possibly
-       requires multiple parts! */
-      final byte[] allBytes = buffer.toByteArray();
-      buffer = null; //earlier gc?
-      LOG.debug("Total length of initial buffer: {}", allBytes.length);
-      int processedPos = 0;
-      while ((multiPartThreshold - processedPos) >= partSize) {
-        LOG.debug("Initial buffer: processing from byte {} to byte {}",
-            processedPos, (processedPos + partSize - 1));
-        multiPartUpload.uploadPartAsync(new ByteArrayInputStream(allBytes,
-            processedPos, partSize), partSize);
-        processedPos += partSize;
-      }
-      //resize and reset stream
-      bufferLimit = partSize;
-      buffer = new ByteArrayOutputStream(bufferLimit);
-      buffer.write(allBytes, processedPos, multiPartThreshold - processedPos);
-    } else {
-      //upload next part
-      multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
-          .toByteArray()), partSize);
-      buffer.reset();
-    }
-  }
-
-  /**
-   * Close the stream. This will not return until the upload is complete
-   * or the attempt to perform the upload has failed.
-   * Exceptions raised in this method are indicative that the write has
-   * failed and data is at risk of being lost.
-   * @throws IOException on any failure.
-   */
-  @Override
-  public synchronized void close() throws IOException {
-    if (closed) {
-      return;
-    }
-    closed = true;
-    try {
-      if (multiPartUpload == null) {
-        putObject();
-      } else {
-        int size = buffer.size();
-        if (size > 0) {
-          fs.incrementPutStartStatistics(size);
-          //send last part
-          multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
-              .toByteArray()), size);
-        }
-        final List<PartETag> partETags = multiPartUpload
-            .waitForAllPartUploads();
-        multiPartUpload.complete(partETags);
-      }
-      // This will delete unnecessary fake parent directories
-      fs.finishedWrite(key);
-      LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
-    } finally {
-      buffer = null;
-      super.close();
-    }
-  }
-
-  /**
-   * Create the default metadata for a multipart upload operation.
-   * @return the metadata to use/extend.
-   */
-  private ObjectMetadata createDefaultMetadata() {
-    return fs.newObjectMetadata();
-  }
-
-  private MultiPartUpload initiateMultiPartUpload() throws IOException {
-    final InitiateMultipartUploadRequest initiateMPURequest =
-        new InitiateMultipartUploadRequest(bucket,
-            key,
-            createDefaultMetadata());
-    initiateMPURequest.setCannedACL(cannedACL);
-    try {
-      return new MultiPartUpload(
-          client.initiateMultipartUpload(initiateMPURequest).getUploadId());
-    } catch (AmazonClientException ace) {
-      throw translateException("initiate MultiPartUpload", key, ace);
-    }
-  }
-
-  private void putObject() throws IOException {
-    LOG.debug("Executing regular upload for bucket '{}' key '{}'",
-        bucket, key);
-    final ObjectMetadata om = createDefaultMetadata();
-    final int size = buffer.size();
-    om.setContentLength(size);
-    final PutObjectRequest putObjectRequest =
-        fs.newPutObjectRequest(key,
-            om,
-            new ByteArrayInputStream(buffer.toByteArray()));
-    putObjectRequest.setGeneralProgressListener(progressListener);
-    ListenableFuture<PutObjectResult> putObjectResult =
-        executorService.submit(new Callable<PutObjectResult>() {
-          @Override
-          public PutObjectResult call() throws Exception {
-            fs.incrementPutStartStatistics(size);
-            return client.putObject(putObjectRequest);
-          }
-        });
-    //wait for completion
-    try {
-      putObjectResult.get();
-    } catch (InterruptedException ie) {
-      LOG.warn("Interrupted object upload: {}", ie, ie);
-      Thread.currentThread().interrupt();
-    } catch (ExecutionException ee) {
-      throw extractException("regular upload", key, ee);
-    }
-  }
-
-
-  private class MultiPartUpload {
-    private final String uploadId;
-    private final List<ListenableFuture<PartETag>> partETagsFutures;
-
-    public MultiPartUpload(String uploadId) {
-      this.uploadId = uploadId;
-      this.partETagsFutures = new ArrayList<ListenableFuture<PartETag>>();
-      LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " +
-          "id '{}'", bucket, key, uploadId);
-    }
-
-    private void uploadPartAsync(ByteArrayInputStream inputStream,
-        int partSize) {
-      final int currentPartNumber = partETagsFutures.size() + 1;
-      final UploadPartRequest request =
-          new UploadPartRequest().withBucketName(bucket).withKey(key)
-              .withUploadId(uploadId).withInputStream(inputStream)
-              .withPartNumber(currentPartNumber).withPartSize(partSize);
-      request.setGeneralProgressListener(progressListener);
-      ListenableFuture<PartETag> partETagFuture =
-          executorService.submit(new Callable<PartETag>() {
-            @Override
-            public PartETag call() throws Exception {
-              LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
-                  uploadId);
-              return fs.uploadPart(request).getPartETag();
-            }
-          });
-      partETagsFutures.add(partETagFuture);
-    }
-
-    private List<PartETag> waitForAllPartUploads() throws IOException {
-      try {
-        return Futures.allAsList(partETagsFutures).get();
-      } catch (InterruptedException ie) {
-        LOG.warn("Interrupted partUpload: {}", ie, ie);
-        Thread.currentThread().interrupt();
-        return null;
-      } catch (ExecutionException ee) {
-        //there is no way of recovering so abort
-        //cancel all partUploads
-        for (ListenableFuture<PartETag> future : partETagsFutures) {
-          future.cancel(true);
-        }
-        //abort multipartupload
-        this.abort();
-        throw extractException("Multi-part upload with id '" + uploadId + "'",
-            key, ee);
-      }
-    }
-
-    private void complete(List<PartETag> partETags) throws IOException {
-      try {
-        LOG.debug("Completing multi-part upload for key '{}', id '{}'",
-            key, uploadId);
-        client.completeMultipartUpload(
-            new CompleteMultipartUploadRequest(bucket,
-                key,
-                uploadId,
-                partETags));
-      } catch (AmazonClientException e) {
-        throw translateException("Completing multi-part upload", key, e);
-      }
-    }
-
-    public void abort() {
-      LOG.warn("Aborting multi-part upload with id '{}'", uploadId);
-      try {
-        fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
-        client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket,
-            key, uploadId));
-      } catch (Exception e2) {
-        LOG.warn("Unable to abort multipart upload, you may need to purge  " +
-            "uploaded parts: {}", e2, e2);
-      }
-    }
-  }
-
-  private static class ProgressableListener implements ProgressListener {
-    private final Progressable progress;
-
-    public ProgressableListener(Progressable progress) {
-      this.progress = progress;
-    }
-
-    public void progressChanged(ProgressEvent progressEvent) {
-      if (progress != null) {
-        progress.progress();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 15bd23a..1532cde 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -30,21 +30,26 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartResult;
@@ -55,6 +60,8 @@ import com.amazonaws.services.s3.transfer.Upload;
 import com.amazonaws.event.ProgressListener;
 import com.amazonaws.event.ProgressEvent;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningExecutorService;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -68,6 +75,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.GlobalStorageStatistics;
 import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -118,21 +126,26 @@ public class S3AFileSystem extends FileSystem {
   private long partSize;
   private boolean enableMultiObjectsDelete;
   private TransferManager transfers;
-  private ExecutorService threadPoolExecutor;
+  private ListeningExecutorService threadPoolExecutor;
   private long multiPartThreshold;
   public static final Logger LOG = 
LoggerFactory.getLogger(S3AFileSystem.class);
+  private static final Logger PROGRESS =
+      
LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress");
+  private LocalDirAllocator directoryAllocator;
   private CannedAccessControlList cannedACL;
   private String serverSideEncryptionAlgorithm;
   private S3AInstrumentation instrumentation;
   private S3AStorageStatistics storageStatistics;
   private long readAhead;
   private S3AInputPolicy inputPolicy;
-  private static final AtomicBoolean warnedOfCoreThreadDeprecation =
-      new AtomicBoolean(false);
   private final AtomicBoolean closed = new AtomicBoolean(false);
 
   // The maximum number of entries that can be deleted in any call to s3
   private static final int MAX_ENTRIES_TO_DELETE = 1000;
+  private boolean blockUploadEnabled;
+  private String blockOutputBuffer;
+  private S3ADataBlocks.BlockFactory blockFactory;
+  private int blockOutputActiveBlocks;
 
   /** Called after a new FileSystem instance is constructed.
    * @param name a uri whose authority section names the host, port, etc.
@@ -159,18 +172,11 @@ public class S3AFileSystem extends FileSystem {
 
       maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
       listing = new Listing(this);
-      partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
-      if (partSize < 5 * 1024 * 1024) {
-        LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
-        partSize = 5 * 1024 * 1024;
-      }
+      partSize = getMultipartSizeProperty(conf,
+          MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
+      multiPartThreshold = getMultipartSizeProperty(conf,
+          MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD);
 
-      multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
-          DEFAULT_MIN_MULTIPART_THRESHOLD);
-      if (multiPartThreshold < 5 * 1024 * 1024) {
-        LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
-        multiPartThreshold = 5 * 1024 * 1024;
-      }
       //check but do not store the block size
       longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
       enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
@@ -186,26 +192,19 @@ public class S3AFileSystem extends FileSystem {
                     }
                   });
 
-      if (conf.get("fs.s3a.threads.core") != null &&
-          warnedOfCoreThreadDeprecation.compareAndSet(false, true)) {
-        LoggerFactory.getLogger(
-            "org.apache.hadoop.conf.Configuration.deprecation")
-            .warn("Unsupported option \"fs.s3a.threads.core\"" +
-                " will be ignored {}", conf.get("fs.s3a.threads.core"));
-      }
       int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
       if (maxThreads < 2) {
         LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
         maxThreads = 2;
       }
-      int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
-      if (totalTasks < 1) {
-        LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
-        totalTasks = 1;
-      }
-      long keepAliveTime = conf.getLong(KEEPALIVE_TIME, 
DEFAULT_KEEPALIVE_TIME);
-      threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
-          maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
+      int totalTasks = intOption(conf,
+          MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1);
+      long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
+          DEFAULT_KEEPALIVE_TIME, 0);
+      threadPoolExecutor = BlockingThreadPoolExecutorService.newInstance(
+          maxThreads,
+          maxThreads + totalTasks,
+          keepAliveTime, TimeUnit.SECONDS,
           "s3a-transfer-shared");
 
       initTransferManager();
@@ -218,8 +217,25 @@ public class S3AFileSystem extends FileSystem {
 
       serverSideEncryptionAlgorithm =
           conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+      LOG.debug("Using encryption {}", serverSideEncryptionAlgorithm);
       inputPolicy = S3AInputPolicy.getPolicy(
           conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
+
+      blockUploadEnabled = conf.getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD);
+
+      if (blockUploadEnabled) {
+        blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
+            DEFAULT_FAST_UPLOAD_BUFFER);
+        partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize);
+        blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
+        blockOutputActiveBlocks = intOption(conf,
+            FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
+        LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
+                " queue limit={}",
+            blockOutputBuffer, partSize, blockOutputActiveBlocks);
+      } else {
+        LOG.debug("Using S3AOutputStream");
+      }
     } catch (AmazonClientException e) {
       throw translateException("initializing ", new Path(name), e);
     }
@@ -346,6 +362,33 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Demand create the directory allocator, then create a temporary file.
+   * {@link LocalDirAllocator#createTmpFileForWrite(String, long, 
Configuration)}.
+   *  @param pathStr prefix for the temporary file
+   *  @param size the size of the file that is going to be written
+   *  @param conf the Configuration object
+   *  @return a unique temporary file
+   *  @throws IOException IO problems
+   */
+  synchronized File createTmpFileForWrite(String pathStr, long size,
+      Configuration conf) throws IOException {
+    if (directoryAllocator == null) {
+      String bufferDir = conf.get(BUFFER_DIR) != null
+          ? BUFFER_DIR : "hadoop.tmp.dir";
+      directoryAllocator = new LocalDirAllocator(bufferDir);
+    }
+    return directoryAllocator.createTmpFileForWrite(pathStr, size, conf);
+  }
+
+  /**
+   * Get the bucket of this filesystem.
+   * @return the bucket
+   */
+  public String getBucket() {
+    return bucket;
+  }
+
+  /**
    * Change the input policy for this FS.
    * @param inputPolicy new policy
    */
@@ -469,6 +512,7 @@ public class S3AFileSystem extends FileSystem {
    * @see #setPermission(Path, FsPermission)
    */
   @Override
+  @SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
   public FSDataOutputStream create(Path f, FsPermission permission,
       boolean overwrite, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
@@ -493,28 +537,33 @@ public class S3AFileSystem extends FileSystem {
 
     }
     instrumentation.fileCreated();
-    if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
-      return new FSDataOutputStream(
-          new S3AFastOutputStream(s3,
-              this,
-              bucket,
+    FSDataOutputStream output;
+    if (blockUploadEnabled) {
+      output = new FSDataOutputStream(
+          new S3ABlockOutputStream(this,
               key,
+              new SemaphoredDelegatingExecutor(threadPoolExecutor,
+                  blockOutputActiveBlocks, true),
               progress,
-              cannedACL,
               partSize,
-              multiPartThreshold,
-              threadPoolExecutor),
-          statistics);
+              blockFactory,
+              instrumentation.newOutputStreamStatistics(),
+              new WriteOperationHelper(key)
+          ),
+          null);
+    } else {
+
+      // We pass null to FSDataOutputStream so it won't count writes that
+      // are being buffered to a file
+      output = new FSDataOutputStream(
+          new S3AOutputStream(getConf(),
+              this,
+              key,
+              progress
+          ),
+          null);
     }
-    // We pass null to FSDataOutputStream so it won't count writes that
-    // are being buffered to a file
-    return new FSDataOutputStream(
-        new S3AOutputStream(getConf(),
-            this,
-            key,
-            progress
-        ),
-        null);
+    return output;
   }
 
   /**
@@ -758,6 +807,33 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Decrement a gauge by a specific value.
+   * @param statistic The operation to decrement
+   * @param count the count to decrement
+   */
+  protected void decrementGauge(Statistic statistic, long count) {
+    instrumentation.decrementGauge(statistic, count);
+  }
+
+  /**
+   * Increment a gauge by a specific value.
+   * @param statistic The operation to increment
+   * @param count the count to increment
+   */
+  protected void incrementGauge(Statistic statistic, long count) {
+    instrumentation.incrementGauge(statistic, count);
+  }
+
+  /**
+   * Get the storage statistics of this filesystem.
+   * @return the storage statistics
+   */
+  @Override
+  public S3AStorageStatistics getStorageStatistics() {
+    return storageStatistics;
+  }
+
+  /**
    * Request object metadata; increments counters in the process.
    * @param key key
    * @return the metadata
@@ -904,7 +980,9 @@ public class S3AFileSystem extends FileSystem {
    */
   public ObjectMetadata newObjectMetadata(long length) {
     final ObjectMetadata om = newObjectMetadata();
-    om.setContentLength(length);
+    if (length >= 0) {
+      om.setContentLength(length);
+    }
     return om;
   }
 
@@ -926,7 +1004,41 @@ public class S3AFileSystem extends FileSystem {
       len = putObjectRequest.getMetadata().getContentLength();
     }
     incrementPutStartStatistics(len);
-    return transfers.upload(putObjectRequest);
+    try {
+      Upload upload = transfers.upload(putObjectRequest);
+      incrementPutCompletedStatistics(true, len);
+      return upload;
+    } catch (AmazonClientException e) {
+      incrementPutCompletedStatistics(false, len);
+      throw e;
+    }
+  }
+
+  /**
+   * PUT an object directly (i.e. not via the transfer manager).
+   * Byte length is calculated from the file length, or, if there is no
+   * file, from the content length of the header.
+   * @param putObjectRequest the request
+   * @return the upload initiated
+   * @throws AmazonClientException on problems
+   */
+  public PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
+      throws AmazonClientException {
+    long len;
+    if (putObjectRequest.getFile() != null) {
+      len = putObjectRequest.getFile().length();
+    } else {
+      len = putObjectRequest.getMetadata().getContentLength();
+    }
+    incrementPutStartStatistics(len);
+    try {
+      PutObjectResult result = s3.putObject(putObjectRequest);
+      incrementPutCompletedStatistics(true, len);
+      return result;
+    } catch (AmazonClientException e) {
+      incrementPutCompletedStatistics(false, len);
+      throw e;
+    }
   }
 
   /**
@@ -934,10 +1046,20 @@ public class S3AFileSystem extends FileSystem {
    * Increments the write and put counters
    * @param request request
    * @return the result of the operation.
+   * @throws AmazonClientException on problems
    */
-  public UploadPartResult uploadPart(UploadPartRequest request) {
-    incrementPutStartStatistics(request.getPartSize());
-    return s3.uploadPart(request);
+  public UploadPartResult uploadPart(UploadPartRequest request)
+      throws AmazonClientException {
+    long len = request.getPartSize();
+    incrementPutStartStatistics(len);
+    try {
+      UploadPartResult uploadPartResult = s3.uploadPart(request);
+      incrementPutCompletedStatistics(true, len);
+      return uploadPartResult;
+    } catch (AmazonClientException e) {
+      incrementPutCompletedStatistics(false, len);
+      throw e;
+    }
   }
 
   /**
@@ -950,9 +1072,28 @@ public class S3AFileSystem extends FileSystem {
     LOG.debug("PUT start {} bytes", bytes);
     incrementWriteOperations();
     incrementStatistic(OBJECT_PUT_REQUESTS);
+    incrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1);
+    if (bytes > 0) {
+      incrementGauge(OBJECT_PUT_BYTES_PENDING, bytes);
+    }
+  }
+
+  /**
+   * At the end of a put/multipart upload operation, update the
+   * relevant counters and gauges.
+   *
+   * @param success did the operation succeed?
+   * @param bytes bytes in the request.
+   */
+  public void incrementPutCompletedStatistics(boolean success, long bytes) {
+    LOG.debug("PUT completed success={}; {} bytes", success, bytes);
+    incrementWriteOperations();
     if (bytes > 0) {
       incrementStatistic(OBJECT_PUT_BYTES, bytes);
+      decrementGauge(OBJECT_PUT_BYTES_PENDING, bytes);
     }
+    incrementStatistic(OBJECT_PUT_REQUESTS_COMPLETED);
+    decrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1);
   }
 
   /**
@@ -963,7 +1104,7 @@ public class S3AFileSystem extends FileSystem {
    * @param bytes bytes successfully uploaded.
    */
   public void incrementPutProgressStatistics(String key, long bytes) {
-    LOG.debug("PUT {}: {} bytes", key, bytes);
+    PROGRESS.debug("PUT {}: {} bytes", key, bytes);
     incrementWriteOperations();
     if (bytes > 0) {
       statistics.incrementBytesWritten(bytes);
@@ -1483,7 +1624,7 @@ public class S3AFileSystem extends FileSystem {
     LocalFileSystem local = getLocal(getConf());
     File srcfile = local.pathToFile(src);
 
-    final ObjectMetadata om = newObjectMetadata();
+    final ObjectMetadata om = newObjectMetadata(srcfile.length());
     PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
     Upload up = putObject(putObjectRequest);
     ProgressableProgressListener listener = new ProgressableProgressListener(
@@ -1751,6 +1892,10 @@ public class S3AFileSystem extends FileSystem {
           .append(serverSideEncryptionAlgorithm)
           .append('\'');
     }
+    if (blockFactory != null) {
+      sb.append(", blockFactory=").append(blockFactory);
+    }
+    sb.append(", executor=").append(threadPoolExecutor);
     sb.append(", statistics {")
         .append(statistics)
         .append("}");
@@ -1958,4 +2103,163 @@ public class S3AFileSystem extends FileSystem {
           getFileBlockLocations(status, 0, status.getLen())
           : null);
   }
+
+  /**
+   * Helper for an ongoing write operation.
+   * <p>
+   * It hides direct access to the S3 API from the output stream,
+   * and is a location where the object upload process can be evolved/enhanced.
+   * <p>
+   * Features
+   * <ul>
+   *   <li>Methods to create and submit requests to S3, so avoiding
+   *   all direct interaction with the AWS APIs.</li>
+   *   <li>Some extra preflight checks of arguments, so failing fast on
+   *   errors.</li>
+   *   <li>Callbacks to let the FS know of events in the output stream
+   *   upload process.</li>
+   * </ul>
+   *
+   * Each instance of this state is unique to a single output stream.
+   */
+  final class WriteOperationHelper {
+    private final String key;
+
+    private WriteOperationHelper(String key) {
+      this.key = key;
+    }
+
+    /**
+     * Create a {@link PutObjectRequest} request.
+     * The metadata is assumed to have been configured with the size of the
+     * operation.
+     * @param inputStream source data.
+     * @param length size, if known. Use -1 for not known
+     * @return the request
+     */
+    PutObjectRequest newPutRequest(InputStream inputStream, long length) {
+      return newPutObjectRequest(key, newObjectMetadata(length), inputStream);
+    }
+
+    /**
+     * Callback on a successful write.
+     */
+    void writeSuccessful() {
+      finishedWrite(key);
+    }
+
+    /**
+     * Callback on a write failure.
+     * @param e Any exception raised which triggered the failure.
+     */
+    void writeFailed(Exception e) {
+      LOG.debug("Write to {} failed", this, e);
+    }
+
+    /**
+     * Create a new object metadata instance.
+     * Any standard metadata headers are added here, for example:
+     * encryption.
+     * @param length size, if known. Use -1 for not known
+     * @return a new metadata instance
+     */
+    public ObjectMetadata newObjectMetadata(long length) {
+      return S3AFileSystem.this.newObjectMetadata(length);
+    }
+
+    /**
+     * Start the multipart upload process.
+     * @return the upload result containing the ID
+     * @throws IOException IO problem
+     */
+    String initiateMultiPartUpload() throws IOException {
+      LOG.debug("Initiating Multipart upload");
+      final InitiateMultipartUploadRequest initiateMPURequest =
+          new InitiateMultipartUploadRequest(bucket,
+              key,
+              newObjectMetadata(-1));
+      initiateMPURequest.setCannedACL(cannedACL);
+      try {
+        return s3.initiateMultipartUpload(initiateMPURequest)
+            .getUploadId();
+      } catch (AmazonClientException ace) {
+        throw translateException("initiate MultiPartUpload", key, ace);
+      }
+    }
+
+    /**
+     * Complete a multipart upload operation.
+     * @param uploadId multipart operation Id
+     * @param partETags list of partial uploads
+     * @return the result
+     * @throws AmazonClientException on problems.
+     */
+    CompleteMultipartUploadResult completeMultipartUpload(String uploadId,
+        List<PartETag> partETags) throws AmazonClientException {
+      Preconditions.checkNotNull(uploadId);
+      Preconditions.checkNotNull(partETags);
+      Preconditions.checkArgument(!partETags.isEmpty(),
+          "No partitions have been uploaded");
+      return s3.completeMultipartUpload(
+          new CompleteMultipartUploadRequest(bucket,
+              key,
+              uploadId,
+              partETags));
+    }
+
+    /**
+     * Abort a multipart upload operation.
+     * @param uploadId multipart operation Id
+     * @return the result
+     * @throws AmazonClientException on problems.
+     */
+    void abortMultipartUpload(String uploadId) throws AmazonClientException {
+      s3.abortMultipartUpload(
+          new AbortMultipartUploadRequest(bucket, key, uploadId));
+    }
+
+    /**
+     * Create and initialize a part request of a multipart upload.
+     * @param uploadId ID of ongoing upload
+     * @param uploadStream source of data to upload
+     * @param partNumber current part number of the upload
+     * @param size amount of data
+     * @return the request.
+     */
+    UploadPartRequest newUploadPartRequest(String uploadId,
+        InputStream uploadStream,
+        int partNumber,
+        int size) {
+      Preconditions.checkNotNull(uploadId);
+      Preconditions.checkNotNull(uploadStream);
+      Preconditions.checkArgument(size > 0, "Invalid partition size %s", size);
+      Preconditions.checkArgument(partNumber> 0 && partNumber <=10000,
+          "partNumber must be between 1 and 10000 inclusive, but is %s",
+          partNumber);
+
+      LOG.debug("Creating part upload request for {} #{} size {}",
+          uploadId, partNumber, size);
+      return new UploadPartRequest()
+          .withBucketName(bucket)
+          .withKey(key)
+          .withUploadId(uploadId)
+          .withInputStream(uploadStream)
+          .withPartNumber(partNumber)
+          .withPartSize(size);
+    }
+
+    /**
+     * The toString method is intended to be used in logging/toString calls.
+     * @return a string description.
+     */
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(
+          "{bucket=").append(bucket);
+      sb.append(", key='").append(key).append('\'');
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 26b5b51..963c53f 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.fs.s3a;
 
-import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.metrics2.MetricStringBuilder;
@@ -29,10 +31,12 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.apache.hadoop.metrics2.lib.MutableMetric;
 
+import java.io.Closeable;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.hadoop.fs.s3a.Statistic.*;
 
@@ -50,6 +54,9 @@ import static org.apache.hadoop.fs.s3a.Statistic.*;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class S3AInstrumentation {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      S3AInstrumentation.class);
+
   public static final String CONTEXT = "S3AFileSystem";
   private final MetricsRegistry registry =
       new MetricsRegistry("S3AFileSystem").setContext(CONTEXT);
@@ -100,7 +107,23 @@ public class S3AInstrumentation {
       OBJECT_METADATA_REQUESTS,
       OBJECT_MULTIPART_UPLOAD_ABORTED,
       OBJECT_PUT_BYTES,
-      OBJECT_PUT_REQUESTS
+      OBJECT_PUT_REQUESTS,
+      OBJECT_PUT_REQUESTS_COMPLETED,
+      STREAM_WRITE_FAILURES,
+      STREAM_WRITE_BLOCK_UPLOADS,
+      STREAM_WRITE_BLOCK_UPLOADS_COMMITTED,
+      STREAM_WRITE_BLOCK_UPLOADS_ABORTED,
+      STREAM_WRITE_TOTAL_TIME,
+      STREAM_WRITE_TOTAL_DATA,
+  };
+
+
+  private static final Statistic[] GAUGES_TO_CREATE = {
+      OBJECT_PUT_REQUESTS_ACTIVE,
+      OBJECT_PUT_BYTES_PENDING,
+      STREAM_WRITE_BLOCK_UPLOADS_ACTIVE,
+      STREAM_WRITE_BLOCK_UPLOADS_PENDING,
+      STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING,
   };
 
   public S3AInstrumentation(URI name) {
@@ -143,6 +166,9 @@ public class S3AInstrumentation {
     for (Statistic statistic : COUNTERS_TO_CREATE) {
       counter(statistic);
     }
+    for (Statistic statistic : GAUGES_TO_CREATE) {
+      gauge(statistic.getSymbol(), statistic.getDescription());
+    }
   }
 
   /**
@@ -254,13 +280,13 @@ public class S3AInstrumentation {
    * Lookup a counter by name. Return null if it is not known.
    * @param name counter name
    * @return the counter
+   * @throws IllegalStateException if the metric is not a counter
    */
   private MutableCounterLong lookupCounter(String name) {
     MutableMetric metric = lookupMetric(name);
     if (metric == null) {
       return null;
     }
-    Preconditions.checkNotNull(metric, "not found: " + name);
     if (!(metric instanceof MutableCounterLong)) {
       throw new IllegalStateException("Metric " + name
           + " is not a MutableCounterLong: " + metric);
@@ -269,6 +295,20 @@ public class S3AInstrumentation {
   }
 
   /**
+   * Look up a gauge.
+   * @param name gauge name
+   * @return the gauge or null
+   * @throws ClassCastException if the metric is not a Gauge.
+   */
+  public MutableGaugeLong lookupGauge(String name) {
+    MutableMetric metric = lookupMetric(name);
+    if (metric == null) {
+      LOG.debug("No gauge {}", name);
+    }
+    return (MutableGaugeLong) metric;
+  }
+
+  /**
    * Look up a metric from both the registered set and the lighter weight
    * stream entries.
    * @param name metric name
@@ -349,6 +389,47 @@ public class S3AInstrumentation {
       counter.incr(count);
     }
   }
+  /**
+   * Increment a specific counter.
+   * No-op if not defined.
+   * @param op operation
+   * @param count atomic long containing value
+   */
+  public void incrementCounter(Statistic op, AtomicLong count) {
+    incrementCounter(op, count.get());
+  }
+
+  /**
+   * Increment a specific gauge.
+   * No-op if not defined.
+   * @param op operation
+   * @param count increment value
+   * @throws ClassCastException if the metric is of the wrong type
+   */
+  public void incrementGauge(Statistic op, long count) {
+    MutableGaugeLong gauge = lookupGauge(op.getSymbol());
+    if (gauge != null) {
+      gauge.incr(count);
+    } else {
+      LOG.debug("No Gauge: "+ op);
+    }
+  }
+
+  /**
+   * Decrement a specific gauge.
+   * No-op if not defined.
+   * @param op operation
+   * @param count increment value
+   * @throws ClassCastException if the metric is of the wrong type
+   */
+  public void decrementGauge(Statistic op, long count) {
+    MutableGaugeLong gauge = lookupGauge(op.getSymbol());
+    if (gauge != null) {
+      gauge.decr(count);
+    } else {
+      LOG.debug("No Gauge: " + op);
+    }
+  }
 
   /**
    * Create a stream input statistics instance.
@@ -553,4 +634,165 @@ public class S3AInstrumentation {
       return sb.toString();
     }
   }
+
+  /**
+   * Create a stream output statistics instance.
+   * @return the new instance
+   */
+
+  OutputStreamStatistics newOutputStreamStatistics() {
+    return new OutputStreamStatistics();
+  }
+
+  /**
+   * Merge in the statistics of a single output stream into
+   * the filesystem-wide statistics.
+   * @param statistics stream statistics
+   */
+  private void mergeOutputStreamStatistics(OutputStreamStatistics statistics) {
+    incrementCounter(STREAM_WRITE_TOTAL_TIME, 
statistics.totalUploadDuration());
+    incrementCounter(STREAM_WRITE_QUEUE_DURATION, statistics.queueDuration);
+    incrementCounter(STREAM_WRITE_TOTAL_DATA, statistics.bytesUploaded);
+    incrementCounter(STREAM_WRITE_BLOCK_UPLOADS,
+        statistics.blockUploadsCompleted);
+  }
+
+  /**
+   * Statistics updated by an output stream during its actual operation.
+   * Some of these stats may be relayed. However, as block upload is
+   * spans multiple
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public final class OutputStreamStatistics implements Closeable {
+    private final AtomicLong blocksSubmitted = new AtomicLong(0);
+    private final AtomicLong blocksInQueue = new AtomicLong(0);
+    private final AtomicLong blocksActive = new AtomicLong(0);
+    private final AtomicLong blockUploadsCompleted = new AtomicLong(0);
+    private final AtomicLong blockUploadsFailed = new AtomicLong(0);
+    private final AtomicLong bytesPendingUpload = new AtomicLong(0);
+
+    private final AtomicLong bytesUploaded = new AtomicLong(0);
+    private final AtomicLong transferDuration = new AtomicLong(0);
+    private final AtomicLong queueDuration = new AtomicLong(0);
+    private final AtomicLong exceptionsInMultipartFinalize = new AtomicLong(0);
+
+    /**
+     * Block is queued for upload.
+     */
+    void blockUploadQueued(int blockSize) {
+      blocksSubmitted.incrementAndGet();
+      blocksInQueue.incrementAndGet();
+      bytesPendingUpload.addAndGet(blockSize);
+      incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1);
+      incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, blockSize);
+    }
+
+    /** Queued block has been scheduled for upload. */
+    void blockUploadStarted(long duration, int blockSize) {
+      queueDuration.addAndGet(duration);
+      blocksInQueue.decrementAndGet();
+      blocksActive.incrementAndGet();
+      incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_PENDING, -1);
+      incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, 1);
+    }
+
+    /** A block upload has completed. */
+    void blockUploadCompleted(long duration, int blockSize) {
+      this.transferDuration.addAndGet(duration);
+      incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1);
+      blocksActive.decrementAndGet();
+      blockUploadsCompleted.incrementAndGet();
+    }
+
+    /**
+     *  A block upload has failed.
+     *  A final transfer completed event is still expected, so this
+     *  does not decrement the active block counter.
+     */
+    void blockUploadFailed(long duration, int blockSize) {
+      blockUploadsFailed.incrementAndGet();
+    }
+
+    /** Intermediate report of bytes uploaded. */
+    void bytesTransferred(long byteCount) {
+      bytesUploaded.addAndGet(byteCount);
+      bytesPendingUpload.addAndGet(-byteCount);
+      incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, -byteCount);
+    }
+
+    /**
+     * Note an exception in a multipart complete.
+     */
+    void exceptionInMultipartComplete() {
+      exceptionsInMultipartFinalize.incrementAndGet();
+    }
+
+    /**
+     * Note an exception in a multipart abort.
+     */
+    void exceptionInMultipartAbort() {
+      exceptionsInMultipartFinalize.incrementAndGet();
+    }
+
+    /**
+     * Get the number of bytes pending upload.
+     * @return the number of bytes in the pending upload state.
+     */
+    public long getBytesPendingUpload() {
+      return bytesPendingUpload.get();
+    }
+
+    /**
+     * Output stream has closed.
+     * Trigger merge in of all statistics not updated during operation.
+     */
+    @Override
+    public void close() {
+      if (bytesPendingUpload.get() > 0) {
+        LOG.warn("Closing output stream statistics while data is still marked" 
+
+            " as pending upload in {}", this);
+      }
+      mergeOutputStreamStatistics(this);
+    }
+
+    long averageQueueTime() {
+      return blocksSubmitted.get() > 0 ?
+          (queueDuration.get() / blocksSubmitted.get()) : 0;
+    }
+
+    double effectiveBandwidth() {
+      double duration = totalUploadDuration() / 1000.0;
+      return duration > 0 ?
+          (bytesUploaded.get() / duration) : 0;
+    }
+
+    long totalUploadDuration() {
+      return queueDuration.get() + transferDuration.get();
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(
+          "OutputStreamStatistics{");
+      sb.append("blocksSubmitted=").append(blocksSubmitted);
+      sb.append(", blocksInQueue=").append(blocksInQueue);
+      sb.append(", blocksActive=").append(blocksActive);
+      sb.append(", blockUploadsCompleted=").append(blockUploadsCompleted);
+      sb.append(", blockUploadsFailed=").append(blockUploadsFailed);
+      sb.append(", bytesPendingUpload=").append(bytesPendingUpload);
+      sb.append(", bytesUploaded=").append(bytesUploaded);
+      sb.append(", exceptionsInMultipartFinalize=").append(
+          exceptionsInMultipartFinalize);
+      sb.append(", transferDuration=").append(transferDuration).append(" ms");
+      sb.append(", queueDuration=").append(queueDuration).append(" ms");
+      sb.append(", averageQueueTime=").append(averageQueueTime()).append(" 
ms");
+      sb.append(", totalUploadDuration=").append(totalUploadDuration())
+          .append(" ms");
+      sb.append(", effectiveBandwidth=").append(effectiveBandwidth())
+          .append(" bytes/s");
+      sb.append('}');
+      return sb.toString();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
index 23ba682..6ebc9e4 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -35,8 +35,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 
 /**
@@ -45,37 +45,27 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class S3AOutputStream extends OutputStream {
-  private OutputStream backupStream;
-  private File backupFile;
-  private boolean closed;
-  private String key;
-  private Progressable progress;
-  private long partSize;
-  private long partSizeThreshold;
-  private S3AFileSystem fs;
-  private LocalDirAllocator lDirAlloc;
+  private final OutputStream backupStream;
+  private final File backupFile;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+  private final String key;
+  private final Progressable progress;
+  private final S3AFileSystem fs;
 
   public static final Logger LOG = S3AFileSystem.LOG;
 
   public S3AOutputStream(Configuration conf,
-      S3AFileSystem fs, String key, Progressable progress)
+      S3AFileSystem fs,
+      String key,
+      Progressable progress)
       throws IOException {
     this.key = key;
     this.progress = progress;
     this.fs = fs;
 
-    partSize = fs.getPartitionSize();
-    partSizeThreshold = fs.getMultiPartThreshold();
-
-    if (conf.get(BUFFER_DIR, null) != null) {
-      lDirAlloc = new LocalDirAllocator(BUFFER_DIR);
-    } else {
-      lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a");
-    }
 
-    backupFile = lDirAlloc.createTmpFileForWrite("output-",
+    backupFile = fs.createTmpFileForWrite("output-",
         LocalDirAllocator.SIZE_UNKNOWN, conf);
-    closed = false;
 
     LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
         key, backupFile);
@@ -84,25 +74,33 @@ public class S3AOutputStream extends OutputStream {
         new FileOutputStream(backupFile));
   }
 
+  /**
+   * Check for the filesystem being open.
+   * @throws IOException if the filesystem is closed.
+   */
+  void checkOpen() throws IOException {
+    if (closed.get()) {
+      throw new IOException("Output Stream closed");
+    }
+  }
+
   @Override
   public void flush() throws IOException {
+    checkOpen();
     backupStream.flush();
   }
 
   @Override
-  public synchronized void close() throws IOException {
-    if (closed) {
+  public void close() throws IOException {
+    if (closed.getAndSet(true)) {
       return;
     }
 
     backupStream.close();
     LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key);
-    LOG.debug("Minimum upload part size: {} threshold {}" , partSize,
-        partSizeThreshold);
-
 
     try {
-      final ObjectMetadata om = fs.newObjectMetadata();
+      final ObjectMetadata om = fs.newObjectMetadata(backupFile.length());
       Upload upload = fs.putObject(
           fs.newPutObjectRequest(
               key,
@@ -126,18 +124,19 @@ public class S3AOutputStream extends OutputStream {
         LOG.warn("Could not delete temporary s3a file: {}", backupFile);
       }
       super.close();
-      closed = true;
     }
     LOG.debug("OutputStream for key '{}' upload complete", key);
   }
 
   @Override
   public void write(int b) throws IOException {
+    checkOpen();
     backupStream.write(b);
   }
 
   @Override
   public void write(byte[] b, int off, int len) throws IOException {
+    checkOpen();
     backupStream.write(b, off, len);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 93d819b..c89f690 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ExecutionException;
 import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.AWS_CREDENTIALS_PROVIDER;
 import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
 import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
 
 /**
@@ -460,4 +461,42 @@ public final class S3AUtils {
             key, v, min));
     return v;
   }
+
+  /**
+   * Get a size property from the configuration: this property must
+   * be at least equal to {@link Constants#MULTIPART_MIN_SIZE}.
+   * If it is too small, it is rounded up to that minimum, and a warning
+   * printed.
+   * @param conf configuration
+   * @param property property name
+   * @param defVal default value
+   * @return the value, guaranteed to be above the minimum size
+   */
+  public static long getMultipartSizeProperty(Configuration conf,
+      String property, long defVal) {
+    long partSize = conf.getLong(property, defVal);
+    if (partSize < MULTIPART_MIN_SIZE) {
+      LOG.warn("{} must be at least 5 MB; configured value is {}",
+          property, partSize);
+      partSize = MULTIPART_MIN_SIZE;
+    }
+    return partSize;
+  }
+
+  /**
+   * Ensure that the long value is in the range of an integer.
+   * @param name property name for error messages
+   * @param size original size
+   * @return the size, guaranteed to be less than or equal to the max
+   * value of an integer.
+   */
+  public static int ensureOutputParameterInRange(String name, long size) {
+    if (size > Integer.MAX_VALUE) {
+      LOG.warn("s3a: {} capped to ~2.14GB" +
+          " (maximum allowed size with current output mechanism)", name);
+      return Integer.MAX_VALUE;
+    } else {
+      return (int)size;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java
new file mode 100644
index 0000000..6b21912
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This ExecutorService blocks the submission of new tasks when its queue is
+ * already full by using a semaphore. Task submissions require permits, task
+ * completions release permits.
+ * <p>
+ * This is a refactoring of {@link BlockingThreadPoolExecutorService}; that 
code
+ * contains the thread pool logic, whereas this isolates the semaphore
+ * and submit logic for use with other thread pools and delegation models.
+ * In particular, it <i>permits multiple per stream executors to share a
+ * single per-FS-instance executor; the latter to throttle overall
+ * load from the the FS, the others to limit the amount of load which
+ * a single output stream can generate.</i>
+ * <p>
+ * This is inspired by <a 
href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java";>
+ * this s4 threadpool</a>
+ */
+@SuppressWarnings("NullableProblems")
+@InterfaceAudience.Private
+class SemaphoredDelegatingExecutor extends
+    ForwardingListeningExecutorService {
+
+  private final Semaphore queueingPermits;
+  private final ListeningExecutorService executorDelegatee;
+  private final int permitCount;
+
+  /**
+   * Instantiate.
+   * @param executorDelegatee Executor to delegate to
+   * @param permitCount number of permits into the queue permitted
+   * @param fair should the semaphore be "fair"
+   */
+  SemaphoredDelegatingExecutor(ListeningExecutorService executorDelegatee,
+      int permitCount,
+      boolean fair) {
+    this.permitCount = permitCount;
+    queueingPermits = new Semaphore(permitCount, fair);
+    this.executorDelegatee = executorDelegatee;
+  }
+
+  @Override
+  protected ListeningExecutorService delegate() {
+    return executorDelegatee;
+  }
+
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+      long timeout, TimeUnit unit) throws InterruptedException {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException, ExecutionException {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
+      TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public <T> ListenableFuture<T> submit(Callable<T> task) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return Futures.immediateFailedCheckedFuture(e);
+    }
+    return super.submit(new CallableWithPermitRelease<>(task));
+  }
+
+  @Override
+  public <T> ListenableFuture<T> submit(Runnable task, T result) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return Futures.immediateFailedCheckedFuture(e);
+    }
+    return super.submit(new RunnableWithPermitRelease(task), result);
+  }
+
+  @Override
+  public ListenableFuture<?> submit(Runnable task) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return Futures.immediateFailedCheckedFuture(e);
+    }
+    return super.submit(new RunnableWithPermitRelease(task));
+  }
+
+  @Override
+  public void execute(Runnable command) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    super.execute(new RunnableWithPermitRelease(command));
+  }
+
+  /**
+   * Get the number of permits available; guaranteed to be
+   * {@code 0 <= availablePermits <= size}.
+   * @return the number of permits available at the time of invocation.
+   */
+  public int getAvailablePermits() {
+    return queueingPermits.availablePermits();
+  }
+
+  /**
+   * Get the number of threads waiting to acquire a permit.
+   * @return snapshot of the length of the queue of blocked threads.
+   */
+  public int getWaitingCount() {
+    return queueingPermits.getQueueLength();
+  }
+
+  /**
+   * Total number of permits.
+   * @return the number of permits as set in the constructor
+   */
+  public int getPermitCount() {
+    return permitCount;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "SemaphoredDelegatingExecutor{");
+    sb.append("permitCount=").append(getPermitCount());
+    sb.append(", available=").append(getAvailablePermits());
+    sb.append(", waiting=").append(getWaitingCount());
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Releases a permit after the task is executed.
+   */
+  class RunnableWithPermitRelease implements Runnable {
+
+    private Runnable delegatee;
+
+    public RunnableWithPermitRelease(Runnable delegatee) {
+      this.delegatee = delegatee;
+    }
+
+    @Override
+    public void run() {
+      try {
+        delegatee.run();
+      } finally {
+        queueingPermits.release();
+      }
+
+    }
+  }
+
+  /**
+   * Releases a permit after the task is completed.
+   */
+  class CallableWithPermitRelease<T> implements Callable<T> {
+
+    private Callable<T> delegatee;
+
+    public CallableWithPermitRelease(Callable<T> delegatee) {
+      this.delegatee = delegatee;
+    }
+
+    @Override
+    public T call() throws Exception {
+      try {
+        return delegatee.call();
+      } finally {
+        queueingPermits.release();
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index d84a355..36ec50b 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -81,10 +81,16 @@ public enum Statistic {
       "Object multipart upload aborted"),
   OBJECT_PUT_REQUESTS("object_put_requests",
       "Object put/multipart upload count"),
+  OBJECT_PUT_REQUESTS_COMPLETED("object_put_requests_completed",
+      "Object put/multipart upload completed count"),
+  OBJECT_PUT_REQUESTS_ACTIVE("object_put_requests_active",
+      "Current number of active put requests"),
   OBJECT_PUT_BYTES("object_put_bytes", "number of bytes uploaded"),
+  OBJECT_PUT_BYTES_PENDING("object_put_bytes_pending",
+      "number of bytes queued for upload/being actively uploaded"),
   STREAM_ABORTED("stream_aborted",
       "Count of times the TCP stream was aborted"),
-  STREAM_BACKWARD_SEEK_OPERATIONS("stream_backward_seek_pperations",
+  STREAM_BACKWARD_SEEK_OPERATIONS("stream_backward_seek_operations",
       "Number of executed seek operations which went backwards in a stream"),
   STREAM_CLOSED("streamClosed", "Count of times the TCP stream was closed"),
   STREAM_CLOSE_OPERATIONS("stream_close_operations",
@@ -112,7 +118,29 @@ public enum Statistic {
   STREAM_CLOSE_BYTES_READ("stream_bytes_read_in_close",
       "Count of bytes read when closing streams during seek operations."),
   STREAM_ABORT_BYTES_DISCARDED("stream_bytes_discarded_in_abort",
-      "Count of bytes discarded by aborting the stream");
+      "Count of bytes discarded by aborting the stream"),
+  STREAM_WRITE_FAILURES("stream_write_failures",
+      "Count of stream write failures reported"),
+  STREAM_WRITE_BLOCK_UPLOADS("stream_write_block_uploads",
+      "Count of block/partition uploads completed"),
+  STREAM_WRITE_BLOCK_UPLOADS_ACTIVE("stream_write_block_uploads_active",
+      "Count of block/partition uploads completed"),
+  STREAM_WRITE_BLOCK_UPLOADS_COMMITTED("stream_write_block_uploads_committed",
+      "Count of number of block uploads committed"),
+  STREAM_WRITE_BLOCK_UPLOADS_ABORTED("stream_write_block_uploads_aborted",
+      "Count of number of block uploads aborted"),
+
+  STREAM_WRITE_BLOCK_UPLOADS_PENDING("stream_write_block_uploads_pending",
+      "Gauge of block/partitions uploads queued to be written"),
+  STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING(
+      "stream_write_block_uploads_data_pending",
+      "Gauge of block/partitions data uploads queued to be written"),
+  STREAM_WRITE_TOTAL_TIME("stream_write_total_time",
+      "Count of total time taken for uploads to complete"),
+  STREAM_WRITE_TOTAL_DATA("stream_write_total_data",
+      "Count of total data uploaded in block output"),
+  STREAM_WRITE_QUEUE_DURATION("stream_write_queue_duration",
+      "Total queue duration of all block uploads");
 
   private static final Map<String, Statistic> SYMBOL_MAP =
       new HashMap<>(Statistic.values().length);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to