Repository: beam-site
Updated Branches:
  refs/heads/asf-site 496f24014 -> 3cafa86a0


Add Python snippets to Mobile Gaming.


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/bf62dc99
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/bf62dc99
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/bf62dc99

Branch: refs/heads/asf-site
Commit: bf62dc99069178c440b867a69042b05dda0cc439
Parents: 496f240
Author: Hadar Hod <had...@google.com>
Authored: Fri Apr 21 09:36:57 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue May 2 18:23:32 2017 -0700

----------------------------------------------------------------------
 src/get-started/mobile-gaming-example.md | 155 +++++++++++++++++++++++++-
 1 file changed, 153 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/bf62dc99/src/get-started/mobile-gaming-example.md
----------------------------------------------------------------------
diff --git a/src/get-started/mobile-gaming-example.md 
b/src/get-started/mobile-gaming-example.md
index 7519d3b..5c97274 100644
--- a/src/get-started/mobile-gaming-example.md
+++ b/src/get-started/mobile-gaming-example.md
@@ -10,9 +10,21 @@ redirect_from: /use/mobile-gaming-example/
 * TOC
 {:toc}
 
+<nav class="language-switcher">
+  <strong>Adapt for:</strong> 
+  <ul>
+    <li data-type="language-java">Java SDK</li>
+    <li data-type="language-py">Python SDK</li>
+  </ul>
+</nav>
+
 This section provides a walkthrough of a series of example Apache Beam 
pipelines that demonstrate more complex functionality than the basic 
[WordCount]({{ site.baseurl }}/get-started/wordcount-example) examples. The 
pipelines in this section process data from a hypothetical game that users play 
on their mobile phones. The pipelines demonstrate processing at increasing 
levels of complexity; the first pipeline, for example, shows how to run a batch 
analysis job to obtain relatively simple score data, while the later pipelines 
use Beam's windowing and triggers features to provide low-latency data analysis 
and more complex intelligence about user's play patterns.
 
-> **Note**: These examples assume some familiarity with the Beam programming 
model. If you haven't already, we recommend familiarizing yourself with the 
programming model documentation and running a basic example pipeline before 
continuing. Note also that these examples use the Java 8 lambda syntax, and 
thus require Java 8. However, you can create pipelines with equivalent 
functionality using Java 7.
+{:.language-java}
+> **Note**: These examples assume some familiarity with the Beam programming 
model. If you haven't already, we recommend familiarizing yourself with the 
programming model documentation and running a basic example pipeline before 
continuing. Note also that these examples use the Java 8 lambda syntax, and 
thus require Java 8. However, you can create pipelines with equivalent 
functionality using Java 7. 
+
+{:.language-py}
+> **Note**: These examples assume some familiarity with the Beam programming 
model. If you haven't already, we recommend familiarizing yourself with the 
programming model documentation and running a basic example pipeline before 
continuing.
 
 Every time a user plays an instance of our hypothetical mobile game, they 
generate a data event. Each data event consists of the following information:
 
@@ -44,8 +56,12 @@ The Mobile Game example pipelines vary in complexity, from 
simple batch analysis
 
 The `UserScore` pipeline is the simplest example for processing mobile game 
data. `UserScore` determines the total score per user over a finite data set 
(for example, one day's worth of scores stored on the game server). Pipelines 
like `UserScore` are best run periodically after all relevant data has been 
gathered. For example, `UserScore` could run as a nightly job over data 
gathered during that day.
 
+{:.language-java}
 > **Note:** See [UserScore on 
 > GitHub](https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java)
 >  for the complete example pipeline program.
 
+{:.language-py}
+> **Note:** See [UserScore on 
GitHub](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/user_score.py)
 for the complete example pipeline program.
+
 ### What Does UserScore Do?
 
 In a day's worth of scoring data, each user ID may have multiple records (if 
the user plays more than one instance of the game during the analysis window), 
each with their own score value and timestamp. If we want to determine the 
total score over all the instances a user plays during the day, our pipeline 
will need to group all the records together per individual user.
@@ -99,6 +115,28 @@ public static class ExtractAndSumScore
 }
 ```
 
+```py
+class ExtractAndSumScore(beam.PTransform):
+  """A transform to extract key/score information and sum the scores.
+  The constructor argument `field` determines whether 'team' or 'user' info is
+  extracted.
+  """
+  def __init__(self, field):
+    super(ExtractAndSumScore, self).__init__()
+    self.field = field
+
+  def expand(self, pcoll):
+    return (pcoll
+            | beam.Map(lambda info: (info[self.field], info['score']))
+            | beam.CombinePerKey(sum_ints))
+
+def configure_bigquery_write():
+  return [
+      ('user', 'STRING', lambda e: e[0]),
+      ('total_score', 'INTEGER', lambda e: e[1]),
+  ]
+```
+
 `ExtractAndSumScore` is written to be more general, in that you can pass in 
the field by which you want to group the data (in the case of our game, by 
unique user or unique team). This means we can re-use `ExtractAndSumScore` in 
other pipelines that group score data by team, for example.
 
 Here's the main method of `UserScore`, showing how we apply all three steps of 
the pipeline:
@@ -123,6 +161,25 @@ public static void main(String[] args) throws Exception {
 }
 ```
 
