Repository: incubator-beam Updated Branches: refs/heads/master 080dbaa3e -> 58bb1174d
Change Dataflow profiling option to saveProfilesToGcs Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1e44cb12 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1e44cb12 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1e44cb12 Branch: refs/heads/master Commit: 1e44cb12c2663b1353717bf9237618df74684102 Parents: 080dbaa Author: bchambers <bchamb...@google.com> Authored: Thu Dec 8 10:40:17 2016 -0800 Committer: Davor Bonaci <da...@google.com> Committed: Fri Dec 9 12:31:30 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 5 + .../options/DataflowProfilingOptions.java | 8 +- .../DataflowPipelineTranslatorTest.java | 36 ++- .../runners/dataflow/DataflowRunnerTest.java | 263 ++++++++++++------- .../options/DataflowProfilingOptionsTest.java | 6 +- 5 files changed, 189 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e44cb12/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 22f6f5a..85318e6 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -40,6 +40,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Optional; +import com.google.common.base.Strings; import com.google.common.base.Utf8; import com.google.common.collect.ForwardingMap; import com.google.common.collect.HashMultimap; @@ -262,6 +263,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { "DataflowRunner requires stagingLocation, and it is missing in PipelineOptions."); validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation()); + if (!Strings.isNullOrEmpty(dataflowOptions.getSaveProfilesToGcs())) { + validator.validateOutputFilePrefixSupported(dataflowOptions.getSaveProfilesToGcs()); + } + if (dataflowOptions.getFilesToStage() == null) { dataflowOptions.setFilesToStage(detectClassPathResourcesToStage( DataflowRunner.class.getClassLoader())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e44cb12/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java index 092c17a..a87d688 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java @@ -30,10 +30,10 @@ import org.apache.beam.sdk.options.Hidden; @Hidden public interface DataflowProfilingOptions { - @Description("Whether to periodically dump profiling information to local disk.\n" - + "WARNING: Enabling this option may fill local disk with profiling information.") - boolean getEnableProfilingAgent(); - void setEnableProfilingAgent(boolean enabled); + @Description("When set to a non-empty value, enables recording profiles and saving them to GCS.\n" + + "Profiles will continue until the pipeline is stopped or updated without this option.\n") + String getSaveProfilesToGcs(); + void setSaveProfilesToGcs(String gcsPath); @Description( "[INTERNAL] Additional configuration for the profiling agent. Not typically necessary.") http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e44cb12/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 8d0b83a..ab82941 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.util.Structs.getDictionary; import static org.apache.beam.sdk.util.Structs.getString; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; @@ -49,7 +50,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -76,6 +76,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.GcsPathValidator; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.Structs; @@ -188,27 +189,22 @@ public class DataflowPipelineTranslatorTest implements Serializable { p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()) .getJob(); - // Note that the contents of this materialized map may be changed by the act of reading an - // option, which will cause the default to get materialized whereas it would otherwise be - // left absent. It is permissible to simply alter this test to reflect current behavior. - Map<String, Object> settings = new HashMap<>(); - settings.put("appName", "DataflowPipelineTranslatorTest"); - settings.put("project", "some-project"); - settings.put("pathValidatorClass", - "org.apache.beam.sdk.util.GcsPathValidator"); - settings.put("runner", "org.apache.beam.runners.dataflow.DataflowRunner"); - settings.put("jobName", "some-job-name"); - settings.put("tempLocation", "gs://somebucket/some/path"); - settings.put("gcpTempLocation", "gs://somebucket/some/path"); - settings.put("stagingLocation", "gs://somebucket/some/path/staging"); - settings.put("stableUniqueNames", "WARNING"); - settings.put("streaming", false); - settings.put("numberOfWorkerHarnessThreads", 0); - settings.put("experiments", null); - Map<String, Object> sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions(); assertThat(sdkPipelineOptions, hasKey("options")); - assertEquals(settings, sdkPipelineOptions.get("options")); + Map<String, Object> optionsMap = (Map<String, Object>) sdkPipelineOptions.get("options"); + + assertThat(optionsMap, hasEntry("appName", (Object) "DataflowPipelineTranslatorTest")); + assertThat(optionsMap, hasEntry("project", (Object) "some-project")); + assertThat(optionsMap, + hasEntry("pathValidatorClass", (Object) GcsPathValidator.class.getName())); + assertThat(optionsMap, hasEntry("runner", (Object) DataflowRunner.class.getName())); + assertThat(optionsMap, hasEntry("jobName", (Object) "some-job-name")); + assertThat(optionsMap, hasEntry("tempLocation", (Object) "gs://somebucket/some/path")); + assertThat(optionsMap, + hasEntry("stagingLocation", (Object) "gs://somebucket/some/path/staging")); + assertThat(optionsMap, hasEntry("stableUniqueNames", (Object) "WARNING")); + assertThat(optionsMap, hasEntry("streaming", (Object) false)); + assertThat(optionsMap, hasEntry("numberOfWorkerHarnessThreads", (Object) 0)); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e44cb12/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 1959be5..133ae8a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -34,6 +34,7 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -110,6 +111,7 @@ import org.hamcrest.Description; import org.hamcrest.Matchers; import org.hamcrest.TypeSafeMatcher; import org.joda.time.Instant; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.internal.matchers.ThrowableMessageMatcher; @@ -128,6 +130,11 @@ import org.mockito.stubbing.Answer; @RunWith(JUnit4.class) public class DataflowRunnerTest { + private static final String VALID_STAGING_BUCKET = "gs://valid-bucket/staging"; + private static final String VALID_TEMP_BUCKET = "gs://valid-bucket/temp"; + private static final String VALID_PROFILE_BUCKET = "gs://valid-bucket/profiles"; + private static final String NON_EXISTENT_BUCKET = "gs://non-existent-bucket/location"; + private static final String PROJECT_ID = "some-project"; @Rule @@ -137,6 +144,9 @@ public class DataflowRunnerTest { @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class); + private Dataflow.Projects.Jobs mockJobs; + private GcsUtil mockGcsUtil; + // Asserts that the given Job has all expected fields set. private static void assertValidJob(Job job) { assertNull(job.getId()); @@ -144,6 +154,38 @@ public class DataflowRunnerTest { assertTrue(Pattern.matches("[a-z]([-a-z0-9]*[a-z0-9])?", job.getName())); } + @Before + public void setUp() throws IOException { + this.mockGcsUtil = mock(GcsUtil.class); + when(mockGcsUtil.create(any(GcsPath.class), anyString())) + .then(new Answer<SeekableByteChannel>() { + @Override + public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { + return FileChannel.open( + Files.createTempFile("channel-", ".tmp"), + StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); + } + }); + when(mockGcsUtil.isGcsPatternSupported(anyString())).thenReturn(true); + when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() { + @Override + public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable { + return ImmutableList.of((GcsPath) invocation.getArguments()[0]); + } + }); + when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_STAGING_BUCKET))).thenReturn(true); + when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET))).thenReturn(true); + when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_TEMP_BUCKET + "/staging"))). + thenReturn(true); + when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(VALID_PROFILE_BUCKET))).thenReturn(true); + when(mockGcsUtil.bucketAccessible(GcsPath.fromUri(NON_EXISTENT_BUCKET))).thenReturn(false); + + // The dataflow pipeline attempts to output to this location. + when(mockGcsUtil.bucketAccessible(GcsPath.fromUri("gs://bucket/object"))).thenReturn(true); + + mockJobs = mock(Dataflow.Projects.Jobs.class); + } + private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) { options.setStableUniqueNames(CheckEnabled.ERROR); options.setRunner(DataflowRunner.class); @@ -155,19 +197,16 @@ public class DataflowRunnerTest { return p; } - private static Dataflow buildMockDataflow( - final ArgumentCaptor<Job> jobCaptor) throws IOException { + private Dataflow buildMockDataflow() throws IOException { Dataflow mockDataflowClient = mock(Dataflow.class); Dataflow.Projects mockProjects = mock(Dataflow.Projects.class); - Dataflow.Projects.Jobs mockJobs = mock(Dataflow.Projects.Jobs.class); Dataflow.Projects.Jobs.Create mockRequest = mock(Dataflow.Projects.Jobs.Create.class); Dataflow.Projects.Jobs.List mockList = mock(Dataflow.Projects.Jobs.List.class); when(mockDataflowClient.projects()).thenReturn(mockProjects); when(mockProjects.jobs()).thenReturn(mockJobs); - when(mockJobs.create(eq(PROJECT_ID), jobCaptor.capture())) - .thenReturn(mockRequest); + when(mockJobs.create(eq(PROJECT_ID), isA(Job.class))).thenReturn(mockRequest); when(mockJobs.list(eq(PROJECT_ID))).thenReturn(mockList); when(mockList.setPageToken(anyString())).thenReturn(mockList); when(mockList.execute()) @@ -186,25 +225,17 @@ public class DataflowRunnerTest { return mockDataflowClient; } - /** - * Build a mock {@link GcsUtil} with return values. - * - * @param bucketExist first return value - * @param bucketAccessible next return values - */ - private GcsUtil buildMockGcsUtil(Boolean bucketExist, Boolean... bucketAccessible) - throws IOException { + private GcsUtil buildMockGcsUtil() throws IOException { GcsUtil mockGcsUtil = mock(GcsUtil.class); when(mockGcsUtil.create(any(GcsPath.class), anyString())) .then(new Answer<SeekableByteChannel>() { - @Override - public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { - return FileChannel.open( - Files.createTempFile("channel-", ".tmp"), - StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); - } - }); - + @Override + public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { + return FileChannel.open( + Files.createTempFile("channel-", ".tmp"), + StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); + } + }); when(mockGcsUtil.isGcsPatternSupported(anyString())).thenReturn(true); when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() { @Override @@ -212,26 +243,18 @@ public class DataflowRunnerTest { return ImmutableList.of((GcsPath) invocation.getArguments()[0]); } }); - when(mockGcsUtil.bucketAccessible(any(GcsPath.class))) - .thenReturn(bucketExist, bucketAccessible); return mockGcsUtil; } private DataflowPipelineOptions buildPipelineOptions() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - return buildPipelineOptions(jobCaptor); - } - - private DataflowPipelineOptions buildPipelineOptions( - ArgumentCaptor<Job> jobCaptor) throws IOException { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setRunner(DataflowRunner.class); options.setProject(PROJECT_ID); - options.setTempLocation("gs://somebucket/some/path"); + options.setTempLocation(VALID_TEMP_BUCKET); // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath. options.setFilesToStage(new LinkedList<String>()); - options.setDataflowClient(buildMockDataflow(jobCaptor)); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setDataflowClient(buildMockDataflow()); + options.setGcsUtil(mockGcsUtil); options.setGcpCredential(new TestCredential()); return options; } @@ -271,22 +294,22 @@ public class DataflowRunnerTest { @Test public void testFromOptionsWithUppercaseConvertsToLowercase() throws Exception { String mixedCase = "ThisJobNameHasMixedCase"; - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + DataflowPipelineOptions options = buildPipelineOptions(); options.setJobName(mixedCase); - DataflowRunner runner = DataflowRunner.fromOptions(options); + DataflowRunner.fromOptions(options); assertThat(options.getJobName(), equalTo(mixedCase.toLowerCase())); } @Test public void testRun() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + DataflowPipelineOptions options = buildPipelineOptions(); Pipeline p = buildDataflowPipeline(options); DataflowPipelineJob job = (DataflowPipelineJob) p.run(); assertEquals("newid", job.getJobId()); + + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); assertValidJob(jobCaptor.getValue()); } @@ -317,14 +340,15 @@ public class DataflowRunnerTest { @Test public void testUpdate() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + DataflowPipelineOptions options = buildPipelineOptions(); options.setUpdate(true); options.setJobName("oldJobName"); Pipeline p = buildDataflowPipeline(options); DataflowPipelineJob job = (DataflowPipelineJob) p.run(); assertEquals("newid", job.getJobId()); + + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); assertValidJob(jobCaptor.getValue()); } @@ -378,9 +402,6 @@ public class DataflowRunnerTest { public void testRunWithFiles() throws IOException { // Test that the function DataflowRunner.stageFiles works as // expected. - GcsUtil mockGcsUtil = buildMockGcsUtil(true /* bucket exists */); - final String gcsStaging = "gs://somebucket/some/path"; - final String gcsTemp = "gs://somebucket/some/temp/path"; final String cloudDataflowDataset = "somedataset"; // Create some temporary files. @@ -391,17 +412,16 @@ public class DataflowRunnerTest { String overridePackageName = "alias.txt"; - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setFilesToStage(ImmutableList.of( temp1.getAbsolutePath(), overridePackageName + "=" + temp2.getAbsolutePath())); - options.setStagingLocation(gcsStaging); - options.setTempLocation(gcsTemp); + options.setStagingLocation(VALID_STAGING_BUCKET); + options.setTempLocation(VALID_TEMP_BUCKET); options.setTempDatasetId(cloudDataflowDataset); options.setProject(PROJECT_ID); options.setJobName("job"); - options.setDataflowClient(buildMockDataflow(jobCaptor)); + options.setDataflowClient(buildMockDataflow()); options.setGcsUtil(mockGcsUtil); options.setGcpCredential(new TestCredential()); @@ -410,6 +430,8 @@ public class DataflowRunnerTest { DataflowPipelineJob job = (DataflowPipelineJob) p.run(); assertEquals("newid", job.getJobId()); + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); Job workflowJob = jobCaptor.getValue(); assertValidJob(workflowJob); @@ -424,7 +446,7 @@ public class DataflowRunnerTest { assertEquals(overridePackageName, workflowPackage2.getName()); assertEquals( - "storage.googleapis.com/somebucket/some/temp/path", + GcsPath.fromUri(VALID_TEMP_BUCKET).toResourceName(), workflowJob.getEnvironment().getTempStoragePrefix()); assertEquals( cloudDataflowDataset, @@ -481,15 +503,12 @@ public class DataflowRunnerTest { @Test public void testGcsStagingLocationInitialization() throws Exception { - // Test that the staging location is initialized correctly. - String gcsTemp = "gs://somebucket/some/temp/path"; - // Set temp location (required), and check that staging location is set. DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setTempLocation(gcsTemp); + options.setTempLocation(VALID_TEMP_BUCKET); options.setProject(PROJECT_ID); options.setGcpCredential(new TestCredential()); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setGcsUtil(mockGcsUtil); options.setRunner(DataflowRunner.class); DataflowRunner.fromOptions(options); @@ -499,9 +518,7 @@ public class DataflowRunnerTest { @Test public void testNonGcsFilePathInReadFailure() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); + Pipeline p = buildDataflowPipeline(buildPipelineOptions()); p.apply("ReadMyNonGcsFile", TextIO.Read.from(tmpFolder.newFile().getPath())); thrown.expectCause(Matchers.allOf( @@ -509,12 +526,16 @@ public class DataflowRunnerTest { ThrowableMessageMatcher.hasMessage( containsString("expected a valid 'gs://' path but was given")))); p.run(); + + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); assertValidJob(jobCaptor.getValue()); } @Test public void testNonGcsFilePathInWriteFailure() throws IOException { Pipeline p = buildDataflowPipeline(buildPipelineOptions()); + PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object")); thrown.expect(IllegalArgumentException.class); @@ -524,15 +545,16 @@ public class DataflowRunnerTest { @Test public void testMultiSlashGcsFileReadPath() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); + Pipeline p = buildDataflowPipeline(buildPipelineOptions()); p.apply("ReadInvalidGcsFile", TextIO.Read.from("gs://bucket/tmp//file")); thrown.expectCause(Matchers.allOf( instanceOf(IllegalArgumentException.class), ThrowableMessageMatcher.hasMessage(containsString("consecutive slashes")))); p.run(); + + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); assertValidJob(jobCaptor.getValue()); } @@ -548,22 +570,21 @@ public class DataflowRunnerTest { @Test public void testInvalidGcpTempLocation() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + DataflowPipelineOptions options = buildPipelineOptions(); options.setGcpTempLocation("file://temp/location"); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(containsString("expected a valid 'gs://' path but was given")); DataflowRunner.fromOptions(options); + + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); assertValidJob(jobCaptor.getValue()); } @Test public void testNonGcsTempLocation() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + DataflowPipelineOptions options = buildPipelineOptions(); options.setTempLocation("file://temp/location"); thrown.expect(IllegalArgumentException.class); @@ -592,39 +613,68 @@ public class DataflowRunnerTest { } @Test - public void testNonExistentTempLocation() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + public void testInvalidProfileLocation() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setSaveProfilesToGcs("file://my/staging/location"); + try { + DataflowRunner.fromOptions(options); + fail("fromOptions should have failed"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given")); + } + options.setSaveProfilesToGcs("my/staging/location"); + try { + DataflowRunner.fromOptions(options); + fail("fromOptions should have failed"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given")); + } + } - GcsUtil mockGcsUtil = - buildMockGcsUtil(false /* temp bucket exists */, true /* staging bucket exists */); - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); - options.setGcsUtil(mockGcsUtil); - options.setGcpTempLocation("gs://non-existent-bucket/location"); + @Test + public void testNonExistentTempLocation() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setGcpTempLocation(NON_EXISTENT_BUCKET); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(containsString( - "Output path does not exist or is not writeable: gs://non-existent-bucket/location")); + "Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET)); DataflowRunner.fromOptions(options); + + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); assertValidJob(jobCaptor.getValue()); } @Test public void testNonExistentStagingLocation() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setStagingLocation(NON_EXISTENT_BUCKET); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString( + "Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET)); + DataflowRunner.fromOptions(options); + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); + assertValidJob(jobCaptor.getValue()); + } - GcsUtil mockGcsUtil = - buildMockGcsUtil(true /* temp bucket exists */, false /* staging bucket exists */); - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); - options.setGcpTempLocation(options.getTempLocation()); // bypass validation for GcpTempLocation - options.setGcsUtil(mockGcsUtil); - options.setStagingLocation("gs://non-existent-bucket/location"); + @Test + public void testNonExistentProfileLocation() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setSaveProfilesToGcs(NON_EXISTENT_BUCKET); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(containsString( - "Output path does not exist or is not writeable: gs://non-existent-bucket/location")); + "Output path does not exist or is not writeable: " + NON_EXISTENT_BUCKET)); DataflowRunner.fromOptions(options); + + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); assertValidJob(jobCaptor.getValue()); - } + } @Test public void testNoProjectFails() { @@ -648,8 +698,8 @@ public class DataflowRunnerTest { options.setRunner(DataflowRunner.class); options.setProject("foo-12345"); - options.setGcpTempLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setGcpTempLocation(VALID_TEMP_BUCKET); + options.setGcsUtil(mockGcsUtil); options.setGcpCredential(new TestCredential()); DataflowRunner.fromOptions(options); @@ -661,8 +711,8 @@ public class DataflowRunnerTest { options.setRunner(DataflowRunner.class); options.setProject("google.com:some-project-12345"); - options.setGcpTempLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setGcpTempLocation(VALID_TEMP_BUCKET); + options.setGcsUtil(mockGcsUtil); options.setGcpCredential(new TestCredential()); DataflowRunner.fromOptions(options); @@ -674,8 +724,8 @@ public class DataflowRunnerTest { options.setRunner(DataflowRunner.class); options.setProject("12345"); - options.setGcpTempLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setGcpTempLocation(VALID_TEMP_BUCKET); + options.setGcsUtil(mockGcsUtil); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Project ID"); @@ -690,8 +740,8 @@ public class DataflowRunnerTest { options.setRunner(DataflowRunner.class); options.setProject("some project"); - options.setGcpTempLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setGcpTempLocation(VALID_TEMP_BUCKET); + options.setGcsUtil(mockGcsUtil); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Project ID"); @@ -706,8 +756,8 @@ public class DataflowRunnerTest { options.setRunner(DataflowRunner.class); options.setProject("foo-12345"); - options.setTempLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setGcpTempLocation(VALID_TEMP_BUCKET); + options.setGcsUtil(mockGcsUtil); options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1); @@ -731,25 +781,34 @@ public class DataflowRunnerTest { } @Test - public void testStagingLocationAndNoTempLocationSucceeds() throws Exception { + public void testGcpTempAndNoTempLocationSucceeds() throws Exception { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setRunner(DataflowRunner.class); options.setGcpCredential(new TestCredential()); options.setProject("foo-project"); - options.setGcpTempLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setGcpTempLocation(VALID_TEMP_BUCKET); + options.setGcsUtil(mockGcsUtil); DataflowRunner.fromOptions(options); } @Test - public void testTempLocationAndNoStagingLocationSucceeds() throws Exception { + public void testTempLocationAndNoGcpTempLocationSucceeds() throws Exception { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setRunner(DataflowRunner.class); options.setGcpCredential(new TestCredential()); options.setProject("foo-project"); - options.setTempLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setTempLocation(VALID_TEMP_BUCKET); + options.setGcsUtil(mockGcsUtil); + + DataflowRunner.fromOptions(options); + } + + + @Test + public void testValidProfileLocation() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setSaveProfilesToGcs(VALID_PROFILE_BUCKET); DataflowRunner.fromOptions(options); } @@ -855,10 +914,7 @@ public class DataflowRunnerTest { @Test public void testTransformTranslatorMissing() throws IOException { - // Test that we throw if we don't provide a translation. - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + DataflowPipelineOptions options = buildPipelineOptions(); Pipeline p = Pipeline.create(options); p.apply(Create.of(Arrays.asList(1, 2, 3))) @@ -869,6 +925,9 @@ public class DataflowRunnerTest { DataflowPipelineTranslator.fromOptions(options) .translate( p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList()); + + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); assertValidJob(jobCaptor.getValue()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e44cb12/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java index 87c74a4..299f3c8 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java @@ -17,8 +17,8 @@ */ package org.apache.beam.runners.dataflow.options; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -38,9 +38,9 @@ public class DataflowProfilingOptionsTest { @Test public void testOptionsObject() throws Exception { DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] { - "--enableProfilingAgent", "--profilingAgentConfiguration={\"interval\": 21}"}) + "--saveProfilesToGcs=path", "--profilingAgentConfiguration={\"interval\": 21}"}) .as(DataflowPipelineOptions.class); - assertTrue(options.getEnableProfilingAgent()); + assertThat(options.getSaveProfilesToGcs(), equalTo("path")); String json = MAPPER.writeValueAsString(options); assertThat(json, Matchers.containsString(