Repository: incubator-beam Updated Branches: refs/heads/master e0cae9fb6 -> 748b0c8da
Remove Dataflow runner references in WordCount examples. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a554f062 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a554f062 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a554f062 Branch: refs/heads/master Commit: a554f062f32d0807a6840928eb0685943a808d42 Parents: e0cae9f Author: Pei He <pe...@google.com> Authored: Fri Jun 17 15:39:59 2016 -0700 Committer: Davor Bonaci <da...@google.com> Committed: Wed Jun 22 18:49:54 2016 -0700 ---------------------------------------------------------------------- examples/java/pom.xml | 26 ++++++++--------- .../apache/beam/examples/MinimalWordCount.java | 26 +++++++++++------ examples/java8/pom.xml | 20 ++++++++----- .../beam/examples/MinimalWordCountJava8.java | 30 +++++++++++--------- 4 files changed, 60 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a554f062/examples/java/pom.xml ---------------------------------------------------------------------- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index cac9857..223334f 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -221,19 +221,6 @@ </dependency> <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-direct-java</artifactId> - <version>${project.version}</version> - <scope>runtime</scope> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> <groupId>com.google.api-client</groupId> <artifactId>google-api-client</artifactId> </dependency> @@ -294,6 +281,19 @@ </dependency> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>${project.version}</version> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-jdk14</artifactId> <scope>runtime</scope> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a554f062/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java index 6d4bfd4..355a1ff 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java @@ -17,10 +17,9 @@ */ package org.apache.beam.examples; -import org.apache.beam.runners.dataflow.BlockingDataflowRunner; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; @@ -63,13 +62,22 @@ public class MinimalWordCount { // Create a PipelineOptions object. This object lets us set various execution // options for our pipeline, such as the associated Cloud Platform project and the location // in Google Cloud Storage to stage files. - DataflowPipelineOptions options = PipelineOptionsFactory.create() - .as(DataflowPipelineOptions.class); - options.setRunner(BlockingDataflowRunner.class); - // CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud. - options.setProject("SET_YOUR_PROJECT_ID_HERE"); - // CHANGE 2/3: Your Google Cloud Storage path is required for staging local files. - options.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); + PipelineOptions options = PipelineOptionsFactory.create(); + + // In order to run your pipeline, you need to make following runner specific changes: + // + // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner + // or FlinkPipelineRunner. + // CHANGE 2/3: Specify runner-required options. + // For BlockingDataflowRunner, set project and temp location as follows: + // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + // dataflowOptions.setRunner(BlockingDataflowRunner.class); + // dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE"); + // dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); + // For FlinkPipelineRunner, set the runner as follows. See {@code FlinkPipelineOptions} + // for more details. + // options.as(FlinkPipelineOptions.class) + // .setRunner(FlinkPipelineRunner.class); // Create the Pipeline object with the options we defined above. Pipeline p = Pipeline.create(options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a554f062/examples/java8/pom.xml ---------------------------------------------------------------------- diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml index 08b811b..82b1c46 100644 --- a/examples/java8/pom.xml +++ b/examples/java8/pom.xml @@ -117,12 +117,6 @@ </dependency> <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> @@ -185,6 +179,18 @@ <artifactId>google-api-client</artifactId> </dependency> - </dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>${project.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-flink_2.10</artifactId> + <version>${project.version}</version> + <scope>runtime</scope> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a554f062/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java index cf3bbf9..6362b96 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java @@ -17,10 +17,9 @@ */ package org.apache.beam.examples; -import org.apache.beam.runners.dataflow.BlockingDataflowRunner; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Filter; @@ -39,16 +38,21 @@ import java.util.Arrays; public class MinimalWordCountJava8 { public static void main(String[] args) { - DataflowPipelineOptions options = PipelineOptionsFactory.create() - .as(DataflowPipelineOptions.class); - - options.setRunner(BlockingDataflowRunner.class); - - // CHANGE 1 of 3: Your project ID is required in order to run your pipeline on the Google Cloud. - options.setProject("SET_YOUR_PROJECT_ID_HERE"); - - // CHANGE 2 of 3: Your Google Cloud Storage path is required for staging local files. - options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY"); + PipelineOptions options = PipelineOptionsFactory.create(); + // In order to run your pipeline, you need to make following runner specific changes: + // + // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner + // or FlinkPipelineRunner. + // CHANGE 2/3: Specify runner-required options. + // For BlockingDataflowRunner, set project and temp location as follows: + // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + // dataflowOptions.setRunner(BlockingDataflowRunner.class); + // dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE"); + // dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); + // For FlinkPipelineRunner, set the runner as follows. See {@code FlinkPipelineOptions} + // for more details. + // options.as(FlinkPipelineOptions.class) + // .setRunner(FlinkPipelineRunner.class); Pipeline p = Pipeline.create(options); @@ -61,7 +65,7 @@ public class MinimalWordCountJava8 { .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()) .withOutputType(TypeDescriptors.strings())) - // CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to. + // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to. .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); p.run();