FileBasedSink: improve parallelism in GCS copy/remove

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f08f21cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f08f21cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f08f21cd

Branch: refs/heads/master
Commit: f08f21cdf4c067745a10b31a6481ed470f97dadc
Parents: 064796d
Author: Dan Halperin <dhalp...@google.com>
Authored: Sun Aug 14 23:08:21 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Tue Aug 23 09:13:40 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/GcsUtil.java  | 241 +++++++++----------
 .../org/apache/beam/sdk/util/GcsUtilTest.java   |  69 ++++++
 2 files changed, 178 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f08f21cd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 4e9ee6e..06685e5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -30,11 +30,9 @@ import 
com.google.api.client.googleapis.batch.json.JsonBatchCallback;
 import com.google.api.client.googleapis.json.GoogleJsonError;
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.client.http.HttpHeaders;
-import com.google.api.client.http.HttpRequestInitializer;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.StorageRequest;
 import com.google.api.services.storage.model.Objects;
 import com.google.api.services.storage.model.StorageObject;
 import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
@@ -47,6 +45,11 @@ import com.google.cloud.hadoop.util.ResilientOperation;
 import com.google.cloud.hadoop.util.RetryDeterminer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,12 +62,16 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import javax.annotation.Nullable;
-import javax.annotation.concurrent.NotThreadSafe;
 
 /**
  * Provides operations on GCS.
@@ -110,7 +117,11 @@ public class GcsUtil {
   /**
    * Maximum number of requests permitted in a GCS batch request.
    */
-  private static final int MAX_REQUESTS_PER_BATCH = 1000;
+  private static final int MAX_REQUESTS_PER_BATCH = 100;
+  /**
+   * Maximum number of concurrent batches of requests executing on GCS.
+   */
+  private static final int MAX_CONCURRENT_BATCHES = 256;
 
   /////////////////////////////////////////////////////////////////////////////
 
