Move Java 8 examples to their own module

This allows easy setting of the compiler source and target version to
1.8 without any fragile plugin configuration. It also allows the
import into Eclipse to easily set separate compliance levels, as this
is done per-Eclipse-project.

This parallels the approach taken with tests to support java 8.

Also fixed some Checkstyle and unused variable warnings in the examples.
These were previously hidden since the code wasn't in a directory like
`src/{main,test}/java`. The warnings got fixed while I had the files
open.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e5c7cf88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e5c7cf88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e5c7cf88

Branch: refs/heads/master
Commit: e5c7cf88078cbb9c2236b38d854128abb0c18c41
Parents: e8b7100
Author: bchambers <bchamb...@google.com>
Authored: Wed Mar 16 14:05:41 2016 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Thu Mar 17 13:11:37 2016 -0700

----------------------------------------------------------------------
 examples/pom.xml                                | 127 ------
 .../examples/MinimalWordCountJava8.java         |  68 ---
 .../examples/complete/game/GameStats.java       | 347 ---------------
 .../examples/complete/game/HourlyTeamScore.java | 193 ---------
 .../examples/complete/game/LeaderBoard.java     | 237 -----------
 .../dataflow/examples/complete/game/README.md   | 119 ------
 .../examples/complete/game/UserScore.java       | 239 -----------
 .../complete/game/injector/Injector.java        | 417 -------------------
 .../complete/game/injector/InjectorUtils.java   | 101 -----
 .../injector/RetryHttpInitializerWrapper.java   | 127 ------
 .../complete/game/utils/WriteToBigQuery.java    | 134 ------
 .../game/utils/WriteWindowedToBigQuery.java     |  76 ----
 .../examples/MinimalWordCountJava8Test.java     | 103 -----
 .../examples/complete/game/GameStatsTest.java   |  99 -----
 .../complete/game/HourlyTeamScoreTest.java      | 121 ------
 .../examples/complete/game/UserScoreTest.java   | 156 -------
 java8examples/pom.xml                           | 278 +++++++++++++
 .../examples/MinimalWordCountJava8.java         |  68 +++
 .../examples/complete/game/GameStats.java       | 339 +++++++++++++++
 .../examples/complete/game/HourlyTeamScore.java | 193 +++++++++
 .../examples/complete/game/LeaderBoard.java     | 237 +++++++++++
 .../dataflow/examples/complete/game/README.md   | 113 +++++
 .../examples/complete/game/UserScore.java       | 239 +++++++++++
 .../complete/game/injector/Injector.java        | 415 ++++++++++++++++++
 .../complete/game/injector/InjectorUtils.java   | 101 +++++
 .../injector/RetryHttpInitializerWrapper.java   | 126 ++++++
 .../complete/game/utils/WriteToBigQuery.java    | 134 ++++++
 .../game/utils/WriteWindowedToBigQuery.java     |  76 ++++
 .../examples/MinimalWordCountJava8Test.java     | 103 +++++
 .../examples/complete/game/GameStatsTest.java   |  76 ++++
 .../complete/game/HourlyTeamScoreTest.java      | 111 +++++
 .../examples/complete/game/UserScoreTest.java   | 154 +++++++
 pom.xml                                         |   9 +
 33 files changed, 2772 insertions(+), 2664 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index c15f73f..ea44b01 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -45,133 +45,6 @@
         <testParallelValue>both</testParallelValue>
       </properties>
     </profile>
-
-    <profile>
-      <id>java8</id>
-      <activation>
-        <jdk>[1.8,)</jdk>
-      </activation>
-
-      <build>
-        <plugins>
-          <!-- Tells Maven about the Java 8 main and test source root. -->
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <executions>
-              <execution>
-                <id>add-java8-main-source</id>
-                <phase>initialize</phase>
-                <goals>
-                  <goal>add-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>${project.basedir}/src/main/java8</source>
-                  </sources>
-                </configuration>
-              </execution>
-
-              <execution>
-                <id>add-java8-test-source</id>
-                <phase>initialize</phase>
-                <goals>
-                  <goal>add-test-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>${project.basedir}/src/test/java8</source>
-                  </sources>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-compiler-plugin</artifactId>
-            <executions>
-
-              <!-- Set `-source 1.7 -target 1.7` for Java 7 tests -->
-              <execution>
-                <id>default-testCompile</id>
-                <phase>test-compile</phase>
-                <goals>
-                  <goal>testCompile</goal>
-                </goals>
-                <configuration>
-                  <testSource>1.7</testSource>
-                  <testTarget>1.7</testTarget>
-                  <testExcludes>
-                    <!-- This pattern is brittle; we would prefer to filter on 
the directory
-                         but that seems to be unavailable to us. -->
-                    <exclude>**/*Java8Test.java</exclude>
-                    <exclude>**/game/**/*.java</exclude>
-                  </testExcludes>
-                </configuration>
-              </execution>
-
-              <!-- Set `-source 1.8 -target 1.8` for Java 8 tests -->
-              <execution>
-                <id>java8-test-compile</id>
-                <phase>test-compile</phase>
-                <goals>
-                  <goal>testCompile</goal>
-                </goals>
-                <configuration>
-                  <testSource>1.8</testSource>
-                  <testTarget>1.8</testTarget>
-                  <includes>
-                    <!-- This pattern is brittle; we would prefer to filter on 
the directory
-                         but that seems to be unavailable to us. -->
-                    <include>**/*Java8Test.java</include>
-                    <include>**/game/**/*.java</include>
-                  </includes>
-                </configuration>
-              </execution>
-
-              <!-- Set `-source 1.7 -target 1.7` for Java 7 examples -->
-              <execution>
-                <id>default-compile</id>
-                <phase>compile</phase>
-                <goals>
-                  <goal>compile</goal>
-                </goals>
-                <configuration>
-                  <source>1.7</source>
-                  <target>1.7</target>
-                  <excludes>
-                    <!-- This pattern is brittle; we would prefer to filter on 
the directory
-                         but that seems to be unavailable to us. -->
-                    <exclude>**/*Java8*.java</exclude>
-                    <exclude>**/game/**/*.java</exclude>
-                  </excludes>
-                </configuration>
-              </execution>
-
-              <!-- Set `-source 1.8 -target 1.8` for Java 8 examples -->
-              <execution>
-                <id>java8-compile</id>
-                <phase>compile</phase>
-                <goals>
-                  <goal>compile</goal>
-                </goals>
-                <configuration>
-                  <source>1.8</source>
-                  <target>1.8</target>
-                  <includes>
-                    <!-- This pattern is brittle; we would prefer to filter on 
the directory
-                         but that seems to be unavailable to us. -->
-                    <include>**/*Java8*.java</include>
-                    <include>**/game/**/*.java</include>
-                  </includes>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
   </profiles>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/examples/src/main/java8/com/google/cloud/dataflow/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/MinimalWordCountJava8.java
 
