http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
new file mode 100644
index 0000000..93304eb
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An example that reads the public 'Shakespeare' data, and for each word in
+ * the dataset that is over a given length, generates a string containing the
+ * list of play names in which that word appears, and saves this information
+ * to a bigquery table.
+ *
+ * <p>Concepts: the Combine.perKey transform, which lets you combine the 
values in a
+ * key-grouped Collection, and how to use an Aggregator to track information 
in the
+ * Monitoring UI.
+ *
+ * <p>Note: Before running this example, you must create a BigQuery dataset to 
contain your output
+ * table.
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ * }
+ * </pre>
+ * and the BigQuery table for the output:
+ * <pre>{@code
+ *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ * }</pre>
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline 
configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://<STAGING DIRECTORY>
+ *   --runner=BlockingDataflowPipelineRunner
+ * }
+ * </pre>
+ * and the BigQuery table for the output:
+ * <pre>{@code
+ *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ * }</pre>
+ *
+ * <p>The BigQuery input table defaults to {@code 
publicdata:samples.shakespeare} and can
+ * be overridden with {@code --input}.
+ */
+public class CombinePerKeyExamples {
+  // Use the shakespeare public BigQuery sample
+  private static final String SHAKESPEARE_TABLE =
+      "publicdata:samples.shakespeare";
+  // We'll track words >= this word length across all plays in the table.
+  private static final int MIN_WORD_LENGTH = 9;
+
+  /**
+   * Examines each row in the input table. If the word is greater than or 
equal to MIN_WORD_LENGTH,
+   * outputs word, play_name.
+   */
+  static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
+    private final Aggregator<Long, Long> smallerWords =
+        createAggregator("smallerWords", new Sum.SumLongFn());
+
+    @Override
+    public void processElement(ProcessContext c){
+      TableRow row = c.element();
+      String playName = (String) row.get("corpus");
+      String word = (String) row.get("word");
+      if (word.length() >= MIN_WORD_LENGTH) {
+        c.output(KV.of(word, playName));
+      } else {
+        // Track how many smaller words we're not including. This information 
will be
+        // visible in the Monitoring UI.
+        smallerWords.addValue(1L);
+      }
+    }
+  }
+
+
+  /**
+   * Prepares the data for writing to BigQuery by building a TableRow object
+   * containing a word with a string listing the plays in which it appeared.
+   */
+  static class FormatShakespeareOutputFn extends DoFn<KV<String, String>, 
TableRow> {
+    @Override
+    public void processElement(ProcessContext c) {
+      TableRow row = new TableRow()
+          .set("word", c.element().getKey())
+          .set("all_plays", c.element().getValue());
+      c.output(row);
+    }
+  }
+
+  /**
+   * Reads the public 'Shakespeare' data, and for each word in the dataset
+   * over a given length, generates a string containing the list of play names
+   * in which that word appears. It does this via the Combine.perKey
+   * transform, with the ConcatWords combine function.
+   *
+   * <p>Combine.perKey is similar to a GroupByKey followed by a ParDo, but
+   * has more restricted semantics that allow it to be executed more
+   * efficiently. These records are then formatted as BQ table rows.
+   */
+  static class PlaysForWord
+      extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
+    @Override
+    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+
+      // row... => <word, play_name> ...
+      PCollection<KV<String, String>> words = rows.apply(
+          ParDo.of(new ExtractLargeWordsFn()));
+
+      // word, play_name => word, all_plays ...
+      PCollection<KV<String, String>> wordAllPlays =
+          words.apply(Combine.<String, String>perKey(
+              new ConcatWords()));
+
+      // <word, all_plays>... => row...
+      PCollection<TableRow> results = wordAllPlays.apply(
+          ParDo.of(new FormatShakespeareOutputFn()));
+
+      return results;
+    }
+  }
+
+  /**
+   * A 'combine function' used with the Combine.perKey transform. Builds a
+   * comma-separated string of all input items.  So, it will build a string
+   * containing all the different Shakespeare plays in which the given input
+   * word has appeared.
+   */
+  public static class ConcatWords implements 
SerializableFunction<Iterable<String>, String> {
+    @Override
+    public String apply(Iterable<String> input) {
+      StringBuilder all = new StringBuilder();
+      for (String item : input) {
+        if (!item.isEmpty()) {
+          if (all.length() == 0) {
+            all.append(item);
+          } else {
+            all.append(",");
+            all.append(item);
+          }
+        }
+      }
+      return all.toString();
+    }
+  }
+
+  /**
+   * Options supported by {@link CombinePerKeyExamples}.
+   *
+   * <p>Inherits standard configuration options.
+   */
+  private static interface Options extends PipelineOptions {
+    @Description("Table to read from, specified as "
+        + "<project_id>:<dataset_id>.<table_id>")
+    @Default.String(SHAKESPEARE_TABLE)
+    String getInput();
+    void setInput(String value);
+
+    @Description("Table to write to, specified as "
+        + "<project_id>:<dataset_id>.<table_id>. "
+        + "The dataset_id must already exist")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+
+    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    Pipeline p = Pipeline.create(options);
+
+    // Build the table schema for the output table.
+    List<TableFieldSchema> fields = new ArrayList<>();
+    fields.add(new TableFieldSchema().setName("word").setType("STRING"));
+    fields.add(new TableFieldSchema().setName("all_plays").setType("STRING"));
+    TableSchema schema = new TableSchema().setFields(fields);
+
+    p.apply(BigQueryIO.Read.from(options.getInput()))
+     .apply(new PlaysForWord())
+     .apply(BigQueryIO.Write
+        .to(options.getOutput())
+        .withSchema(schema)
+        
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+        
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
new file mode 100644
index 0000000..9dddb5d
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.cookbook;
+
+import static 
com.google.api.services.datastore.client.DatastoreHelper.getPropertyMap;
+import static 
com.google.api.services.datastore.client.DatastoreHelper.getString;
+import static 
com.google.api.services.datastore.client.DatastoreHelper.makeFilter;
+import static com.google.api.services.datastore.client.DatastoreHelper.makeKey;
+import static 
com.google.api.services.datastore.client.DatastoreHelper.makeValue;
+
+import com.google.api.services.datastore.DatastoreV1.Entity;
+import com.google.api.services.datastore.DatastoreV1.Key;
+import com.google.api.services.datastore.DatastoreV1.Property;
+import com.google.api.services.datastore.DatastoreV1.PropertyFilter;
+import com.google.api.services.datastore.DatastoreV1.Query;
+import com.google.api.services.datastore.DatastoreV1.Value;
+import com.google.cloud.dataflow.examples.WordCount;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.DatastoreIO;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+
+import java.util.Map;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+
+/**
+ * A WordCount example using DatastoreIO.
+ *
+ * <p>This example shows how to use DatastoreIO to read from Datastore and
+ * write the results to Cloud Storage.  Note that this example will write
+ * data to Datastore, which may incur charge for Datastore operations.
+ *
+ * <p>To run this example, users need to use gcloud to get credential for 
Datastore:
+ * <pre>{@code
+ * $ gcloud auth login
+ * }</pre>
+ *
+ * <p>To run this pipeline locally, the following options must be provided:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --dataset=YOUR_DATASET_ID
+ *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PATH]
+ * }</pre>
+ *
+ * <p>To run this example using Dataflow service, you must additionally
+ * provide either {@literal --stagingLocation} or {@literal --tempLocation}, 
and
+ * select one of the Dataflow pipeline runners, eg
+ * {@literal --runner=BlockingDataflowPipelineRunner}.
+ *
+ * <p><b>Note:</b> this example creates entities with <i>Ancestor keys</i> to 
ensure that all
+ * entities created are in the same entity group. Similarly, the query used to 
read from the Cloud
+ * Datastore uses an <i>Ancestor filter</i>. Ancestors are used to ensure 
strongly consistent
+ * results in Cloud Datastore. For more information, see the Cloud Datastore 
documentation on
+ * <a 
href="https://cloud.google.com/datastore/docs/concepts/structuring_for_strong_consistency";>
+ * Structing Data for Strong Consistency</a>.
+ */
+public class DatastoreWordCount {
+
+  /**
+   * A DoFn that gets the content of an entity (one line in a
+   * Shakespeare play) and converts it to a string.
+   */
+  static class GetContentFn extends DoFn<Entity, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      Map<String, Value> props = getPropertyMap(c.element());
+      Value value = props.get("content");
+      if (value != null) {
+        c.output(getString(value));
+      }
+    }
+  }
+
+  /**
+   * A helper function to create the ancestor key for all created and queried 
entities.
+   *
+   * <p>We use ancestor keys and ancestor queries for strong consistency. See
+   * {@link DatastoreWordCount} javadoc for more information.
+   */
+  static Key makeAncestorKey(@Nullable String namespace, String kind) {
+    Key.Builder keyBuilder = makeKey(kind, "root");
+    if (namespace != null) {
+      keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
+    }
+    return keyBuilder.build();
+  }
+
+  /**
+   * A DoFn that creates entity for every line in Shakespeare.
+   */
+  static class CreateEntityFn extends DoFn<String, Entity> {
+    private final String namespace;
+    private final String kind;
+    private final Key ancestorKey;
+
+    CreateEntityFn(String namespace, String kind) {
+      this.namespace = namespace;
+      this.kind = kind;
+
+      // Build the ancestor key for all created entities once, including the 
namespace.
+      ancestorKey = makeAncestorKey(namespace, kind);
+    }
+
+    public Entity makeEntity(String content) {
+      Entity.Builder entityBuilder = Entity.newBuilder();
+
+      // All created entities have the same ancestor Key.
+      Key.Builder keyBuilder = makeKey(ancestorKey, kind, 
UUID.randomUUID().toString());
+      // NOTE: Namespace is not inherited between keys created with 
DatastoreHelper.makeKey, so
+      // we must set the namespace on keyBuilder. TODO: Once partitionId 
inheritance is added,
+      // we can simplify this code.
+      if (namespace != null) {
+        keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
+      }
+
+      entityBuilder.setKey(keyBuilder.build());
+      entityBuilder.addProperty(Property.newBuilder().setName("content")
+          .setValue(Value.newBuilder().setStringValue(content)));
+      return entityBuilder.build();
+    }
+
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(makeEntity(c.element()));
+    }
+  }
+
+  /**
+   * Options supported by {@link DatastoreWordCount}.
+   *
+   * <p>Inherits standard configuration options.
+   */
+  public static interface Options extends PipelineOptions {
+    @Description("Path of the file to read from and store to Datastore")
+    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
+    String getInput();
+    void setInput(String value);
+
+    @Description("Path of the file to write to")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+
+    @Description("Dataset ID to read from datastore")
+    @Validation.Required
+    String getDataset();
+    void setDataset(String value);
+
+    @Description("Dataset entity kind")
+    @Default.String("shakespeare-demo")
+    String getKind();
+    void setKind(String value);
+
+    @Description("Dataset namespace")
+    String getNamespace();
+    void setNamespace(@Nullable String value);
+
+    @Description("Read an existing dataset, do not write first")
+    boolean isReadOnly();
+    void setReadOnly(boolean value);
+
+    @Description("Number of output shards")
+    @Default.Integer(0) // If the system should choose automatically.
+    int getNumShards();
+    void setNumShards(int value);
+  }
+
+  /**
+   * An example that creates a pipeline to populate DatastoreIO from a
+   * text input.  Forces use of DirectPipelineRunner for local execution mode.
+   */
+  public static void writeDataToDatastore(Options options) {
+      Pipeline p = Pipeline.create(options);
+      p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+       .apply(ParDo.of(new CreateEntityFn(options.getNamespace(), 
options.getKind())))
+       .apply(DatastoreIO.writeTo(options.getDataset()));
+
+      p.run();
+  }
+
+  /**
+   * Build a Cloud Datastore ancestor query for the specified {@link 
Options#getNamespace} and
+   * {@link Options#getKind}.
+   *
+   * <p>We use ancestor keys and ancestor queries for strong consistency. See
+   * {@link DatastoreWordCount} javadoc for more information.
+   *
+   * @see <a 
href="https://cloud.google.com/datastore/docs/concepts/queries#Datastore_Ancestor_filters";>Ancestor
 filters</a>
+   */
+  static Query makeAncestorKindQuery(Options options) {
+    Query.Builder q = Query.newBuilder();
+    q.addKindBuilder().setName(options.getKind());
+    q.setFilter(makeFilter(
+        "__key__",
+        PropertyFilter.Operator.HAS_ANCESTOR,
+        makeValue(makeAncestorKey(options.getNamespace(), 
options.getKind()))));
+    return q.build();
+  }
+
+  /**
+   * An example that creates a pipeline to do DatastoreIO.Read from Datastore.
+   */
+  public static void readDataFromDatastore(Options options) {
+    Query query = makeAncestorKindQuery(options);
+
+    // For Datastore sources, the read namespace can be set on the entire 
query.
+    DatastoreIO.Source source = DatastoreIO.source()
+        .withDataset(options.getDataset())
+        .withQuery(query)
+        .withNamespace(options.getNamespace());
+
+    Pipeline p = Pipeline.create(options);
+    p.apply("ReadShakespeareFromDatastore", Read.from(source))
+        .apply("StringifyEntity", ParDo.of(new GetContentFn()))
+        .apply("CountWords", new WordCount.CountWords())
+        .apply("PrintWordCount", MapElements.via(new 
WordCount.FormatAsTextFn()))
+        .apply("WriteLines", TextIO.Write.to(options.getOutput())
+            .withNumShards(options.getNumShards()));
+    p.run();
+  }
+
+  /**
+   * An example to demo how to use {@link DatastoreIO}.  The runner here is
+   * customizable, which means users could pass either {@code 
DirectPipelineRunner}
+   * or {@code DataflowPipelineRunner} in the pipeline options.
+   */
+  public static void main(String args[]) {
+    // The options are used in two places, for Dataflow service, and
+    // building DatastoreIO.Read object
+    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
+    if (!options.isReadOnly()) {
+      // First example: write data to Datastore for reading later.
+      //
+      // NOTE: this write does not delete any existing Entities in the 
Datastore, so if run
+      // multiple times with the same output dataset, there may be duplicate 
entries. The
+      // Datastore Query tool in the Google Developers Console can be used to 
inspect or erase all
+      // entries with a particular namespace and/or kind.
+      DatastoreWordCount.writeDataToDatastore(options);
+    }
+
+    // Second example: do parallel read from Datastore.
+    DatastoreWordCount.readDataFromDatastore(options);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
new file mode 100644
index 0000000..40d1f76
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
+
+/**
+ * This example uses as input Shakespeare's plays as plaintext files, and will 
remove any
+ * duplicate lines across all the files. (The output does not preserve any 
input order).
+ *
+ * <p>Concepts: the RemoveDuplicates transform, and how to wire transforms 
together.
+ * Demonstrates {@link com.google.cloud.dataflow.sdk.io.TextIO.Read}/
+ * {@link RemoveDuplicates}/{@link 
com.google.cloud.dataflow.sdk.io.TextIO.Write}.
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ *   --project=YOUR_PROJECT_ID
+ * and a local output file or output prefix on GCS:
+ *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline 
configuration:
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ * and an output prefix on GCS:
+ *   --output=gs://YOUR_OUTPUT_PREFIX
+ *
+ * <p>The input defaults to {@code gs://dataflow-samples/shakespeare/*} and 
can be
+ * overridden with {@code --input}.
+ */
+public class DeDupExample {
+
+  /**
+   * Options supported by {@link DeDupExample}.
+   *
+   * <p>Inherits standard configuration options.
+   */
+  private static interface Options extends PipelineOptions {
+    @Description("Path to the directory or GCS prefix containing files to read 
from")
+    @Default.String("gs://dataflow-samples/shakespeare/*")
+    String getInput();
+    void setInput(String value);
+
+    @Description("Path of the file to write to")
+    @Default.InstanceFactory(OutputFactory.class)
+    String getOutput();
+    void setOutput(String value);
+
+    /** Returns gs://${STAGING_LOCATION}/"deduped.txt". */
+    public static class OutputFactory implements DefaultValueFactory<String> {
+      @Override
+      public String create(PipelineOptions options) {
+        DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
+        if (dataflowOptions.getStagingLocation() != null) {
+          return GcsPath.fromUri(dataflowOptions.getStagingLocation())
+              .resolve("deduped.txt").toString();
+        } else {
+          throw new IllegalArgumentException("Must specify --output or 
--stagingLocation");
+        }
+      }
+    }
+  }
+
+
+  public static void main(String[] args)
+      throws Exception {
+
+    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+     .apply(RemoveDuplicates.<String>create())
+     .apply(TextIO.Write.named("DedupedShakespeare")
+         .to(options.getOutput()));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
new file mode 100644
index 0000000..269ee6a
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Mean;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ * This is an example that demonstrates several approaches to filtering, and 
use of the Mean
+ * transform. It shows how to dynamically set parameters by defining and using 
new pipeline options,
+ * and how to use a value derived by the pipeline.
+ *
+ * <p>Concepts: The Mean transform; Options configuration; using 
pipeline-derived data as a side
+ * input; approaches to filtering, selection, and projection.
+ *
+ * <p>The example reads public samples of weather data from BigQuery. It 
performs a
+ * projection on the data, finds the global mean of the temperature readings, 
filters on readings
+ * for a single given month, and then outputs only data (for that month) that 
has a mean temp
+ * smaller than the derived global mean.
+*
+ * <p>Note: Before running this example, you must create a BigQuery dataset to 
contain your output
+ * table.
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ * }
+ * </pre>
+ * and the BigQuery table for the output:
+ * <pre>{@code
+ *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ *   [--monthFilter=<month_number>]
+ * }
+ * </pre>
+ * where optional parameter {@code --monthFilter} is set to a number 1-12.
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline 
configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ * }
+ * </pre>
+ * and the BigQuery table for the output:
+ * <pre>{@code
+ *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ *   [--monthFilter=<month_number>]
+ * }
+ * </pre>
+ * where optional parameter {@code --monthFilter} is set to a number 1-12.
+ *
+ * <p>The BigQuery input table defaults to {@code 
clouddataflow-readonly:samples.weather_stations}
+ * and can be overridden with {@code --input}.
+ */
+public class FilterExamples {
+  // Default to using a 1000 row subset of the public weather station table 
publicdata:samples.gsod.
+  private static final String WEATHER_SAMPLES_TABLE =
+      "clouddataflow-readonly:samples.weather_stations";
+  static final Logger LOG = Logger.getLogger(FilterExamples.class.getName());
+  static final int MONTH_TO_FILTER = 7;
+
+  /**
+   * Examines each row in the input table. Outputs only the subset of the 
cells this example
+   * is interested in-- the mean_temp and year, month, and day-- as a bigquery 
table row.
+   */
+  static class ProjectionFn extends DoFn<TableRow, TableRow> {
+    @Override
+    public void processElement(ProcessContext c){
+      TableRow row = c.element();
+      // Grab year, month, day, mean_temp from the row
+      Integer year = Integer.parseInt((String) row.get("year"));
+      Integer month = Integer.parseInt((String) row.get("month"));
+      Integer day = Integer.parseInt((String) row.get("day"));
+      Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
+      // Prepares the data for writing to BigQuery by building a TableRow 
object
+      TableRow outRow = new TableRow()
+          .set("year", year).set("month", month)
+          .set("day", day).set("mean_temp", meanTemp);
+      c.output(outRow);
+    }
+  }
+
+  /**
+   * Implements 'filter' functionality.
+   *
+   * <p>Examines each row in the input table. Outputs only rows from the month
+   * monthFilter, which is passed in as a parameter during construction of 
this DoFn.
+   */
+  static class FilterSingleMonthDataFn extends DoFn<TableRow, TableRow> {
+    Integer monthFilter;
+
+    public FilterSingleMonthDataFn(Integer monthFilter) {
+      this.monthFilter = monthFilter;
+    }
+
+    @Override
+    public void processElement(ProcessContext c){
+      TableRow row = c.element();
+      Integer month;
+      month = (Integer) row.get("month");
+      if (month.equals(this.monthFilter)) {
+        c.output(row);
+      }
+    }
+  }
+
+  /**
+   * Examines each row (weather reading) in the input table. Output the 
temperature
+   * reading for that row ('mean_temp').
+   */
+  static class ExtractTempFn extends DoFn<TableRow, Double> {
+    @Override
+    public void processElement(ProcessContext c){
+      TableRow row = c.element();
+      Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
+      c.output(meanTemp);
+    }
+  }
+
+
+
+  /*
+   * Finds the global mean of the mean_temp for each day/record, and outputs
+   * only data that has a mean temp larger than this global mean.
+   **/
+  static class BelowGlobalMean
+      extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
+    Integer monthFilter;
+
+    public BelowGlobalMean(Integer monthFilter) {
+      this.monthFilter = monthFilter;
+    }
+
+
+    @Override
+    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+
+      // Extract the mean_temp from each row.
+      PCollection<Double> meanTemps = rows.apply(
+          ParDo.of(new ExtractTempFn()));
+
+      // Find the global mean, of all the mean_temp readings in the weather 
data,
+      // and prepare this singleton PCollectionView for use as a side input.
+      final PCollectionView<Double> globalMeanTemp =
+          meanTemps.apply(Mean.<Double>globally())
+               .apply(View.<Double>asSingleton());
+
+      // Rows filtered to remove all but a single month
+      PCollection<TableRow> monthFilteredRows = rows
+          .apply(ParDo.of(new FilterSingleMonthDataFn(monthFilter)));
+
+      // Then, use the global mean as a side input, to further filter the 
weather data.
+      // By using a side input to pass in the filtering criteria, we can use a 
value
+      // that is computed earlier in pipeline execution.
+      // We'll only output readings with temperatures below this mean.
+      PCollection<TableRow> filteredRows = monthFilteredRows
+          .apply(ParDo
+              .named("ParseAndFilter")
+              .withSideInputs(globalMeanTemp)
+              .of(new DoFn<TableRow, TableRow>() {
+                @Override
+                public void processElement(ProcessContext c) {
+                  Double meanTemp = 
Double.parseDouble(c.element().get("mean_temp").toString());
+                  Double gTemp = c.sideInput(globalMeanTemp);
+                  if (meanTemp < gTemp) {
+                    c.output(c.element());
+                  }
+                }
+              }));
+
+      return filteredRows;
+    }
+  }
+
+
+  /**
+   * Options supported by {@link FilterExamples}.
+   *
+   * <p>Inherits standard configuration options.
+   */
+  private static interface Options extends PipelineOptions {
+    @Description("Table to read from, specified as "
+        + "<project_id>:<dataset_id>.<table_id>")
+    @Default.String(WEATHER_SAMPLES_TABLE)
+    String getInput();
+    void setInput(String value);
+
+    @Description("Table to write to, specified as "
+        + "<project_id>:<dataset_id>.<table_id>. "
+        + "The dataset_id must already exist")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+
+    @Description("Numeric value of month to filter on")
+    @Default.Integer(MONTH_TO_FILTER)
+    Integer getMonthFilter();
+    void setMonthFilter(Integer value);
+  }
+
+  /**
+   * Helper method to build the table schema for the output table.
+   */
+  private static TableSchema buildWeatherSchemaProjection() {
+    List<TableFieldSchema> fields = new ArrayList<>();
+    fields.add(new TableFieldSchema().setName("year").setType("INTEGER"));
+    fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
+    fields.add(new TableFieldSchema().setName("day").setType("INTEGER"));
+    fields.add(new TableFieldSchema().setName("mean_temp").setType("FLOAT"));
+    TableSchema schema = new TableSchema().setFields(fields);
+    return schema;
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+
+    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    Pipeline p = Pipeline.create(options);
+
+    TableSchema schema = buildWeatherSchemaProjection();
+
+    p.apply(BigQueryIO.Read.from(options.getInput()))
+     .apply(ParDo.of(new ProjectionFn()))
+     .apply(new BelowGlobalMean(options.getMonthFilter()))
+     .apply(BigQueryIO.Write
+        .to(options.getOutput())
+        .withSchema(schema)
+        
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+        
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
new file mode 100644
index 0000000..765420e
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+
+/**
+ * This example shows how to do a join on two collections.
+ * It uses a sample of the GDELT 'world event' data (http://goo.gl/OB6oin), 
joining the event
+ * 'action' country code against a table that maps country codes to country 
names.
+ *
+ * <p>Concepts: Join operation; multiple input sources.
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ * }
+ * </pre>
+ * and a local output file or output prefix on GCS:
+ * <pre>{@code
+ *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
+ * }</pre>
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline 
configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ * }
+ * </pre>
+ * and an output prefix on GCS:
+ * <pre>{@code
+ *   --output=gs://YOUR_OUTPUT_PREFIX
+ * }</pre>
+ */
+public class JoinExamples {
+
+  // A 1000-row sample of the GDELT data here: gdelt-bq:full.events.
+  private static final String GDELT_EVENTS_TABLE =
+      "clouddataflow-readonly:samples.gdelt_sample";
+  // A table that maps country codes to country names.
+  private static final String COUNTRY_CODES =
+      "gdelt-bq:full.crosswalk_geocountrycodetohuman";
+
+  /**
+   * Join two collections, using country code as the key.
+   */
+  static PCollection<String> joinEvents(PCollection<TableRow> eventsTable,
+      PCollection<TableRow> countryCodes) throws Exception {
+
+    final TupleTag<String> eventInfoTag = new TupleTag<String>();
+    final TupleTag<String> countryInfoTag = new TupleTag<String>();
+
+    // transform both input collections to tuple collections, where the keys 
are country
+    // codes in both cases.
+    PCollection<KV<String, String>> eventInfo = eventsTable.apply(
+        ParDo.of(new ExtractEventDataFn()));
+    PCollection<KV<String, String>> countryInfo = countryCodes.apply(
+        ParDo.of(new ExtractCountryInfoFn()));
+
+    // country code 'key' -> CGBKR (<event info>, <country name>)
+    PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
+        .of(eventInfoTag, eventInfo)
+        .and(countryInfoTag, countryInfo)
+        .apply(CoGroupByKey.<String>create());
+
+    // Process the CoGbkResult elements generated by the CoGroupByKey 
transform.
+    // country code 'key' -> string of <event info>, <country name>
+    PCollection<KV<String, String>> finalResultCollection =
+      kvpCollection.apply(ParDo.named("Process").of(
+        new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+          @Override
+          public void processElement(ProcessContext c) {
+            KV<String, CoGbkResult> e = c.element();
+            String countryCode = e.getKey();
+            String countryName = "none";
+            countryName = e.getValue().getOnly(countryInfoTag);
+            for (String eventInfo : 
c.element().getValue().getAll(eventInfoTag)) {
+              // Generate a string that combines information from both 
collection values
+              c.output(KV.of(countryCode, "Country name: " + countryName
+                      + ", Event info: " + eventInfo));
+            }
+          }
+      }));
+
+    // write to GCS
+    PCollection<String> formattedResults = finalResultCollection
+        .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() 
{
+          @Override
+          public void processElement(ProcessContext c) {
+            String outputstring = "Country code: " + c.element().getKey()
+                + ", " + c.element().getValue();
+            c.output(outputstring);
+          }
+        }));
+    return formattedResults;
+  }
+
+  /**
+   * Examines each row (event) in the input table. Output a KV with the key 
the country
+   * code of the event, and the value a string encoding event information.
+   */
+  static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> {
+    @Override
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
+      String countryCode = (String) row.get("ActionGeo_CountryCode");
+      String sqlDate = (String) row.get("SQLDATE");
+      String actor1Name = (String) row.get("Actor1Name");
+      String sourceUrl = (String) row.get("SOURCEURL");
+      String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", 
url: " + sourceUrl;
+      c.output(KV.of(countryCode, eventInfo));
+    }
+  }
+
+
+  /**
+   * Examines each row (country info) in the input table. Output a KV with the 
key the country
+   * code, and the value the country name.
+   */
+  static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> 
{
+    @Override
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
+      String countryCode = (String) row.get("FIPSCC");
+      String countryName = (String) row.get("HumanName");
+      c.output(KV.of(countryCode, countryName));
+    }
+  }
+
+
+  /**
+   * Options supported by {@link JoinExamples}.
+   *
+   * <p>Inherits standard configuration options.
+   */
+  private static interface Options extends PipelineOptions {
+    @Description("Path of the file to write to")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+  }
+
+  public static void main(String[] args) throws Exception {
+    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    Pipeline p = Pipeline.create(options);
+    // the following two 'applys' create multiple inputs to our pipeline, one 
for each
+    // of our two input sources.
+    PCollection<TableRow> eventsTable = 
p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE));
+    PCollection<TableRow> countryCodes = 
p.apply(BigQueryIO.Read.from(COUNTRY_CODES));
+    PCollection<String> formattedResults = joinEvents(eventsTable, 
countryCodes);
+    formattedResults.apply(TextIO.Write.to(options.getOutput()));
+    p.run();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
new file mode 100644
index 0000000..b35a57f
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Max;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An example that reads the public samples of weather data from BigQuery, and 
finds
+ * the maximum temperature ('mean_temp') for each month.
+ *
+ * <p>Concepts: The 'Max' statistical combination function, and how to find 
the max per
+ * key group.
+ *
+ * <p>Note: Before running this example, you must create a BigQuery dataset to 
contain your output
+ * table.
+ *
+ * <p>To execute this pipeline locally, specify general pipeline configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ * }
+ * </pre>
+ * and the BigQuery table for the output, with the form
+ * <pre>{@code
+ *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ * }</pre>
+ *
+ * <p>To execute this pipeline using the Dataflow service, specify pipeline 
configuration:
+ * <pre>{@code
+ *   --project=YOUR_PROJECT_ID
+ *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
+ *   --runner=BlockingDataflowPipelineRunner
+ * }
+ * </pre>
+ * and the BigQuery table for the output:
+ * <pre>{@code
+ *   --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ * }</pre>
+ *
+ * <p>The BigQuery input table defaults to {@code 
clouddataflow-readonly:samples.weather_stations }
+ * and can be overridden with {@code --input}.
+ */
+public class MaxPerKeyExamples {
+  // Default to using a 1000 row subset of the public weather station table 
publicdata:samples.gsod.
+  private static final String WEATHER_SAMPLES_TABLE =
+      "clouddataflow-readonly:samples.weather_stations";
+
+  /**
+   * Examines each row (weather reading) in the input table. Output the month 
of the reading,
+   * and the mean_temp.
+   */
+  static class ExtractTempFn extends DoFn<TableRow, KV<Integer, Double>> {
+    @Override
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
+      Integer month = Integer.parseInt((String) row.get("month"));
+      Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
+      c.output(KV.of(month, meanTemp));
+    }
+  }
+
+  /**
+   * Format the results to a TableRow, to save to BigQuery.
+   *
+   */
+  static class FormatMaxesFn extends DoFn<KV<Integer, Double>, TableRow> {
+    @Override
+    public void processElement(ProcessContext c) {
+      TableRow row = new TableRow()
+          .set("month", c.element().getKey())
+          .set("max_mean_temp", c.element().getValue());
+      c.output(row);
+    }
+  }
+
+  /**
+   * Reads rows from a weather data table, and finds the max mean_temp for each
+   * month via the 'Max' statistical combination function.
+   */
+  static class MaxMeanTemp
+      extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
+    @Override
+    public PCollection<TableRow> apply(PCollection<TableRow> rows) {
+
+      // row... => <month, mean_temp> ...
+      PCollection<KV<Integer, Double>> temps = rows.apply(
+          ParDo.of(new ExtractTempFn()));
+
+      // month, mean_temp... => <month, max mean temp>...
+      PCollection<KV<Integer, Double>> tempMaxes =
+          temps.apply(Max.<Integer>doublesPerKey());
+
+      // <month, max>... => row...
+      PCollection<TableRow> results = tempMaxes.apply(
+          ParDo.of(new FormatMaxesFn()));
+
+      return results;
+    }
+  }
+
+  /**
+   * Options supported by {@link MaxPerKeyExamples}.
+   *
+   * <p>Inherits standard configuration options.
+   */
+  private static interface Options extends PipelineOptions {
+    @Description("Table to read from, specified as "
+        + "<project_id>:<dataset_id>.<table_id>")
+    @Default.String(WEATHER_SAMPLES_TABLE)
+    String getInput();
+    void setInput(String value);
+
+    @Description("Table to write to, specified as "
+        + "<project_id>:<dataset_id>.<table_id>")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+
+    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    Pipeline p = Pipeline.create(options);
+
+    // Build the table schema for the output table.
+    List<TableFieldSchema> fields = new ArrayList<>();
+    fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
+    fields.add(new 
TableFieldSchema().setName("max_mean_temp").setType("FLOAT"));
+    TableSchema schema = new TableSchema().setFields(fields);
+
+    p.apply(BigQueryIO.Read.from(options.getInput()))
+     .apply(new MaxMeanTemp())
+     .apply(BigQueryIO.Write
+        .to(options.getOutput())
+        .withSchema(schema)
+        
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+        
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md
new file mode 100644
index 0000000..99f3080
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/README.md
@@ -0,0 +1,55 @@
+
+# "Cookbook" Examples
+
+This directory holds simple "cookbook" examples, which show how to define
+commonly-used data analysis patterns that you would likely incorporate into a
+larger Dataflow pipeline. They include:
+
+ <ul>
+  <li><a 
href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoes.java";>BigQueryTornadoes</a>
+  &mdash; An example that reads the public samples of weather data from Google
+  BigQuery, counts the number of tornadoes that occur in each month, and
+  writes the results to BigQuery. Demonstrates reading/writing BigQuery,
+  counting a <code>PCollection</code>, and user-defined 
<code>PTransforms</code>.</li>
+  <li><a 
href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamples.java";>CombinePerKeyExamples</a>
+  &mdash; An example that reads the public &quot;Shakespeare&quot; data, and 
for
+  each word in the dataset that exceeds a given length, generates a string
+  containing the list of play names in which that word appears.
+  Demonstrates the <code>Combine.perKey</code>
+  transform, which lets you combine the values in a key-grouped
+  <code>PCollection</code>.
+  </li>
+  <li><a 
href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DatastoreWordCount.java";>DatastoreWordCount</a>
+  &mdash; An example that shows you how to read from Google Cloud 
Datastore.</li>
+  <li><a 
href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/DeDupExample.java";>DeDupExample</a>
+  &mdash; An example that uses Shakespeare's plays as plain text files, and
+  removes duplicate lines across all the files. Demonstrates the
+  <code>RemoveDuplicates</code>, <code>TextIO.Read</code>,
+  and <code>TextIO.Write</code> transforms, and how to wire transforms 
together.
+  </li>
+  <li><a 
href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/FilterExamples.java";>FilterExamples</a>
+  &mdash; An example that shows different approaches to filtering, including
+  selection and projection. It also shows how to dynamically set parameters
+  by defining and using new pipeline options, and use how to use a value 
derived
+  by a pipeline. Demonstrates the <code>Mean</code> transform,
+  <code>Options</code> configuration, and using pipeline-derived data as a side
+  input.
+  </li>
+  <li><a 
href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/JoinExamples.java";>JoinExamples</a>
+  &mdash; An example that shows how to join two collections. It uses a
+  sample of the <a href="http://goo.gl/OB6oin";>GDELT &quot;world event&quot;
+  data</a>, joining the event <code>action</code> country code against a table
+  that maps country codes to country names. Demonstrates the <code>Join</code>
+  operation, and using multiple input sources.
+  </li>
+  <li><a 
href="https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamples.java";>MaxPerKeyExamples</a>
+  &mdash; An example that reads the public samples of weather data from 
BigQuery,
+  and finds the maximum temperature (<code>mean_temp</code>) for each month.
+  Demonstrates the <code>Max</code> statistical combination transform, and how 
to
+  find the max-per-key group.
+  </li>
+  </ul>
+
+See the [documentation](https://cloud.google.com/dataflow/getting-started) and 
the [Examples
+README](../../../../../../../../../README.md) for
+information about how to run these examples.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
new file mode 100644
index 0000000..e32596d
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
+import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
+import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
+import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions;
+import com.google.cloud.dataflow.examples.common.PubsubFileInjector;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.IntraBundleParallelization;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterEach;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This example illustrates the basic concepts behind triggering. It shows how 
to use different
+ * trigger definitions to produce partial (speculative) results before all the 
data is processed and
+ * to control when updated results are produced for late data. The example 
performs a streaming
+ * analysis of the data coming in from PubSub and writes the results to 
BigQuery. It divides the
+ * data into {@link Window windows} to be processed, and demonstrates using 
various kinds of {@link
+ * Trigger triggers} to control when the results for each window are emitted.
+ *
+ * <p> This example uses a portion of real traffic data from San Diego 
freeways. It contains
+ * readings from sensor stations set up along each freeway. Each sensor 
reading includes a
+ * calculation of the 'total flow' across all lanes in that freeway direction.
+ *
+ * <p> Concepts:
+ * <pre>
+ *   1. The default triggering behavior
+ *   2. Late data with the default trigger
+ *   3. How to get speculative estimates
+ *   4. Combining late data and speculative estimates
+ * </pre>
+ *
+ * <p> Before running this example, it will be useful to familiarize yourself 
with Dataflow triggers
+ * and understand the concept of 'late data',
+ * See:  <a href="https://cloud.google.com/dataflow/model/triggers";>
+ * https://cloud.google.com/dataflow/model/triggers </a> and
+ * <a href="https://cloud.google.com/dataflow/model/windowing#Advanced";>
+ * https://cloud.google.com/dataflow/model/windowing#Advanced </a>
+ *
+ * <p> The example pipeline reads data from a Pub/Sub topic. By default, 
running the example will
+ * also run an auxiliary pipeline to inject data from the default {@code 
--input} file to the
+ * {@code --pubsubTopic}. The auxiliary pipeline puts a timestamp on the 
injected data so that the
+ * example pipeline can operate on <i>event time</i> (rather than arrival 
time). The auxiliary
+ * pipeline also randomly simulates late data, by setting the timestamps of 
some of the data
+ * elements to be in the past. You may override the default {@code --input} 
with the file of your
+ * choosing or set {@code --input=""} which will disable the automatic Pub/Sub 
injection, and allow
+ * you to use a separate tool to publish to the given topic.
+ *
+ * <p> The example is configured to use the default Pub/Sub topic and the 
default BigQuery table
+ * from the example common package (there are no defaults for a general 
Dataflow pipeline).
+ * You can override them by using the {@code --pubsubTopic}, {@code 
--bigQueryDataset}, and
+ * {@code --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table 
do not exist,
+ * the example will try to create them.
+ *
+ * <p> The pipeline outputs its results to a BigQuery table.
+ * Here are some queries you can use to see interesting results:
+ * Replace {@code <enter_table_name>} in the query below with the name of the 
BigQuery table.
+ * Replace {@code <enter_window_interval>} in the query below with the window 
interval.
+ *
+ * <p> To see the results of the default trigger,
+ * Note: When you start up your pipeline, you'll initially see results from 
'late' data. Wait after
+ * the window duration, until the first pane of non-late data has been 
emitted, to see more
+ * interesting results.
+ * {@code SELECT * FROM enter_table_name WHERE trigger_type = "default" ORDER 
BY window DESC}
+ *
+ * <p> To see the late data i.e. dropped by the default trigger,
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = 
"withAllowedLateness" and
+ * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window 
DESC, processing_time}
+ *
+ * <p>To see the the difference between accumulation mode and discarding mode,
+ * {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = 
"ON_TIME") AND
+ * (trigger_type = "withAllowedLateness" or trigger_type = "sequential") and 
freeway = "5" ORDER BY
+ * window DESC, processing_time}
+ *
+ * <p> To see speculative results every minute,
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "speculative" 
and freeway = "5"
+ * ORDER BY window DESC, processing_time}
+ *
+ * <p> To see speculative results every five minutes after the end of the 
window
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "sequential" 
and timing != "EARLY"
+ * and freeway = "5" ORDER BY window DESC, processing_time}
+ *
+ * <p> To see the first and the last pane for a freeway in a window for all 
the trigger types,
+ * {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = 
true) ORDER BY window}
+ *
+ * <p> To reduce the number of results for each query we can add additional 
where clauses.
+ * For examples, To see the results of the default trigger,
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "default" AND 
freeway = "5" AND
+ * window = "<enter_window_interval>"}
+ *
+ * <p> The example will try to cancel the pipelines on the signal to terminate 
the process (CTRL-C)
+ * and then exits.
+ */
+
+public class TriggerExample {
+  //Numeric value of fixed window duration, in minutes
+  public static final int WINDOW_DURATION = 30;
+  // Constants used in triggers.
+  // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early 
approximation of results.
+  // ONE_MINUTE is used only with processing time before the end of the window
+  public static final Duration ONE_MINUTE = Duration.standardMinutes(1);
+  // FIVE_MINUTES is used only with processing time after the end of the window
+  public static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
+  // ONE_DAY is used to specify the amount of lateness allowed for the data 
elements.
+  public static final Duration ONE_DAY = Duration.standardDays(1);
+
+  /**
+   * This transform demonstrates using triggers to control when data is 
produced for each window
+   * Consider an example to understand the results generated by each type of 
trigger.
+   * The example uses "freeway" as the key. Event time is the timestamp 
associated with the data
+   * element and processing time is the time when the data element gets 
processed in the pipeline.
+   * For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) 
window.
+   * Key (freeway) | Value (total_flow) | event time | processing time
+   * 5             | 50                 | 10:00:03   | 10:00:47
+   * 5             | 30                 | 10:01:00   | 10:01:03
+   * 5             | 30                 | 10:02:00   | 11:07:00
+   * 5             | 20                 | 10:04:10   | 10:05:15
+   * 5             | 60                 | 10:05:00   | 11:03:00
+   * 5             | 20                 | 10:05:01   | 11.07:30
+   * 5             | 60                 | 10:15:00   | 10:27:15
+   * 5             | 40                 | 10:26:40   | 10:26:43
+   * 5             | 60                 | 10:27:20   | 10:27:25
+   * 5             | 60                 | 10:29:00   | 11:11:00
+   *
+   * <p> Dataflow tracks a watermark which records up to what point in event 
time the data is
+   * complete. For the purposes of the example, we'll assume the watermark is 
approximately 15m
+   * behind the current processing time. In practice, the actual value would 
vary over time based
+   * on the systems knowledge of the current PubSub delay and contents of the 
backlog (data
+   * that has not yet been processed).
+   *
+   * <p> If the watermark is 15m behind, then the window [10:00:00, 10:30:00) 
(in event time) would
+   * close at 10:44:59, when the watermark passes 10:30:00.
+   */
+  static class CalculateTotalFlow
+  extends PTransform <PCollection<KV<String, Integer>>, 
PCollectionList<TableRow>> {
+    private int windowDuration;
+
+    CalculateTotalFlow(int windowDuration) {
+      this.windowDuration = windowDuration;
+    }
+
+    @Override
+    public PCollectionList<TableRow> apply(PCollection<KV<String, Integer>> 
flowInfo) {
+
+      // Concept #1: The default triggering behavior
+      // By default Dataflow uses a trigger which fires when the watermark has 
passed the end of the
+      // window. This would be written {@code 
Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
+
+      // The system also defaults to dropping late data -- data which arrives 
after the watermark
+      // has passed the event timestamp of the arriving element. This means 
that the default trigger
+      // will only fire once.
+
+      // Each pane produced by the default trigger with no allowed lateness 
will be the first and
+      // last pane in the window, and will be ON_TIME.
+
+      // The results for the example above with the default trigger and zero 
allowed lateness
+      // would be:
+      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | 
isLast | timing
+      // 5             | 260                | 6                 | true    | 
true   | ON_TIME
+
+      // At 11:03:00 (processing time) the system watermark may have advanced 
to 10:54:00. As a
+      // result, when the data record with event time 10:05:00 arrives at 
11:03:00, it is considered
+      // late, and dropped.
+
+      PCollection<TableRow> defaultTriggerResults = flowInfo
+          .apply("Default", Window
+              // The default window duration values work well if you're 
running the default input
+              // file. You may want to adjust the window duration otherwise.
+              .<KV<String, 
Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
+              // The default trigger first emits output when the system's 
watermark passes the end
+              // of the window.
+              .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
+              // Late data is dropped
+              .withAllowedLateness(Duration.ZERO)
+              // Discard elements after emitting each pane.
+              // With no allowed lateness and the specified trigger there will 
only be a single
+              // pane, so this doesn't have a noticeable effect. See concept 2 
for more details.
+              .discardingFiredPanes())
+          .apply(new TotalFlow("default"));
+
+      // Concept #2: Late data with the default trigger
+      // This uses the same trigger as concept #1, but allows data that is up 
to ONE_DAY late. This
+      // leads to each window staying open for ONE_DAY after the watermark has 
passed the end of the
+      // window. Any late data will result in an additional pane being fired 
for that same window.
+
+      // The first pane produced will be ON_TIME and the remaining panes will 
be LATE.
+      // To definitely get the last pane when the window closes, use
+      // .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS).
+
+      // The results for the example above with the default trigger and 
ONE_DAY allowed lateness
+      // would be:
+      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | 
isLast | timing
+      // 5             | 260                | 6                 | true    | 
false  | ON_TIME
+      // 5             | 60                 | 1                 | false   | 
false  | LATE
+      // 5             | 30                 | 1                 | false   | 
false  | LATE
+      // 5             | 20                 | 1                 | false   | 
false  | LATE
+      // 5             | 60                 | 1                 | false   | 
false  | LATE
+      PCollection<TableRow> withAllowedLatenessResults = flowInfo
+          .apply("WithLateData", Window
+              .<KV<String, 
Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
+              // Late data is emitted as it arrives
+              .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
+              // Once the output is produced, the pane is dropped and we start 
preparing the next
+              // pane for the window
+              .discardingFiredPanes()
+              // Late data is handled up to one day
+              .withAllowedLateness(ONE_DAY))
+          .apply(new TotalFlow("withAllowedLateness"));
+
+      // Concept #3: How to get speculative estimates
+      // We can specify a trigger that fires independent of the watermark, for 
instance after
+      // ONE_MINUTE of processing time. This allows us to produce speculative 
estimates before
+      // all the data is available. Since we don't have any triggers that 
depend on the watermark
+      // we don't get an ON_TIME firing. Instead, all panes are either EARLY 
or LATE.
+
+      // We also use accumulatingFiredPanes to build up the results across 
each pane firing.
+
+      // The results for the example above for this trigger would be:
+      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | 
isLast | timing
+      // 5             | 80                 | 2                 | true    | 
false  | EARLY
+      // 5             | 100                | 3                 | false   | 
false  | EARLY
+      // 5             | 260                | 6                 | false   | 
false  | EARLY
+      // 5             | 320                | 7                 | false   | 
false  | LATE
+      // 5             | 370                | 9                 | false   | 
false  | LATE
+      // 5             | 430                | 10                | false   | 
false  | LATE
+      PCollection<TableRow> speculativeResults = flowInfo
+          .apply("Speculative" , Window
+              .<KV<String, 
Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
+              // Trigger fires every minute.
+              
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
+                  // Speculative every ONE_MINUTE
+                  .plusDelayOf(ONE_MINUTE)))
+              // After emitting each pane, it will continue accumulating the 
elements so that each
+              // approximation includes all of the previous data in addition 
to the newly arrived
+              // data.
+              .accumulatingFiredPanes()
+              .withAllowedLateness(ONE_DAY))
+          .apply(new TotalFlow("speculative"));
+
+      // Concept #4: Combining late data and speculative estimates
+      // We can put the previous concepts together to get EARLY estimates, an 
ON_TIME result,
+      // and LATE updates based on late data.
+
+      // Each time a triggering condition is satisfied it advances to the next 
trigger.
+      // If there are new elements this trigger emits a window under following 
condition:
+      // > Early approximations every minute till the end of the window.
+      // > An on-time firing when the watermark has passed the end of the 
window
+      // > Every five minutes of late data.
+
+      // Every pane produced will either be EARLY, ON_TIME or LATE.
+
+      // The results for the example above for this trigger would be:
+      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | 
isLast | timing
+      // 5             | 80                 | 2                 | true    | 
false  | EARLY
+      // 5             | 100                | 3                 | false   | 
false  | EARLY
+      // 5             | 260                | 6                 | false   | 
false  | EARLY
+      // [First pane fired after the end of the window]
+      // 5             | 320                | 7                 | false   | 
false  | ON_TIME
+      // 5             | 430                | 10                | false   | 
false  | LATE
+
+      // For more possibilities of how to build advanced triggers, see {@link 
Trigger}.
+      PCollection<TableRow> sequentialResults = flowInfo
+          .apply("Sequential", Window
+              .<KV<String, 
Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
+              .triggering(AfterEach.inOrder(
+                  
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
+                      // Speculative every ONE_MINUTE
+                      
.plusDelayOf(ONE_MINUTE)).orFinally(AfterWatermark.pastEndOfWindow()),
+                  
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
+                      // Late data every FIVE_MINUTES
+                      .plusDelayOf(FIVE_MINUTES))))
+              .accumulatingFiredPanes()
+              // For up to ONE_DAY
+              .withAllowedLateness(ONE_DAY))
+          .apply(new TotalFlow("sequential"));
+
+      // Adds the results generated by each trigger type to a PCollectionList.
+      PCollectionList<TableRow> resultsList = 
PCollectionList.of(defaultTriggerResults)
+          .and(withAllowedLatenessResults)
+          .and(speculativeResults)
+          .and(sequentialResults);
+
+      return resultsList;
+    }
+  }
+
+  
//////////////////////////////////////////////////////////////////////////////////////////////////
+  // The remaining parts of the pipeline are needed to produce the output for 
each
+  // concept above. Not directly relevant to understanding the trigger 
examples.
+
+  /**
+   * Calculate total flow and number of records for each freeway and format 
the results to TableRow
+   * objects, to save to BigQuery.
+   */
+  static class TotalFlow extends
+  PTransform <PCollection<KV<String, Integer>>, PCollection<TableRow>> {
+    private String triggerType;
+
+    public TotalFlow(String triggerType) {
+      this.triggerType = triggerType;
+    }
+
+    @Override
+    public PCollection<TableRow> apply(PCollection<KV<String, Integer>> 
flowInfo) {
+      PCollection<KV<String, Iterable<Integer>>> flowPerFreeway = flowInfo
+          .apply(GroupByKey.<String, Integer>create());
+
+      PCollection<KV<String, String>> results = flowPerFreeway.apply(ParDo.of(
+          new DoFn <KV<String, Iterable<Integer>>, KV<String, String>>() {
+
+            @Override
+            public void processElement(ProcessContext c) throws Exception {
+              Iterable<Integer> flows = c.element().getValue();
+              Integer sum = 0;
+              Long numberOfRecords = 0L;
+              for (Integer value : flows) {
+                sum += value;
+                numberOfRecords++;
+              }
+              c.output(KV.of(c.element().getKey(), sum + "," + 
numberOfRecords));
+            }
+          }));
+      PCollection<TableRow> output = results.apply(ParDo.of(new 
FormatTotalFlow(triggerType)));
+      return output;
+    }
+  }
+
+  /**
+   * Format the results of the Total flow calculation to a TableRow, to save 
to BigQuery.
+   * Adds the triggerType, pane information, processing time and the window 
timestamp.
+   * */
+  static class FormatTotalFlow extends DoFn<KV<String, String>, TableRow>
+  implements  RequiresWindowAccess {
+    private String triggerType;
+
+    public FormatTotalFlow(String triggerType) {
+      this.triggerType = triggerType;
+    }
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      String[] values = c.element().getValue().split(",");
+      TableRow row = new TableRow()
+          .set("trigger_type", triggerType)
+          .set("freeway", c.element().getKey())
+          .set("total_flow", Integer.parseInt(values[0]))
+          .set("number_of_records", Long.parseLong(values[1]))
+          .set("window", c.window().toString())
+          .set("isFirst", c.pane().isFirst())
+          .set("isLast", c.pane().isLast())
+          .set("timing", c.pane().getTiming().toString())
+          .set("event_time", c.timestamp().toString())
+          .set("processing_time", Instant.now().toString());
+      c.output(row);
+    }
+  }
+
+  /**
+   * Extract the freeway and total flow in a reading.
+   * Freeway is used as key since we are calculating the total flow for each 
freeway.
+   */
+  static class ExtractFlowInfo extends DoFn<String, KV<String, Integer>> {
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      String[] laneInfo = c.element().split(",");
+      if (laneInfo[0].equals("timestamp")) {
+        // Header row
+        return;
+      }
+      if (laneInfo.length < 48) {
+        //Skip the invalid input.
+        return;
+      }
+      String freeway = laneInfo[2];
+      Integer totalFlow = tryIntegerParse(laneInfo[7]);
+      // Ignore the records with total flow 0 to easily understand the working 
of triggers.
+      // Skip the records with total flow -1 since they are invalid input.
+      if (totalFlow == null || totalFlow <= 0) {
+        return;
+      }
+      c.output(KV.of(freeway,  totalFlow));
+    }
+  }
+
+  /**
+   * Inherits standard configuration options.
+   */
+  public interface TrafficFlowOptions
+      extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions, 
DataflowExampleOptions {
+
+    @Description("Input file to inject to Pub/Sub topic")
+    @Default.String("gs://dataflow-samples/traffic_sensor/"
+        + "Freeways-5Minaa2010-01-01_to_2010-02-15.csv")
+    String getInput();
+    void setInput(String value);
+
+    @Description("Numeric value of window duration for fixed windows, in 
minutes")
+    @Default.Integer(WINDOW_DURATION)
+    Integer getWindowDuration();
+    void setWindowDuration(Integer value);
+  }
+
+  private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
+
+  public static void main(String[] args) throws Exception {
+    TrafficFlowOptions options = PipelineOptionsFactory.fromArgs(args)
+        .withValidation()
+        .as(TrafficFlowOptions.class);
+    options.setStreaming(true);
+
+    // In order to cancel the pipelines automatically,
+    // {@code DataflowPipelineRunner} is forced to be used.
+    options.setRunner(DataflowPipelineRunner.class);
+    options.setBigQuerySchema(getSchema());
+
+    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
+    dataflowUtils.setup();
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    TableReference tableRef = getTableReference(options.getProject(),
+        options.getBigQueryDataset(), options.getBigQueryTable());
+
+    PCollectionList<TableRow> resultList = 
pipeline.apply(PubsubIO.Read.named("ReadPubsubInput")
+        .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
+        .topic(options.getPubsubTopic()))
+        .apply(ParDo.of(new ExtractFlowInfo()))
+        .apply(new CalculateTotalFlow(options.getWindowDuration()));
+
+    for (int i = 0; i < resultList.size(); i++){
+      
resultList.get(i).apply(BigQueryIO.Write.to(tableRef).withSchema(getSchema()));
+    }
+
+    PipelineResult result = pipeline.run();
+    if (!options.getInput().isEmpty()){
+      //Inject the data into the pubsub topic
+      dataflowUtils.runInjectorPipeline(runInjector(options));
+    }
+    // dataflowUtils will try to cancel the pipeline and the injector before 
the program exits.
+    dataflowUtils.waitToFinish(result);
+  }
+
+  private static Pipeline runInjector(TrafficFlowOptions options){
+    DataflowPipelineOptions copiedOptions = 
options.cloneAs(DataflowPipelineOptions.class);
+    copiedOptions.setStreaming(false);
+    
copiedOptions.setNumWorkers(options.as(DataflowExampleOptions.class).getInjectorNumWorkers());
+    copiedOptions.setJobName(options.getJobName() + "-injector");
+    Pipeline injectorPipeline = Pipeline.create(copiedOptions);
+    injectorPipeline
+    .apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
+    .apply(ParDo.named("InsertRandomDelays").of(new InsertDelays()))
+    .apply(IntraBundleParallelization.of(PubsubFileInjector
+        .withTimestampLabelKey(PUBSUB_TIMESTAMP_LABEL_KEY)
+        .publish(options.getPubsubTopic()))
+        .withMaxParallelism(20));
+
+    return injectorPipeline;
+  }
+
+  /**
+   * Add current time to each record.
+   * Also insert a delay at random to demo the triggers.
+   */
+  public static class InsertDelays extends DoFn<String, String> {
+    private static final double THRESHOLD = 0.001;
+    // MIN_DELAY and MAX_DELAY in minutes.
+    private static final int MIN_DELAY = 1;
+    private static final int MAX_DELAY = 100;
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      Instant timestamp = Instant.now();
+      if (Math.random() < THRESHOLD){
+        int range = MAX_DELAY - MIN_DELAY;
+        int delayInMinutes = (int) (Math.random() * range) + MIN_DELAY;
+        long delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes);
+        timestamp = new Instant(timestamp.getMillis() - delayInMillis);
+      }
+      c.outputWithTimestamp(c.element(), timestamp);
+    }
+  }
+
+
+  /**Sets the table reference. **/
+  private static TableReference getTableReference(String project, String 
dataset, String table){
+    TableReference tableRef = new TableReference();
+    tableRef.setProjectId(project);
+    tableRef.setDatasetId(dataset);
+    tableRef.setTableId(table);
+    return tableRef;
+  }
+
+  /** Defines the BigQuery schema used for the output. */
+  private static TableSchema getSchema() {
+    List<TableFieldSchema> fields = new ArrayList<>();
+    fields.add(new 
TableFieldSchema().setName("trigger_type").setType("STRING"));
+    fields.add(new TableFieldSchema().setName("freeway").setType("STRING"));
+    fields.add(new 
TableFieldSchema().setName("total_flow").setType("INTEGER"));
+    fields.add(new 
TableFieldSchema().setName("number_of_records").setType("INTEGER"));
+    fields.add(new TableFieldSchema().setName("window").setType("STRING"));
+    fields.add(new TableFieldSchema().setName("isFirst").setType("BOOLEAN"));
+    fields.add(new TableFieldSchema().setName("isLast").setType("BOOLEAN"));
+    fields.add(new TableFieldSchema().setName("timing").setType("STRING"));
+    fields.add(new 
TableFieldSchema().setName("event_time").setType("TIMESTAMP"));
+    fields.add(new 
TableFieldSchema().setName("processing_time").setType("TIMESTAMP"));
+    TableSchema schema = new TableSchema().setFields(fields);
+    return schema;
+  }
+
+  private static Integer tryIntegerParse(String number) {
+    try {
+      return Integer.parseInt(number);
+    } catch (NumberFormatException e) {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/com/google/cloud/dataflow/examples/DebuggingWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/test/java/com/google/cloud/dataflow/examples/DebuggingWordCountTest.java
 
b/examples/java/src/test/java/com/google/cloud/dataflow/examples/DebuggingWordCountTest.java
deleted file mode 100644
index 52c61ef..0000000
--- 
a/examples/java/src/test/java/com/google/cloud/dataflow/examples/DebuggingWordCountTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples;
-
-import com.google.common.io.Files;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.File;
-import java.nio.charset.StandardCharsets;
-
-/**
- * Tests for {@link DebuggingWordCount}.
- */
-@RunWith(JUnit4.class)
-public class DebuggingWordCountTest {
-  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Test
-  public void testDebuggingWordCount() throws Exception {
-    File file = tmpFolder.newFile();
-    Files.write("stomach secret Flourish message Flourish here Flourish", file,
-        StandardCharsets.UTF_8);
-    DebuggingWordCount.main(new String[]{"--inputFile=" + 
file.getAbsolutePath()});
-  }
-}
-


Reply via email to