+```py
+def run(argv=None):
+  """Main entry point; defines and runs the user_score pipeline."""
+  
+  ...
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  p = beam.Pipeline(options=pipeline_options)
+
+  (p  # pylint: disable=expression-not-assigned
+   | ReadFromText(known_args.input) # Read events from a file and parse them.
+   | UserScore()
+   | WriteToBigQuery(
+       known_args.table_name, known_args.dataset, configure_bigquery_write()))
+
+  result = p.run()
+  result.wait_until_finish()
+```
+
 ### Working with the Results
 
 `UserScore` writes the data to a BigQuery table (called `user_score` by 
default). With the data in the BigQuery table, we might perform a further 
interactive analysis, such as querying for a list of the N top-scoring users 
for a given day.
@@ -153,8 +210,12 @@ The `HourlyTeamScore` pipeline expands on the basic batch 
analysis principles us
 
 Like `UserScore`, `HourlyTeamScore` is best thought of as a job to be run 
periodically after all the relevant data has been gathered (such as once per 
day). The pipeline reads a fixed data set from a file, and writes the results 
to a Google Cloud BigQuery table, just like `UserScore`.
 
+{:.language-java}
 > **Note:** See [HourlyTeamScore on 
 > GitHub](https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java)
 >  for the complete example pipeline program.
 
+{:.language-py}
+> **Note:** See [HourlyTeamScore on 
GitHub](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py)
 for the complete example pipeline program.
+
 ### What Does HourlyTeamScore Do?
 
 `HourlyTeamScore` calculates the total score per team, per hour, in a fixed 
data set (such as one day's worth of data).
@@ -184,7 +245,13 @@ Notice that as processing time advances, the sums are now 
_per window_; each win
 
 Beam's windowing feature uses the [intrinsic timestamp information]({{ 
site.baseurl }}/documentation/programming-guide/#pctimestamps) attached to each 
element of a `PCollection`. Because we want our pipeline to window based on 
_event time_, we **must first extract the timestamp** that's embedded in each 
data record apply it to the corresponding element in the `PCollection` of score 
data. Then, the pipeline can **apply the windowing function** to divide the 
`PCollection` into logical windows.
 
-Here's the code, which shows how `HourlyTeamScore` uses the 
[WithTimestamps](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java)
 and 
[Window](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java)
 transforms to perform these operations:
+{:.language-java}
+`HourlyTeamScore` uses the 
[WithTimestamps](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java)
 and 
[Window](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java)
 transforms to perform these operations.
+
+{:.language-py}
+`HourlyTeamScore` uses the `FixedWindows` transform, found in 
[window.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/window.py),
 to perform these operations.
+
+The following code shows this: 
 
 ```java
 // Add an element timestamp based on the event log, and apply fixed windowing.
@@ -194,6 +261,17 @@ Here's the code, which shows how `HourlyTeamScore` uses 
the [WithTimestamps](htt
         
FixedWindows.of(Duration.standardMinutes(options.getWindowDuration()))))
 ```
 
+```py
+# Add an element timestamp based on the event log, and apply fixed windowing.
+# Convert element['timestamp'] into seconds as expected by TimestampedValue.
+| 'AddEventTimestamps' >> beam.Map(
+    lambda element: TimestampedValue(
+        element, element['timestamp'] / 1000.0))
+# Convert window_duration into seconds as expected by FixedWindows.
+| 'FixedWindowsTeam' >> beam.WindowInto(FixedWindows(
+    size=self.window_duration * 60))
+```
+
 Notice that the transforms the pipeline uses to specify the windowing are 
distinct from the actual data processing transforms (such as 
`ExtractAndSumScores`). This functionality provides you some flexibility in 
designing your Beam pipeline, in that you can run existing transforms over 
datasets with different windowing characteristics.
 
 #### Filtering Based On Event Time