b/examples/src/main/java8/com/google/cloud/dataflow/examples/MinimalWordCountJava8.java
deleted file mode 100644
index c115ea0..0000000
--- 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/MinimalWordCountJava8.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.Filter;
-import com.google.cloud.dataflow.sdk.transforms.FlatMapElements;
-import com.google.cloud.dataflow.sdk.transforms.MapElements;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-import java.util.Arrays;
-
-/**
- * An example that counts words in Shakespeare, using Java 8 language features.
- *
- * <p>See {@link MinimalWordCount} for a comprehensive explanation.
- */
-public class MinimalWordCountJava8 {
-
-  public static void main(String[] args) {
-    DataflowPipelineOptions options = PipelineOptionsFactory.create()
-        .as(DataflowPipelineOptions.class);
-
-    options.setRunner(BlockingDataflowPipelineRunner.class);
-
-    // CHANGE 1 of 3: Your project ID is required in order to run your 
pipeline on the Google Cloud.
-    options.setProject("SET_YOUR_PROJECT_ID_HERE");
-
-    // CHANGE 2 of 3: Your Google Cloud Storage path is required for staging 
local files.
-    
options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");
-
-    Pipeline p = Pipeline.create(options);
-
-    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
-     .apply(FlatMapElements.via((String word) -> 
Arrays.asList(word.split("[^a-zA-Z']+")))
-         .withOutputType(new TypeDescriptor<String>() {}))
-     .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
-     .apply(Count.<String>perElement())
-     .apply(MapElements
-         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + 
wordCount.getValue())
-         .withOutputType(new TypeDescriptor<String>() {}))
-
-     // CHANGE 3 of 3: The Google Cloud Storage path is required for 
outputting the results to.
-     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/GameStats.java
 
b/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/GameStats.java
deleted file mode 100644
index 39d7a76..0000000
--- 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/GameStats.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete.game;
-
-import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
-import 
com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
-import com.google.cloud.dataflow.sdk.transforms.MapElements;
-import com.google.cloud.dataflow.sdk.transforms.Mean;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.Values;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-import org.joda.time.DateTimeZone;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TimeZone;
-
-/**
- * This class is the fourth in a series of four pipelines that tell a story in 
a 'gaming'
- * domain, following {@link UserScore}, {@link HourlyTeamScore}, and {@link 
LeaderBoard}.
- * New concepts: session windows and finding session duration; use of both
- * singleton and non-singleton side inputs.
- *
- * <p> This pipeline builds on the {@link LeaderBoard} functionality, and adds 
some "business
- * intelligence" analysis: abuse detection and usage patterns. The pipeline 
derives the Mean user
- * score sum for a window, and uses that information to identify likely 
spammers/robots. (The robots
- * have a higher click rate than the human users). The 'robot' users are then 
filtered out when
- * calculating the team scores.
- *
- * <p> Additionally, user sessions are tracked: that is, we find bursts of 
user activity using
- * session windows. Then, the mean session duration information is recorded in 
the context of
- * subsequent fixed windowing. (This could be used to tell us what games are 
giving us greater
- * user retention).
- *
- * <p> Run {@link injector.Injector} to generate pubsub data for this 
pipeline.  The Injector
- * documentation provides more detail.
- *
- * <p> To execute this pipeline using the Dataflow service, specify the 
pipeline configuration
- * like this:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- *   --dataset=YOUR-DATASET
- *   --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC
- * }
- * </pre>
- * where the BigQuery dataset you specify must already exist. The PubSub topic 
you specify should
- * be the same topic to which the Injector is publishing.
- */
-public class GameStats extends LeaderBoard {
-
-  private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
-  private static final Logger LOG = LoggerFactory.getLogger(GameStats.class);
-
-  private static DateTimeFormatter fmt =
-      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
-          .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
-  static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
-  static final Duration TEN_MINUTES = Duration.standardMinutes(10);
-
-
-  /**
-   * Filter out all but those users with a high clickrate, which we will 
consider as 'spammy' uesrs.
-   * We do this by finding the mean total score per user, then using that 
information as a side
-   * input to filter out all but those user scores that are > (mean * 
SCORE_WEIGHT)
-   */
-  // [START DocInclude_AbuseDetect]
-  public static class CalculateSpammyUsers
-      extends PTransform<PCollection<KV<String, Integer>>, 
PCollection<KV<String, Integer>>> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(CalculateSpammyUsers.class);
-    private static final double SCORE_WEIGHT = 2.5;
-
-    @Override
-    public PCollection<KV<String, Integer>> apply(PCollection<KV<String, 
Integer>> userScores) {
-
-      // Get the sum of scores for each user.
-      PCollection<KV<String, Integer>> sumScores = userScores
-          .apply("UserSum", Sum.<String>integersPerKey());
-
-      // Extract the score from each element, and use it to find the global 
mean.
-      final PCollectionView<Double> globalMeanScore = 
sumScores.apply(Values.<Integer>create())
-          .apply(Mean.<Integer>globally().asSingletonView());
-
-      // Filter the user sums using the global mean.
-      PCollection<KV<String, Integer>> filtered = sumScores
-          .apply(ParDo
-              .named("ProcessAndFilter")
-              // use the derived mean total score as a side input
-              .withSideInputs(globalMeanScore)
-              .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
-                private final Aggregator<Long, Long> numSpammerUsers =
-                  createAggregator("SpammerUsers", new Sum.SumLongFn());
-                @Override
-                public void processElement(ProcessContext c) {
-                  Integer score = c.element().getValue();
-                  Double gmc = c.sideInput(globalMeanScore);
-                  if (score > (gmc * SCORE_WEIGHT)) {
-                    LOG.info("user " + c.element().getKey() + " spammer score 
" + score
-                        + " with mean " + gmc);
-                    numSpammerUsers.addValue(1L);
-                    c.output(c.element());
-                  }
-                }
-              }));
-      return filtered;
-    }
-  }
-  // [END DocInclude_AbuseDetect]
-
-  /**
-   * Calculate and output an element's session duration.
-   */
-  private static class UserSessionInfoFn extends DoFn<KV<String, Integer>, 
Integer>
-      implements RequiresWindowAccess {
-
-    @Override
-    public void processElement(ProcessContext c) {
-      IntervalWindow w = (IntervalWindow) c.window();
-      int duration = new Duration(
-          w.start(), w.end()).toPeriod().toStandardMinutes().getMinutes();
-      c.output(duration);
-    }
-  }
-
-
-  /**
-   * Options supported by {@link GameStats}.
-   */
-  static interface Options extends LeaderBoard.Options {
-    @Description("Pub/Sub topic to read from")
-    @Validation.Required
-    String getTopic();
-    void setTopic(String value);
-
-    @Description("Numeric value of fixed window duration for user analysis, in 
minutes")
-    @Default.Integer(60)
-    Integer getFixedWindowDuration();
-    void setFixedWindowDuration(Integer value);
-
-    @Description("Numeric value of gap between user sessions, in minutes")
-    @Default.Integer(5)
-    Integer getSessionGap();
-    void setSessionGap(Integer value);
-
-    @Description("Numeric value of fixed window for finding mean of user 
session duration, "
-        + "in minutes")
-    @Default.Integer(30)
-    Integer getUserActivityWindowDuration();
-    void setUserActivityWindowDuration(Integer value);
-
-    @Description("Prefix used for the BigQuery table names")
-    @Default.String("game_stats")
-    String getTablePrefix();
-    void setTablePrefix(String value);
-  }
-
-
-  /**
-   * Create a map of information that describes how to write pipeline output 
to BigQuery. This map
-   * is used to write information about team score sums.
-   */
-  protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, 
Integer>>>
-      configureWindowedWrite() {
-    Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> 
tableConfigure =
-        new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, 
Integer>>>();
-    tableConfigure.put("team",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
-            c -> c.element().getKey()));
-    tableConfigure.put("total_score",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
-            c -> c.element().getValue()));
-    tableConfigure.put("window_start",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
-          c -> { IntervalWindow w = (IntervalWindow) c.window();
-                 return fmt.print(w.start()); }));
-    tableConfigure.put("processing_time",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
-            "STRING", c -> fmt.print(Instant.now())));
-    return tableConfigure;
-  }
-
-  /**
-   * Create a map of information that describes how to write pipeline output 
to BigQuery. This map
-   * is used to write information about mean user session time.
-   */
-  protected static Map<String, WriteWindowedToBigQuery.FieldInfo<Double>>
-      configureSessionWindowWrite() {
-
-    Map<String, WriteWindowedToBigQuery.FieldInfo<Double>> tableConfigure =
-        new HashMap<String, WriteWindowedToBigQuery.FieldInfo<Double>>();
-    tableConfigure.put("window_start",
-        new WriteWindowedToBigQuery.FieldInfo<Double>("STRING",
-          c -> { IntervalWindow w = (IntervalWindow) c.window();
-                 return fmt.print(w.start()); }));
-    tableConfigure.put("mean_duration",
-        new WriteWindowedToBigQuery.FieldInfo<Double>("FLOAT", c -> 
c.element()));
-    return tableConfigure;
-  }
-
-
-
-  public static void main(String[] args) throws Exception {
-
-    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    // Enforce that this pipeline is always run in streaming mode.
-    options.setStreaming(true);
-    // Allow the pipeline to be cancelled automatically.
-    options.setRunner(DataflowPipelineRunner.class);
-    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
-    Pipeline pipeline = Pipeline.create(options);
-
-    // Read Events from Pub/Sub using custom timestamps
-    PCollection<GameActionInfo> rawEvents = pipeline
-        
.apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
-        .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()));
-
-    // Extract username/score pairs from the event stream
-    PCollection<KV<String, Integer>> userEvents =
-        rawEvents.apply("ExtractUserScore",
-          MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), 
gInfo.getScore()))
-            .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
-
-    // Calculate the total score per user over fixed windows, and
-    // cumulative updates for late data.
-    final PCollectionView<Map<String, Integer>> spammersView = userEvents
-      .apply(Window.named("FixedWindowsUser")
-          .<KV<String, Integer>>into(FixedWindows.of(
-              Duration.standardMinutes(options.getFixedWindowDuration())))
-          )
-
-      // Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
-      // These might be robots/spammers.
-      .apply("CalculateSpammyUsers", new CalculateSpammyUsers())
-      // Derive a view from the collection of spammer users. It will be used 
as a side input
-      // in calculating the team score sums, below.
-      .apply("CreateSpammersView", View.<String, Integer>asMap());
-
-    // [START DocInclude_FilterAndCalc]
-    // Calculate the total score per team over fixed windows,
-    // and emit cumulative updates for late data. Uses the side input derived 
above-- the set of
-    // suspected robots-- to filter out scores from those users from the sum.
-    // Write the results to BigQuery.
-    rawEvents
-      .apply(Window.named("WindowIntoFixedWindows")
-          .<GameActionInfo>into(FixedWindows.of(
-              Duration.standardMinutes(options.getFixedWindowDuration())))
-          )
-      // Filter out the detected spammer users, using the side input derived 
above.
-      .apply(ParDo.named("FilterOutSpammers")
-              .withSideInputs(spammersView)
-              .of(new DoFn<GameActionInfo, GameActionInfo>() {
-                @Override
-                public void processElement(ProcessContext c) {
-                  // If the user is not in the spammers Map, output the data 
element.
-                  if 
(c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
-                    c.output(c.element());
-                  }}}))
-      // Extract and sum teamname/score pairs from the event data.
-      .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
-      // [END DocInclude_FilterAndCalc]
-      // Write the result to BigQuery
-      .apply("WriteTeamSums",
-             new WriteWindowedToBigQuery<KV<String, Integer>>(
-                options.getTablePrefix() + "_team", configureWindowedWrite()));
-
-
-    // [START DocInclude_SessionCalc]
-    // Detect user sessions-- that is, a burst of activity separated by a gap 
from further
-    // activity. Find and record the mean session lengths.
-    // This information could help the game designers track the changing user 
engagement
-    // as their set of games changes.
-    userEvents
-      .apply(Window.named("WindowIntoSessions")
-            .<KV<String, Integer>>into(
-                  
Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
-        .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
-      // For this use, we care only about the existence of the session, not 
any particular
-      // information aggregated over it, so the following is an efficient way 
to do that.
-      .apply(Combine.perKey(x -> 0))
-      // Get the duration per session.
-      .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
-      // [END DocInclude_SessionCalc]
-      // [START DocInclude_Rewindow]
-      // Re-window to process groups of session sums according to when the 
sessions complete.
-      .apply(Window.named("WindowToExtractSessionMean")
-            .<Integer>into(
-                
FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
-      // Find the mean session duration in each window.
-      .apply(Mean.<Integer>globally().withoutDefaults())
-      // Write this info to a BigQuery table.
-      .apply("WriteAvgSessionLength",
-             new WriteWindowedToBigQuery<Double>(
-                options.getTablePrefix() + "_sessions", 
configureSessionWindowWrite()));
-    // [END DocInclude_Rewindow]
-
-
-    // Run the pipeline and wait for the pipeline to finish; capture 
cancellation requests from the
-    // command line.
-    PipelineResult result = pipeline.run();
-    dataflowUtils.waitToFinish(result);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java
 
b/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java
deleted file mode 100644
index 481b9df..0000000
--- 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScore.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete.game;
-
-import 
com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Filter;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.WithTimestamps;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-
-import org.joda.time.DateTimeZone;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TimeZone;
-
-/**
- * This class is the second in a series of four pipelines that tell a story in 
a 'gaming'
- * domain, following {@link UserScore}. In addition to the concepts introduced 
in {@link UserScore},
- * new concepts include: windowing and element timestamps; use of {@code 
Filter.byPredicate()}.
- *
- * <p> This pipeline processes data collected from gaming events in batch, 
building on {@link
- * UserScore} but using fixed windows. It calculates the sum of scores per 
team, for each window,
- * optionally allowing specification of two timestamps before and after which 
data is filtered out.
- * This allows a model where late data collected after the intended analysis 
window can be included,
- * and any late-arriving data prior to the beginning of the analysis window 
can be removed as well.
- * By using windowing and adding element timestamps, we can do finer-grained 
analysis than with the
- * {@link UserScore} pipeline. However, our batch processing is high-latency, 
in that we don't get
- * results from plays at the beginning of the batch's time period until the 
batch is processed.
- *
- * <p> To execute this pipeline using the Dataflow service, specify the 
pipeline configuration
- * like this:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- *   --dataset=YOUR-DATASET
- * }
- * </pre>
- * where the BigQuery dataset you specify must already exist.
- *
- * <p> Optionally include {@code --input} to specify the batch input file path.
- * To indicate a time after which the data should be filtered out, include the
- * {@code --stopMin} arg. E.g., {@code --stopMin=2015-10-18-23-59} indicates 
that any data
- * timestamped after 23:59 PST on 2015-10-18 should not be included in the 
analysis.
- * To indicate a time before which data should be filtered out, include the 
{@code --startMin} arg.
- * If you're using the default input specified in {@link UserScore},
- * "gs://dataflow-samples/game/gaming_data*.csv", then
- * {@code --startMin=2015-11-16-16-10 --stopMin=2015-11-17-16-10} are good 
values.
- */
-public class HourlyTeamScore extends UserScore {
-
-  private static DateTimeFormatter fmt =
-      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
-          .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
-  private static DateTimeFormatter minFmt =
-      DateTimeFormat.forPattern("yyyy-MM-dd-HH-mm")
-          .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
-
-
-  /**
-   * Options supported by {@link HourlyTeamScore}.
-   */
-  static interface Options extends UserScore.Options {
-
-    @Description("Numeric value of fixed window duration, in minutes")
-    @Default.Integer(60)
-    Integer getWindowDuration();
-    void setWindowDuration(Integer value);
-
-    @Description("String representation of the first minute after which to 
generate results,"
-        + "in the format: yyyy-MM-dd-HH-mm . This time should be in PST."
-        + "Any input data timestamped prior to that minute won't be included 
in the sums.")
-    @Default.String("1970-01-01-00-00")
-    String getStartMin();
-    void setStartMin(String value);
-
-    @Description("String representation of the first minute for which to not 
generate results,"
-        + "in the format: yyyy-MM-dd-HH-mm . This time should be in PST."
-        + "Any input data timestamped after that minute won't be included in 
the sums.")
-    @Default.String("2100-01-01-00-00")
-    String getStopMin();
-    void setStopMin(String value);
-
-    @Description("The BigQuery table name. Should not already exist.")
-    @Default.String("hourly_team_score")
-    String getTableName();
-    void setTableName(String value);
-  }
-
-  /**
-   * Create a map of information that describes how to write pipeline output 
to BigQuery. This map
-   * is passed to the {@link WriteWindowedToBigQuery} constructor to write 
team score sums and
-   * includes information about window start time.
-   */
-  protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, 
Integer>>>
-      configureWindowedTableWrite() {
-    Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> 
tableConfig =
-        new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, 
Integer>>>();
-    tableConfig.put("team",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
-            c -> c.element().getKey()));
-    tableConfig.put("total_score",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
-            c -> c.element().getValue()));
-    tableConfig.put("window_start",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
-          c -> { IntervalWindow w = (IntervalWindow) c.window();
-                 return fmt.print(w.start()); }));
-    return tableConfig;
-  }
-
-
-  /**
-   * Run a batch pipeline to do windowed analysis of the data.
-   */
-  // [START DocInclude_HTSMain]
-  public static void main(String[] args) throws Exception {
-    // Begin constructing a pipeline configured by commandline flags.
-    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    Pipeline pipeline = Pipeline.create(options);
-
-    final Instant stopMinTimestamp = new 
Instant(minFmt.parseMillis(options.getStopMin()));
-    final Instant startMinTimestamp = new 
Instant(minFmt.parseMillis(options.getStartMin()));
-
-    // Read 'gaming' events from a text file.
-    pipeline.apply(TextIO.Read.from(options.getInput()))
-      // Parse the incoming data.
-      .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
-
-      // Filter out data before and after the given times so that it is not 
included
-      // in the calculations. As we collect data in batches (say, by day), the 
batch for the day
-      // that we want to analyze could potentially include some late-arriving 
data from the previous
-      // day. If so, we want to weed it out. Similarly, if we include data 
from the following day
-      // (to scoop up late-arriving events from the day we're analyzing), we 
need to weed out events
-      // that fall after the time period we want to analyze.
-      // [START DocInclude_HTSFilters]
-      .apply("FilterStartTime", Filter.byPredicate(
-          (GameActionInfo gInfo)
-              -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
-      .apply("FilterEndTime", Filter.byPredicate(
-          (GameActionInfo gInfo)
-              -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))
-      // [END DocInclude_HTSFilters]
-
-      // [START DocInclude_HTSAddTsAndWindow]
-      // Add an element timestamp based on the event log, and apply fixed 
windowing.
-      .apply("AddEventTimestamps",
-             WithTimestamps.of((GameActionInfo i) -> new 
Instant(i.getTimestamp())))
-      .apply(Window.named("FixedWindowsTeam")
-          .<GameActionInfo>into(FixedWindows.of(
-                Duration.standardMinutes(options.getWindowDuration()))))
-      // [END DocInclude_HTSAddTsAndWindow]
-
-      // Extract and sum teamname/score pairs from the event data.
-      .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
-      .apply("WriteTeamScoreSums",
-        new WriteWindowedToBigQuery<KV<String, 
Integer>>(options.getTableName(),
-            configureWindowedTableWrite()));
-
-
-    pipeline.run();
-  }
-  // [END DocInclude_HTSMain]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java
 
b/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java
deleted file mode 100644
index 4185376..0000000
--- 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/LeaderBoard.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete.game;
-
-import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
-import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
-import com.google.cloud.dataflow.examples.complete.game.utils.WriteToBigQuery;
-import 
com.google.cloud.dataflow.examples.complete.game.utils.WriteWindowedToBigQuery;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime;
-import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.joda.time.DateTimeZone;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TimeZone;
-
-/**
- * This class is the third in a series of four pipelines that tell a story in 
a 'gaming' domain,
- * following {@link UserScore} and {@link HourlyTeamScore}. Concepts include: 
processing unbounded
- * data using fixed windows; use of custom timestamps and event-time 
processing; generation of
- * early/speculative results; using .accumulatingFiredPanes() to do cumulative 
processing of late-
- * arriving data.
- *
- * <p> This pipeline processes an unbounded stream of 'game events'. The 
calculation of the team
- * scores uses fixed windowing based on event time (the time of the game play 
event), not
- * processing time (the time that an event is processed by the pipeline). The 
pipeline calculates
- * the sum of scores per team, for each window. By default, the team scores 
are calculated using
- * one-hour windows.
- *
- * <p> In contrast-- to demo another windowing option-- the user scores are 
calculated using a
- * global window, which periodically (every ten minutes) emits cumulative user 
score sums.
- *
- * <p> In contrast to the previous pipelines in the series, which used static, 
finite input data,
- * here we're using an unbounded data source, which lets us provide 
speculative results, and allows
- * handling of late data, at much lower latency. We can use the 
early/speculative results to keep a
- * 'leaderboard' updated in near-realtime. Our handling of late data lets us 
generate correct
- * results, e.g. for 'team prizes'. We're now outputing window results as 
they're
- * calculated, giving us much lower latency than with the previous batch 
examples.
- *
- * <p> Run {@link injector.Injector} to generate pubsub data for this 
pipeline.  The Injector
- * documentation provides more detail on how to do this.
- *
- * <p> To execute this pipeline using the Dataflow service, specify the 
pipeline configuration
- * like this:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- *   --dataset=YOUR-DATASET
- *   --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC
- * }
- * </pre>
- * where the BigQuery dataset you specify must already exist.
- * The PubSub topic you specify should be the same topic to which the Injector 
is publishing.
- */
-public class LeaderBoard extends HourlyTeamScore {
-
-  private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
-
-  private static DateTimeFormatter fmt =
-      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
-          .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
-  static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
-  static final Duration TEN_MINUTES = Duration.standardMinutes(10);
-
-
-  /**
-   * Options supported by {@link LeaderBoard}.
-   */
-  static interface Options extends HourlyTeamScore.Options, 
DataflowExampleOptions {
-
-    @Description("Pub/Sub topic to read from")
-    @Validation.Required
-    String getTopic();
-    void setTopic(String value);
-
-    @Description("Numeric value of fixed window duration for team analysis, in 
minutes")
-    @Default.Integer(60)
-    Integer getTeamWindowDuration();
-    void setTeamWindowDuration(Integer value);
-
-    @Description("Numeric value of allowed data lateness, in minutes")
-    @Default.Integer(120)
-    Integer getAllowedLateness();
-    void setAllowedLateness(Integer value);
-
-    @Description("Prefix used for the BigQuery table names")
-    @Default.String("leaderboard")
-    String getTableName();
-    void setTableName(String value);
-  }
-
-  /**
-   * Create a map of information that describes how to write pipeline output 
to BigQuery. This map
-   * is used to write team score sums and includes event timing information.
-   */
-  protected static Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, 
Integer>>>
-      configureWindowedTableWrite() {
-
-    Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> 
tableConfigure =
-        new HashMap<String, WriteWindowedToBigQuery.FieldInfo<KV<String, 
Integer>>>();
-    tableConfigure.put("team",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
-            c -> c.element().getKey()));
-    tableConfigure.put("total_score",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER",
-            c -> c.element().getValue()));
-    tableConfigure.put("window_start",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>("STRING",
-          c -> { IntervalWindow w = (IntervalWindow) c.window();
-                 return fmt.print(w.start()); }));
-    tableConfigure.put("processing_time",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
-            "STRING", c -> fmt.print(Instant.now())));
-    tableConfigure.put("timing",
-        new WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>(
-            "STRING", c -> c.pane().getTiming().toString()));
-    return tableConfigure;
-  }
-
-  /**
-   * Create a map of information that describes how to write pipeline output 
to BigQuery. This map
-   * is used to write user score sums.
-   */
-  protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
-      configureGlobalWindowBigQueryWrite() {
-
-    Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure 
=
-        configureBigQueryWrite();
-    tableConfigure.put("processing_time",
-        new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
-            "STRING", c -> fmt.print(Instant.now())));
-    return tableConfigure;
-  }
-
-
-  public static void main(String[] args) throws Exception {
-
-    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    // Enforce that this pipeline is always run in streaming mode.
-    options.setStreaming(true);
-    // For example purposes, allow the pipeline to be easily cancelled instead 
of running
-    // continuously.
-    options.setRunner(DataflowPipelineRunner.class);
-    DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
-    Pipeline pipeline = Pipeline.create(options);
-
-    // Read game events from Pub/Sub using custom timestamps, which are 
extracted from the pubsub
-    // data elements, and parse the data.
-    PCollection<GameActionInfo> gameEvents = pipeline
-        
.apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
-        .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()));
-
-    // [START DocInclude_WindowAndTrigger]
-    // Extract team/score pairs from the event stream, using hour-long windows 
by default.
-    gameEvents
-        .apply(Window.named("LeaderboardTeamFixedWindows")
-          .<GameActionInfo>into(FixedWindows.of(
-              Duration.standardMinutes(options.getTeamWindowDuration())))
-          // We will get early (speculative) results as well as cumulative
-          // processing of late data.
-          .triggering(
-            AfterWatermark.pastEndOfWindow()
-            .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
-                  .plusDelayOf(FIVE_MINUTES))
-            .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
-                  .plusDelayOf(TEN_MINUTES)))
-          
.withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness()))
-          .accumulatingFiredPanes())
-        // Extract and sum teamname/score pairs from the event data.
-        .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
-        // Write the results to BigQuery.
-        .apply("WriteTeamScoreSums",
-               new WriteWindowedToBigQuery<KV<String, Integer>>(
-                  options.getTableName() + "_team", 
configureWindowedTableWrite()));
-    // [END DocInclude_WindowAndTrigger]
-
-    // [START DocInclude_ProcTimeTrigger]
-    // Extract user/score pairs from the event stream using processing time, 
via global windowing.
-    // Get periodic updates on all users' running scores.
-    gameEvents
-        .apply(Window.named("LeaderboardUserGlobalWindow")
-          .<GameActionInfo>into(new GlobalWindows())
-          // Get periodic results every ten minutes.
-              
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
-                  .plusDelayOf(TEN_MINUTES)))
-              .accumulatingFiredPanes()
-              
.withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness())))
-        // Extract and sum username/score pairs from the event data.
-        .apply("ExtractUserScore", new ExtractAndSumScore("user"))
-        // Write the results to BigQuery.
-        .apply("WriteUserScoreSums",
-               new WriteToBigQuery<KV<String, Integer>>(
-                  options.getTableName() + "_user", 
configureGlobalWindowBigQueryWrite()));
-    // [END DocInclude_ProcTimeTrigger]
-
-    // Run the pipeline and wait for the pipeline to finish; capture 
cancellation requests from the
-    // command line.
-    PipelineResult result = pipeline.run();
-    dataflowUtils.waitToFinish(result);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/README.md
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/README.md
 
