http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java deleted file mode 100644 index 23f0418..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.when; - -import com.google.api.client.util.BackOff; -import com.google.api.services.cloudresourcemanager.CloudResourceManager; -import com.google.api.services.cloudresourcemanager.model.Project; -import java.net.SocketTimeoutException; -import org.apache.beam.sdk.options.CloudResourceManagerOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mockito; - -/** Test case for {@link GcpProjectUtil}. */ -@RunWith(JUnit4.class) -public class GcpProjectUtilTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - - private static CloudResourceManagerOptions crmOptionsWithTestCredential() { - CloudResourceManagerOptions pipelineOptions = - PipelineOptionsFactory.as(CloudResourceManagerOptions.class); - pipelineOptions.setGcpCredential(new TestCredential()); - return pipelineOptions; - } - - @Test - public void testGetProjectNumber() throws Exception { - CloudResourceManagerOptions pipelineOptions = crmOptionsWithTestCredential(); - GcpProjectUtil projectUtil = pipelineOptions.getGcpProjectUtil(); - - CloudResourceManager.Projects mockProjects = Mockito.mock( - CloudResourceManager.Projects.class); - CloudResourceManager mockCrm = Mockito.mock(CloudResourceManager.class); - projectUtil.setCrmClient(mockCrm); - - CloudResourceManager.Projects.Get mockProjectsGet = - Mockito.mock(CloudResourceManager.Projects.Get.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - Project project = new Project(); - project.setProjectNumber(5L); - - when(mockCrm.projects()).thenReturn(mockProjects); - when(mockProjects.get(any(String.class))).thenReturn(mockProjectsGet); - when(mockProjectsGet.execute()) - .thenThrow(new SocketTimeoutException("SocketException")) - .thenReturn(project); - - assertEquals(5L, projectUtil.getProjectNumber( - "foo", mockBackOff, new FastNanoClockAndSleeper())); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java deleted file mode 100644 index a29dd45..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.fail; - -import com.google.common.collect.Lists; -import java.util.ServiceLoader; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link GcsIOChannelFactoryRegistrar}. - */ -@RunWith(JUnit4.class) -public class GcsIOChannelFactoryRegistrarTest { - - @Test - public void testServiceLoader() { - for (IOChannelFactoryRegistrar registrar - : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) { - if (registrar instanceof GcsIOChannelFactoryRegistrar) { - return; - } - } - fail("Expected to find " + GcsIOChannelFactoryRegistrar.class); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java deleted file mode 100644 index 7248b38..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; - -import org.apache.beam.sdk.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link GcsIOChannelFactoryTest}. */ -@RunWith(JUnit4.class) -public class GcsIOChannelFactoryTest { - private GcsIOChannelFactory factory; - - @Before - public void setUp() { - factory = GcsIOChannelFactory.fromOptions(PipelineOptionsFactory.as(GcsOptions.class)); - } - - @Test - public void testResolve() throws Exception { - assertEquals("gs://bucket/object", factory.resolve("gs://bucket", "object")); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java deleted file mode 100644 index dc36319..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** Tests for {@link GcsPathValidator}. */ -@RunWith(JUnit4.class) -public class GcsPathValidatorTest { - @Rule public ExpectedException expectedException = ExpectedException.none(); - - @Mock private GcsUtil mockGcsUtil; - private GcsPathValidator validator; - - @Before - public void setUp() throws Exception { - MockitoAnnotations.initMocks(this); - when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true); - GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); - options.setGcpCredential(new TestCredential()); - options.setGcsUtil(mockGcsUtil); - validator = GcsPathValidator.fromOptions(options); - } - - @Test - public void testValidFilePattern() { - validator.validateInputFilePatternSupported("gs://bucket/path"); - } - - @Test - public void testInvalidFilePattern() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "Expected a valid 'gs://' path but was given '/local/path'"); - validator.validateInputFilePatternSupported("/local/path"); - } - - @Test - public void testWhenBucketDoesNotExist() throws Exception { - when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false); - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "Could not find file gs://non-existent-bucket/location"); - validator.validateInputFilePatternSupported("gs://non-existent-bucket/location"); - } - - @Test - public void testValidOutputPrefix() { - validator.validateOutputFilePrefixSupported("gs://bucket/path"); - } - - @Test - public void testInvalidOutputPrefix() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage( - "Expected a valid 'gs://' path but was given '/local/path'"); - validator.validateOutputFilePrefixSupported("/local/path"); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java deleted file mode 100644 index 03668ce..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java +++ /dev/null @@ -1,798 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.when; - -import com.google.api.client.googleapis.batch.BatchRequest; -import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.http.HttpRequest; -import com.google.api.client.http.HttpResponse; -import com.google.api.client.http.HttpStatusCodes; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.http.LowLevelHttpRequest; -import com.google.api.client.http.LowLevelHttpResponse; -import com.google.api.client.json.GenericJson; -import com.google.api.client.json.Json; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.client.testing.http.HttpTesting; -import com.google.api.client.testing.http.MockHttpTransport; -import com.google.api.client.testing.http.MockLowLevelHttpRequest; -import com.google.api.client.testing.http.MockLowLevelHttpResponse; -import com.google.api.client.util.BackOff; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.model.Bucket; -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; -import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; -import com.google.cloud.hadoop.util.ClientRequestHelper; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import java.io.ByteArrayInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.math.BigInteger; -import java.net.SocketTimeoutException; -import java.nio.channels.SeekableByteChannel; -import java.nio.charset.StandardCharsets; -import java.nio.file.AccessDeniedException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import org.apache.beam.sdk.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; -import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mockito; - -/** Test case for {@link GcsUtil}. */ -@RunWith(JUnit4.class) -public class GcsUtilTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - - @Test - public void testGlobTranslation() { - assertEquals("foo", GcsUtil.globToRegexp("foo")); - assertEquals("fo[^/]*o", GcsUtil.globToRegexp("fo*o")); - assertEquals("f[^/]*o\\.[^/]", GcsUtil.globToRegexp("f*o.?")); - assertEquals("foo-[0-9][^/]*", GcsUtil.globToRegexp("foo-[0-9]*")); - } - - private static GcsOptions gcsOptionsWithTestCredential() { - GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); - pipelineOptions.setGcpCredential(new TestCredential()); - return pipelineOptions; - } - - @Test - public void testCreationWithDefaultOptions() { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - assertNotNull(pipelineOptions.getGcpCredential()); - } - - @Test - public void testUploadBufferSizeDefault() { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil util = pipelineOptions.getGcsUtil(); - assertNull(util.getUploadBufferSizeBytes()); - } - - @Test - public void testUploadBufferSizeUserSpecified() { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - pipelineOptions.setGcsUploadBufferSizeBytes(12345); - GcsUtil util = pipelineOptions.getGcsUtil(); - assertEquals((Integer) 12345, util.getUploadBufferSizeBytes()); - } - - @Test - public void testCreationWithExecutorServiceProvided() { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - pipelineOptions.setExecutorService(Executors.newCachedThreadPool()); - assertSame(pipelineOptions.getExecutorService(), pipelineOptions.getGcsUtil().executorService); - } - - @Test - public void testCreationWithGcsUtilProvided() { - GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); - GcsUtil gcsUtil = Mockito.mock(GcsUtil.class); - pipelineOptions.setGcsUtil(gcsUtil); - assertSame(gcsUtil, pipelineOptions.getGcsUtil()); - } - - @Test - public void testMultipleThreadsCanCompleteOutOfOrderWithDefaultThreadPool() throws Exception { - GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); - ExecutorService executorService = pipelineOptions.getExecutorService(); - - int numThreads = 100; - final CountDownLatch[] countDownLatches = new CountDownLatch[numThreads]; - for (int i = 0; i < numThreads; i++) { - final int currentLatch = i; - countDownLatches[i] = new CountDownLatch(1); - executorService.execute( - new Runnable() { - @Override - public void run() { - // Wait for latch N and then release latch N - 1 - try { - countDownLatches[currentLatch].await(); - if (currentLatch > 0) { - countDownLatches[currentLatch - 1].countDown(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - }); - } - - // Release the last latch starting the chain reaction. - countDownLatches[countDownLatches.length - 1].countDown(); - executorService.shutdown(); - assertTrue("Expected tasks to complete", - executorService.awaitTermination(10, TimeUnit.SECONDS)); - } - - @Test - public void testGlobExpansion() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); - Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); - Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class); - - Objects modelObjects = new Objects(); - List<StorageObject> items = new ArrayList<>(); - // A directory - items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/")); - - // Files within the directory - items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file1name")); - items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file2name")); - items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file3name")); - items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile")); - items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/anotherfile")); - - modelObjects.setItems(items); - - when(mockStorage.objects()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket", "testdirectory/otherfile")).thenReturn( - mockStorageGet); - when(mockStorageObjects.list("testbucket")).thenReturn(mockStorageList); - when(mockStorageGet.execute()).thenReturn( - new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile")); - when(mockStorageList.execute()).thenReturn(modelObjects); - - // Test a single file. - { - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"); - List<GcsPath> expectedFiles = - ImmutableList.of(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile")); - - assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); - } - - // Test patterns. - { - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*"); - List<GcsPath> expectedFiles = ImmutableList.of( - GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); - - assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); - } - - { - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*"); - List<GcsPath> expectedFiles = ImmutableList.of( - GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); - - assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); - } - - { - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name"); - List<GcsPath> expectedFiles = ImmutableList.of( - GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); - - assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); - } - - { - GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name"); - List<GcsPath> expectedFiles = ImmutableList.of( - GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); - - assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); - } - } - - // Patterns that contain recursive wildcards ('**') are not supported. - @Test - public void testRecursiveGlobExpansionFails() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - GcsPath pattern = GcsPath.fromUri("gs://testbucket/test**"); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Unsupported wildcard usage"); - gcsUtil.expand(pattern); - } - - // GCSUtil.expand() should fail when matching a single object when that object does not exist. - // We should return the empty result since GCS get object is strongly consistent. - @Test - public void testNonExistentObjectReturnsEmptyResult() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); - Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); - - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/nonexistentfile"); - GoogleJsonResponseException expectedException = - googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND, - "It don't exist", "Nothing here to see"); - - when(mockStorage.objects()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn( - mockStorageGet); - when(mockStorageGet.execute()).thenThrow(expectedException); - - assertEquals(Collections.EMPTY_LIST, gcsUtil.expand(pattern)); - } - - // GCSUtil.expand() should fail for other errors such as access denied. - @Test - public void testAccessDeniedObjectThrowsIOException() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); - Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); - - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/accessdeniedfile"); - GoogleJsonResponseException expectedException = - googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, - "Waves hand mysteriously", "These aren't the buckets you're looking for"); - - when(mockStorage.objects()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn( - mockStorageGet); - when(mockStorageGet.execute()).thenThrow(expectedException); - - thrown.expect(IOException.class); - thrown.expectMessage("Unable to get the file object for path"); - gcsUtil.expand(pattern); - } - - @Test - public void testFileSizeNonBatch() throws Exception { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); - Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); - - when(mockStorage.objects()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet); - when(mockStorageGet.execute()).thenReturn( - new StorageObject().setSize(BigInteger.valueOf(1000))); - - assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"))); - } - - @Test - public void testFileSizeWhenFileNotFoundNonBatch() throws Exception { - MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse(); - notFoundResponse.setContent(""); - notFoundResponse.setStatusCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND); - - MockHttpTransport mockTransport = - new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build(); - - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); - - thrown.expect(FileNotFoundException.class); - gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject")); - } - - @Test - public void testRetryFileSizeNonBatch() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); - Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff(); - - when(mockStorage.objects()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet); - when(mockStorageGet.execute()) - .thenThrow(new SocketTimeoutException("SocketException")) - .thenThrow(new SocketTimeoutException("SocketException")) - .thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000))); - - assertEquals(1000, - gcsUtil.getObject( - GcsPath.fromComponents("testbucket", "testobject"), - mockBackOff, - new FastNanoClockAndSleeper()).getSize().longValue()); - assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis()); - } - - @Test - public void testGetSizeBytesWhenFileNotFoundBatch() throws Exception { - JsonFactory jsonFactory = new JacksonFactory(); - - String contentBoundary = "batch_foobarbaz"; - String contentBoundaryLine = "--" + contentBoundary; - String endOfContentBoundaryLine = "--" + contentBoundary + "--"; - - GenericJson error = new GenericJson() - .set("error", new GenericJson().set("code", 404)); - error.setFactory(jsonFactory); - - String content = contentBoundaryLine + "\n" - + "Content-Type: application/http\n" - + "\n" - + "HTTP/1.1 404 Not Found\n" - + "Content-Length: -1\n" - + "\n" - + error.toString() - + "\n" - + "\n" - + endOfContentBoundaryLine - + "\n"; - thrown.expect(FileNotFoundException.class); - MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse() - .setContentType("multipart/mixed; boundary=" + contentBoundary) - .setContent(content) - .setStatusCode(HttpStatusCodes.STATUS_CODE_OK); - - MockHttpTransport mockTransport = - new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build(); - - GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - - gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); - gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); - } - - @Test - public void testGetSizeBytesWhenFileNotFoundBatchRetry() throws Exception { - JsonFactory jsonFactory = new JacksonFactory(); - - String contentBoundary = "batch_foobarbaz"; - String contentBoundaryLine = "--" + contentBoundary; - String endOfContentBoundaryLine = "--" + contentBoundary + "--"; - - GenericJson error = new GenericJson() - .set("error", new GenericJson().set("code", 404)); - error.setFactory(jsonFactory); - - String content = contentBoundaryLine + "\n" - + "Content-Type: application/http\n" - + "\n" - + "HTTP/1.1 404 Not Found\n" - + "Content-Length: -1\n" - + "\n" - + error.toString() - + "\n" - + "\n" - + endOfContentBoundaryLine - + "\n"; - thrown.expect(FileNotFoundException.class); - - final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class); - when(mockResponse.getContentType()).thenReturn("multipart/mixed; boundary=" + contentBoundary); - - // 429: Too many requests, then 200: OK. - when(mockResponse.getStatusCode()).thenReturn(429, 200); - when(mockResponse.getContent()).thenReturn(toStream("error"), toStream(content)); - - // A mock transport that lets us mock the API responses. - MockHttpTransport mockTransport = - new MockHttpTransport.Builder() - .setLowLevelHttpRequest( - new MockLowLevelHttpRequest() { - @Override - public LowLevelHttpResponse execute() throws IOException { - return mockResponse; - } - }) - .build(); - - GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - - gcsUtil.setStorageClient( - new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())); - gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); - } - - @Test - public void testCreateBucket() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - - when(mockStorage.buckets()).thenReturn(mockStorageObjects); - when(mockStorageObjects.insert( - any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert); - when(mockStorageInsert.execute()) - .thenThrow(new SocketTimeoutException("SocketException")) - .thenReturn(new Bucket()); - - gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()); - } - - @Test - public void testCreateBucketAccessErrors() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); - Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - GoogleJsonResponseException expectedException = - googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, - "Waves hand mysteriously", "These aren't the buckets you're looking for"); - - when(mockStorage.buckets()).thenReturn(mockStorageObjects); - when(mockStorageObjects.insert( - any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert); - when(mockStorageInsert.execute()) - .thenThrow(expectedException); - - thrown.expect(AccessDeniedException.class); - - gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()); - } - - @Test - public void testBucketAccessible() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); - Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - - when(mockStorage.buckets()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); - when(mockStorageGet.execute()) - .thenThrow(new SocketTimeoutException("SocketException")) - .thenReturn(new Bucket()); - - assertTrue(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"), - mockBackOff, new FastNanoClockAndSleeper())); - } - - @Test - public void testBucketDoesNotExistBecauseOfAccessError() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); - Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - GoogleJsonResponseException expectedException = - googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, - "Waves hand mysteriously", "These aren't the buckets you're looking for"); - - when(mockStorage.buckets()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); - when(mockStorageGet.execute()) - .thenThrow(expectedException); - - assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"), - mockBackOff, new FastNanoClockAndSleeper())); - } - - @Test - public void testBucketDoesNotExist() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); - Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - - when(mockStorage.buckets()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); - when(mockStorageGet.execute()) - .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND, - "It don't exist", "Nothing here to see")); - - assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"), - mockBackOff, new FastNanoClockAndSleeper())); - } - - @Test - public void testGetBucket() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); - Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - - when(mockStorage.buckets()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); - when(mockStorageGet.execute()) - .thenThrow(new SocketTimeoutException("SocketException")) - .thenReturn(new Bucket()); - - assertNotNull(gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"), - mockBackOff, new FastNanoClockAndSleeper())); - } - - @Test - public void testGetBucketNotExists() throws IOException { - GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); - GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - - Storage mockStorage = Mockito.mock(Storage.class); - gcsUtil.setStorageClient(mockStorage); - - Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); - Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - - when(mockStorage.buckets()).thenReturn(mockStorageObjects); - when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); - when(mockStorageGet.execute()) - .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND, - "It don't exist", "Nothing here to see")); - - thrown.expect(FileNotFoundException.class); - thrown.expectMessage("It don't exist"); - gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"), - mockBackOff, new FastNanoClockAndSleeper()); - } - - @Test - public void testGCSChannelCloseIdempotent() throws IOException { - SeekableByteChannel channel = - new GoogleCloudStorageReadChannel(null, "dummybucket", "dummyobject", null, - new ClientRequestHelper<StorageObject>()); - channel.close(); - channel.close(); - } - - /** - * Builds a fake GoogleJsonResponseException for testing API error handling. - */ - private static GoogleJsonResponseException googleJsonResponseException( - final int status, final String reason, final String message) throws IOException { - final JsonFactory jsonFactory = new JacksonFactory(); - HttpTransport transport = new MockHttpTransport() { - @Override - public LowLevelHttpRequest buildRequest(String method, String url) throws IOException { - ErrorInfo errorInfo = new ErrorInfo(); - errorInfo.setReason(reason); - errorInfo.setMessage(message); - errorInfo.setFactory(jsonFactory); - GenericJson error = new GenericJson(); - error.set("code", status); - error.set("errors", Arrays.asList(errorInfo)); - error.setFactory(jsonFactory); - GenericJson errorResponse = new GenericJson(); - errorResponse.set("error", error); - errorResponse.setFactory(jsonFactory); - return new MockLowLevelHttpRequest().setResponse( - new MockLowLevelHttpResponse().setContent(errorResponse.toPrettyString()) - .setContentType(Json.MEDIA_TYPE).setStatusCode(status)); - } - }; - HttpRequest request = - transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL); - request.setThrowExceptionOnExecuteError(false); - HttpResponse response = request.execute(); - return GoogleJsonResponseException.from(jsonFactory, response); - } - - private static List<String> makeStrings(String s, int n) { - ImmutableList.Builder<String> ret = ImmutableList.builder(); - for (int i = 0; i < n; ++i) { - ret.add(String.format("gs://bucket/%s%d", s, i)); - } - return ret.build(); - } - - private static List<GcsPath> makeGcsPaths(String s, int n) { - ImmutableList.Builder<GcsPath> ret = ImmutableList.builder(); - for (int i = 0; i < n; ++i) { - ret.add(GcsPath.fromUri(String.format("gs://bucket/%s%d", s, i))); - } - return ret.build(); - } - - private static int sumBatchSizes(List<BatchRequest> batches) { - int ret = 0; - for (BatchRequest b : batches) { - ret += b.size(); - assertThat(b.size(), greaterThan(0)); - } - return ret; - } - - @Test - public void testMakeCopyBatches() throws IOException { - GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - - // Small number of files fits in 1 batch - List<BatchRequest> batches = gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 3)); - assertThat(batches.size(), equalTo(1)); - assertThat(sumBatchSizes(batches), equalTo(3)); - - // 1 batch of files fits in 1 batch - batches = gcsUtil.makeCopyBatches(makeStrings("s", 100), makeStrings("d", 100)); - assertThat(batches.size(), equalTo(1)); - assertThat(sumBatchSizes(batches), equalTo(100)); - - // A little more than 5 batches of files fits in 6 batches - batches = gcsUtil.makeCopyBatches(makeStrings("s", 501), makeStrings("d", 501)); - assertThat(batches.size(), equalTo(6)); - assertThat(sumBatchSizes(batches), equalTo(501)); - } - - @Test - public void testInvalidCopyBatches() throws IOException { - GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Number of source files 3"); - - gcsUtil.makeCopyBatches(makeStrings("s", 3), makeStrings("d", 1)); - } - - @Test - public void testMakeRemoveBatches() throws IOException { - GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - - // Small number of files fits in 1 batch - List<BatchRequest> batches = gcsUtil.makeRemoveBatches(makeStrings("s", 3)); - assertThat(batches.size(), equalTo(1)); - assertThat(sumBatchSizes(batches), equalTo(3)); - - // 1 batch of files fits in 1 batch - batches = gcsUtil.makeRemoveBatches(makeStrings("s", 100)); - assertThat(batches.size(), equalTo(1)); - assertThat(sumBatchSizes(batches), equalTo(100)); - - // A little more than 5 batches of files fits in 6 batches - batches = gcsUtil.makeRemoveBatches(makeStrings("s", 501)); - assertThat(batches.size(), equalTo(6)); - assertThat(sumBatchSizes(batches), equalTo(501)); - } - - @Test - public void testMakeGetBatches() throws IOException { - GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - - // Small number of files fits in 1 batch - List<StorageObjectOrIOException[]> results = Lists.newArrayList(); - List<BatchRequest> batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), results); - assertThat(batches.size(), equalTo(1)); - assertThat(sumBatchSizes(batches), equalTo(3)); - assertEquals(3, results.size()); - - // 1 batch of files fits in 1 batch - results = Lists.newArrayList(); - batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 100), results); - assertThat(batches.size(), equalTo(1)); - assertThat(sumBatchSizes(batches), equalTo(100)); - assertEquals(100, results.size()); - - // A little more than 5 batches of files fits in 6 batches - results = Lists.newArrayList(); - batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 501), results); - assertThat(batches.size(), equalTo(6)); - assertThat(sumBatchSizes(batches), equalTo(501)); - assertEquals(501, results.size()); - } - - /** - * A helper to wrap a {@link GenericJson} object in a content stream. - */ - private static InputStream toStream(String content) throws IOException { - return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java deleted file mode 100644 index 8e7878c..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOffTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.lessThan; -import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Unit tests for {@link IntervalBoundedExponentialBackOff}. */ -@RunWith(JUnit4.class) -public class IntervalBoundedExponentialBackOffTest { - @Rule public ExpectedException exception = ExpectedException.none(); - - - @Test - public void testUsingInvalidInitialInterval() throws Exception { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Initial interval must be greater than zero."); - new IntervalBoundedExponentialBackOff(1000L, 0L); - } - - @Test - public void testUsingInvalidMaximumInterval() throws Exception { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Maximum interval must be greater than zero."); - new IntervalBoundedExponentialBackOff(-1L, 10L); - } - - @Test - public void testThatcertainNumberOfAttemptsReachesMaxInterval() throws Exception { - IntervalBoundedExponentialBackOff backOff = new IntervalBoundedExponentialBackOff(1000L, 500); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), - lessThanOrEqualTo(1500L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), - lessThanOrEqualTo(1500L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), - lessThanOrEqualTo(1500L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), - lessThanOrEqualTo(1500L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), - lessThanOrEqualTo(1500L))); - } - - @Test - public void testThatResettingAllowsReuse() throws Exception { - IntervalBoundedExponentialBackOff backOff = new IntervalBoundedExponentialBackOff(1000L, 500); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), - lessThanOrEqualTo(1500L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), - lessThanOrEqualTo(1500L))); - backOff.reset(); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), - lessThanOrEqualTo(1500L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), - lessThanOrEqualTo(1500L))); - } - - @Test - public void testAtMaxInterval() throws Exception { - IntervalBoundedExponentialBackOff backOff = new IntervalBoundedExponentialBackOff(1000L, 500); - assertFalse(backOff.atMaxInterval()); - backOff.nextBackOffMillis(); - assertFalse(backOff.atMaxInterval()); - backOff.nextBackOffMillis(); - assertTrue(backOff.atMaxInterval()); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), - lessThanOrEqualTo(1500L))); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java deleted file mode 100644 index 71554b5..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import com.google.api.client.auth.oauth2.Credential; -import com.google.api.client.http.HttpRequest; -import com.google.api.client.http.HttpResponse; -import com.google.api.client.http.HttpResponseException; -import com.google.api.client.http.HttpResponseInterceptor; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.http.LowLevelHttpRequest; -import com.google.api.client.http.LowLevelHttpResponse; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.client.testing.http.MockHttpTransport; -import com.google.api.client.testing.http.MockLowLevelHttpRequest; -import com.google.api.client.util.NanoClock; -import com.google.api.client.util.Sleeper; -import com.google.api.services.storage.Storage; -import com.google.api.services.storage.Storage.Objects.Get; -import java.io.IOException; -import java.net.SocketTimeoutException; -import java.security.PrivateKey; -import java.util.Arrays; -import java.util.concurrent.atomic.AtomicLong; -import org.hamcrest.Matchers; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * Tests for RetryHttpRequestInitializer. - */ -@RunWith(JUnit4.class) -public class RetryHttpRequestInitializerTest { - - @Mock private Credential mockCredential; - @Mock private PrivateKey mockPrivateKey; - @Mock private LowLevelHttpRequest mockLowLevelRequest; - @Mock private LowLevelHttpResponse mockLowLevelResponse; - @Mock private HttpResponseInterceptor mockHttpResponseInterceptor; - - private final JsonFactory jsonFactory = JacksonFactory.getDefaultInstance(); - private Storage storage; - - // Used to test retrying a request more than the default 10 times. - static class MockNanoClock implements NanoClock { - private int timesMs[] = {500, 750, 1125, 1688, 2531, 3797, 5695, 8543, - 12814, 19222, 28833, 43249, 64873, 97310, 145965, 218945, 328420}; - private int i = 0; - - @Override - public long nanoTime() { - return timesMs[i++ / 2] * 1000000; - } - } - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - - HttpTransport lowLevelTransport = new HttpTransport() { - @Override - protected LowLevelHttpRequest buildRequest(String method, String url) - throws IOException { - return mockLowLevelRequest; - } - }; - - // Retry initializer will pass through to credential, since we can have - // only a single HttpRequestInitializer, and we use multiple Credential - // types in the SDK, not all of which allow for retry configuration. - RetryHttpRequestInitializer initializer = new RetryHttpRequestInitializer( - mockCredential, new MockNanoClock(), new Sleeper() { - @Override - public void sleep(long millis) throws InterruptedException {} - }, Arrays.asList(418 /* I'm a teapot */), mockHttpResponseInterceptor); - storage = new Storage.Builder(lowLevelTransport, jsonFactory, initializer) - .setApplicationName("test").build(); - } - - @After - public void tearDown() { - verifyNoMoreInteractions(mockPrivateKey); - verifyNoMoreInteractions(mockLowLevelRequest); - verifyNoMoreInteractions(mockCredential); - verifyNoMoreInteractions(mockHttpResponseInterceptor); - } - - @Test - public void testBasicOperation() throws IOException { - when(mockLowLevelRequest.execute()) - .thenReturn(mockLowLevelResponse); - when(mockLowLevelResponse.getStatusCode()) - .thenReturn(200); - - Storage.Buckets.Get result = storage.buckets().get("test"); - HttpResponse response = result.executeUnparsed(); - assertNotNull(response); - - verify(mockCredential).initialize(any(HttpRequest.class)); - verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); - verify(mockLowLevelRequest, atLeastOnce()) - .addHeader(anyString(), anyString()); - verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt()); - verify(mockLowLevelRequest).execute(); - verify(mockLowLevelResponse).getStatusCode(); - } - - /** - * Tests that a non-retriable error is not retried. - */ - @Test - public void testErrorCodeForbidden() throws IOException { - when(mockLowLevelRequest.execute()) - .thenReturn(mockLowLevelResponse); - when(mockLowLevelResponse.getStatusCode()) - .thenReturn(403) // Non-retryable error. - .thenReturn(200); // Shouldn't happen. - - try { - Storage.Buckets.Get result = storage.buckets().get("test"); - HttpResponse response = result.executeUnparsed(); - assertNotNull(response); - } catch (HttpResponseException e) { - Assert.assertThat(e.getMessage(), Matchers.containsString("403")); - } - - verify(mockCredential).initialize(any(HttpRequest.class)); - verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); - verify(mockLowLevelRequest, atLeastOnce()) - .addHeader(anyString(), anyString()); - verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt()); - verify(mockLowLevelRequest).execute(); - verify(mockLowLevelResponse).getStatusCode(); - } - - /** - * Tests that a retriable error is retried. - */ - @Test - public void testRetryableError() throws IOException { - when(mockLowLevelRequest.execute()) - .thenReturn(mockLowLevelResponse) - .thenReturn(mockLowLevelResponse) - .thenReturn(mockLowLevelResponse); - when(mockLowLevelResponse.getStatusCode()) - .thenReturn(503) // Retryable - .thenReturn(429) // We also retry on 429 Too Many Requests. - .thenReturn(200); - - Storage.Buckets.Get result = storage.buckets().get("test"); - HttpResponse response = result.executeUnparsed(); - assertNotNull(response); - - verify(mockCredential).initialize(any(HttpRequest.class)); - verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); - verify(mockLowLevelRequest, atLeastOnce()) - .addHeader(anyString(), anyString()); - verify(mockLowLevelRequest, times(3)).setTimeout(anyInt(), anyInt()); - verify(mockLowLevelRequest, times(3)).execute(); - verify(mockLowLevelResponse, times(3)).getStatusCode(); - } - - /** - * Tests that an IOException is retried. - */ - @Test - public void testThrowIOException() throws IOException { - when(mockLowLevelRequest.execute()) - .thenThrow(new IOException("Fake Error")) - .thenReturn(mockLowLevelResponse); - when(mockLowLevelResponse.getStatusCode()) - .thenReturn(200); - - Storage.Buckets.Get result = storage.buckets().get("test"); - HttpResponse response = result.executeUnparsed(); - assertNotNull(response); - - verify(mockCredential).initialize(any(HttpRequest.class)); - verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); - verify(mockLowLevelRequest, atLeastOnce()) - .addHeader(anyString(), anyString()); - verify(mockLowLevelRequest, times(2)).setTimeout(anyInt(), anyInt()); - verify(mockLowLevelRequest, times(2)).execute(); - verify(mockLowLevelResponse).getStatusCode(); - } - - /** - * Tests that a retryable error is retried enough times. - */ - @Test - public void testRetryableErrorRetryEnoughTimes() throws IOException { - when(mockLowLevelRequest.execute()).thenReturn(mockLowLevelResponse); - final int retries = 10; - when(mockLowLevelResponse.getStatusCode()).thenAnswer(new Answer<Integer>(){ - int n = 0; - @Override - public Integer answer(InvocationOnMock invocation) { - return (n++ < retries - 1) ? 503 : 200; - }}); - - Storage.Buckets.Get result = storage.buckets().get("test"); - HttpResponse response = result.executeUnparsed(); - assertNotNull(response); - - verify(mockCredential).initialize(any(HttpRequest.class)); - verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class)); - verify(mockLowLevelRequest, atLeastOnce()).addHeader(anyString(), - anyString()); - verify(mockLowLevelRequest, times(retries)).setTimeout(anyInt(), anyInt()); - verify(mockLowLevelRequest, times(retries)).execute(); - verify(mockLowLevelResponse, times(retries)).getStatusCode(); - } - - /** - * Tests that when RPCs fail with {@link SocketTimeoutException}, the IO exception handler - * is invoked. - */ - @Test - public void testIOExceptionHandlerIsInvokedOnTimeout() throws Exception { - // Counts the number of calls to execute the HTTP request. - final AtomicLong executeCount = new AtomicLong(); - - // 10 is a private internal constant in the Google API Client library. See - // com.google.api.client.http.HttpRequest#setNumberOfRetries - // TODO: update this test once the private internal constant is public. - final int defaultNumberOfRetries = 10; - - // A mock HTTP request that always throws SocketTimeoutException. - MockHttpTransport transport = - new MockHttpTransport.Builder().setLowLevelHttpRequest(new MockLowLevelHttpRequest() { - @Override - public LowLevelHttpResponse execute() throws IOException { - executeCount.incrementAndGet(); - throw new SocketTimeoutException("Fake forced timeout exception"); - } - }).build(); - - // A sample HTTP request to Google Cloud Storage that uses both default Transport and default - // RetryHttpInitializer. - Storage storage = new Storage.Builder( - transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()).build(); - - Get getRequest = storage.objects().get("gs://fake", "file"); - - try { - getRequest.execute(); - fail(); - } catch (Throwable e) { - assertThat(e, Matchers.<Throwable>instanceOf(SocketTimeoutException.class)); - assertEquals(1 + defaultNumberOfRetries, executeCount.get()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/pom.xml b/sdks/java/extensions/gcp-core/pom.xml new file mode 100644 index 0000000..d566f94 --- /dev/null +++ b/sdks/java/extensions/gcp-core/pom.xml @@ -0,0 +1,217 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-parent</artifactId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-extensions-gcp-core</artifactId> + <name>Apache Beam :: SDKs :: Java :: Extensions :: Google Cloud Platform Core</name> + <description>Common components used to support multiple + Google Cloud Platform specific maven modules.</description> + + <packaging>jar</packaging> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <excludedGroups> + org.apache.beam.sdk.testing.NeedsRunner + </excludedGroups> + <systemPropertyVariables> + <beamUseDummyRunner>true</beamUseDummyRunner> + </systemPropertyVariables> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + + <!-- Coverage analysis for unit tests. --> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>com.google.http-client</groupId> + <artifactId>google-http-client-jackson2</artifactId> + </dependency> + + <dependency> + <groupId>com.google.oauth-client</groupId> + <artifactId>google-oauth-client</artifactId> + </dependency> + + <dependency> + <groupId>com.google.auth</groupId> + <artifactId>google-auth-library-oauth2-http</artifactId> + </dependency> + + <dependency> + <groupId>com.google.api-client</groupId> + <artifactId>google-api-client</artifactId> + </dependency> + + <dependency> + <groupId>com.google.cloud.bigdataoss</groupId> + <artifactId>gcsio</artifactId> + </dependency> + + <dependency> + <groupId>com.google.cloud.bigdataoss</groupId> + <artifactId>util</artifactId> + </dependency> + + <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-cloudresourcemanager</artifactId> + </dependency> + + <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-pubsub</artifactId> + </dependency> + + <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-bigquery</artifactId> + </dependency> + + <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-storage</artifactId> + </dependency> + + <dependency> + <groupId>com.google.auth</groupId> + <artifactId>google-auth-library-credentials</artifactId> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <dependency> + <groupId>com.google.http-client</groupId> + <artifactId>google-http-client</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <!-- build dependencies --> + <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java new file mode 100644 index 0000000..7672cd7 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/BigQueryOptions.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +/** + * Properties needed when using Google BigQuery with the Apache Beam SDK. + */ +@Description("Options that are used to configure Google BigQuery. See " + + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.") +public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions, + PipelineOptions, StreamingOptions { + @Description("Temporary dataset for BigQuery table operations. " + + "Supported values are \"bigquery.googleapis.com/{dataset}\"") + @Default.String("bigquery.googleapis.com/cloud_dataflow") + String getTempDatasetId(); + void setTempDatasetId(String value); +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java new file mode 100644 index 0000000..13fdaf3 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.beam.sdk.util.GcpProjectUtil; + +/** + * Properties needed when using Google CloudResourceManager with the Apache Beam SDK. + */ +@Description("Options that are used to configure Google CloudResourceManager. See " + + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.") +public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions, + PipelineOptions, StreamingOptions { + /** + * The GcpProjectUtil instance that should be used to communicate with Google Cloud Storage. + */ + @JsonIgnore + @Description("The GcpProjectUtil instance that should be used to communicate" + + " with Google Cloud Resource Manager.") + @Default.InstanceFactory(GcpProjectUtil.GcpProjectUtilFactory.class) + @Hidden + GcpProjectUtil getGcpProjectUtil(); + void setGcpProjectUtil(GcpProjectUtil value); +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java new file mode 100644 index 0000000..d01406f --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +import static com.google.common.base.Strings.isNullOrEmpty; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.auth.Credentials; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.GeneralSecurityException; +import java.util.Locale; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nullable; +import org.apache.beam.sdk.util.CredentialFactory; +import org.apache.beam.sdk.util.DefaultBucket; +import org.apache.beam.sdk.util.GcpCredentialFactory; +import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.util.PathValidator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Options used to configure Google Cloud Platform specific options such as the project + * and credentials. + * + * <p>These options defer to the + * <a href="https://developers.google.com/accounts/docs/application-default-credentials"> + * application default credentials</a> for authentication. See the + * <a href="https://github.com/google/google-auth-library-java">Google Auth Library</a> for + * alternative mechanisms for creating credentials. + */ +@Description("Options used to configure Google Cloud Platform project and credentials.") +public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { + /** + * Project id to use when launching jobs. + */ + @Description("Project id. Required when using Google Cloud Platform services. " + + "See https://cloud.google.com/storage/docs/projects for further details.") + @Default.InstanceFactory(DefaultProjectFactory.class) + String getProject(); + void setProject(String value); + + /** + * GCP <a href="https://developers.google.com/compute/docs/zones" + * >availability zone</a> for operations. + * + * <p>Default is set on a per-service basis. + */ + @Description("GCP availability zone for running GCP operations. " + + "Default is up to the individual service.") + String getZone(); + void setZone(String value); + + /** + * The class of the credential factory that should be created and used to create + * credentials. If gcpCredential has not been set explicitly, an instance of this class will + * be constructed and used as a credential factory. + */ + @Description("The class of the credential factory that should be created and used to create " + + "credentials. If gcpCredential has not been set explicitly, an instance of this class will " + + "be constructed and used as a credential factory.") + @Default.Class(GcpCredentialFactory.class) + Class<? extends CredentialFactory> getCredentialFactoryClass(); + void setCredentialFactoryClass( + Class<? extends CredentialFactory> credentialFactoryClass); + + /** + * The credential instance that should be used to authenticate against GCP services. + * If no credential has been set explicitly, the default is to use the instance factory + * that constructs a credential based upon the currently set credentialFactoryClass. + */ + @JsonIgnore + @Description("The credential instance that should be used to authenticate against GCP services. " + + "If no credential has been set explicitly, the default is to use the instance factory " + + "that constructs a credential based upon the currently set credentialFactoryClass.") + @Default.InstanceFactory(GcpUserCredentialsFactory.class) + Credentials getGcpCredential(); + void setGcpCredential(Credentials value); + + /** + * Attempts to infer the default project based upon the environment this application + * is executing within. Currently this only supports getting the default project from gcloud. + */ + class DefaultProjectFactory implements DefaultValueFactory<String> { + private static final Logger LOG = LoggerFactory.getLogger(DefaultProjectFactory.class); + + @Override + public String create(PipelineOptions options) { + try { + File configFile; + if (getEnvironment().containsKey("CLOUDSDK_CONFIG")) { + configFile = new File(getEnvironment().get("CLOUDSDK_CONFIG"), "properties"); + } else if (isWindows() && getEnvironment().containsKey("APPDATA")) { + configFile = new File(getEnvironment().get("APPDATA"), "gcloud/properties"); + } else { + // New versions of gcloud use this file + configFile = new File( + System.getProperty("user.home"), + ".config/gcloud/configurations/config_default"); + if (!configFile.exists()) { + // Old versions of gcloud use this file + configFile = new File(System.getProperty("user.home"), ".config/gcloud/properties"); + } + } + String section = null; + Pattern projectPattern = Pattern.compile("^project\\s*=\\s*(.*)$"); + Pattern sectionPattern = Pattern.compile("^\\[(.*)\\]$"); + for (String line : Files.readLines(configFile, StandardCharsets.UTF_8)) { + line = line.trim(); + if (line.isEmpty() || line.startsWith(";")) { + continue; + } + Matcher matcher = sectionPattern.matcher(line); + if (matcher.matches()) { + section = matcher.group(1); + } else if (section == null || section.equals("core")) { + matcher = projectPattern.matcher(line); + if (matcher.matches()) { + String project = matcher.group(1).trim(); + LOG.info("Inferred default GCP project '{}' from gcloud. If this is the incorrect " + + "project, please cancel this Pipeline and specify the command-line " + + "argument --project.", project); + return project; + } + } + } + } catch (IOException expected) { + LOG.debug("Failed to find default project.", expected); + } + // return null if can't determine + return null; + } + + /** + * Returns true if running on the Windows OS. + */ + private static boolean isWindows() { + return System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("windows"); + } + + /** + * Used to mock out getting environment variables. + */ + @VisibleForTesting + Map<String, String> getEnvironment() { + return System.getenv(); + } + } + + /** + * Attempts to load the GCP credentials. See + * {@link CredentialFactory#getCredential()} for more details. + */ + class GcpUserCredentialsFactory implements DefaultValueFactory<Credentials> { + @Override + public Credentials create(PipelineOptions options) { + GcpOptions gcpOptions = options.as(GcpOptions.class); + try { + CredentialFactory factory = InstanceBuilder.ofType(CredentialFactory.class) + .fromClass(gcpOptions.getCredentialFactoryClass()) + .fromFactoryMethod("fromOptions") + .withArg(PipelineOptions.class, options) + .build(); + return factory.getCredential(); + } catch (IOException | GeneralSecurityException e) { + throw new RuntimeException("Unable to obtain credential", e); + } + } + } + + /** + * A GCS path for storing temporary files in GCP. + * + * <p>Its default to {@link PipelineOptions#getTempLocation}. + */ + @Description("A GCS path for storing temporary files in GCP.") + @Default.InstanceFactory(GcpTempLocationFactory.class) + @Nullable String getGcpTempLocation(); + void setGcpTempLocation(String value); + + /** + * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location. + */ + class GcpTempLocationFactory implements DefaultValueFactory<String> { + + @Override + @Nullable + public String create(PipelineOptions options) { + String tempLocation = options.getTempLocation(); + if (isNullOrEmpty(tempLocation)) { + tempLocation = DefaultBucket.tryCreateDefaultBucket(options); + options.setTempLocation(tempLocation); + } else { + try { + PathValidator validator = options.as(GcsOptions.class).getPathValidator(); + validator.validateOutputFilePrefixSupported(tempLocation); + } catch (Exception e) { + throw new IllegalArgumentException(String.format( + "Error constructing default value for gcpTempLocation: tempLocation is not" + + " a valid GCS path, %s. ", tempLocation), e); + } + } + return tempLocation; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java new file mode 100644 index 0000000..00be440 --- /dev/null +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/options/GcpPipelineOptionsRegistrar.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.options; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; + +/** + * A registrar containing the default GCP options. + */ +@AutoService(PipelineOptionsRegistrar.class) +public class GcpPipelineOptionsRegistrar implements PipelineOptionsRegistrar { + @Override + public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { + return ImmutableList.<Class<? extends PipelineOptions>>builder() + .add(BigQueryOptions.class) + .add(GcpOptions.class) + .add(GcsOptions.class) + .add(GoogleApiDebugOptions.class) + .add(PubsubOptions.class) + .build(); + } +}