@@ -125,7 +136,6 @@ public class GcsUtil {
   // Exposed for testing.
   final ExecutorService executorService;
 
-  private final BatchHelper batchHelper;
   /**
    * Returns true if the given GCS pattern is supported otherwise fails with an
    * exception.
@@ -145,8 +155,6 @@ public class GcsUtil {
     this.storageClient = storageClient;
     this.uploadBufferSizeBytes = uploadBufferSizeBytes;
     this.executorService = executorService;
-    this.batchHelper = new BatchHelper(
-        storageClient.getRequestFactory().getInitializer(), storageClient, 
MAX_REQUESTS_PER_BATCH);
   }
 
   // Use this only for testing purposes.
@@ -372,154 +380,123 @@ public class GcsUtil {
      }
   }
 
+  private static void executeBatches(List<BatchRequest> batches) throws 
IOException {
+    ListeningExecutorService executor = MoreExecutors.listeningDecorator(
+        MoreExecutors.getExitingExecutorService(
+            new ThreadPoolExecutor(MAX_CONCURRENT_BATCHES, 
MAX_CONCURRENT_BATCHES,
+                0L, TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<Runnable>())));
+
+    List<ListenableFuture<Void>> futures = new LinkedList<>();
+    for (final BatchRequest batch : batches) {
+      futures.add(executor.submit(new Callable<Void>() {
+        public Void call() throws IOException {
+          batch.execute();
+          return null;
+        }
+      }));
+    }
+
+    try {
+      Futures.allAsList(futures).get();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException("Interrupted while executing batch GCS request", 
e);
+    } catch (ExecutionException e) {
+      throw new IOException("Error executing batch GCS request", e);
+    } finally {
+      executor.shutdown();
+    }
+  }
+
   public void copy(List<String> srcFilenames, List<String> destFilenames) 
throws IOException {
+    executeBatches(makeCopyBatches(srcFilenames, destFilenames));
+  }
+
+  List<BatchRequest> makeCopyBatches(List<String> srcFilenames, List<String> 
destFilenames)
+      throws IOException {
     checkArgument(
         srcFilenames.size() == destFilenames.size(),
         "Number of source files %s must equal number of destination files %s",
         srcFilenames.size(),
         destFilenames.size());
+
+    List<BatchRequest> batches = new LinkedList<>();
+    BatchRequest batch = storageClient.batch();
     for (int i = 0; i < srcFilenames.size(); i++) {
       final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i));
       final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i));
-      LOG.debug("Copying {} to {}", sourcePath, destPath);
-      Storage.Objects.Copy copyObject = 
storageClient.objects().copy(sourcePath.getBucket(),
-          sourcePath.getObject(), destPath.getBucket(), destPath.getObject(), 
null);
-      batchHelper.queue(copyObject, new JsonBatchCallback<StorageObject>() {
-        @Override
-        public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) {
-          LOG.debug("Successfully copied {} to {}", sourcePath, destPath);
-        }
-
-        @Override
-        public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) 
throws IOException {
-          // Do nothing on item not found.
-          if (!errorExtractor.itemNotFound(e)) {
-            throw new IOException(e.toString());
-          }
-          LOG.debug("{} does not exist.", sourcePath);
-        }
-      });
+      enqueueCopy(sourcePath, destPath, batch);
+      if (batch.size() >= MAX_REQUESTS_PER_BATCH) {
+        batches.add(batch);
+        batch = storageClient.batch();
+      }
     }
-    batchHelper.flush();
-  }
-
-  public void remove(Collection<String> filenames) throws IOException {
-    for (String filename : filenames) {
-      final GcsPath path = GcsPath.fromUri(filename);
-      LOG.debug("Removing: " + path);
-      Storage.Objects.Delete deleteObject =
-          storageClient.objects().delete(path.getBucket(), path.getObject());
-      batchHelper.queue(deleteObject, new JsonBatchCallback<Void>() {
-        @Override
-        public void onSuccess(Void obj, HttpHeaders responseHeaders) throws 
IOException {
-          LOG.debug("Successfully removed {}", path);
-        }
-
-        @Override
-        public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) 
throws IOException {
-          // Do nothing on item not found.
-          if (!errorExtractor.itemNotFound(e)) {
-            throw new IOException(e.toString());
-          }
-          LOG.debug("{} does not exist.", path);
-        }
-      });
+    if (batch.size() > 0) {
+      batches.add(batch);
     }
-    batchHelper.flush();
+    return batches;
   }
 
-  /**
-   * BatchHelper abstracts out the logic for the maximum requests per batch 
for GCS.
-   *
-   * <p>Copy of
-   * 
https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/src/main/java/com/google/cloud/hadoop/gcsio/BatchHelper.java
-   *
-   * <p>Copied to prevent Dataflow from depending on the Hadoop-related 
dependencies that are not
-   * used in Dataflow.  Hadoop-related dependencies will be removed from the 
Google Cloud Storage
-   * Connector 
(https://cloud.google.com/hadoop/google-cloud-storage-connector) so that this 
project
-   * and others may use the connector without introducing unnecessary 
dependencies.
-   *
-   * <p>This class is not thread-safe; create a new BatchHelper instance per 
single-threaded logical
-   * grouping of requests.
-   */
-  @NotThreadSafe
-  private static class BatchHelper {
-    /**
-     * Callback that causes a single StorageRequest to be added to the 
BatchRequest.
-     */
-    protected static interface QueueRequestCallback {
-      void enqueue() throws IOException;
+  List<BatchRequest> makeRemoveBatches(Collection<String> filenames) throws 
IOException {
+    List<BatchRequest> batches = new LinkedList<>();
+    for (List<String> filesToDelete :
+        Lists.partition(Lists.newArrayList(filenames), 
MAX_REQUESTS_PER_BATCH)) {
+      BatchRequest batch = storageClient.batch();
+      for (String file : filesToDelete) {
+        enqueueDelete(GcsPath.fromUri(file), batch);
+      }
+      batches.add(batch);
     }
+    return batches;
+  }
 
-    private final List<QueueRequestCallback> pendingBatchEntries;
-    private final BatchRequest batch;
-
-    // Number of requests that can be queued into a single actual HTTP request
-    // before a sub-batch is sent.
-    private final long maxRequestsPerBatch;
-
-    // Flag that indicates whether there is an in-progress flush.
-    private boolean flushing = false;
+  public void remove(Collection<String> filenames) throws IOException {
+    executeBatches(makeRemoveBatches(filenames));
+  }
 
-    /**
-     * Primary constructor, generally accessed only via the inner Factory 
class.
-     */
-    public BatchHelper(
-        HttpRequestInitializer requestInitializer, Storage gcs, long 
maxRequestsPerBatch) {
-      this.pendingBatchEntries = new LinkedList<>();
-      this.batch = gcs.batch(requestInitializer);
-      this.maxRequestsPerBatch = maxRequestsPerBatch;
-    }
+  private void enqueueCopy(final GcsPath from, final GcsPath to, BatchRequest 
batch)
+      throws IOException {
+    Storage.Objects.Copy copyRequest = storageClient.objects()
+        .copy(from.getBucket(), from.getObject(), to.getBucket(), 
to.getObject(), null);
+    copyRequest.queue(batch, new JsonBatchCallback<StorageObject>() {
+      @Override
+      public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) {
+        LOG.debug("Successfully copied {} to {}", from, to);
+      }
 
-    /**
-     * Adds an additional request to the batch, and possibly flushes the 
current contents of the
-     * batch if {@code maxRequestsPerBatch} has been reached.
-     */
-    public <T> void queue(final StorageRequest<T> req, final 
JsonBatchCallback<T> callback)
-        throws IOException {
-      QueueRequestCallback queueCallback = new QueueRequestCallback() {
-        @Override
-        public void enqueue() throws IOException {
-          req.queue(batch, callback);
+      @Override
+      public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) 
throws IOException {
+        if (errorExtractor.itemNotFound(e)) {
+          // Do nothing on item not found.
+          LOG.debug("{} does not exist, assuming this is a retry after 
deletion.", from);
+          return;
         }
-      };
-      pendingBatchEntries.add(queueCallback);
-
-      flushIfPossibleAndRequired();
-    }
-
-    // Flush our buffer if we have more pending entries than 
maxRequestsPerBatch
-    private void flushIfPossibleAndRequired() throws IOException {
-      if (pendingBatchEntries.size() > maxRequestsPerBatch) {
-        flushIfPossible();
+        throw new IOException(
+            String.format("Error trying to copy %s to %s: %s", from, to, e));
       }
-    }
+    });
+  }
 
-    // Flush our buffer if we are not already in a flush operation and we have 
data to flush.
-    private void flushIfPossible() throws IOException {
-      if (!flushing && pendingBatchEntries.size() > 0) {
-        flushing = true;
-        try {
-          while (batch.size() < maxRequestsPerBatch && 
pendingBatchEntries.size() > 0) {
-            QueueRequestCallback head = pendingBatchEntries.remove(0);
-            head.enqueue();
-          }
+  private void enqueueDelete(final GcsPath file, BatchRequest batch) throws 
IOException {
+    Storage.Objects.Delete deleteRequest = storageClient.objects()
+        .delete(file.getBucket(), file.getObject());
+    deleteRequest.queue(batch, new JsonBatchCallback<Void>() {
+      @Override
+      public void onSuccess(Void obj, HttpHeaders responseHeaders) {
+        LOG.debug("Successfully deleted {}", file);
+      }
 
-          batch.execute();
-        } finally {
-          flushing = false;
+      @Override
+      public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) 
throws IOException {
+        if (errorExtractor.itemNotFound(e)) {
+          // Do nothing on item not found.
+          LOG.debug("{} does not exist.", file);
+          return;
         }
+        throw new IOException(String.format("Error trying to delete %s: %s", 
file, e));
       }
-    }
-
-
-    /**
-     * Sends any currently remaining requests in the batch; should be called 
at the end of any
-     * series of batched requests to ensure everything has been sent.
-     */
-    public void flush() throws IOException {
-      flushIfPossible();
-    }
+    });
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f08f21cd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
index 49c7bc4..997340a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.util;
 
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -32,6 +34,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 
+import com.google.api.client.googleapis.batch.BatchRequest;
 import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.client.http.HttpRequest;
@@ -490,4 +493,70 @@ public class GcsUtilTest {
     HttpResponse response = request.execute();
     return GoogleJsonResponseException.from(jsonFactory, response);
   }
+
+  private static List<String> makeStrings(String s, int n) {
+    ImmutableList.Builder<String> ret = ImmutableList.builder();
+    for (int i = 0; i < n; ++i) {
+      ret.add(String.format("gs://bucket/%s%d", s, i));
+    }
+    return ret.build();
+  }
+
+  private static int sumBatchSizes(List<BatchRequest> batches) {
+    int ret = 0;
+    for (BatchRequest b : batches) {
+      ret += b.size();
+      assertThat(b.size(), greaterThan(0));
+    }
+    return ret;
+  }
+
+  @Test
+  public void testMakeCopyBatches() throws IOException {
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+    // Small number of files fits in 1 batch
+    List<BatchRequest> batches = gcsUtil.makeCopyBatches(makeStrings("s", 3), 
makeStrings("d", 3));
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(3));
+
+    // 1 batch of files fits in 1 batch
+    batches = gcsUtil.makeCopyBatches(makeStrings("s", 100), makeStrings("d", 
100));
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(100));
+
+    // A little more than 5 batches of files fits in 6 batches
+    batches = gcsUtil.makeCopyBatches(makeStrings("s", 501), makeStrings("d", 
501));
+    assertThat(batches.size(), equalTo(6));
+    assertThat(sumBatchSizes(batches), equalTo(501));
+  }
+
+  @Test
+  public void testInvalidCopyBatches() throws IOException {
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Number of source files 3");
+
+    gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 1));
+  }
+
+  @Test
+  public void testMakeRemoveBatches() throws IOException {
+    GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
+
+    // Small number of files fits in 1 batch
+    List<BatchRequest> batches = gcsUtil.makeRemoveBatches(makeStrings("s", 
3));
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(3));
+
+    // 1 batch of files fits in 1 batch
+    batches = gcsUtil.makeRemoveBatches(makeStrings("s", 100));
+    assertThat(batches.size(), equalTo(1));
+    assertThat(sumBatchSizes(batches), equalTo(100));
+
+    // A little more than 5 batches of files fits in 6 batches
+    batches = gcsUtil.makeRemoveBatches(makeStrings("s", 501));
+    assertThat(batches.size(), equalTo(6));
+    assertThat(sumBatchSizes(batches), equalTo(501));
+  }
 }

Reply via email to