b/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/README.md
deleted file mode 100644
index 4cad16d..0000000
--- 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/README.md
+++ /dev/null
@@ -1,119 +0,0 @@
-
-# 'Gaming' examples
-
-
-This directory holds a series of example Dataflow pipelines in a simple 'mobile
-gaming' domain. They all require Java 8.  Each pipeline successively introduces
-new concepts, and gives some examples of using Java 8 syntax in constructing
-Dataflow pipelines. Other than usage of Java 8 lambda expressions, the concepts
-that are used apply equally well in Java 7.
-
-In the gaming scenario, many users play, as members of different teams, over
-the course of a day, and their actions are logged for processing. Some of the
-logged game events may be late-arriving, if users play on mobile devices and go
-transiently offline for a period.
-
-The scenario includes not only "regular" users, but "robot users", which have a
-higher click rate than the regular users, and may move from team to team.
-
-The first two pipelines in the series use pre-generated batch data samples. The
-second two pipelines read from a [PubSub](https://cloud.google.com/pubsub/)
-topic input.  For these examples, you will also need to run the
-`injector.Injector` program, which generates and publishes the gaming data to
-PubSub. The javadocs for each pipeline have more detailed information on how to
-run that pipeline.
-
-All of these pipelines write their results to BigQuery table(s).
-
-
-## The pipelines in the 'gaming' series
-
-### UserScore
-
-The first pipeline in the series is `UserScore`. This pipeline does batch
-processing of data collected from gaming events. It calculates the sum of
-scores per user, over an entire batch of gaming data (collected, say, for each
-day). The batch processing will not include any late data that arrives after
-the day's cutoff point.
-
-### HourlyTeamScore
-
-The next pipeline in the series is `HourlyTeamScore`. This pipeline also
-processes data collected from gaming events in batch. It builds on `UserScore`,
-but uses [fixed windows](https://cloud.google.com/dataflow/model/windowing), by
-default an hour in duration. It calculates the sum of scores per team, for each
-window, optionally allowing specification of two timestamps before and after
-which data is filtered out. This allows a model where late data collected after
-the intended analysis window can be included in the analysis, and any late-
-arriving data prior to the beginning of the analysis window can be removed as
-well.
-
-By using windowing and adding element timestamps, we can do finer-grained
-analysis than with the `UserScore` pipeline — we're now tracking scores for
-each hour rather than over the course of a whole day. However, our batch
-processing is high-latency, in that we don't get results from plays at the
-beginning of the batch's time period until the complete batch is processed.
-
-### LeaderBoard
-
-The third pipeline in the series is `LeaderBoard`. This pipeline processes an
-unbounded stream of 'game events' from a PubSub topic. The calculation of the
-team scores uses fixed windowing based on event time (the time of the game play
-event), not processing time (the time that an event is processed by the
-pipeline). The pipeline calculates the sum of scores per team, for each window.
-By default, the team scores are calculated using one-hour windows.
-
-In contrast — to demo another windowing option — the user scores are 
calculated
-using a global window, which periodically (every ten minutes) emits cumulative
-user score sums.
-
-In contrast to the previous pipelines in the series, which used static, finite
-input data, here we're using an unbounded data source, which lets us provide
-_speculative_ results, and allows handling of late data, at much lower latency.
-E.g., we could use the early/speculative results to keep a 'leaderboard'
-updated in near-realtime. Our handling of late data lets us generate correct
-results, e.g. for 'team prizes'. We're now outputing window results as they're
-calculated, giving us much lower latency than with the previous batch examples.
-
-### GameStats
-
-The fourth pipeline in the series is `GameStats`. This pipeline builds
-on the `LeaderBoard` functionality — supporting output of speculative and 
late
-data — and adds some "business intelligence" analysis: identifying abuse
-detection. The pipeline derives the Mean user score sum for a window, and uses
-that information to identify likely spammers/robots. (The injector is designed
-so that the "robots" have a higher click rate than the "real" users). The robot
-users are then filtered out when calculating the team scores.
-
-Additionally, user sessions are tracked: that is, we find bursts of user
-activity using session windows. Then, the mean session duration information is
-recorded in the context of subsequent fixed windowing. (This could be used to
-tell us what games are giving us greater user retention).
-
-### Running the PubSub Injector
-
-The `LeaderBoard` and `GameStats` example pipelines read unbounded data
-from a PubSub topic.
-
-Use the `injector.Injector` program to generate this data and publish to a
-PubSub topic. See the `Injector`javadocs for more information on how to run the
-injector. Set up the injector before you start one of these pipelines. Then,
-when you start the pipeline, pass as an argument the name of that PubSub topic.
-See the pipeline javadocs for the details.
-
-## Viewing the results in BigQuery
-
-All of the pipelines write their results to BigQuery.  `UserScore` and
-`HourlyTeamScore` each write one table, and `LeaderBoard` and
-`GameStats` each write two. The pipelines have default table names that
-you can override when you start up the pipeline if those tables already exist.
-
-Depending on the windowing intervals defined in a given pipeline, you may have
-to wait for a while (more than an hour) before you start to see results written
-to the BigQuery tables.
-
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/UserScore.java
 
b/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/UserScore.java
deleted file mode 100644
index de06ce3..0000000
--- 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/UserScore.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete.game;
-
-import com.google.cloud.dataflow.examples.complete.game.utils.WriteToBigQuery;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
-import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.MapElements;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-import org.apache.avro.reflect.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This class is the first in a series of four pipelines that tell a story in 
a 'gaming' domain.
- * Concepts: batch processing; reading input from Google Cloud Storage and 
writing output to
- * BigQuery; using standalone DoFns; use of the sum by key transform; examples 
of
- * Java 8 lambda syntax.
- *
- * <p> In this gaming scenario, many users play, as members of different 
teams, over the course of a
- * day, and their actions are logged for processing.  Some of the logged game 
events may be late-
- * arriving, if users play on mobile devices and go transiently offline for a 
period.
- *
- * <p> This pipeline does batch processing of data collected from gaming 
events. It calculates the
- * sum of scores per user, over an entire batch of gaming data (collected, 
say, for each day). The
- * batch processing will not include any late data that arrives after the 
day's cutoff point.
- *
- * <p> To execute this pipeline using the Dataflow service and static example 
input data, specify
- * the pipeline configuration like this:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- *   --dataset=YOUR-DATASET
- * }
- * </pre>
- * where the BigQuery dataset you specify must already exist.
- *
- * <p> Optionally include the --input argument to specify a batch input file.
- * See the --input default value for example batch data file, or use {@link 
injector.Injector} to
- * generate your own batch data.
-  */
-public class UserScore {
-
-  /**
-   * Class to hold info about a game event.
-   */
-  @DefaultCoder(AvroCoder.class)
-  static class GameActionInfo {
-    @Nullable String user;
-    @Nullable String team;
-    @Nullable Integer score;
-    @Nullable Long timestamp;
-
-    public GameActionInfo() {}
-
-    public GameActionInfo(String user, String team, Integer score, Long 
timestamp) {
-      this.user = user;
-      this.team = team;
-      this.score = score;
-      this.timestamp = timestamp;
-    }
-
-    public String getUser() {
-      return this.user;
-    }
-    public String getTeam() {
-      return this.team;
-    }
-    public Integer getScore() {
-      return this.score;
-    }
-    public String getKey(String keyname) {
-      if (keyname.equals("team")) {
-        return this.team;
-      } else {  // return username as default
-        return this.user;
-      }
-    }
-    public Long getTimestamp() {
-      return this.timestamp;
-    }
-  }
-
-
-  /**
-   * Parses the raw game event info into GameActionInfo objects. Each event 
line has the following
-   * format: username,teamname,score,timestamp_in_ms,readable_time
-   * e.g.:
-   * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
-   * The human-readable time string is not used here.
-   */
-  static class ParseEventFn extends DoFn<String, GameActionInfo> {
-
-    // Log and count parse errors.
-    private static final Logger LOG = 
LoggerFactory.getLogger(ParseEventFn.class);
-    private final Aggregator<Long, Long> numParseErrors =
-        createAggregator("ParseErrors", new Sum.SumLongFn());
-
-    @Override
-    public void processElement(ProcessContext c) {
-      String[] components = c.element().split(",");
-      try {
-        String user = components[0].trim();
-        String team = components[1].trim();
-        Integer score = Integer.parseInt(components[2].trim());
-        Long timestamp = Long.parseLong(components[3].trim());
-        GameActionInfo gInfo = new GameActionInfo(user, team, score, 
timestamp);
-        c.output(gInfo);
-      } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
-        numParseErrors.addValue(1L);
-        LOG.info("Parse error on " + c.element() + ", " + e.getMessage());
-      }
-    }
-  }
-
-  /**
-   * A transform to extract key/score information from GameActionInfo, and sum 
the scores. The
-   * constructor arg determines whether 'team' or 'user' info is extracted.
-   */
-  // [START DocInclude_USExtractXform]
-  public static class ExtractAndSumScore
-      extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, 
Integer>>> {
-
-    private final String field;
-
-    ExtractAndSumScore(String field) {
-      this.field = field;
-    }
-
-    @Override
-    public PCollection<KV<String, Integer>> apply(
-        PCollection<GameActionInfo> gameInfo) {
-
-      return gameInfo
-        .apply(MapElements
-            .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), 
gInfo.getScore()))
-            .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
-        .apply(Sum.<String>integersPerKey());
-    }
-  }
-  // [END DocInclude_USExtractXform]
-
-
-  /**
-   * Options supported by {@link UserScore}.
-   */
-  public static interface Options extends PipelineOptions {
-
-    @Description("Path to the data file(s) containing game data.")
-    // The default maps to two large Google Cloud Storage files (each ~12GB) 
holding two subsequent
-    // day's worth (roughly) of data.
-    @Default.String("gs://dataflow-samples/game/gaming_data*.csv")
-    String getInput();
-    void setInput(String value);
-
-    @Description("BigQuery Dataset to write tables to. Must already exist.")
-    @Validation.Required
-    String getDataset();
-    void setDataset(String value);
-
-    @Description("The BigQuery table name. Should not already exist.")
-    @Default.String("user_score")
-    String getTableName();
-    void setTableName(String value);
-  }
-
-  /**
-   * Create a map of information that describes how to write pipeline output 
to BigQuery. This map
-   * is passed to the {@link WriteToBigQuery} constructor to write user score 
sums.
-   */
-  protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
-    configureBigQueryWrite() {
-    Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure 
=
-        new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
-    tableConfigure.put("user",
-        new WriteToBigQuery.FieldInfo<KV<String, Integer>>("STRING", c -> 
c.element().getKey()));
-    tableConfigure.put("total_score",
-        new WriteToBigQuery.FieldInfo<KV<String, Integer>>("INTEGER", c -> 
c.element().getValue()));
-    return tableConfigure;
-  }
-
-
-  /**
-   * Run a batch pipeline.
-   */
- // [START DocInclude_USMain]
-  public static void main(String[] args) throws Exception {
-    // Begin constructing a pipeline configured by commandline flags.
-    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    Pipeline pipeline = Pipeline.create(options);
-
-    // Read events from a text file and parse them.
-    pipeline.apply(TextIO.Read.from(options.getInput()))
-      .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
-      // Extract and sum username/score pairs from the event data.
-      .apply("ExtractUserScore", new ExtractAndSumScore("user"))
-      .apply("WriteUserScoreSums",
-          new WriteToBigQuery<KV<String, Integer>>(options.getTableName(),
-                                                   configureBigQueryWrite()));
-
-    // Run the batch pipeline.
-    pipeline.run();
-  }
-  // [END DocInclude_USMain]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
 
