Update the Default Pipeline Runner Select the InProcessRunner if it is on the classpath, and throw an exception otherwise.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a3ffd510 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a3ffd510 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a3ffd510 Branch: refs/heads/master Commit: a3ffd510896626019723294931a4c3763faf43af Parents: 816a3bf Author: Thomas Groh <tg...@google.com> Authored: Wed May 18 16:56:06 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Jun 14 09:57:17 2016 -0700 ---------------------------------------------------------------------- examples/java/pom.xml | 7 ++ runners/google-cloud-dataflow-java/pom.xml | 11 +++ sdks/java/core/pom.xml | 3 + .../beam/sdk/options/PipelineOptions.java | 31 +++++++- .../sdk/options/PipelineOptionsFactoryTest.java | 79 +++++++++++++++----- .../beam/sdk/options/PipelineOptionsTest.java | 8 -- .../options/PipelineOptionsValidatorTest.java | 15 ++++ .../sdk/runners/DirectPipelineRunnerTest.java | 1 + .../beam/sdk/testing/TestPipelineTest.java | 5 +- sdks/java/extensions/join-library/pom.xml | 7 ++ sdks/java/io/google-cloud-platform/pom.xml | 7 ++ sdks/java/io/hdfs/pom.xml | 7 ++ sdks/java/io/kafka/pom.xml | 7 ++ sdks/java/java8tests/pom.xml | 7 ++ .../beam/sdk/transforms/WithKeysJava8Test.java | 3 +- .../main/resources/archetype-resources/pom.xml | 6 ++ 16 files changed, 170 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/examples/java/pom.xml ---------------------------------------------------------------------- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 3d81338..5211b80 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -49,6 +49,7 @@ <artifactId>maven-surefire-plugin</artifactId> <configuration> <systemPropertyVariables> + <beamUseDummyRunner></beamUseDummyRunner> <beamTestPipelineOptions> </beamTestPipelineOptions> </systemPropertyVariables> @@ -213,6 +214,12 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index a6dfae3..6d8e94b 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -84,6 +84,17 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <beamTestPipelineOptions></beamTestPipelineOptions> + <beamUseDummyRunner>true</beamUseDummyRunner> + </systemPropertyVariables> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 372a913..c559cff 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -129,6 +129,9 @@ <excludedGroups> org.apache.beam.sdk.testing.NeedsRunner </excludedGroups> + <systemPropertyVariables> + <beamUseDummyRunner>true</beamUseDummyRunner> + </systemPropertyVariables> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index a2f38ed..b1b5280 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -21,11 +21,11 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer; import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer; import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.Context; import org.apache.beam.sdk.transforms.display.HasDisplayData; + import com.google.auto.service.AutoService; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -225,7 +225,7 @@ public interface PipelineOptions extends HasDisplayData { @Description("The pipeline runner that will be used to execute the pipeline. " + "For registered runners, the class name can be specified, otherwise the fully " + "qualified name needs to be specified.") - @Default.Class(DirectPipelineRunner.class) + @Default.InstanceFactory(DirectRunner.class) Class<? extends PipelineRunner<?>> getRunner(); void setRunner(Class<? extends PipelineRunner<?>> kls); @@ -262,4 +262,31 @@ public interface PipelineOptions extends HasDisplayData { @Description("A pipeline level default location for storing temporary files.") String getTempLocation(); void setTempLocation(String value); + + /** + * A {@link DefaultValueFactory} that obtains the class of the {@code DirectRunner} if it exists + * on the classpath, and throws an exception otherwise. + * + * <p>As the {@code DirectRunner} is in an independent module, it cannot be directly referenced + * as the {@link Default}. However, it should still be used if available, and a user is required + * to explicitly set the {@code --runner} property if they wish to use an alternative runner. + */ + class DirectRunner implements DefaultValueFactory<Class<? extends PipelineRunner>> { + @Override + public Class<? extends PipelineRunner> create(PipelineOptions options) { + try { + @SuppressWarnings({"unchecked", "rawtypes"}) + Class<? extends PipelineRunner> direct = (Class<? extends PipelineRunner>) Class.forName( + "org.apache.beam.runners.direct.InProcessPipelineRunner"); + return direct; + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(String.format( + "No Runner was specified and the DirectRunner was not found on the classpath.%n" + + "Specify a runner by either:%n" + + " Explicitly specifying a runner by providing the 'runner' property%n" + + " Adding the DirectRunner to the classpath%n" + + " Calling 'PipelineOptions.setRunner(PipelineRunner)' directly")); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index 62c6909..8b8337e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -29,11 +29,13 @@ import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; +import org.apache.beam.sdk.testing.CrashingRunner; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.RestoreSystemProperties; +import com.google.auto.service.AutoService; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -59,8 +61,9 @@ import java.util.Set; /** Tests for {@link PipelineOptionsFactory}. */ @RunWith(JUnit4.class) public class PipelineOptionsFactoryTest { - private static final Class<? extends PipelineRunner<?>> DEFAULT_RUNNER_CLASS = - DirectPipelineRunner.class; + private static final String DEFAULT_RUNNER_NAME = "DirectRunner"; + private static final Class<? extends PipelineRunner> REGISTERED_RUNNER = + RegisteredTestRunner.class; @Rule public ExpectedException expectedException = ExpectedException.none(); @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); @@ -68,14 +71,13 @@ public class PipelineOptionsFactoryTest { @Test public void testAutomaticRegistrationOfPipelineOptions() { - assertTrue(PipelineOptionsFactory.getRegisteredOptions().contains(DirectPipelineOptions.class)); + assertTrue(PipelineOptionsFactory.getRegisteredOptions().contains(RegisteredTestOptions.class)); } @Test public void testAutomaticRegistrationOfRunners() { - assertEquals( - DEFAULT_RUNNER_CLASS, - PipelineOptionsFactory.getRegisteredRunners().get(DEFAULT_RUNNER_CLASS.getSimpleName())); + assertEquals(REGISTERED_RUNNER, + PipelineOptionsFactory.getRegisteredRunners().get(REGISTERED_RUNNER.getSimpleName())); } @Test @@ -85,7 +87,7 @@ public class PipelineOptionsFactoryTest { } /** A simple test interface. */ - public static interface TestPipelineOptions extends PipelineOptions { + public interface TestPipelineOptions extends PipelineOptions { String getTestPipelineOption(); void setTestPipelineOption(String value); } @@ -810,18 +812,18 @@ public class PipelineOptionsFactoryTest { @Test public void testSettingRunner() { - String[] args = new String[] {"--runner=DirectPipelineRunner"}; + String[] args = new String[] {"--runner=" + RegisteredTestRunner.class.getSimpleName()}; PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); - assertEquals(DirectPipelineRunner.class, options.getRunner()); + assertEquals(RegisteredTestRunner.class, options.getRunner()); } @Test public void testSettingRunnerFullName() { String[] args = - new String[] {String.format("--runner=%s", DirectPipelineRunner.class.getName())}; + new String[] {String.format("--runner=%s", CrashingRunner.class.getName())}; PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); - assertEquals(opts.getRunner(), DirectPipelineRunner.class); + assertEquals(opts.getRunner(), CrashingRunner.class); } @@ -832,7 +834,7 @@ public class PipelineOptionsFactoryTest { expectedException.expectMessage( "Unknown 'runner' specified 'UnknownRunner', supported " + "pipeline runners"); Set<String> registeredRunners = PipelineOptionsFactory.getRegisteredRunners().keySet(); - assertThat(registeredRunners, hasItem(DEFAULT_RUNNER_CLASS.getSimpleName())); + assertThat(registeredRunners, hasItem(REGISTERED_RUNNER.getSimpleName())); for (String registeredRunner : registeredRunners) { expectedException.expectMessage(registeredRunner); } @@ -923,7 +925,7 @@ public class PipelineOptionsFactoryTest { public void testEmptyArgumentIsIgnored() { String[] args = new String[] { - "", "--string=100", "", "", "--runner=" + DEFAULT_RUNNER_CLASS.getSimpleName() + "", "--string=100", "", "", "--runner=" + REGISTERED_RUNNER.getSimpleName() }; PipelineOptionsFactory.fromArgs(args).as(Objects.class); } @@ -932,7 +934,7 @@ public class PipelineOptionsFactoryTest { public void testNullArgumentIsIgnored() { String[] args = new String[] { - "--string=100", null, null, "--runner=" + DEFAULT_RUNNER_CLASS.getSimpleName() + "--string=100", null, null, "--runner=" + REGISTERED_RUNNER.getSimpleName() }; PipelineOptionsFactory.fromArgs(args).as(Objects.class); } @@ -985,7 +987,7 @@ public class PipelineOptionsFactoryTest { String output = new String(baos.toByteArray()); assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions")); assertThat(output, containsString("--runner")); - assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName())); + assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME)); assertThat(output, containsString("The pipeline runner that will be used to execute the pipeline.")); } @@ -1000,7 +1002,7 @@ public class PipelineOptionsFactoryTest { String output = new String(baos.toByteArray()); assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions")); assertThat(output, containsString("--runner")); - assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName())); + assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME)); assertThat(output, containsString("The pipeline runner that will be used to execute the pipeline.")); } @@ -1015,7 +1017,7 @@ public class PipelineOptionsFactoryTest { String output = new String(baos.toByteArray()); assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions")); assertThat(output, containsString("--runner")); - assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName())); + assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME)); assertThat(output, containsString("The pipeline runner that will be used to execute the pipeline.")); } @@ -1109,7 +1111,7 @@ public class PipelineOptionsFactoryTest { String output = new String(baos.toByteArray()); assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions")); assertThat(output, containsString("--runner")); - assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName())); + assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME)); assertThat(output, containsString("The pipeline runner that will be used to execute the pipeline.")); } @@ -1147,4 +1149,43 @@ public class PipelineOptionsFactoryTest { thread.join(); assertEquals(cl, classLoader[0]); } + + private static class RegisteredTestRunner extends PipelineRunner<PipelineResult> { + public static PipelineRunner fromOptions(PipelineOptions options) { + return new RegisteredTestRunner(); + } + + public PipelineResult run(Pipeline p) { + throw new IllegalArgumentException(); + } + } + + + /** + * A {@link PipelineRunnerRegistrar} to demonstrate default {@link PipelineRunner} registration. + */ + @AutoService(PipelineRunnerRegistrar.class) + public static class RegisteredTestRunnerRegistrar implements PipelineRunnerRegistrar { + @Override + public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { + return ImmutableList.<Class<? extends PipelineRunner<?>>>of(RegisteredTestRunner.class); + } + } + + private interface RegisteredTestOptions extends PipelineOptions { + Object getRegisteredExampleFooBar(); + void setRegisteredExampleFooBar(Object registeredExampleFooBar); + } + + + /** + * A {@link PipelineOptionsRegistrar} to demonstrate default {@link PipelineOptions} registration. + */ + @AutoService(PipelineOptionsRegistrar.class) + public static class RegisteredTestOptionsRegistrar implements PipelineOptionsRegistrar { + @Override + public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { + return ImmutableList.<Class<? extends PipelineOptions>>of(RegisteredTestOptions.class); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java index dfda528..687271c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java @@ -17,15 +17,12 @@ */ package org.apache.beam.sdk.options; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import org.apache.beam.sdk.runners.DirectPipelineRunner; - import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -86,11 +83,6 @@ public class PipelineOptionsTest { } @Test - public void testDefaultRunnerIsSet() { - assertEquals(DirectPipelineRunner.class, PipelineOptionsFactory.create().getRunner()); - } - - @Test public void testCloneAs() throws IOException { DerivedTestOptions options = PipelineOptionsFactory.create().as(DerivedTestOptions.class); options.setBaseValue(Lists.<Boolean>newArrayList()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java index 0250bd1..2b684a8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.options; +import org.apache.beam.sdk.testing.CrashingRunner; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -40,6 +42,7 @@ public class PipelineOptionsValidatorTest { @Test public void testWhenRequiredOptionIsSet() { Required required = PipelineOptionsFactory.as(Required.class); + required.setRunner(CrashingRunner.class); required.setObject("blah"); PipelineOptionsValidator.validate(Required.class, required); } @@ -114,6 +117,7 @@ public class PipelineOptionsValidatorTest { GroupRequired groupRequired = PipelineOptionsFactory.as(GroupRequired.class); groupRequired.setFoo("foo"); groupRequired.setBar(null); + groupRequired.setRunner(CrashingRunner.class); PipelineOptionsValidator.validate(GroupRequired.class, groupRequired); @@ -126,6 +130,7 @@ public class PipelineOptionsValidatorTest { @Test public void testWhenNoneOfRequiredGroupIsSetThrowsException() { GroupRequired groupRequired = PipelineOptionsFactory.as(GroupRequired.class); + groupRequired.setRunner(CrashingRunner.class); expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Missing required value for group [ham]"); @@ -155,6 +160,7 @@ public class PipelineOptionsValidatorTest { public void testWhenOneOfMultipleRequiredGroupsIsSetIsValid() { MultiGroupRequired multiGroupRequired = PipelineOptionsFactory.as(MultiGroupRequired.class); + multiGroupRequired.setRunner(CrashingRunner.class); multiGroupRequired.setFoo("eggs"); PipelineOptionsValidator.validate(MultiGroupRequired.class, multiGroupRequired); @@ -194,6 +200,7 @@ public class PipelineOptionsValidatorTest { public void testWhenOptionIsDefinedInMultipleSuperInterfacesAndIsNotPresentFailsRequirement() { RightOptions rightOptions = PipelineOptionsFactory.as(RightOptions.class); rightOptions.setBoth("foo"); + rightOptions.setRunner(CrashingRunner.class); expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Missing required value for group"); @@ -212,6 +219,8 @@ public class PipelineOptionsValidatorTest { leftOpts.setFoo("Untrue"); leftOpts.setBoth("Raise the"); + rightOpts.setRunner(CrashingRunner.class); + leftOpts.setRunner(CrashingRunner.class); PipelineOptionsValidator.validate(JoinedOptions.class, rightOpts); PipelineOptionsValidator.validate(JoinedOptions.class, leftOpts); } @@ -226,6 +235,8 @@ public class PipelineOptionsValidatorTest { leftOpts.setFoo("Untrue"); leftOpts.setBoth("Raise the"); + rightOpts.setRunner(CrashingRunner.class); + leftOpts.setRunner(CrashingRunner.class); PipelineOptionsValidator.validate(RightOptions.class, leftOpts); PipelineOptionsValidator.validate(LeftOptions.class, rightOpts); } @@ -234,6 +245,7 @@ public class PipelineOptionsValidatorTest { public void testWhenOptionIsDefinedOnMultipleInterfacesOnlyListedOnceWhenNotPresent() { JoinedOptions options = PipelineOptionsFactory.as(JoinedOptions.class); options.setFoo("Hello"); + options.setRunner(CrashingRunner.class); expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("required value for group [both]"); @@ -273,6 +285,7 @@ public class PipelineOptionsValidatorTest { public void testSuperInterfaceRequiredOptionsAlsoRequiredInSubInterface() { SubOptions subOpts = PipelineOptionsFactory.as(SubOptions.class); subOpts.setFoo("Bar"); + subOpts.setRunner(CrashingRunner.class); expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("otherSuper"); @@ -288,6 +301,7 @@ public class PipelineOptionsValidatorTest { SubOptions opts = PipelineOptionsFactory.as(SubOptions.class); opts.setFoo("Foo"); opts.setSuperclassObj("Hello world"); + opts.setRunner(CrashingRunner.class); // Valid SubOptions, but invalid SuperOptions PipelineOptionsValidator.validate(SubOptions.class, opts); @@ -304,6 +318,7 @@ public class PipelineOptionsValidatorTest { subOpts.setFoo("bar"); subOpts.setBar("bar"); subOpts.setSuperclassObj("SuperDuper"); + subOpts.setRunner(CrashingRunner.class); PipelineOptionsValidator.validate(SubOptions.class, subOpts); PipelineOptionsValidator.validate(SuperOptions.class, subOpts); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java index ae3b4e0..edf6996 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java @@ -68,6 +68,7 @@ public class DirectPipelineRunnerTest implements Serializable { @Test public void testToString() { PipelineOptions options = PipelineOptionsFactory.create(); + options.setRunner(DirectPipelineRunner.class); DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options); assertEquals("DirectPipelineRunner#" + runner.hashCode(), runner.toString()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java index b741e2e..043c06c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.Create; import com.fasterxml.jackson.databind.ObjectMapper; @@ -62,13 +61,13 @@ public class TestPipelineTest { public void testCreationOfPipelineOptions() throws Exception { ObjectMapper mapper = new ObjectMapper(); String stringOptions = mapper.writeValueAsString(new String[]{ - "--runner=DirectPipelineRunner", + "--runner=org.apache.beam.sdk.testing.CrashingRunner", "--project=testProject" }); System.getProperties().put("beamTestPipelineOptions", stringOptions); GcpOptions options = TestPipeline.testingPipelineOptions().as(GcpOptions.class); - assertEquals(DirectPipelineRunner.class, options.getRunner()); + assertEquals(CrashingRunner.class, options.getRunner()); assertEquals(options.getProject(), "testProject"); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/extensions/join-library/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml index 828fcce..0fec148 100644 --- a/sdks/java/extensions/join-library/pom.xml +++ b/sdks/java/extensions/join-library/pom.xml @@ -63,6 +63,13 @@ <!-- Dependency for tests --> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/io/google-cloud-platform/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index 962c7b0..f567261 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -88,6 +88,13 @@ </dependency> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/io/hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml index e99dbd7..9c30792 100644 --- a/sdks/java/io/hdfs/pom.xml +++ b/sdks/java/io/hdfs/pom.xml @@ -66,6 +66,13 @@ <!-- test dependencies --> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/io/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml index be19a83..76c0eb6 100644 --- a/sdks/java/io/kafka/pom.xml +++ b/sdks/java/io/kafka/pom.xml @@ -71,6 +71,13 @@ <!-- test dependencies--> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/java8tests/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/pom.xml b/sdks/java/java8tests/pom.xml index 48bf682..8e20228 100644 --- a/sdks/java/java8tests/pom.xml +++ b/sdks/java/java8tests/pom.xml @@ -96,6 +96,13 @@ </dependency> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java index a0d1a63..1ffb147 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.transforms; -import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; @@ -65,7 +64,7 @@ public class WithKeysJava8Test { values.apply("ApplyKeysWithWithKeys", WithKeys.of((String s) -> Integer.valueOf(s))); - thrown.expect(PipelineExecutionException.class); + thrown.expect(IllegalStateException.class); thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys"); thrown.expectMessage("Cannot provide a coder for type variable K"); thrown.expectMessage("the actual type is unknown due to erasure."); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml index d86b9cc..2b2e24b 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml @@ -101,6 +101,12 @@ <!-- Adds a dependency on a specific version of the Dataflow runnner. --> <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>[0-incubating, 2-incubating)</version> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> <version>[0-incubating, 2-incubating)</version> </dependency>