Repository: incubator-beam Updated Branches: refs/heads/master 202acd1d6 -> 2ee444d15
Add default bucket scaffolding. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d6e7c71 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d6e7c71 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d6e7c71 Branch: refs/heads/master Commit: 9d6e7c71b6863a3a87eb3b4ad7b4a5ce75707955 Parents: ccd9bad Author: sammcveety <sam.mcve...@gmail.com> Authored: Fri Sep 30 21:24:58 2016 -0400 Committer: Luke Cwik <lc...@google.com> Committed: Mon Oct 3 08:20:12 2016 -0700 ---------------------------------------------------------------------- pom.xml | 13 +++ .../DataflowPipelineTranslatorTest.java | 2 +- .../runners/dataflow/DataflowRunnerTest.java | 7 +- sdks/java/core/pom.xml | 5 + .../options/CloudResourceManagerOptions.java | 40 +++++++ .../apache/beam/sdk/util/GcpProjectUtil.java | 106 ++++++++++++++++++ .../apache/beam/sdk/util/GcsPathValidator.java | 2 +- .../java/org/apache/beam/sdk/util/GcsUtil.java | 67 ++++++----- .../org/apache/beam/sdk/util/Transport.java | 17 +++ .../dataflow/util/GcsPathValidatorTest.java | 4 +- .../apache/beam/sdk/util/ApiSurfaceTest.java | 1 + .../beam/sdk/util/GcpProjectUtilTest.java | 76 +++++++++++++ .../org/apache/beam/sdk/util/GcsUtilTest.java | 112 ++++++++++++++++++- 13 files changed, 413 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3cd5255..7295261 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,7 @@ <!-- If updating dependencies, please update any relevant javadoc offlineLinks --> <avro.version>1.8.1</avro.version> <bigquery.version>v2-rev295-1.22.0</bigquery.version> + <cloudresourcemanager.version>v1-rev6-1.22.0</cloudresourcemanager.version> <pubsubgrpc.version>0.1.0</pubsubgrpc.version> <clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version> <dataflow.version>v1b3-rev36-1.22.0</dataflow.version> @@ -483,6 +484,18 @@ <dependency> <groupId>com.google.apis</groupId> + <artifactId>google-api-services-cloudresourcemanager</artifactId> + <version>${cloudresourcemanager.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava-jdk5</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.google.apis</groupId> <artifactId>google-api-services-pubsub</artifactId> <version>${pubsub.version}</version> <exclusions> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 2b7013d..98d2fb0 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -155,7 +155,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { return ImmutableList.of((GcsPath) invocation.getArguments()[0]); } }); - when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true); + when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true); when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod(); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 58a01aa..b0ee231 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -187,9 +187,9 @@ public class DataflowRunnerTest { * Build a mock {@link GcsUtil} with return values. * * @param bucketExist first return value - * @param bucketExists next return values + * @param bucketAccessible next return values */ - private GcsUtil buildMockGcsUtil(Boolean bucketExist, Boolean... bucketExists) + private GcsUtil buildMockGcsUtil(Boolean bucketExist, Boolean... bucketAccessible) throws IOException { GcsUtil mockGcsUtil = mock(GcsUtil.class); when(mockGcsUtil.create(any(GcsPath.class), anyString())) @@ -209,7 +209,8 @@ public class DataflowRunnerTest { return ImmutableList.of((GcsPath) invocation.getArguments()[0]); } }); - when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(bucketExist, bucketExists); + when(mockGcsUtil.bucketAccessible(any(GcsPath.class))) + .thenReturn(bucketExist, bucketAccessible); return mockGcsUtil; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index c4d3e64..aa0ad09 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -369,6 +369,11 @@ <dependency> <groupId>com.google.apis</groupId> + <artifactId>google-api-services-cloudresourcemanager</artifactId> + </dependency> + + <dependency> + <groupId>com.google.apis</groupId> <artifactId>google-api-services-pubsub</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java new file mode 100644 index 0000000..ed532db --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.beam.sdk.util.GcpProjectUtil; + +/** + * Properties needed when using CloudResourceManager with the Beam SDK. + */ +@Description("Options that are used to configure CloudResourceManager. See " + + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.") +public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions, + PipelineOptions, StreamingOptions { + /** + * The GcpProjectUtil instance that should be used to communicate with Google Cloud Storage. + */ + @JsonIgnore + @Description("The GcpProjectUtil instance that should be used to communicate" + + " with Google Cloud Resource Manager.") + @Default.InstanceFactory(GcpProjectUtil.GcpProjectUtilFactory.class) + @Hidden + GcpProjectUtil getGcpProjectUtil(); + void setGcpProjectUtil(GcpProjectUtil value); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java new file mode 100644 index 0000000..beac4e4 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.api.client.util.BackOff; +import com.google.api.client.util.Sleeper; +import com.google.api.services.cloudresourcemanager.CloudResourceManager; +import com.google.api.services.cloudresourcemanager.model.Project; +import com.google.cloud.hadoop.util.ResilientOperation; +import com.google.cloud.hadoop.util.RetryDeterminer; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import org.apache.beam.sdk.options.CloudResourceManagerOptions; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.PipelineOptions; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides operations on Google Cloud Platform Projects. + */ +public class GcpProjectUtil { + /** + * A {@link DefaultValueFactory} able to create a {@link GcpProjectUtil} using + * any transport flags specified on the {@link PipelineOptions}. + */ + public static class GcpProjectUtilFactory implements DefaultValueFactory<GcpProjectUtil> { + /** + * Returns an instance of {@link GcpProjectUtil} based on the + * {@link PipelineOptions}. + */ + @Override + public GcpProjectUtil create(PipelineOptions options) { + LOG.debug("Creating new GcpProjectUtil"); + CloudResourceManagerOptions crmOptions = options.as(CloudResourceManagerOptions.class); + return new GcpProjectUtil( + Transport.newCloudResourceManagerClient(crmOptions).build()); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(GcpProjectUtil.class); + + private static final FluentBackoff BACKOFF_FACTORY = + FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200)); + + /** Client for the CRM API. */ + private CloudResourceManager crmClient; + + private GcpProjectUtil(CloudResourceManager crmClient) { + this.crmClient = crmClient; + } + + // Use this only for testing purposes. + @VisibleForTesting + void setCrmClient(CloudResourceManager crmClient) { + this.crmClient = crmClient; + } + + /** + * Returns the project number or throws an exception if the project does not + * exist or has other access exceptions. + */ + long getProjectNumber(String projectId) throws IOException { + return getProjectNumber( + projectId, + BACKOFF_FACTORY.backoff(), + Sleeper.DEFAULT); + } + + /** + * Returns the project number or throws an error if the project does not + * exist or has other access errors. + */ + @VisibleForTesting + long getProjectNumber(String projectId, BackOff backoff, Sleeper sleeper) throws IOException { + CloudResourceManager.Projects.Get getProject = + crmClient.projects().get(projectId); + try { + Project project = ResilientOperation.retry( + ResilientOperation.getGoogleRequestCallable(getProject), + backoff, + RetryDeterminer.SOCKET_ERRORS, + IOException.class, + sleeper); + return project.getProjectNumber(); + } catch (Exception e) { + throw new IOException("Unable to get project number", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java index 89363ce..c8da4d8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java @@ -75,7 +75,7 @@ public class GcsPathValidator implements PathValidator { private void verifyPathIsAccessible(String path, String errorMessage) { GcsPath gcsPath = getGcsPath(path); try { - checkArgument(gcpOptions.getGcsUtil().bucketExists(gcsPath), + checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath), errorMessage, path); } catch (IOException e) { throw new RuntimeException( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index 4befb1a..ce4604b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -28,6 +28,7 @@ import com.google.api.client.http.HttpHeaders; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.Bucket; import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; @@ -49,6 +50,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; +import java.nio.file.AccessDeniedException; +import java.nio.file.FileAlreadyExistsException; import java.util.Collection; import java.util.Collections; import java.util.LinkedList; @@ -338,11 +341,10 @@ public class GcsUtil { } /** - * Returns whether the GCS bucket exists. If the bucket exists, it must - * be accessible otherwise the permissions exception will be propagated. + * Returns whether the GCS bucket exists and is accessible. */ - public boolean bucketExists(GcsPath path) throws IOException { - return bucketExists( + public boolean bucketAccessible(GcsPath path) throws IOException { + return bucketAccessible( path, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); @@ -351,24 +353,23 @@ public class GcsUtil { /** * Returns the project number of the project which owns this bucket. * If the bucket exists, it must be accessible otherwise the permissions - * exception will be propagated. + * exception will be propagated. If the bucket does not exist, an exception + * will be thrown. */ public long bucketOwner(GcsPath path) throws IOException { return getBucket( path, BACKOFF_FACTORY.backoff(), - Sleeper.DEFAULT).getProjectNumber(); + Sleeper.DEFAULT).getProjectNumber().longValue(); } /** - * Creates a bucket for the provided project or propagates an error. + * Creates a {@link Bucket} under the specified project in Cloud Storage or + * propagates an exception. */ - public void createBucket(GcsPath path, long projectNumber) throws IOException { - return createBucket( - path, - projectNumber, - BACKOFF_FACTORY.backoff(), - Sleeper.DEFAULT); + public void createBucket(String projectId, Bucket bucket) throws IOException { + createBucket( + projectId, bucket, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); } /** @@ -376,18 +377,22 @@ public class GcsUtil { * is inaccessible due to permissions. */ @VisibleForTesting - boolean bucketExists(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { - return getBucket(path, backoff, sleeper) != null; + boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { + try { + return getBucket(path, backoff, sleeper) != null; + } catch (AccessDeniedException | FileNotFoundException e) { + return false; + } } @VisibleForTesting @Nullable - Storage.Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { + Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { Storage.Buckets.Get getBucket = storageClient.buckets().get(path.getBucket()); try { - Storage.Bucket bucket = ResilientOperation.retry( + Bucket bucket = ResilientOperation.retry( ResilientOperation.getGoogleRequestCallable(getBucket), backoff, new RetryDeterminer<IOException>() { @@ -401,11 +406,14 @@ public class GcsUtil { }, IOException.class, sleeper); - + return bucket; } catch (GoogleJsonResponseException e) { - if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) { - return null; + if (errorExtractor.accessDenied(e)) { + throw new AccessDeniedException(path.toString(), null, e.getMessage()); + } + if (errorExtractor.itemNotFound(e)) { + throw new FileNotFoundException(e.getMessage()); } throw e; } catch (InterruptedException e) { @@ -417,11 +425,10 @@ public class GcsUtil { } @VisibleForTesting - void createBucket(GcsPath path, long projectNumber, BackOff backoff, Sleeper sleeper) + void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper) throws IOException { Storage.Buckets.Insert insertBucket = - storageClient.buckets().insert(path.getBucket()); - insertBucket.setProject(String.valueOf(projectNumber)); + storageClient.buckets().insert(projectId, bucket); try { ResilientOperation.retry( @@ -429,8 +436,8 @@ public class GcsUtil { backoff, new RetryDeterminer<IOException>() { @Override - public boolean shouldRetry(IOException e) { - if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) { + public boolean shouldRetry(IOException e) { + if (errorExtractor.itemAlreadyExists(e) || errorExtractor.accessDenied(e)) { return false; } return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); @@ -439,11 +446,19 @@ public class GcsUtil { IOException.class, sleeper); return; + } catch (GoogleJsonResponseException e) { + if (errorExtractor.accessDenied(e)) { + throw new AccessDeniedException(bucket.getName(), null, e.getMessage()); + } + if (errorExtractor.itemAlreadyExists(e)) { + throw new FileAlreadyExistsException(bucket.getName(), null, e.getMessage()); + } + throw e; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException( String.format("Error while attempting to create bucket gs://%s for rproject %s", - path.getBucket(), projectNumber), e); + bucket.getName(), projectId), e); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java index d824207..1f61299 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java @@ -24,6 +24,7 @@ import com.google.api.client.http.HttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.cloudresourcemanager.CloudResourceManager; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.storage.Storage; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; @@ -33,6 +34,7 @@ import java.net.MalformedURLException; import java.net.URL; import java.security.GeneralSecurityException; import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.CloudResourceManagerOptions; import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PubsubOptions; @@ -121,6 +123,21 @@ public class Transport { } /** + * Returns a CloudResourceManager client builder using the specified + * {@link CloudResourceManagerOptions}. + */ + public static CloudResourceManager.Builder + newCloudResourceManagerClient(CloudResourceManagerOptions options) { + return new CloudResourceManager.Builder(getTransport(), getJsonFactory(), + chainHttpRequestInitializer( + options.getGcpCredential(), + // Do not log 404. It clutters the output and is possibly even required by the caller. + new RetryHttpRequestInitializer(ImmutableList.of(404)))) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + /** * Returns a Cloud Storage client builder using the specified {@link GcsOptions}. */ public static Storage.Builder http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java index 398fa63..adf4fc2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java @@ -57,7 +57,7 @@ public class GcsPathValidatorTest { @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); - when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true); + when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true); when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod(); GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); options.setRunner(FakeRunner.class); @@ -81,7 +81,7 @@ public class GcsPathValidatorTest { @Test public void testWhenBucketDoesNotExist() throws Exception { - when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false); + when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false); expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage( "Could not find file gs://non-existent-bucket/location"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java index 4b76277..ea771b4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java @@ -76,6 +76,7 @@ public class ApiSurfaceTest { inPackage("org.apache.beam"), inPackage("com.google.api.client"), inPackage("com.google.api.services.bigquery"), + inPackage("com.google.api.services.cloudresourcemanager"), inPackage("com.google.api.services.dataflow"), inPackage("com.google.api.services.pubsub"), inPackage("com.google.api.services.storage"), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java new file mode 100644 index 0000000..23f0418 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import com.google.api.client.util.BackOff; +import com.google.api.services.cloudresourcemanager.CloudResourceManager; +import com.google.api.services.cloudresourcemanager.model.Project; +import java.net.SocketTimeoutException; +import org.apache.beam.sdk.options.CloudResourceManagerOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +/** Test case for {@link GcpProjectUtil}. */ +@RunWith(JUnit4.class) +public class GcpProjectUtilTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static CloudResourceManagerOptions crmOptionsWithTestCredential() { + CloudResourceManagerOptions pipelineOptions = + PipelineOptionsFactory.as(CloudResourceManagerOptions.class); + pipelineOptions.setGcpCredential(new TestCredential()); + return pipelineOptions; + } + + @Test + public void testGetProjectNumber() throws Exception { + CloudResourceManagerOptions pipelineOptions = crmOptionsWithTestCredential(); + GcpProjectUtil projectUtil = pipelineOptions.getGcpProjectUtil(); + + CloudResourceManager.Projects mockProjects = Mockito.mock( + CloudResourceManager.Projects.class); + CloudResourceManager mockCrm = Mockito.mock(CloudResourceManager.class); + projectUtil.setCrmClient(mockCrm); + + CloudResourceManager.Projects.Get mockProjectsGet = + Mockito.mock(CloudResourceManager.Projects.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + Project project = new Project(); + project.setProjectNumber(5L); + + when(mockCrm.projects()).thenReturn(mockProjects); + when(mockProjects.get(any(String.class))).thenReturn(mockProjectsGet); + when(mockProjectsGet.execute()) + .thenThrow(new SocketTimeoutException("SocketException")) + .thenReturn(project); + + assertEquals(5L, projectUtil.getProjectNumber( + "foo", mockBackOff, new FastNanoClockAndSleeper())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java index 9504b4c..df3bf6e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.when; import com.google.api.client.googleapis.batch.BatchRequest; @@ -58,6 +59,7 @@ import java.io.IOException; import java.math.BigInteger; import java.net.SocketTimeoutException; import java.nio.channels.SeekableByteChannel; +import java.nio.file.AccessDeniedException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -305,7 +307,7 @@ public class GcsUtilTest { GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/accessdeniedfile"); GoogleJsonResponseException expectedException = googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, - "Waves hand mysteriously", "These aren't the buckets your looking for"); + "Waves hand mysteriously", "These aren't the buckets you're looking for"); when(mockStorage.objects()).thenReturn(mockStorageObjects); when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn( @@ -380,7 +382,57 @@ public class GcsUtilTest { } @Test - public void testBucketExists() throws IOException { + public void testCreateBucket() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.insert( + any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert); + when(mockStorageInsert.execute()) + .thenThrow(new SocketTimeoutException("SocketException")) + .thenReturn(new Bucket()); + + gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()); + } + + @Test + public void testCreateBucketAccessErrors() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + GoogleJsonResponseException expectedException = + googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, + "Waves hand mysteriously", "These aren't the buckets you're looking for"); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.insert( + any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert); + when(mockStorageInsert.execute()) + .thenThrow(expectedException); + + thrown.expect(AccessDeniedException.class); + + gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()); + } + + @Test + public void testBucketAccessible() throws IOException { GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); @@ -398,7 +450,7 @@ public class GcsUtilTest { .thenThrow(new SocketTimeoutException("SocketException")) .thenReturn(new Bucket()); - assertTrue(gcsUtil.bucketExists(GcsPath.fromComponents("testbucket", "testobject"), + assertTrue(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"), mockBackOff, new FastNanoClockAndSleeper())); } @@ -416,14 +468,14 @@ public class GcsUtilTest { BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); GoogleJsonResponseException expectedException = googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, - "Waves hand mysteriously", "These aren't the buckets your looking for"); + "Waves hand mysteriously", "These aren't the buckets you're looking for"); when(mockStorage.buckets()).thenReturn(mockStorageObjects); when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); when(mockStorageGet.execute()) .thenThrow(expectedException); - assertFalse(gcsUtil.bucketExists(GcsPath.fromComponents("testbucket", "testobject"), + assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"), mockBackOff, new FastNanoClockAndSleeper())); } @@ -446,11 +498,59 @@ public class GcsUtilTest { .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND, "It don't exist", "Nothing here to see")); - assertFalse(gcsUtil.bucketExists(GcsPath.fromComponents("testbucket", "testobject"), + assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"), + mockBackOff, new FastNanoClockAndSleeper())); + } + + @Test + public void testGetBucket() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()) + .thenThrow(new SocketTimeoutException("SocketException")) + .thenReturn(new Bucket()); + + assertNotNull(gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"), mockBackOff, new FastNanoClockAndSleeper())); } @Test + public void testGetBucketNotExists() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); + Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); + + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); + + when(mockStorage.buckets()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); + when(mockStorageGet.execute()) + .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND, + "It don't exist", "Nothing here to see")); + + thrown.expect(FileNotFoundException.class); + thrown.expectMessage("It don't exist"); + gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"), + mockBackOff, new FastNanoClockAndSleeper()); + } + + @Test public void testGCSChannelCloseIdempotent() throws IOException { SeekableByteChannel channel = new GoogleCloudStorageReadChannel(null, "dummybucket", "dummyobject", null,