@@ -215,6 +293,13 @@ The following code shows how `HourlyTeamScore` uses the 
`Filter` transform to fi
         -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))
 ```
 
+```py
+| 'FilterStartTime' >> beam.Filter(
+    lambda element: element['timestamp'] > start_min_filter)
+| 'FilterEndTime' >> beam.Filter(
+    lambda element: element['timestamp'] < end_min_filter)
+```
+
 #### Calculating Score Per Team, Per Window
 
 `HourlyTeamScore` uses the same `ExtractAndSumScores` transform as the 
`UserScore` pipeline, but passes a different key (team, as opposed to user). 
Also, because the pipeline applies `ExtractAndSumScores` _after_ applying 
fixed-time 1-hour windowing to the input data, the data gets grouped by both 
team _and_ window. You can see the full sequence of transforms in 
`HourlyTeamScore`'s main method:
@@ -266,6 +351,68 @@ public static void main(String[] args) throws Exception {
 }
 ```
 
+```py
+class HourlyTeamScore(beam.PTransform):
+  def __init__(self, start_min, stop_min, window_duration):
+    super(HourlyTeamScore, self).__init__()
+    self.start_min = start_min
+    self.stop_min = stop_min
+    self.window_duration = window_duration
+
+  def expand(self, pcoll):
+    start_min_filter = string_to_timestamp(self.start_min)
+    end_min_filter = string_to_timestamp(self.stop_min)
+
+    return (
+        pcoll
+        | 'ParseGameEvent' >> beam.ParDo(ParseEventFn())
+        # Filter out data before and after the given times so that it is not
+        # included in the calculations. As we collect data in batches (say, by
+        # day), the batch for the day that we want to analyze could potentially
+        # include some late-arriving data from the previous day. If so, we want
+        # to weed it out. Similarly, if we include data from the following day
+        # (to scoop up late-arriving events from the day we're analyzing), we
+        # need to weed out events that fall after the time period we want to
+        # analyze.
+        | 'FilterStartTime' >> beam.Filter(
+            lambda element: element['timestamp'] > start_min_filter)
+        | 'FilterEndTime' >> beam.Filter(
+            lambda element: element['timestamp'] < end_min_filter)
+        # Add an element timestamp based on the event log, and apply fixed
+        # windowing.
+        # Convert element['timestamp'] into seconds as expected by
+        # TimestampedValue.
+        | 'AddEventTimestamps' >> beam.Map(
+            lambda element: TimestampedValue(
+                element, element['timestamp'] / 1000.0))
+        # Convert window_duration into seconds as expected by FixedWindows.
+        | 'FixedWindowsTeam' >> beam.WindowInto(FixedWindows(
+            size=self.window_duration * 60))
+        # Extract and sum teamname/score pairs from the event data.
+        | 'ExtractTeamScore' >> ExtractAndSumScore('team'))
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the hourly_team_score pipeline."""
+  ...
+
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  p = beam.Pipeline(options=pipeline_options)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+
+  (p  # pylint: disable=expression-not-assigned
+   | ReadFromText(known_args.input)
+   | HourlyTeamScore(
+       known_args.start_min, known_args.stop_min, known_args.window_duration)
+   | WriteWindowedToBigQuery(
+       known_args.table_name, known_args.dataset, configure_bigquery_write()))
+
+  result = p.run()
+  result.wait_until_finish()
+```
+
 ### Limitations
 
 As written, `HourlyTeamScore` still has a limitation:
@@ -275,6 +422,8 @@ As written, `HourlyTeamScore` still has a limitation:
 
 ## LeaderBoard: Streaming Processing with Real-Time Game Data
 
+> **Note:** This example currently exists in Java only.
+
 One way we can help address the latency issue present in the `UserScore` and 
`HourlyTeamScore` pipelines is by reading the score data from an unbounded 
source. The `LeaderBoard` pipeline introduces streaming processing by reading 
the game score data from an unbounded source that produces an infinite amount 
of data, rather than from a file on the game server.
 
 The `LeaderBoard` pipeline also demonstrates how to process game score data 
with respect to both _processing time_ and _event time_. `LeaderBoard` outputs 
data about both individual user scores and about team scores, each with respect 
to a different time frame.
@@ -405,6 +554,8 @@ Taken together, these processing strategies let us address 
the latency and compl
 
 ## GameStats: Abuse Detection and Usage Analysis
 
+> **Note:** This example currently exists in Java only.
+
 While `LeaderBoard` demonstrates how to use basic windowing and triggers to 
perform low-latency and flexible data analysis, we can use more advanced 
windowing techniques to perform more comprehensive analysis. This might include 
some calculations designed to detect system abuse (like spam) or to gain 
insight into user behavior. The `GameStats` pipeline builds on the low-latency 
functionality in `LeaderBoard` to demonstrate how you can use Beam to perform 
this kind of advanced analysis.
 
 Like `LeaderBoard`, `GameStats` reads data from an unbounded source. It is 
best thought of as an ongoing job that provides insight into the game as users 
play.

Reply via email to