http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java new file mode 100644 index 0000000..93304eb --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.cookbook; + +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.BigQueryIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.Validation; +import com.google.cloud.dataflow.sdk.transforms.Aggregator; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; +import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import java.util.ArrayList; +import java.util.List; + +/** + * An example that reads the public 'Shakespeare' data, and for each word in + * the dataset that is over a given length, generates a string containing the + * list of play names in which that word appears, and saves this information + * to a bigquery table. + * + * <p>Concepts: the Combine.perKey transform, which lets you combine the values in a + * key-grouped Collection, and how to use an Aggregator to track information in the + * Monitoring UI. + * + * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output + * table. + * + * <p>To execute this pipeline locally, specify general pipeline configuration: + * <pre>{@code + * --project=YOUR_PROJECT_ID + * } + * </pre> + * and the BigQuery table for the output: + * <pre>{@code + * --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID + * }</pre> + * + * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration: + * <pre>{@code + * --project=YOUR_PROJECT_ID + * --stagingLocation=gs://<STAGING DIRECTORY> + * --runner=BlockingDataflowPipelineRunner + * } + * </pre> + * and the BigQuery table for the output: + * <pre>{@code + * --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID + * }</pre> + * + * <p>The BigQuery input table defaults to {@code publicdata:samples.shakespeare} and can + * be overridden with {@code --input}. + */ +public class CombinePerKeyExamples { + // Use the shakespeare public BigQuery sample + private static final String SHAKESPEARE_TABLE = + "publicdata:samples.shakespeare"; + // We'll track words >= this word length across all plays in the table. + private static final int MIN_WORD_LENGTH = 9; + + /** + * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH, + * outputs word, play_name. + */ + static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> { + private final Aggregator<Long, Long> smallerWords = + createAggregator("smallerWords", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c){ + TableRow row = c.element(); + String playName = (String) row.get("corpus"); + String word = (String) row.get("word"); + if (word.length() >= MIN_WORD_LENGTH) { + c.output(KV.of(word, playName)); + } else { + // Track how many smaller words we're not including. This information will be + // visible in the Monitoring UI. + smallerWords.addValue(1L); + } + } + } + + + /** + * Prepares the data for writing to BigQuery by building a TableRow object + * containing a word with a string listing the plays in which it appeared. + */ + static class FormatShakespeareOutputFn extends DoFn<KV<String, String>, TableRow> { + @Override + public void processElement(ProcessContext c) { + TableRow row = new TableRow() + .set("word", c.element().getKey()) + .set("all_plays", c.element().getValue()); + c.output(row); + } + } + + /** + * Reads the public 'Shakespeare' data, and for each word in the dataset + * over a given length, generates a string containing the list of play names + * in which that word appears. It does this via the Combine.perKey + * transform, with the ConcatWords combine function. + * + * <p>Combine.perKey is similar to a GroupByKey followed by a ParDo, but + * has more restricted semantics that allow it to be executed more + * efficiently. These records are then formatted as BQ table rows. + */ + static class PlaysForWord + extends PTransform<PCollection<TableRow>, PCollection<TableRow>> { + @Override + public PCollection<TableRow> apply(PCollection<TableRow> rows) { + + // row... => <word, play_name> ... + PCollection<KV<String, String>> words = rows.apply( + ParDo.of(new ExtractLargeWordsFn())); + + // word, play_name => word, all_plays ... + PCollection<KV<String, String>> wordAllPlays = + words.apply(Combine.<String, String>perKey( + new ConcatWords())); + + // <word, all_plays>... => row... + PCollection<TableRow> results = wordAllPlays.apply( + ParDo.of(new FormatShakespeareOutputFn())); + + return results; + } + } + + /** + * A 'combine function' used with the Combine.perKey transform. Builds a + * comma-separated string of all input items. So, it will build a string + * containing all the different Shakespeare plays in which the given input + * word has appeared. + */ + public static class ConcatWords implements SerializableFunction<Iterable<String>, String> { + @Override + public String apply(Iterable<String> input) { + StringBuilder all = new StringBuilder(); + for (String item : input) { + if (!item.isEmpty()) { + if (all.length() == 0) { + all.append(item); + } else { + all.append(","); + all.append(item); + } + } + } + return all.toString(); + } + } + + /** + * Options supported by {@link CombinePerKeyExamples}. + * + * <p>Inherits standard configuration options. + */ + private static interface Options extends PipelineOptions { + @Description("Table to read from, specified as " + + "<project_id>:<dataset_id>.<table_id>") + @Default.String(SHAKESPEARE_TABLE) + String getInput(); + void setInput(String value); + + @Description("Table to write to, specified as " + + "<project_id>:<dataset_id>.<table_id>. " + + "The dataset_id must already exist") + @Validation.Required + String getOutput(); + void setOutput(String value); + } + + public static void main(String[] args) + throws Exception { + + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + // Build the table schema for the output table. + List<TableFieldSchema> fields = new ArrayList<>(); + fields.add(new TableFieldSchema().setName("word").setType("STRING")); + fields.add(new TableFieldSchema().setName("all_plays").setType("STRING")); + TableSchema schema = new TableSchema().setFields(fields); + + p.apply(BigQueryIO.Read.from(options.getInput())) + .apply(new PlaysForWord()) + .apply(BigQueryIO.Write + .to(options.getOutput()) + .withSchema(schema) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); + + p.run(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java new file mode 100644 index 0000000..9dddb5d --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.cookbook; + +import static com.google.api.services.datastore.client.DatastoreHelper.getPropertyMap; +import static com.google.api.services.datastore.client.DatastoreHelper.getString; +import static com.google.api.services.datastore.client.DatastoreHelper.makeFilter; +import static com.google.api.services.datastore.client.DatastoreHelper.makeKey; +import static com.google.api.services.datastore.client.DatastoreHelper.makeValue; + +import com.google.api.services.datastore.DatastoreV1.Entity; +import com.google.api.services.datastore.DatastoreV1.Key; +import com.google.api.services.datastore.DatastoreV1.Property; +import com.google.api.services.datastore.DatastoreV1.PropertyFilter; +import com.google.api.services.datastore.DatastoreV1.Query; +import com.google.api.services.datastore.DatastoreV1.Value; +import com.google.cloud.dataflow.examples.WordCount; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.DatastoreIO; +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.Validation; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.MapElements; +import com.google.cloud.dataflow.sdk.transforms.ParDo; + +import java.util.Map; +import java.util.UUID; + +import javax.annotation.Nullable; + +/** + * A WordCount example using DatastoreIO. + * + * <p>This example shows how to use DatastoreIO to read from Datastore and + * write the results to Cloud Storage. Note that this example will write + * data to Datastore, which may incur charge for Datastore operations. + * + * <p>To run this example, users need to use gcloud to get credential for Datastore: + * <pre>{@code + * $ gcloud auth login + * }</pre> + * + * <p>To run this pipeline locally, the following options must be provided: + * <pre>{@code + * --project=YOUR_PROJECT_ID + * --dataset=YOUR_DATASET_ID + * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PATH] + * }</pre> + * + * <p>To run this example using Dataflow service, you must additionally + * provide either {@literal --stagingLocation} or {@literal --tempLocation}, and + * select one of the Dataflow pipeline runners, eg + * {@literal --runner=BlockingDataflowPipelineRunner}. + * + * <p><b>Note:</b> this example creates entities with <i>Ancestor keys</i> to ensure that all + * entities created are in the same entity group. Similarly, the query used to read from the Cloud + * Datastore uses an <i>Ancestor filter</i>. Ancestors are used to ensure strongly consistent + * results in Cloud Datastore. For more information, see the Cloud Datastore documentation on + * <a href="https://cloud.google.com/datastore/docs/concepts/structuring_for_strong_consistency"> + * Structing Data for Strong Consistency</a>. + */ +public class DatastoreWordCount { + + /** + * A DoFn that gets the content of an entity (one line in a + * Shakespeare play) and converts it to a string. + */ + static class GetContentFn extends DoFn<Entity, String> { + @Override + public void processElement(ProcessContext c) { + Map<String, Value> props = getPropertyMap(c.element()); + Value value = props.get("content"); + if (value != null) { + c.output(getString(value)); + } + } + } + + /** + * A helper function to create the ancestor key for all created and queried entities. + * + * <p>We use ancestor keys and ancestor queries for strong consistency. See + * {@link DatastoreWordCount} javadoc for more information. + */ + static Key makeAncestorKey(@Nullable String namespace, String kind) { + Key.Builder keyBuilder = makeKey(kind, "root"); + if (namespace != null) { + keyBuilder.getPartitionIdBuilder().setNamespace(namespace); + } + return keyBuilder.build(); + } + + /** + * A DoFn that creates entity for every line in Shakespeare. + */ + static class CreateEntityFn extends DoFn<String, Entity> { + private final String namespace; + private final String kind; + private final Key ancestorKey; + + CreateEntityFn(String namespace, String kind) { + this.namespace = namespace; + this.kind = kind; + + // Build the ancestor key for all created entities once, including the namespace. + ancestorKey = makeAncestorKey(namespace, kind); + } + + public Entity makeEntity(String content) { + Entity.Builder entityBuilder = Entity.newBuilder(); + + // All created entities have the same ancestor Key. + Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString()); + // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so + // we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added, + // we can simplify this code. + if (namespace != null) { + keyBuilder.getPartitionIdBuilder().setNamespace(namespace); + } + + entityBuilder.setKey(keyBuilder.build()); + entityBuilder.addProperty(Property.newBuilder().setName("content") + .setValue(Value.newBuilder().setStringValue(content))); + return entityBuilder.build(); + } + + @Override + public void processElement(ProcessContext c) { + c.output(makeEntity(c.element())); + } + } + + /** + * Options supported by {@link DatastoreWordCount}. + * + * <p>Inherits standard configuration options. + */ + public static interface Options extends PipelineOptions { + @Description("Path of the file to read from and store to Datastore") + @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") + String getInput(); + void setInput(String value); + + @Description("Path of the file to write to") + @Validation.Required + String getOutput(); + void setOutput(String value); + + @Description("Dataset ID to read from datastore") + @Validation.Required + String getDataset(); + void setDataset(String value); + + @Description("Dataset entity kind") + @Default.String("shakespeare-demo") + String getKind(); + void setKind(String value); + + @Description("Dataset namespace") + String getNamespace(); + void setNamespace(@Nullable String value); + + @Description("Read an existing dataset, do not write first") + boolean isReadOnly(); + void setReadOnly(boolean value); + + @Description("Number of output shards") + @Default.Integer(0) // If the system should choose automatically. + int getNumShards(); + void setNumShards(int value); + } + + /** + * An example that creates a pipeline to populate DatastoreIO from a + * text input. Forces use of DirectPipelineRunner for local execution mode. + */ + public static void writeDataToDatastore(Options options) { + Pipeline p = Pipeline.create(options); + p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) + .apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind()))) + .apply(DatastoreIO.writeTo(options.getDataset())); + + p.run(); + } + + /** + * Build a Cloud Datastore ancestor query for the specified {@link Options#getNamespace} and + * {@link Options#getKind}. + * + * <p>We use ancestor keys and ancestor queries for strong consistency. See + * {@link DatastoreWordCount} javadoc for more information. + * + * @see <a href="https://cloud.google.com/datastore/docs/concepts/queries#Datastore_Ancestor_filters">Ancestor filters</a> + */ + static Query makeAncestorKindQuery(Options options) { + Query.Builder q = Query.newBuilder(); + q.addKindBuilder().setName(options.getKind()); + q.setFilter(makeFilter( + "__key__", + PropertyFilter.Operator.HAS_ANCESTOR, + makeValue(makeAncestorKey(options.getNamespace(), options.getKind())))); + return q.build(); + } + + /** + * An example that creates a pipeline to do DatastoreIO.Read from Datastore. + */ + public static void readDataFromDatastore(Options options) { + Query query = makeAncestorKindQuery(options); + + // For Datastore sources, the read namespace can be set on the entire query. + DatastoreIO.Source source = DatastoreIO.source() + .withDataset(options.getDataset()) + .withQuery(query) + .withNamespace(options.getNamespace()); + + Pipeline p = Pipeline.create(options); + p.apply("ReadShakespeareFromDatastore", Read.from(source)) + .apply("StringifyEntity", ParDo.of(new GetContentFn())) + .apply("CountWords", new WordCount.CountWords()) + .apply("PrintWordCount", MapElements.via(new WordCount.FormatAsTextFn())) + .apply("WriteLines", TextIO.Write.to(options.getOutput()) + .withNumShards(options.getNumShards())); + p.run(); + } + + /** + * An example to demo how to use {@link DatastoreIO}. The runner here is + * customizable, which means users could pass either {@code DirectPipelineRunner} + * or {@code DataflowPipelineRunner} in the pipeline options. + */ + public static void main(String args[]) { + // The options are used in two places, for Dataflow service, and + // building DatastoreIO.Read object + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + + if (!options.isReadOnly()) { + // First example: write data to Datastore for reading later. + // + // NOTE: this write does not delete any existing Entities in the Datastore, so if run + // multiple times with the same output dataset, there may be duplicate entries. The + // Datastore Query tool in the Google Developers Console can be used to inspect or erase all + // entries with a particular namespace and/or kind. + DatastoreWordCount.writeDataToDatastore(options); + } + + // Second example: do parallel read from Datastore. + DatastoreWordCount.readDataFromDatastore(options); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java new file mode 100644 index 0000000..40d1f76 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.cookbook; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.DefaultValueFactory; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; +import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; + +/** + * This example uses as input Shakespeare's plays as plaintext files, and will remove any + * duplicate lines across all the files. (The output does not preserve any input order). + * + * <p>Concepts: the RemoveDuplicates transform, and how to wire transforms together. + * Demonstrates {@link com.google.cloud.dataflow.sdk.io.TextIO.Read}/ + * {@link RemoveDuplicates}/{@link com.google.cloud.dataflow.sdk.io.TextIO.Write}. + * + * <p>To execute this pipeline locally, specify general pipeline configuration: + * --project=YOUR_PROJECT_ID + * and a local output file or output prefix on GCS: + * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX] + * + * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration: + * --project=YOUR_PROJECT_ID + * --stagingLocation=gs://YOUR_STAGING_DIRECTORY + * --runner=BlockingDataflowPipelineRunner + * and an output prefix on GCS: + * --output=gs://YOUR_OUTPUT_PREFIX + * + * <p>The input defaults to {@code gs://dataflow-samples/shakespeare/*} and can be + * overridden with {@code --input}. + */ +public class DeDupExample { + + /** + * Options supported by {@link DeDupExample}. + * + * <p>Inherits standard configuration options. + */ + private static interface Options extends PipelineOptions { + @Description("Path to the directory or GCS prefix containing files to read from") + @Default.String("gs://dataflow-samples/shakespeare/*") + String getInput(); + void setInput(String value); + + @Description("Path of the file to write to") + @Default.InstanceFactory(OutputFactory.class) + String getOutput(); + void setOutput(String value); + + /** Returns gs://${STAGING_LOCATION}/"deduped.txt". */ + public static class OutputFactory implements DefaultValueFactory<String> { + @Override + public String create(PipelineOptions options) { + DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + if (dataflowOptions.getStagingLocation() != null) { + return GcsPath.fromUri(dataflowOptions.getStagingLocation()) + .resolve("deduped.txt").toString(); + } else { + throw new IllegalArgumentException("Must specify --output or --stagingLocation"); + } + } + } + } + + + public static void main(String[] args) + throws Exception { + + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) + .apply(RemoveDuplicates.<String>create()) + .apply(TextIO.Write.named("DedupedShakespeare") + .to(options.getOutput())); + + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java new file mode 100644 index 0000000..269ee6a --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.cookbook; + +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.BigQueryIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.Validation; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.Mean; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionView; + +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Logger; + +/** + * This is an example that demonstrates several approaches to filtering, and use of the Mean + * transform. It shows how to dynamically set parameters by defining and using new pipeline options, + * and how to use a value derived by the pipeline. + * + * <p>Concepts: The Mean transform; Options configuration; using pipeline-derived data as a side + * input; approaches to filtering, selection, and projection. + * + * <p>The example reads public samples of weather data from BigQuery. It performs a + * projection on the data, finds the global mean of the temperature readings, filters on readings + * for a single given month, and then outputs only data (for that month) that has a mean temp + * smaller than the derived global mean. +* + * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output + * table. + * + * <p>To execute this pipeline locally, specify general pipeline configuration: + * <pre>{@code + * --project=YOUR_PROJECT_ID + * } + * </pre> + * and the BigQuery table for the output: + * <pre>{@code + * --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID + * [--monthFilter=<month_number>] + * } + * </pre> + * where optional parameter {@code --monthFilter} is set to a number 1-12. + * + * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration: + * <pre>{@code + * --project=YOUR_PROJECT_ID + * --stagingLocation=gs://YOUR_STAGING_DIRECTORY + * --runner=BlockingDataflowPipelineRunner + * } + * </pre> + * and the BigQuery table for the output: + * <pre>{@code + * --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID + * [--monthFilter=<month_number>] + * } + * </pre> + * where optional parameter {@code --monthFilter} is set to a number 1-12. + * + * <p>The BigQuery input table defaults to {@code clouddataflow-readonly:samples.weather_stations} + * and can be overridden with {@code --input}. + */ +public class FilterExamples { + // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod. + private static final String WEATHER_SAMPLES_TABLE = + "clouddataflow-readonly:samples.weather_stations"; + static final Logger LOG = Logger.getLogger(FilterExamples.class.getName()); + static final int MONTH_TO_FILTER = 7; + + /** + * Examines each row in the input table. Outputs only the subset of the cells this example + * is interested in-- the mean_temp and year, month, and day-- as a bigquery table row. + */ + static class ProjectionFn extends DoFn<TableRow, TableRow> { + @Override + public void processElement(ProcessContext c){ + TableRow row = c.element(); + // Grab year, month, day, mean_temp from the row + Integer year = Integer.parseInt((String) row.get("year")); + Integer month = Integer.parseInt((String) row.get("month")); + Integer day = Integer.parseInt((String) row.get("day")); + Double meanTemp = Double.parseDouble(row.get("mean_temp").toString()); + // Prepares the data for writing to BigQuery by building a TableRow object + TableRow outRow = new TableRow() + .set("year", year).set("month", month) + .set("day", day).set("mean_temp", meanTemp); + c.output(outRow); + } + } + + /** + * Implements 'filter' functionality. + * + * <p>Examines each row in the input table. Outputs only rows from the month + * monthFilter, which is passed in as a parameter during construction of this DoFn. + */ + static class FilterSingleMonthDataFn extends DoFn<TableRow, TableRow> { + Integer monthFilter; + + public FilterSingleMonthDataFn(Integer monthFilter) { + this.monthFilter = monthFilter; + } + + @Override + public void processElement(ProcessContext c){ + TableRow row = c.element(); + Integer month; + month = (Integer) row.get("month"); + if (month.equals(this.monthFilter)) { + c.output(row); + } + } + } + + /** + * Examines each row (weather reading) in the input table. Output the temperature + * reading for that row ('mean_temp'). + */ + static class ExtractTempFn extends DoFn<TableRow, Double> { + @Override + public void processElement(ProcessContext c){ + TableRow row = c.element(); + Double meanTemp = Double.parseDouble(row.get("mean_temp").toString()); + c.output(meanTemp); + } + } + + + + /* + * Finds the global mean of the mean_temp for each day/record, and outputs + * only data that has a mean temp larger than this global mean. + **/ + static class BelowGlobalMean + extends PTransform<PCollection<TableRow>, PCollection<TableRow>> { + Integer monthFilter; + + public BelowGlobalMean(Integer monthFilter) { + this.monthFilter = monthFilter; + } + + + @Override + public PCollection<TableRow> apply(PCollection<TableRow> rows) { + + // Extract the mean_temp from each row. + PCollection<Double> meanTemps = rows.apply( + ParDo.of(new ExtractTempFn())); + + // Find the global mean, of all the mean_temp readings in the weather data, + // and prepare this singleton PCollectionView for use as a side input. + final PCollectionView<Double> globalMeanTemp = + meanTemps.apply(Mean.<Double>globally()) + .apply(View.<Double>asSingleton()); + + // Rows filtered to remove all but a single month + PCollection<TableRow> monthFilteredRows = rows + .apply(ParDo.of(new FilterSingleMonthDataFn(monthFilter))); + + // Then, use the global mean as a side input, to further filter the weather data. + // By using a side input to pass in the filtering criteria, we can use a value + // that is computed earlier in pipeline execution. + // We'll only output readings with temperatures below this mean. + PCollection<TableRow> filteredRows = monthFilteredRows + .apply(ParDo + .named("ParseAndFilter") + .withSideInputs(globalMeanTemp) + .of(new DoFn<TableRow, TableRow>() { + @Override + public void processElement(ProcessContext c) { + Double meanTemp = Double.parseDouble(c.element().get("mean_temp").toString()); + Double gTemp = c.sideInput(globalMeanTemp); + if (meanTemp < gTemp) { + c.output(c.element()); + } + } + })); + + return filteredRows; + } + } + + + /** + * Options supported by {@link FilterExamples}. + * + * <p>Inherits standard configuration options. + */ + private static interface Options extends PipelineOptions { + @Description("Table to read from, specified as " + + "<project_id>:<dataset_id>.<table_id>") + @Default.String(WEATHER_SAMPLES_TABLE) + String getInput(); + void setInput(String value); + + @Description("Table to write to, specified as " + + "<project_id>:<dataset_id>.<table_id>. " + + "The dataset_id must already exist") + @Validation.Required + String getOutput(); + void setOutput(String value); + + @Description("Numeric value of month to filter on") + @Default.Integer(MONTH_TO_FILTER) + Integer getMonthFilter(); + void setMonthFilter(Integer value); + } + + /** + * Helper method to build the table schema for the output table. + */ + private static TableSchema buildWeatherSchemaProjection() { + List<TableFieldSchema> fields = new ArrayList<>(); + fields.add(new TableFieldSchema().setName("year").setType("INTEGER")); + fields.add(new TableFieldSchema().setName("month").setType("INTEGER")); + fields.add(new TableFieldSchema().setName("day").setType("INTEGER")); + fields.add(new TableFieldSchema().setName("mean_temp").setType("FLOAT")); + TableSchema schema = new TableSchema().setFields(fields); + return schema; + } + + public static void main(String[] args) + throws Exception { + + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + TableSchema schema = buildWeatherSchemaProjection(); + + p.apply(BigQueryIO.Read.from(options.getInput())) + .apply(ParDo.of(new ProjectionFn())) + .apply(new BelowGlobalMean(options.getMonthFilter())) + .apply(BigQueryIO.Write + .to(options.getOutput()) + .withSchema(schema) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); + + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java new file mode 100644 index 0000000..765420e --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.cookbook; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.BigQueryIO; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.Validation; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TupleTag; + +/** + * This example shows how to do a join on two collections. + * It uses a sample of the GDELT 'world event' data (http://goo.gl/OB6oin), joining the event + * 'action' country code against a table that maps country codes to country names. + * + * <p>Concepts: Join operation; multiple input sources. + * + * <p>To execute this pipeline locally, specify general pipeline configuration: + * <pre>{@code + * --project=YOUR_PROJECT_ID + * } + * </pre> + * and a local output file or output prefix on GCS: + * <pre>{@code + * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX] + * }</pre> + * + * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration: + * <pre>{@code + * --project=YOUR_PROJECT_ID + * --stagingLocation=gs://YOUR_STAGING_DIRECTORY + * --runner=BlockingDataflowPipelineRunner + * } + * </pre> + * and an output prefix on GCS: + * <pre>{@code + * --output=gs://YOUR_OUTPUT_PREFIX + * }</pre> + */ +public class JoinExamples { + + // A 1000-row sample of the GDELT data here: gdelt-bq:full.events. + private static final String GDELT_EVENTS_TABLE = + "clouddataflow-readonly:samples.gdelt_sample"; + // A table that maps country codes to country names. + private static final String COUNTRY_CODES = + "gdelt-bq:full.crosswalk_geocountrycodetohuman"; + + /** + * Join two collections, using country code as the key. + */ + static PCollection<String> joinEvents(PCollection<TableRow> eventsTable, + PCollection<TableRow> countryCodes) throws Exception { + + final TupleTag<String> eventInfoTag = new TupleTag<String>(); + final TupleTag<String> countryInfoTag = new TupleTag<String>(); + + // transform both input collections to tuple collections, where the keys are country + // codes in both cases. + PCollection<KV<String, String>> eventInfo = eventsTable.apply( + ParDo.of(new ExtractEventDataFn())); + PCollection<KV<String, String>> countryInfo = countryCodes.apply( + ParDo.of(new ExtractCountryInfoFn())); + + // country code 'key' -> CGBKR (<event info>, <country name>) + PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple + .of(eventInfoTag, eventInfo) + .and(countryInfoTag, countryInfo) + .apply(CoGroupByKey.<String>create()); + + // Process the CoGbkResult elements generated by the CoGroupByKey transform. + // country code 'key' -> string of <event info>, <country name> + PCollection<KV<String, String>> finalResultCollection = + kvpCollection.apply(ParDo.named("Process").of( + new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { + @Override + public void processElement(ProcessContext c) { + KV<String, CoGbkResult> e = c.element(); + String countryCode = e.getKey(); + String countryName = "none"; + countryName = e.getValue().getOnly(countryInfoTag); + for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) { + // Generate a string that combines information from both collection values + c.output(KV.of(countryCode, "Country name: " + countryName + + ", Event info: " + eventInfo)); + } + } + })); + + // write to GCS + PCollection<String> formattedResults = finalResultCollection + .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() { + @Override + public void processElement(ProcessContext c) { + String outputstring = "Country code: " + c.element().getKey() + + ", " + c.element().getValue(); + c.output(outputstring); + } + })); + return formattedResults; + } + + /** + * Examines each row (event) in the input table. Output a KV with the key the country + * code of the event, and the value a string encoding event information. + */ + static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> { + @Override + public void processElement(ProcessContext c) { + TableRow row = c.element(); + String countryCode = (String) row.get("ActionGeo_CountryCode"); + String sqlDate = (String) row.get("SQLDATE"); + String actor1Name = (String) row.get("Actor1Name"); + String sourceUrl = (String) row.get("SOURCEURL"); + String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl; + c.output(KV.of(countryCode, eventInfo)); + } + } + + + /** + * Examines each row (country info) in the input table. Output a KV with the key the country + * code, and the value the country name. + */ + static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> { + @Override + public void processElement(ProcessContext c) { + TableRow row = c.element(); + String countryCode = (String) row.get("FIPSCC"); + String countryName = (String) row.get("HumanName"); + c.output(KV.of(countryCode, countryName)); + } + } + + + /** + * Options supported by {@link JoinExamples}. + * + * <p>Inherits standard configuration options. + */ + private static interface Options extends PipelineOptions { + @Description("Path of the file to write to") + @Validation.Required + String getOutput(); + void setOutput(String value); + } + + public static void main(String[] args) throws Exception { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + // the following two 'applys' create multiple inputs to our pipeline, one for each + // of our two input sources. + PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE)); + PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES)); + PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes); + formattedResults.apply(TextIO.Write.to(options.getOutput())); + p.run(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java new file mode 100644 index 0000000..b35a57f --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.cookbook; + +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.BigQueryIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.Validation; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.Max; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import java.util.ArrayList; +import java.util.List; + +/** + * An example that reads the public samples of weather data from BigQuery, and finds + * the maximum temperature ('mean_temp') for each month. + * + * <p>Concepts: The 'Max' statistical combination function, and how to find the max per + * key group. + * + * <p>Note: Before running this example, you must create a BigQuery dataset to contain your output + * table. + * + * <p>To execute this pipeline locally, specify general pipeline configuration: + * <pre>{@code + * --project=YOUR_PROJECT_ID + * } + * </pre> + * and the BigQuery table for the output, with the form + * <pre>{@code + * --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID + * }</pre> + * + * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration: + * <pre>{@code + * --project=YOUR_PROJECT_ID + * --stagingLocation=gs://YOUR_STAGING_DIRECTORY + * --runner=BlockingDataflowPipelineRunner + * } + * </pre> + * and the BigQuery table for the output: + * <pre>{@code + * --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID + * }</pre> + * + * <p>The BigQuery input table defaults to {@code clouddataflow-readonly:samples.weather_stations } + * and can be overridden with {@code --input}. + */ +public class MaxPerKeyExamples { + // Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod. + private static final String WEATHER_SAMPLES_TABLE = + "clouddataflow-readonly:samples.weather_stations"; + + /** + * Examines each row (weather reading) in the input table. Output the month of the reading, + * and the mean_temp. + */ + static class ExtractTempFn extends DoFn<TableRow, KV<Integer, Double>> { + @Override + public void processElement(ProcessContext c) { + TableRow row = c.element(); + Integer month = Integer.parseInt((String) row.get("month")); + Double meanTemp = Double.parseDouble(row.get("mean_temp").toString()); + c.output(KV.of(month, meanTemp)); + } + } + + /** + * Format the results to a TableRow, to save to BigQuery. + * + */ + static class FormatMaxesFn extends DoFn<KV<Integer, Double>, TableRow> { + @Override + public void processElement(ProcessContext c) { + TableRow row = new TableRow() + .set("month", c.element().getKey()) + .set("max_mean_temp", c.element().getValue()); + c.output(row); + } + } + + /** + * Reads rows from a weather data table, and finds the max mean_temp for each + * month via the 'Max' statistical combination function. + */ + static class MaxMeanTemp + extends PTransform<PCollection<TableRow>, PCollection<TableRow>> { + @Override + public PCollection<TableRow> apply(PCollection<TableRow> rows) { + + // row... => <month, mean_temp> ... + PCollection<KV<Integer, Double>> temps = rows.apply( + ParDo.of(new ExtractTempFn())); + + // month, mean_temp... => <month, max mean temp>... + PCollection<KV<Integer, Double>> tempMaxes = + temps.apply(Max.<Integer>doublesPerKey()); + + // <month, max>... => row... + PCollection<TableRow> results = tempMaxes.apply( + ParDo.of(new FormatMaxesFn())); + + return results; + } + } + + /** + * Options supported by {@link MaxPerKeyExamples}. + * + * <p>Inherits standard configuration options. + */ + private static interface Options extends PipelineOptions { + @Description("Table to read from, specified as " + + "<project_id>:<dataset_id>.<table_id>") + @Default.String(WEATHER_SAMPLES_TABLE) + String getInput(); + void setInput(String value); + + @Description("Table to write to, specified as " + + "<project_id>:<dataset_id>.<table_id>") + @Validation.Required + String getOutput(); + void setOutput(String value); + } + + public static void main(String[] args) + throws Exception { + + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + // Build the table schema for the output table. + List<TableFieldSchema> fields = new ArrayList<>(); + fields.add(new TableFieldSchema().setName("month").setType("INTEGER")); + fields.add(new TableFieldSchema().setName("max_mean_temp").setType("FLOAT")); + TableSchema schema = new TableSchema().setFields(fields); + + p.apply(BigQueryIO.Read.from(options.getInput())) + .apply(new MaxMeanTemp()) + .apply(BigQueryIO.Write + .to(options.getOutput()) + .withSchema(schema) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); + + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md b/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md new file mode 100644 index 0000000..99f3080 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md @@ -0,0 +1,55 @@ + +# "Cookbook" Examples + +This directory holds simple "cookbook" examples, which show how to define +commonly-used data analysis patterns that you would likely incorporate into a +larger Dataflow pipeline. They include: + + <ul> + <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java">BigQueryTornadoes</a> + — An example that reads the public samples of weather data from Google + BigQuery, counts the number of tornadoes that occur in each month, and + writes the results to BigQuery. Demonstrates reading/writing BigQuery, + counting a <code>PCollection</code>, and user-defined <code>PTransforms</code>.</li> + <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java">CombinePerKeyExamples</a> + — An example that reads the public "Shakespeare" data, and for + each word in the dataset that exceeds a given length, generates a string + containing the list of play names in which that word appears. + Demonstrates the <code>Combine.perKey</code> + transform, which lets you combine the values in a key-grouped + <code>PCollection</code>. + </li> + <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java">DatastoreWordCount</a> + — An example that shows you how to read from Google Cloud Datastore.</li> + <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java">DeDupExample</a> + — An example that uses Shakespeare's plays as plain text files, and + removes duplicate lines across all the files. Demonstrates the + <code>RemoveDuplicates</code>, <code>TextIO.Read</code>, + and <code>TextIO.Write</code> transforms, and how to wire transforms together. + </li> + <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java">FilterExamples</a> + — An example that shows different approaches to filtering, including + selection and projection. It also shows how to dynamically set parameters + by defining and using new pipeline options, and use how to use a value derived + by a pipeline. Demonstrates the <code>Mean</code> transform, + <code>Options</code> configuration, and using pipeline-derived data as a side + input. + </li> + <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java">JoinExamples</a> + — An example that shows how to join two collections. It uses a + sample of the <a href="http://goo.gl/OB6oin">GDELT "world event" + data</a>, joining the event <code>action</code> country code against a table + that maps country codes to country names. Demonstrates the <code>Join</code> + operation, and using multiple input sources. + </li> + <li><a href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java">MaxPerKeyExamples</a> + — An example that reads the public samples of weather data from BigQuery, + and finds the maximum temperature (<code>mean_temp</code>) for each month. + Demonstrates the <code>Max</code> statistical combination transform, and how to + find the max-per-key group. + </li> + </ul> + +See the [documentation](https://cloud.google.com/dataflow/getting-started) and the [Examples +README](../../../../../../../../../README.md) for +information about how to run these examples. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java new file mode 100644 index 0000000..e32596d --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java @@ -0,0 +1,565 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.cookbook; + +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.dataflow.examples.common.DataflowExampleOptions; +import com.google.cloud.dataflow.examples.common.DataflowExampleUtils; +import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions; +import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions; +import com.google.cloud.dataflow.examples.common.PubsubFileInjector; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.PipelineResult; +import com.google.cloud.dataflow.sdk.io.BigQueryIO; +import com.google.cloud.dataflow.sdk.io.PubsubIO; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; +import com.google.cloud.dataflow.sdk.transforms.IntraBundleParallelization; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.AfterEach; +import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime; +import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionList; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * This example illustrates the basic concepts behind triggering. It shows how to use different + * trigger definitions to produce partial (speculative) results before all the data is processed and + * to control when updated results are produced for late data. The example performs a streaming + * analysis of the data coming in from PubSub and writes the results to BigQuery. It divides the + * data into {@link Window windows} to be processed, and demonstrates using various kinds of {@link + * Trigger triggers} to control when the results for each window are emitted. + * + * <p> This example uses a portion of real traffic data from San Diego freeways. It contains + * readings from sensor stations set up along each freeway. Each sensor reading includes a + * calculation of the 'total flow' across all lanes in that freeway direction. + * + * <p> Concepts: + * <pre> + * 1. The default triggering behavior + * 2. Late data with the default trigger + * 3. How to get speculative estimates + * 4. Combining late data and speculative estimates + * </pre> + * + * <p> Before running this example, it will be useful to familiarize yourself with Dataflow triggers + * and understand the concept of 'late data', + * See: <a href="https://cloud.google.com/dataflow/model/triggers"> + * https://cloud.google.com/dataflow/model/triggers </a> and + * <a href="https://cloud.google.com/dataflow/model/windowing#Advanced"> + * https://cloud.google.com/dataflow/model/windowing#Advanced </a> + * + * <p> The example pipeline reads data from a Pub/Sub topic. By default, running the example will + * also run an auxiliary pipeline to inject data from the default {@code --input} file to the + * {@code --pubsubTopic}. The auxiliary pipeline puts a timestamp on the injected data so that the + * example pipeline can operate on <i>event time</i> (rather than arrival time). The auxiliary + * pipeline also randomly simulates late data, by setting the timestamps of some of the data + * elements to be in the past. You may override the default {@code --input} with the file of your + * choosing or set {@code --input=""} which will disable the automatic Pub/Sub injection, and allow + * you to use a separate tool to publish to the given topic. + * + * <p> The example is configured to use the default Pub/Sub topic and the default BigQuery table + * from the example common package (there are no defaults for a general Dataflow pipeline). + * You can override them by using the {@code --pubsubTopic}, {@code --bigQueryDataset}, and + * {@code --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist, + * the example will try to create them. + * + * <p> The pipeline outputs its results to a BigQuery table. + * Here are some queries you can use to see interesting results: + * Replace {@code <enter_table_name>} in the query below with the name of the BigQuery table. + * Replace {@code <enter_window_interval>} in the query below with the window interval. + * + * <p> To see the results of the default trigger, + * Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after + * the window duration, until the first pane of non-late data has been emitted, to see more + * interesting results. + * {@code SELECT * FROM enter_table_name WHERE trigger_type = "default" ORDER BY window DESC} + * + * <p> To see the late data i.e. dropped by the default trigger, + * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "withAllowedLateness" and + * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processing_time} + * + * <p>To see the the difference between accumulation mode and discarding mode, + * {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND + * (trigger_type = "withAllowedLateness" or trigger_type = "sequential") and freeway = "5" ORDER BY + * window DESC, processing_time} + * + * <p> To see speculative results every minute, + * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "speculative" and freeway = "5" + * ORDER BY window DESC, processing_time} + * + * <p> To see speculative results every five minutes after the end of the window + * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "sequential" and timing != "EARLY" + * and freeway = "5" ORDER BY window DESC, processing_time} + * + * <p> To see the first and the last pane for a freeway in a window for all the trigger types, + * {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window} + * + * <p> To reduce the number of results for each query we can add additional where clauses. + * For examples, To see the results of the default trigger, + * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "default" AND freeway = "5" AND + * window = "<enter_window_interval>"} + * + * <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C) + * and then exits. + */ + +public class TriggerExample { + //Numeric value of fixed window duration, in minutes + public static final int WINDOW_DURATION = 30; + // Constants used in triggers. + // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results. + // ONE_MINUTE is used only with processing time before the end of the window + public static final Duration ONE_MINUTE = Duration.standardMinutes(1); + // FIVE_MINUTES is used only with processing time after the end of the window + public static final Duration FIVE_MINUTES = Duration.standardMinutes(5); + // ONE_DAY is used to specify the amount of lateness allowed for the data elements. + public static final Duration ONE_DAY = Duration.standardDays(1); + + /** + * This transform demonstrates using triggers to control when data is produced for each window + * Consider an example to understand the results generated by each type of trigger. + * The example uses "freeway" as the key. Event time is the timestamp associated with the data + * element and processing time is the time when the data element gets processed in the pipeline. + * For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window. + * Key (freeway) | Value (total_flow) | event time | processing time + * 5 | 50 | 10:00:03 | 10:00:47 + * 5 | 30 | 10:01:00 | 10:01:03 + * 5 | 30 | 10:02:00 | 11:07:00 + * 5 | 20 | 10:04:10 | 10:05:15 + * 5 | 60 | 10:05:00 | 11:03:00 + * 5 | 20 | 10:05:01 | 11.07:30 + * 5 | 60 | 10:15:00 | 10:27:15 + * 5 | 40 | 10:26:40 | 10:26:43 + * 5 | 60 | 10:27:20 | 10:27:25 + * 5 | 60 | 10:29:00 | 11:11:00 + * + * <p> Dataflow tracks a watermark which records up to what point in event time the data is + * complete. For the purposes of the example, we'll assume the watermark is approximately 15m + * behind the current processing time. In practice, the actual value would vary over time based + * on the systems knowledge of the current PubSub delay and contents of the backlog (data + * that has not yet been processed). + * + * <p> If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would + * close at 10:44:59, when the watermark passes 10:30:00. + */ + static class CalculateTotalFlow + extends PTransform <PCollection<KV<String, Integer>>, PCollectionList<TableRow>> { + private int windowDuration; + + CalculateTotalFlow(int windowDuration) { + this.windowDuration = windowDuration; + } + + @Override + public PCollectionList<TableRow> apply(PCollection<KV<String, Integer>> flowInfo) { + + // Concept #1: The default triggering behavior + // By default Dataflow uses a trigger which fires when the watermark has passed the end of the + // window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. + + // The system also defaults to dropping late data -- data which arrives after the watermark + // has passed the event timestamp of the arriving element. This means that the default trigger + // will only fire once. + + // Each pane produced by the default trigger with no allowed lateness will be the first and + // last pane in the window, and will be ON_TIME. + + // The results for the example above with the default trigger and zero allowed lateness + // would be: + // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing + // 5 | 260 | 6 | true | true | ON_TIME + + // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a + // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered + // late, and dropped. + + PCollection<TableRow> defaultTriggerResults = flowInfo + .apply("Default", Window + // The default window duration values work well if you're running the default input + // file. You may want to adjust the window duration otherwise. + .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration))) + // The default trigger first emits output when the system's watermark passes the end + // of the window. + .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) + // Late data is dropped + .withAllowedLateness(Duration.ZERO) + // Discard elements after emitting each pane. + // With no allowed lateness and the specified trigger there will only be a single + // pane, so this doesn't have a noticeable effect. See concept 2 for more details. + .discardingFiredPanes()) + .apply(new TotalFlow("default")); + + // Concept #2: Late data with the default trigger + // This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This + // leads to each window staying open for ONE_DAY after the watermark has passed the end of the + // window. Any late data will result in an additional pane being fired for that same window. + + // The first pane produced will be ON_TIME and the remaining panes will be LATE. + // To definitely get the last pane when the window closes, use + // .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS). + + // The results for the example above with the default trigger and ONE_DAY allowed lateness + // would be: + // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing + // 5 | 260 | 6 | true | false | ON_TIME + // 5 | 60 | 1 | false | false | LATE + // 5 | 30 | 1 | false | false | LATE + // 5 | 20 | 1 | false | false | LATE + // 5 | 60 | 1 | false | false | LATE + PCollection<TableRow> withAllowedLatenessResults = flowInfo + .apply("WithLateData", Window + .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration))) + // Late data is emitted as it arrives + .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) + // Once the output is produced, the pane is dropped and we start preparing the next + // pane for the window + .discardingFiredPanes() + // Late data is handled up to one day + .withAllowedLateness(ONE_DAY)) + .apply(new TotalFlow("withAllowedLateness")); + + // Concept #3: How to get speculative estimates + // We can specify a trigger that fires independent of the watermark, for instance after + // ONE_MINUTE of processing time. This allows us to produce speculative estimates before + // all the data is available. Since we don't have any triggers that depend on the watermark + // we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE. + + // We also use accumulatingFiredPanes to build up the results across each pane firing. + + // The results for the example above for this trigger would be: + // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing + // 5 | 80 | 2 | true | false | EARLY + // 5 | 100 | 3 | false | false | EARLY + // 5 | 260 | 6 | false | false | EARLY + // 5 | 320 | 7 | false | false | LATE + // 5 | 370 | 9 | false | false | LATE + // 5 | 430 | 10 | false | false | LATE + PCollection<TableRow> speculativeResults = flowInfo + .apply("Speculative" , Window + .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration))) + // Trigger fires every minute. + .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() + // Speculative every ONE_MINUTE + .plusDelayOf(ONE_MINUTE))) + // After emitting each pane, it will continue accumulating the elements so that each + // approximation includes all of the previous data in addition to the newly arrived + // data. + .accumulatingFiredPanes() + .withAllowedLateness(ONE_DAY)) + .apply(new TotalFlow("speculative")); + + // Concept #4: Combining late data and speculative estimates + // We can put the previous concepts together to get EARLY estimates, an ON_TIME result, + // and LATE updates based on late data. + + // Each time a triggering condition is satisfied it advances to the next trigger. + // If there are new elements this trigger emits a window under following condition: + // > Early approximations every minute till the end of the window. + // > An on-time firing when the watermark has passed the end of the window + // > Every five minutes of late data. + + // Every pane produced will either be EARLY, ON_TIME or LATE. + + // The results for the example above for this trigger would be: + // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing + // 5 | 80 | 2 | true | false | EARLY + // 5 | 100 | 3 | false | false | EARLY + // 5 | 260 | 6 | false | false | EARLY + // [First pane fired after the end of the window] + // 5 | 320 | 7 | false | false | ON_TIME + // 5 | 430 | 10 | false | false | LATE + + // For more possibilities of how to build advanced triggers, see {@link Trigger}. + PCollection<TableRow> sequentialResults = flowInfo + .apply("Sequential", Window + .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration))) + .triggering(AfterEach.inOrder( + Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() + // Speculative every ONE_MINUTE + .plusDelayOf(ONE_MINUTE)).orFinally(AfterWatermark.pastEndOfWindow()), + Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() + // Late data every FIVE_MINUTES + .plusDelayOf(FIVE_MINUTES)))) + .accumulatingFiredPanes() + // For up to ONE_DAY + .withAllowedLateness(ONE_DAY)) + .apply(new TotalFlow("sequential")); + + // Adds the results generated by each trigger type to a PCollectionList. + PCollectionList<TableRow> resultsList = PCollectionList.of(defaultTriggerResults) + .and(withAllowedLatenessResults) + .and(speculativeResults) + .and(sequentialResults); + + return resultsList; + } + } + + ////////////////////////////////////////////////////////////////////////////////////////////////// + // The remaining parts of the pipeline are needed to produce the output for each + // concept above. Not directly relevant to understanding the trigger examples. + + /** + * Calculate total flow and number of records for each freeway and format the results to TableRow + * objects, to save to BigQuery. + */ + static class TotalFlow extends + PTransform <PCollection<KV<String, Integer>>, PCollection<TableRow>> { + private String triggerType; + + public TotalFlow(String triggerType) { + this.triggerType = triggerType; + } + + @Override + public PCollection<TableRow> apply(PCollection<KV<String, Integer>> flowInfo) { + PCollection<KV<String, Iterable<Integer>>> flowPerFreeway = flowInfo + .apply(GroupByKey.<String, Integer>create()); + + PCollection<KV<String, String>> results = flowPerFreeway.apply(ParDo.of( + new DoFn <KV<String, Iterable<Integer>>, KV<String, String>>() { + + @Override + public void processElement(ProcessContext c) throws Exception { + Iterable<Integer> flows = c.element().getValue(); + Integer sum = 0; + Long numberOfRecords = 0L; + for (Integer value : flows) { + sum += value; + numberOfRecords++; + } + c.output(KV.of(c.element().getKey(), sum + "," + numberOfRecords)); + } + })); + PCollection<TableRow> output = results.apply(ParDo.of(new FormatTotalFlow(triggerType))); + return output; + } + } + + /** + * Format the results of the Total flow calculation to a TableRow, to save to BigQuery. + * Adds the triggerType, pane information, processing time and the window timestamp. + * */ + static class FormatTotalFlow extends DoFn<KV<String, String>, TableRow> + implements RequiresWindowAccess { + private String triggerType; + + public FormatTotalFlow(String triggerType) { + this.triggerType = triggerType; + } + @Override + public void processElement(ProcessContext c) throws Exception { + String[] values = c.element().getValue().split(","); + TableRow row = new TableRow() + .set("trigger_type", triggerType) + .set("freeway", c.element().getKey()) + .set("total_flow", Integer.parseInt(values[0])) + .set("number_of_records", Long.parseLong(values[1])) + .set("window", c.window().toString()) + .set("isFirst", c.pane().isFirst()) + .set("isLast", c.pane().isLast()) + .set("timing", c.pane().getTiming().toString()) + .set("event_time", c.timestamp().toString()) + .set("processing_time", Instant.now().toString()); + c.output(row); + } + } + + /** + * Extract the freeway and total flow in a reading. + * Freeway is used as key since we are calculating the total flow for each freeway. + */ + static class ExtractFlowInfo extends DoFn<String, KV<String, Integer>> { + @Override + public void processElement(ProcessContext c) throws Exception { + String[] laneInfo = c.element().split(","); + if (laneInfo[0].equals("timestamp")) { + // Header row + return; + } + if (laneInfo.length < 48) { + //Skip the invalid input. + return; + } + String freeway = laneInfo[2]; + Integer totalFlow = tryIntegerParse(laneInfo[7]); + // Ignore the records with total flow 0 to easily understand the working of triggers. + // Skip the records with total flow -1 since they are invalid input. + if (totalFlow == null || totalFlow <= 0) { + return; + } + c.output(KV.of(freeway, totalFlow)); + } + } + + /** + * Inherits standard configuration options. + */ + public interface TrafficFlowOptions + extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions, DataflowExampleOptions { + + @Description("Input file to inject to Pub/Sub topic") + @Default.String("gs://dataflow-samples/traffic_sensor/" + + "Freeways-5Minaa2010-01-01_to_2010-02-15.csv") + String getInput(); + void setInput(String value); + + @Description("Numeric value of window duration for fixed windows, in minutes") + @Default.Integer(WINDOW_DURATION) + Integer getWindowDuration(); + void setWindowDuration(Integer value); + } + + private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms"; + + public static void main(String[] args) throws Exception { + TrafficFlowOptions options = PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(TrafficFlowOptions.class); + options.setStreaming(true); + + // In order to cancel the pipelines automatically, + // {@code DataflowPipelineRunner} is forced to be used. + options.setRunner(DataflowPipelineRunner.class); + options.setBigQuerySchema(getSchema()); + + DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options); + dataflowUtils.setup(); + + Pipeline pipeline = Pipeline.create(options); + + TableReference tableRef = getTableReference(options.getProject(), + options.getBigQueryDataset(), options.getBigQueryTable()); + + PCollectionList<TableRow> resultList = pipeline.apply(PubsubIO.Read.named("ReadPubsubInput") + .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY) + .topic(options.getPubsubTopic())) + .apply(ParDo.of(new ExtractFlowInfo())) + .apply(new CalculateTotalFlow(options.getWindowDuration())); + + for (int i = 0; i < resultList.size(); i++){ + resultList.get(i).apply(BigQueryIO.Write.to(tableRef).withSchema(getSchema())); + } + + PipelineResult result = pipeline.run(); + if (!options.getInput().isEmpty()){ + //Inject the data into the pubsub topic + dataflowUtils.runInjectorPipeline(runInjector(options)); + } + // dataflowUtils will try to cancel the pipeline and the injector before the program exits. + dataflowUtils.waitToFinish(result); + } + + private static Pipeline runInjector(TrafficFlowOptions options){ + DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class); + copiedOptions.setStreaming(false); + copiedOptions.setNumWorkers(options.as(DataflowExampleOptions.class).getInjectorNumWorkers()); + copiedOptions.setJobName(options.getJobName() + "-injector"); + Pipeline injectorPipeline = Pipeline.create(copiedOptions); + injectorPipeline + .apply(TextIO.Read.named("ReadMyFile").from(options.getInput())) + .apply(ParDo.named("InsertRandomDelays").of(new InsertDelays())) + .apply(IntraBundleParallelization.of(PubsubFileInjector + .withTimestampLabelKey(PUBSUB_TIMESTAMP_LABEL_KEY) + .publish(options.getPubsubTopic())) + .withMaxParallelism(20)); + + return injectorPipeline; + } + + /** + * Add current time to each record. + * Also insert a delay at random to demo the triggers. + */ + public static class InsertDelays extends DoFn<String, String> { + private static final double THRESHOLD = 0.001; + // MIN_DELAY and MAX_DELAY in minutes. + private static final int MIN_DELAY = 1; + private static final int MAX_DELAY = 100; + + @Override + public void processElement(ProcessContext c) throws Exception { + Instant timestamp = Instant.now(); + if (Math.random() < THRESHOLD){ + int range = MAX_DELAY - MIN_DELAY; + int delayInMinutes = (int) (Math.random() * range) + MIN_DELAY; + long delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes); + timestamp = new Instant(timestamp.getMillis() - delayInMillis); + } + c.outputWithTimestamp(c.element(), timestamp); + } + } + + + /**Sets the table reference. **/ + private static TableReference getTableReference(String project, String dataset, String table){ + TableReference tableRef = new TableReference(); + tableRef.setProjectId(project); + tableRef.setDatasetId(dataset); + tableRef.setTableId(table); + return tableRef; + } + + /** Defines the BigQuery schema used for the output. */ + private static TableSchema getSchema() { + List<TableFieldSchema> fields = new ArrayList<>(); + fields.add(new TableFieldSchema().setName("trigger_type").setType("STRING")); + fields.add(new TableFieldSchema().setName("freeway").setType("STRING")); + fields.add(new TableFieldSchema().setName("total_flow").setType("INTEGER")); + fields.add(new TableFieldSchema().setName("number_of_records").setType("INTEGER")); + fields.add(new TableFieldSchema().setName("window").setType("STRING")); + fields.add(new TableFieldSchema().setName("isFirst").setType("BOOLEAN")); + fields.add(new TableFieldSchema().setName("isLast").setType("BOOLEAN")); + fields.add(new TableFieldSchema().setName("timing").setType("STRING")); + fields.add(new TableFieldSchema().setName("event_time").setType("TIMESTAMP")); + fields.add(new TableFieldSchema().setName("processing_time").setType("TIMESTAMP")); + TableSchema schema = new TableSchema().setFields(fields); + return schema; + } + + private static Integer tryIntegerParse(String number) { + try { + return Integer.parseInt(number); + } catch (NumberFormatException e) { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/com/google/cloud/dataflow/examples/DebuggingWordCountTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/DebuggingWordCountTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/DebuggingWordCountTest.java deleted file mode 100644 index 52c61ef..0000000 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/DebuggingWordCountTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples; - -import com.google.common.io.Files; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.File; -import java.nio.charset.StandardCharsets; - -/** - * Tests for {@link DebuggingWordCount}. - */ -@RunWith(JUnit4.class) -public class DebuggingWordCountTest { - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Test - public void testDebuggingWordCount() throws Exception { - File file = tmpFolder.newFile(); - Files.write("stomach secret Flourish message Flourish here Flourish", file, - StandardCharsets.UTF_8); - DebuggingWordCount.main(new String[]{"--inputFile=" + file.getAbsolutePath()}); - } -} -