b/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
deleted file mode 100644
index d47886d..0000000
--- 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/Injector.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete.game.injector;
-
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PubsubMessage;
-
-import com.google.common.collect.ImmutableMap;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import java.io.BufferedOutputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-import java.util.TimeZone;
-
-
-/**
- * This is a generator that simulates usage data from a mobile game, and 
either publishes the data
- * to a pubsub topic or writes it to a file.
- *
- * <p> The general model used by the generator is the following. There is a 
set of teams with team
- * members. Each member is scoring points for their team. After some period, a 
team will dissolve
- * and a new one will be created in its place. There is also a set of 
'Robots', or spammer users.
- * They hop from team to team. The robots are set to have a higher 'click 
rate' (generate more
- * events) than the regular team members.
- *
- * <p> Each generated line of data has the following form:
- * username,teamname,score,timestamp_in_ms,readable_time
- * e.g.:
- * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
- *
- * <p> The Injector writes either to a PubSub topic, or a file. It will use 
the PubSub topic if
- * specified. It takes the following arguments:
- * {@code Injector project-name (topic-name|none) (filename|none)}.
- *
- * <p> To run the Injector in the mode where it publishes to PubSub, you will 
need to authenticate
- * locally using project-based service account credentials to avoid running 
over PubSub
- * quota.
- * See 
https://developers.google.com/identity/protocols/application-default-credentials
- * for more information on using service account credentials. Set the 
GOOGLE_APPLICATION_CREDENTIALS
- * environment variable to point to your downloaded service account 
credentials before starting the
- * program, e.g.:
- * {@code export 
GOOGLE_APPLICATION_CREDENTIALS=/path/to/your/credentials-key.json}.
- * If you do not do this, then your injector will only run for a few minutes 
on your
- * 'user account' credentials before you will start to see quota error 
messages like:
- * "Request throttled due to user QPS limit being reached", and see this 
exception:
- * ".com.google.api.client.googleapis.json.GoogleJsonResponseException: 429 
Too Many Requests".
- * Once you've set up your credentials, run the Injector like this":
-  * <pre>{@code
- * Injector <project-name> <topic-name> none
- * }
- * </pre>
- * The pubsub topic will be created if it does not exist.
- *
- * <p> To run the injector in write-to-file-mode, set the topic name to "none" 
and specify the
- * filename:
- * <pre>{@code
- * Injector <project-name> none <filename>
- * }
- * </pre>
- */
-class Injector {
-  private static Pubsub pubsub;
-  private static Random random = new Random();
-  private static String topic;
-  private static String project;
-  private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
-
-  // QPS ranges from 800 to 1000.
-  private static final int MIN_QPS = 800;
-  private static final int QPS_RANGE = 200;
-  // How long to sleep, in ms, between creation of the threads that make API 
requests to PubSub.
-  private static final int THREAD_SLEEP_MS = 500;
-
-  // Lists used to generate random team names.
-  private static final ArrayList<String> COLORS =
-      new ArrayList<String>(Arrays.asList(
-         "Magenta", "AliceBlue", "Almond", "Amaranth", "Amber",
-         "Amethyst", "AndroidGreen", "AntiqueBrass", "Fuchsia", "Ruby", 
"AppleGreen",
-         "Apricot", "Aqua", "ArmyGreen", "Asparagus", "Auburn", "Azure", 
"Banana",
-         "Beige", "Bisque", "BarnRed", "BattleshipGrey"));
-
-  private static final ArrayList<String> ANIMALS =
-      new ArrayList<String>(Arrays.asList(
-         "Echidna", "Koala", "Wombat", "Marmot", "Quokka", "Kangaroo", 
"Dingo", "Numbat", "Emu",
-         "Wallaby", "CaneToad", "Bilby", "Possum", "Cassowary", "Kookaburra", 
"Platypus",
-         "Bandicoot", "Cockatoo", "Antechinus"));
-
-  // The list of live teams.
-  private static ArrayList<TeamInfo> liveTeams = new ArrayList<TeamInfo>();
-
-  private static DateTimeFormatter fmt =
-    DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
-        .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
-
-
-  // The total number of robots in the system.
-  private static final int NUM_ROBOTS = 20;
-  // Determines the chance that a team will have a robot team member.
-  private static final int ROBOT_PROBABILITY = 3;
-  private static final int NUM_LIVE_TEAMS = 15;
-  private static final int BASE_MEMBERS_PER_TEAM = 5;
-  private static final int MEMBERS_PER_TEAM = 15;
-  private static final int MAX_SCORE = 20;
-  private static final int LATE_DATA_RATE = 5 * 60 * 2;       // Every 10 
minutes
-  private static final int BASE_DELAY_IN_MILLIS = 5 * 60 * 1000;  // 5-10 
minute delay
-  private static final int FUZZY_DELAY_IN_MILLIS = 5 * 60 * 1000;
-
-  // The minimum time a 'team' can live.
-  private static final int BASE_TEAM_EXPIRATION_TIME_IN_MINS = 20;
-  private static final int TEAM_EXPIRATION_TIME_IN_MINS = 20;
-
-
-  /**
-   * A class for holding team info: the name of the team, when it started,
-   * and the current team members. Teams may but need not include one robot 
team member.
-   */
-  private static class TeamInfo {
-    String teamName;
-    long startTimeInMillis;
-    int expirationPeriod;
-    // The team might but need not include 1 robot. Will be non-null if so.
-    String robot;
-    int numMembers;
-
-    private TeamInfo(String teamName, long startTimeInMillis, String robot) {
-      this.teamName = teamName;
-      this.startTimeInMillis = startTimeInMillis;
-      // How long until this team is dissolved.
-      this.expirationPeriod = random.nextInt(TEAM_EXPIRATION_TIME_IN_MINS) +
-        BASE_TEAM_EXPIRATION_TIME_IN_MINS;
-      this.robot = robot;
-      // Determine the number of team members.
-      numMembers = random.nextInt(MEMBERS_PER_TEAM) + BASE_MEMBERS_PER_TEAM;
-    }
-
-    String getTeamName() {
-      return teamName;
-    }
-    String getRobot() {
-      return robot;
-    }
-
-    long getStartTimeInMillis() {
-      return startTimeInMillis;
-    }
-    long getEndTimeInMillis() {
-      return startTimeInMillis + (expirationPeriod * 60 * 1000);
-    }
-    String getRandomUser() {
-      int userNum = random.nextInt(numMembers);
-      return "user" + userNum + "_" + teamName;
-    }
-
-    int numMembers() {
-      return numMembers;
-    }
-
-    @Override
-    public String toString() {
-      return "(" + teamName + ", num members: " + numMembers() + ", starting 
at: "
-        + startTimeInMillis + ", expires in: " + expirationPeriod + ", robot: 
" + robot + ")";
-    }
-  }
-
-  /** Utility to grab a random element from an array of Strings. */
-  private static String randomElement(ArrayList<String> list) {
-    int index = random.nextInt(list.size());
-    return list.get(index);
-  }
-
-  /**
-   * Get and return a random team. If the selected team is too old w.r.t its 
expiration, remove
-   * it, replacing it with a new team.
-   */
-  private static TeamInfo randomTeam(ArrayList<TeamInfo> list) {
-    int index = random.nextInt(list.size());
-    TeamInfo team = list.get(index);
-    // If the selected team is expired, remove it and return a new team.
-    long currTime = System.currentTimeMillis();
-    if ((team.getEndTimeInMillis() < currTime) || team.numMembers() == 0) {
-      System.out.println("\nteam " + team + " is too old; replacing.");
-      System.out.println("start time: " + team.getStartTimeInMillis() +
-        ", end time: " + team.getEndTimeInMillis() +
-        ", current time:" + currTime);
-      removeTeam(index);
-      // Add a new team in its stead.
-      return (addLiveTeam());
-    } else {
-      return team;
-    }
-  }
-
-  /**
-   * Create and add a team. Possibly add a robot to the team.
-   */
-  private static synchronized TeamInfo addLiveTeam() {
-    String teamName = randomElement(COLORS) + randomElement(ANIMALS);
-    String robot = null;
-    // Decide if we want to add a robot to the team.
-    if (random.nextInt(ROBOT_PROBABILITY) == 0) {
-      robot = "Robot-" + random.nextInt(NUM_ROBOTS);
-    }
-    long currTime = System.currentTimeMillis();
-    // Create the new team.
-    TeamInfo newTeam = new TeamInfo(teamName, System.currentTimeMillis(), 
robot);
-    liveTeams.add(newTeam);
-    System.out.println("[+" + newTeam + "]");
-    return newTeam;
-  }
-
-  /**
-   * Remove a specific team.
-   */
-  private static synchronized void removeTeam(int teamIndex) {
-    TeamInfo removedTeam = liveTeams.remove(teamIndex);
-    System.out.println("[-" + removedTeam + "]");
-  }
-
-  /** Generate a user gaming event. */
-  private static String generateEvent(Long currTime, int delayInMillis) {
-    TeamInfo team = randomTeam(liveTeams);
-    String teamName = team.getTeamName();
-    String user;
-    int PARSE_ERROR_RATE = 900000;
-
-    String robot = team.getRobot();
-    // If the team has an associated robot team member...
-    if (robot != null) {
-      // Then use that robot for the message with some probability.
-      // Set this probability to higher than that used to select any of the 
'regular' team
-      // members, so that if there is a robot on the team, it has a higher 
click rate.
-      if (random.nextInt(team.numMembers() / 2) == 0) {
-        user = robot;
-      } else {
-        user = team.getRandomUser();
-      }
-    } else { // No robot.
-      user = team.getRandomUser();
-    }
-    String event = user + "," + teamName + "," + random.nextInt(MAX_SCORE);
-    // Randomly introduce occasional parse errors. You can see a custom 
counter tracking the number
-    // of such errors in the Dataflow Monitoring UI, as the example pipeline 
runs.
-    if (random.nextInt(PARSE_ERROR_RATE) == 0) {
-      System.out.println("Introducing a parse error.");
-      event = "THIS LINE REPRESENTS CORRUPT DATA AND WILL CAUSE A PARSE ERROR";
-    }
-    return addTimeInfoToEvent(event, currTime, delayInMillis);
-  }
-
-  /**
-   * Add time info to a generated gaming event.
-   */
-  private static String addTimeInfoToEvent(String message, Long currTime, int 
delayInMillis) {
-    String eventTimeString =
-        Long.toString((currTime - delayInMillis) / 1000 * 1000);
-    // Add a (redundant) 'human-readable' date string to make the data 
semantics more clear.
-    String dateString = fmt.print(currTime);
-    message = message + "," + eventTimeString + "," + dateString;
-    return message;
-  }
-
-  /**
-   * Publish 'numMessages' arbitrary events from live users with the provided 
delay, to a
-   * PubSub topic.
-   */
-  public static void publishData(int numMessages, int delayInMillis)
-      throws IOException {
-    List<PubsubMessage> pubsubMessages = new ArrayList<>();
-
-    for (int i = 0; i < Math.max(1, numMessages); i++) {
-      Long currTime = System.currentTimeMillis();
-      String message = generateEvent(currTime, delayInMillis);
-      PubsubMessage pubsubMessage = new PubsubMessage()
-              .encodeData(message.getBytes("UTF-8"));
-      pubsubMessage.setAttributes(
-          ImmutableMap.of(TIMESTAMP_ATTRIBUTE,
-              Long.toString((currTime - delayInMillis) / 1000 * 1000)));
-      if (delayInMillis != 0) {
-        System.out.println(pubsubMessage.getAttributes());
-        System.out.println("late data for: " + message);
-      }
-      pubsubMessages.add(pubsubMessage);
-    }
-
-    PublishRequest publishRequest = new PublishRequest();
-    publishRequest.setMessages(pubsubMessages);
-    pubsub.projects().topics().publish(topic, publishRequest).execute();
-  }
-
-  /**
-   * Publish generated events to a file.
-   */
-  public static void publishDataToFile(String fileName, int numMessages, int 
delayInMillis)
-      throws IOException {
-    List<PubsubMessage> pubsubMessages = new ArrayList<>();
-    PrintWriter out = new PrintWriter(new OutputStreamWriter(
-        new BufferedOutputStream(new FileOutputStream(fileName, true)), 
"UTF-8"));
-
-    try {
-      for (int i = 0; i < Math.max(1, numMessages); i++) {
-        Long currTime = System.currentTimeMillis();
-        String message = generateEvent(currTime, delayInMillis);
-        out.println(message);
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-    } finally {
-      if (out != null) {
-        out.flush();
-        out.close();
-      }
-    }
-  }
-
-
-  public static void main(String[] args)
-      throws GeneralSecurityException, IOException, InterruptedException {
-    if (args.length < 3) {
-      System.out.println("Usage: Injector project-name (topic-name|none) 
(filename|none)");
-      System.exit(1);
-    }
-    boolean writeToFile = false;
-    boolean writeToPubsub = true;
-    project = args[0];
-    String topicName = args[1];
-    String fileName = args[2];
-    // The Injector writes either to a PubSub topic, or a file. It will use 
the PubSub topic if
-    // specified; otherwise, it will try to write to a file.
-    if (topicName.equalsIgnoreCase("none")) {
-      writeToFile = true;
-      writeToPubsub = false;
-    }
-    if (writeToPubsub) {
-      // Create the PubSub client.
-      pubsub = InjectorUtils.getClient();
-      // Create the PubSub topic as necessary.
-      topic = InjectorUtils.getFullyQualifiedTopicName(project, topicName);
-      InjectorUtils.createTopic(pubsub, topic);
-      System.out.println("Injecting to topic: " + topic);
-    } else {
-      if (fileName.equalsIgnoreCase("none")) {
-        System.out.println("Filename not specified.");
-        System.exit(1);
-      }
-      System.out.println("Writing to file: " + fileName);
-    }
-    System.out.println("Starting Injector");
-
-    // Start off with some random live teams.
-    while (liveTeams.size() < NUM_LIVE_TEAMS) {
-      addLiveTeam();
-    }
-
-    // Publish messages at a rate determined by the QPS and Thread sleep 
settings.
-    for (int i = 0; true; i++) {
-      if (Thread.activeCount() > 10) {
-        System.err.println("I'm falling behind!");
-      }
-
-      // Decide if this should be a batch of late data.
-      final int numMessages;
-      final int delayInMillis;
-      if (i % LATE_DATA_RATE == 0) {
-        // Insert delayed data for one user (one message only)
-        delayInMillis = BASE_DELAY_IN_MILLIS + 
random.nextInt(FUZZY_DELAY_IN_MILLIS);
-        numMessages = 1;
-        System.out.println("DELAY(" + delayInMillis + ", " + numMessages + 
")");
-      } else {
-        System.out.print(".");
-        delayInMillis = 0;
-        numMessages = MIN_QPS + random.nextInt(QPS_RANGE);
-      }
-
-      if (writeToFile) { // Won't use threading for the file write.
-        publishDataToFile(fileName, numMessages, delayInMillis);
-      } else { // Write to PubSub.
-        // Start a thread to inject some data.
-        new Thread(){
-          public void run() {
-            try {
-              publishData(numMessages, delayInMillis);
-            } catch (IOException e) {
-              System.err.println(e);
-            }
-          }
-        }.start();
-      }
-
-      // Wait before creating another injector thread.
-      Thread.sleep(THREAD_SLEEP_MS);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e5c7cf88/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
 
b/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
deleted file mode 100644
index 55982df..0000000
--- 
a/examples/src/main/java8/com/google/cloud/dataflow/examples/complete/game/injector/InjectorUtils.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.examples.complete.game.injector;
-
-
-import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.googleapis.util.Utils;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpStatusCodes;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.PubsubScopes;
-import com.google.api.services.pubsub.model.Topic;
-
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-
-class InjectorUtils {
-
-  private static final String APP_NAME = "injector";
-
-  /**
-   * Builds a new Pubsub client and returns it.
-   */
-  public static Pubsub getClient(final HttpTransport httpTransport,
-                                 final JsonFactory jsonFactory)
-           throws IOException {
-      Preconditions.checkNotNull(httpTransport);
-      Preconditions.checkNotNull(jsonFactory);
-      GoogleCredential credential =
-          GoogleCredential.getApplicationDefault(httpTransport, jsonFactory);
-      if (credential.createScopedRequired()) {
-          credential = credential.createScoped(PubsubScopes.all());
-      }
-      if (credential.getClientAuthentication() != null) {
-        System.out.println("\n***Warning! You are not using service account 
credentials to "
-          + "authenticate.\nYou need to use service account credentials for 
this example,"
-          + "\nsince user-level credentials do not have enough pubsub 
quota,\nand so you will run "
-          + "out of PubSub quota very quickly.\nSee "
-          + 
"https://developers.google.com/identity/protocols/application-default-credentials.";);
-        System.exit(1);
-      }
-      HttpRequestInitializer initializer =
-          new RetryHttpInitializerWrapper(credential);
-      return new Pubsub.Builder(httpTransport, jsonFactory, initializer)
-              .setApplicationName(APP_NAME)
-              .build();
-  }
-
-  /**
-   * Builds a new Pubsub client with default HttpTransport and
-   * JsonFactory and returns it.
-   */
-  public static Pubsub getClient() throws IOException {
-      return getClient(Utils.getDefaultTransport(),
-                       Utils.getDefaultJsonFactory());
-  }
-
-
-  /**
-   * Returns the fully qualified topic name for Pub/Sub.
-   */
-  public static String getFullyQualifiedTopicName(
-          final String project, final String topic) {
-      return String.format("projects/%s/topics/%s", project, topic);
-  }
-
-  /**
-   * Create a topic if it doesn't exist.
-   */
-  public static void createTopic(Pubsub client, String fullTopicName)
-      throws IOException {
-    try {
-        client.projects().topics().get(fullTopicName).execute();
-    } catch (GoogleJsonResponseException e) {
-      if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
-        Topic topic = client.projects().topics()
-                .create(fullTopicName, new Topic())
-                .execute();
-        System.out.printf("Topic %s was created.\n", topic.getName());
-      }
-    }
-  }
-}


Reply via email to