FileBasedSink: improve parallelism in GCS copy/remove
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f08f21cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f08f21cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f08f21cd Branch: refs/heads/master Commit: f08f21cdf4c067745a10b31a6481ed470f97dadc Parents: 064796d Author: Dan Halperin <dhalp...@google.com> Authored: Sun Aug 14 23:08:21 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue Aug 23 09:13:40 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/util/GcsUtil.java | 241 +++++++++---------- .../org/apache/beam/sdk/util/GcsUtilTest.java | 69 ++++++ 2 files changed, 178 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f08f21cd/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index 4e9ee6e..06685e5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -30,11 +30,9 @@ import com.google.api.client.googleapis.batch.json.JsonBatchCallback; import com.google.api.client.googleapis.json.GoogleJsonError; import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.http.HttpHeaders; -import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.storage.Storage; -import com.google.api.services.storage.StorageRequest; import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; @@ -47,6 +45,11 @@ import com.google.cloud.hadoop.util.ResilientOperation; import com.google.cloud.hadoop.util.RetryDeterminer; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,12 +62,16 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; /** * Provides operations on GCS. @@ -110,7 +117,11 @@ public class GcsUtil { /** * Maximum number of requests permitted in a GCS batch request. */ - private static final int MAX_REQUESTS_PER_BATCH = 1000; + private static final int MAX_REQUESTS_PER_BATCH = 100; + /** + * Maximum number of concurrent batches of requests executing on GCS. + */ + private static final int MAX_CONCURRENT_BATCHES = 256; ///////////////////////////////////////////////////////////////////////////// @@ -125,7 +136,6 @@ public class GcsUtil { // Exposed for testing. final ExecutorService executorService; - private final BatchHelper batchHelper; /** * Returns true if the given GCS pattern is supported otherwise fails with an * exception. @@ -145,8 +155,6 @@ public class GcsUtil { this.storageClient = storageClient; this.uploadBufferSizeBytes = uploadBufferSizeBytes; this.executorService = executorService; - this.batchHelper = new BatchHelper( - storageClient.getRequestFactory().getInitializer(), storageClient, MAX_REQUESTS_PER_BATCH); } // Use this only for testing purposes. @@ -372,154 +380,123 @@ public class GcsUtil { } } + private static void executeBatches(List<BatchRequest> batches) throws IOException { + ListeningExecutorService executor = MoreExecutors.listeningDecorator( + MoreExecutors.getExitingExecutorService( + new ThreadPoolExecutor(MAX_CONCURRENT_BATCHES, MAX_CONCURRENT_BATCHES, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>()))); + + List<ListenableFuture<Void>> futures = new LinkedList<>(); + for (final BatchRequest batch : batches) { + futures.add(executor.submit(new Callable<Void>() { + public Void call() throws IOException { + batch.execute(); + return null; + } + })); + } + + try { + Futures.allAsList(futures).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while executing batch GCS request", e); + } catch (ExecutionException e) { + throw new IOException("Error executing batch GCS request", e); + } finally { + executor.shutdown(); + } + } + public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException { + executeBatches(makeCopyBatches(srcFilenames, destFilenames)); + } + + List<BatchRequest> makeCopyBatches(List<String> srcFilenames, List<String> destFilenames) + throws IOException { checkArgument( srcFilenames.size() == destFilenames.size(), "Number of source files %s must equal number of destination files %s", srcFilenames.size(), destFilenames.size()); + + List<BatchRequest> batches = new LinkedList<>(); + BatchRequest batch = storageClient.batch(); for (int i = 0; i < srcFilenames.size(); i++) { final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i)); final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i)); - LOG.debug("Copying {} to {}", sourcePath, destPath); - Storage.Objects.Copy copyObject = storageClient.objects().copy(sourcePath.getBucket(), - sourcePath.getObject(), destPath.getBucket(), destPath.getObject(), null); - batchHelper.queue(copyObject, new JsonBatchCallback<StorageObject>() { - @Override - public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) { - LOG.debug("Successfully copied {} to {}", sourcePath, destPath); - } - - @Override - public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { - // Do nothing on item not found. - if (!errorExtractor.itemNotFound(e)) { - throw new IOException(e.toString()); - } - LOG.debug("{} does not exist.", sourcePath); - } - }); + enqueueCopy(sourcePath, destPath, batch); + if (batch.size() >= MAX_REQUESTS_PER_BATCH) { + batches.add(batch); + batch = storageClient.batch(); + } } - batchHelper.flush(); - } - - public void remove(Collection<String> filenames) throws IOException { - for (String filename : filenames) { - final GcsPath path = GcsPath.fromUri(filename); - LOG.debug("Removing: " + path); - Storage.Objects.Delete deleteObject = - storageClient.objects().delete(path.getBucket(), path.getObject()); - batchHelper.queue(deleteObject, new JsonBatchCallback<Void>() { - @Override - public void onSuccess(Void obj, HttpHeaders responseHeaders) throws IOException { - LOG.debug("Successfully removed {}", path); - } - - @Override - public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { - // Do nothing on item not found. - if (!errorExtractor.itemNotFound(e)) { - throw new IOException(e.toString()); - } - LOG.debug("{} does not exist.", path); - } - }); + if (batch.size() > 0) { + batches.add(batch); } - batchHelper.flush(); + return batches; } - /** - * BatchHelper abstracts out the logic for the maximum requests per batch for GCS. - * - * <p>Copy of - * https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/src/main/java/com/google/cloud/hadoop/gcsio/BatchHelper.java - * - * <p>Copied to prevent Dataflow from depending on the Hadoop-related dependencies that are not - * used in Dataflow. Hadoop-related dependencies will be removed from the Google Cloud Storage - * Connector (https://cloud.google.com/hadoop/google-cloud-storage-connector) so that this project - * and others may use the connector without introducing unnecessary dependencies. - * - * <p>This class is not thread-safe; create a new BatchHelper instance per single-threaded logical - * grouping of requests. - */ - @NotThreadSafe - private static class BatchHelper { - /** - * Callback that causes a single StorageRequest to be added to the BatchRequest. - */ - protected static interface QueueRequestCallback { - void enqueue() throws IOException; + List<BatchRequest> makeRemoveBatches(Collection<String> filenames) throws IOException { + List<BatchRequest> batches = new LinkedList<>(); + for (List<String> filesToDelete : + Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) { + BatchRequest batch = storageClient.batch(); + for (String file : filesToDelete) { + enqueueDelete(GcsPath.fromUri(file), batch); + } + batches.add(batch); } + return batches; + } - private final List<QueueRequestCallback> pendingBatchEntries; - private final BatchRequest batch; - - // Number of requests that can be queued into a single actual HTTP request - // before a sub-batch is sent. - private final long maxRequestsPerBatch; - - // Flag that indicates whether there is an in-progress flush. - private boolean flushing = false; + public void remove(Collection<String> filenames) throws IOException { + executeBatches(makeRemoveBatches(filenames)); + } - /** - * Primary constructor, generally accessed only via the inner Factory class. - */ - public BatchHelper( - HttpRequestInitializer requestInitializer, Storage gcs, long maxRequestsPerBatch) { - this.pendingBatchEntries = new LinkedList<>(); - this.batch = gcs.batch(requestInitializer); - this.maxRequestsPerBatch = maxRequestsPerBatch; - } + private void enqueueCopy(final GcsPath from, final GcsPath to, BatchRequest batch) + throws IOException { + Storage.Objects.Copy copyRequest = storageClient.objects() + .copy(from.getBucket(), from.getObject(), to.getBucket(), to.getObject(), null); + copyRequest.queue(batch, new JsonBatchCallback<StorageObject>() { + @Override + public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) { + LOG.debug("Successfully copied {} to {}", from, to); + } - /** - * Adds an additional request to the batch, and possibly flushes the current contents of the - * batch if {@code maxRequestsPerBatch} has been reached. - */ - public <T> void queue(final StorageRequest<T> req, final JsonBatchCallback<T> callback) - throws IOException { - QueueRequestCallback queueCallback = new QueueRequestCallback() { - @Override - public void enqueue() throws IOException { - req.queue(batch, callback); + @Override + public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { + if (errorExtractor.itemNotFound(e)) { + // Do nothing on item not found. + LOG.debug("{} does not exist, assuming this is a retry after deletion.", from); + return; } - }; - pendingBatchEntries.add(queueCallback); - - flushIfPossibleAndRequired(); - } - - // Flush our buffer if we have more pending entries than maxRequestsPerBatch - private void flushIfPossibleAndRequired() throws IOException { - if (pendingBatchEntries.size() > maxRequestsPerBatch) { - flushIfPossible(); + throw new IOException( + String.format("Error trying to copy %s to %s: %s", from, to, e)); } - } + }); + } - // Flush our buffer if we are not already in a flush operation and we have data to flush. - private void flushIfPossible() throws IOException { - if (!flushing && pendingBatchEntries.size() > 0) { - flushing = true; - try { - while (batch.size() < maxRequestsPerBatch && pendingBatchEntries.size() > 0) { - QueueRequestCallback head = pendingBatchEntries.remove(0); - head.enqueue(); - } + private void enqueueDelete(final GcsPath file, BatchRequest batch) throws IOException { + Storage.Objects.Delete deleteRequest = storageClient.objects() + .delete(file.getBucket(), file.getObject()); + deleteRequest.queue(batch, new JsonBatchCallback<Void>() { + @Override + public void onSuccess(Void obj, HttpHeaders responseHeaders) { + LOG.debug("Successfully deleted {}", file); + } - batch.execute(); - } finally { - flushing = false; + @Override + public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { + if (errorExtractor.itemNotFound(e)) { + // Do nothing on item not found. + LOG.debug("{} does not exist.", file); + return; } + throw new IOException(String.format("Error trying to delete %s: %s", file, e)); } - } - - - /** - * Sends any currently remaining requests in the batch; should be called at the end of any - * series of batched requests to ensure everything has been sent. - */ - public void flush() throws IOException { - flushIfPossible(); - } + }); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f08f21cd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java index 49c7bc4..997340a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.util; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -32,6 +34,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; import org.apache.beam.sdk.util.gcsfs.GcsPath; +import com.google.api.client.googleapis.batch.BatchRequest; import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo; import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.http.HttpRequest; @@ -490,4 +493,70 @@ public class GcsUtilTest { HttpResponse response = request.execute(); return GoogleJsonResponseException.from(jsonFactory, response); } + + private static List<String> makeStrings(String s, int n) { + ImmutableList.Builder<String> ret = ImmutableList.builder(); + for (int i = 0; i < n; ++i) { + ret.add(String.format("gs://bucket/%s%d", s, i)); + } + return ret.build(); + } + + private static int sumBatchSizes(List<BatchRequest> batches) { + int ret = 0; + for (BatchRequest b : batches) { + ret += b.size(); + assertThat(b.size(), greaterThan(0)); + } + return ret; + } + + @Test + public void testMakeCopyBatches() throws IOException { + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + + // Small number of files fits in 1 batch + List<BatchRequest> batches = gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 3)); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(3)); + + // 1 batch of files fits in 1 batch + batches = gcsUtil.makeCopyBatches(makeStrings("s", 100), makeStrings("d", 100)); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(100)); + + // A little more than 5 batches of files fits in 6 batches + batches = gcsUtil.makeCopyBatches(makeStrings("s", 501), makeStrings("d", 501)); + assertThat(batches.size(), equalTo(6)); + assertThat(sumBatchSizes(batches), equalTo(501)); + } + + @Test + public void testInvalidCopyBatches() throws IOException { + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Number of source files 3"); + + gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 1)); + } + + @Test + public void testMakeRemoveBatches() throws IOException { + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + + // Small number of files fits in 1 batch + List<BatchRequest> batches = gcsUtil.makeRemoveBatches(makeStrings("s", 3)); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(3)); + + // 1 batch of files fits in 1 batch + batches = gcsUtil.makeRemoveBatches(makeStrings("s", 100)); + assertThat(batches.size(), equalTo(1)); + assertThat(sumBatchSizes(batches), equalTo(100)); + + // A little more than 5 batches of files fits in 6 batches + batches = gcsUtil.makeRemoveBatches(makeStrings("s", 501)); + assertThat(batches.size(), equalTo(6)); + assertThat(sumBatchSizes(batches), equalTo(501)); + } }