Repository: beam-site Updated Branches: refs/heads/asf-site 496f24014 -> 3cafa86a0
Add Python snippets to Mobile Gaming. Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/bf62dc99 Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/bf62dc99 Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/bf62dc99 Branch: refs/heads/asf-site Commit: bf62dc99069178c440b867a69042b05dda0cc439 Parents: 496f240 Author: Hadar Hod <had...@google.com> Authored: Fri Apr 21 09:36:57 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Tue May 2 18:23:32 2017 -0700 ---------------------------------------------------------------------- src/get-started/mobile-gaming-example.md | 155 +++++++++++++++++++++++++- 1 file changed, 153 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam-site/blob/bf62dc99/src/get-started/mobile-gaming-example.md ---------------------------------------------------------------------- diff --git a/src/get-started/mobile-gaming-example.md b/src/get-started/mobile-gaming-example.md index 7519d3b..5c97274 100644 --- a/src/get-started/mobile-gaming-example.md +++ b/src/get-started/mobile-gaming-example.md @@ -10,9 +10,21 @@ redirect_from: /use/mobile-gaming-example/ * TOC {:toc} +<nav class="language-switcher"> + <strong>Adapt for:</strong> + <ul> + <li data-type="language-java">Java SDK</li> + <li data-type="language-py">Python SDK</li> + </ul> +</nav> + This section provides a walkthrough of a series of example Apache Beam pipelines that demonstrate more complex functionality than the basic [WordCount]({{ site.baseurl }}/get-started/wordcount-example) examples. The pipelines in this section process data from a hypothetical game that users play on their mobile phones. The pipelines demonstrate processing at increasing levels of complexity; the first pipeline, for example, shows how to run a batch analysis job to obtain relatively simple score data, while the later pipelines use Beam's windowing and triggers features to provide low-latency data analysis and more complex intelligence about user's play patterns. -> **Note**: These examples assume some familiarity with the Beam programming model. If you haven't already, we recommend familiarizing yourself with the programming model documentation and running a basic example pipeline before continuing. Note also that these examples use the Java 8 lambda syntax, and thus require Java 8. However, you can create pipelines with equivalent functionality using Java 7. +{:.language-java} +> **Note**: These examples assume some familiarity with the Beam programming model. If you haven't already, we recommend familiarizing yourself with the programming model documentation and running a basic example pipeline before continuing. Note also that these examples use the Java 8 lambda syntax, and thus require Java 8. However, you can create pipelines with equivalent functionality using Java 7. + +{:.language-py} +> **Note**: These examples assume some familiarity with the Beam programming model. If you haven't already, we recommend familiarizing yourself with the programming model documentation and running a basic example pipeline before continuing. Every time a user plays an instance of our hypothetical mobile game, they generate a data event. Each data event consists of the following information: @@ -44,8 +56,12 @@ The Mobile Game example pipelines vary in complexity, from simple batch analysis The `UserScore` pipeline is the simplest example for processing mobile game data. `UserScore` determines the total score per user over a finite data set (for example, one day's worth of scores stored on the game server). Pipelines like `UserScore` are best run periodically after all relevant data has been gathered. For example, `UserScore` could run as a nightly job over data gathered during that day. +{:.language-java} > **Note:** See [UserScore on > GitHub](https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java) > for the complete example pipeline program. +{:.language-py} +> **Note:** See [UserScore on GitHub](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/user_score.py) for the complete example pipeline program. + ### What Does UserScore Do? In a day's worth of scoring data, each user ID may have multiple records (if the user plays more than one instance of the game during the analysis window), each with their own score value and timestamp. If we want to determine the total score over all the instances a user plays during the day, our pipeline will need to group all the records together per individual user. @@ -99,6 +115,28 @@ public static class ExtractAndSumScore } ``` +```py +class ExtractAndSumScore(beam.PTransform): + """A transform to extract key/score information and sum the scores. + The constructor argument `field` determines whether 'team' or 'user' info is + extracted. + """ + def __init__(self, field): + super(ExtractAndSumScore, self).__init__() + self.field = field + + def expand(self, pcoll): + return (pcoll + | beam.Map(lambda info: (info[self.field], info['score'])) + | beam.CombinePerKey(sum_ints)) + +def configure_bigquery_write(): + return [ + ('user', 'STRING', lambda e: e[0]), + ('total_score', 'INTEGER', lambda e: e[1]), + ] +``` + `ExtractAndSumScore` is written to be more general, in that you can pass in the field by which you want to group the data (in the case of our game, by unique user or unique team). This means we can re-use `ExtractAndSumScore` in other pipelines that group score data by team, for example. Here's the main method of `UserScore`, showing how we apply all three steps of the pipeline: @@ -123,6 +161,25 @@ public static void main(String[] args) throws Exception { } ``` +```py +def run(argv=None): + """Main entry point; defines and runs the user_score pipeline.""" + + ... + + pipeline_options = PipelineOptions(pipeline_args) + p = beam.Pipeline(options=pipeline_options) + + (p # pylint: disable=expression-not-assigned + | ReadFromText(known_args.input) # Read events from a file and parse them. + | UserScore() + | WriteToBigQuery( + known_args.table_name, known_args.dataset, configure_bigquery_write())) + + result = p.run() + result.wait_until_finish() +``` + ### Working with the Results `UserScore` writes the data to a BigQuery table (called `user_score` by default). With the data in the BigQuery table, we might perform a further interactive analysis, such as querying for a list of the N top-scoring users for a given day. @@ -153,8 +210,12 @@ The `HourlyTeamScore` pipeline expands on the basic batch analysis principles us Like `UserScore`, `HourlyTeamScore` is best thought of as a job to be run periodically after all the relevant data has been gathered (such as once per day). The pipeline reads a fixed data set from a file, and writes the results to a Google Cloud BigQuery table, just like `UserScore`. +{:.language-java} > **Note:** See [HourlyTeamScore on > GitHub](https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java) > for the complete example pipeline program. +{:.language-py} +> **Note:** See [HourlyTeamScore on GitHub](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py) for the complete example pipeline program. + ### What Does HourlyTeamScore Do? `HourlyTeamScore` calculates the total score per team, per hour, in a fixed data set (such as one day's worth of data). @@ -184,7 +245,13 @@ Notice that as processing time advances, the sums are now _per window_; each win Beam's windowing feature uses the [intrinsic timestamp information]({{ site.baseurl }}/documentation/programming-guide/#pctimestamps) attached to each element of a `PCollection`. Because we want our pipeline to window based on _event time_, we **must first extract the timestamp** that's embedded in each data record apply it to the corresponding element in the `PCollection` of score data. Then, the pipeline can **apply the windowing function** to divide the `PCollection` into logical windows. -Here's the code, which shows how `HourlyTeamScore` uses the [WithTimestamps](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java) and [Window](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java) transforms to perform these operations: +{:.language-java} +`HourlyTeamScore` uses the [WithTimestamps](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java) and [Window](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java) transforms to perform these operations. + +{:.language-py} +`HourlyTeamScore` uses the `FixedWindows` transform, found in [window.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/window.py), to perform these operations. + +The following code shows this: ```java // Add an element timestamp based on the event log, and apply fixed windowing. @@ -194,6 +261,17 @@ Here's the code, which shows how `HourlyTeamScore` uses the [WithTimestamps](htt FixedWindows.of(Duration.standardMinutes(options.getWindowDuration())))) ``` +```py +# Add an element timestamp based on the event log, and apply fixed windowing. +# Convert element['timestamp'] into seconds as expected by TimestampedValue. +| 'AddEventTimestamps' >> beam.Map( + lambda element: TimestampedValue( + element, element['timestamp'] / 1000.0)) +# Convert window_duration into seconds as expected by FixedWindows. +| 'FixedWindowsTeam' >> beam.WindowInto(FixedWindows( + size=self.window_duration * 60)) +``` + Notice that the transforms the pipeline uses to specify the windowing are distinct from the actual data processing transforms (such as `ExtractAndSumScores`). This functionality provides you some flexibility in designing your Beam pipeline, in that you can run existing transforms over datasets with different windowing characteristics. #### Filtering Based On Event Time @@ -215,6 +293,13 @@ The following code shows how `HourlyTeamScore` uses the `Filter` transform to fi -> gInfo.getTimestamp() < stopMinTimestamp.getMillis())) ``` +```py +| 'FilterStartTime' >> beam.Filter( + lambda element: element['timestamp'] > start_min_filter) +| 'FilterEndTime' >> beam.Filter( + lambda element: element['timestamp'] < end_min_filter) +``` + #### Calculating Score Per Team, Per Window `HourlyTeamScore` uses the same `ExtractAndSumScores` transform as the `UserScore` pipeline, but passes a different key (team, as opposed to user). Also, because the pipeline applies `ExtractAndSumScores` _after_ applying fixed-time 1-hour windowing to the input data, the data gets grouped by both team _and_ window. You can see the full sequence of transforms in `HourlyTeamScore`'s main method: @@ -266,6 +351,68 @@ public static void main(String[] args) throws Exception { } ``` +```py +class HourlyTeamScore(beam.PTransform): + def __init__(self, start_min, stop_min, window_duration): + super(HourlyTeamScore, self).__init__() + self.start_min = start_min + self.stop_min = stop_min + self.window_duration = window_duration + + def expand(self, pcoll): + start_min_filter = string_to_timestamp(self.start_min) + end_min_filter = string_to_timestamp(self.stop_min) + + return ( + pcoll + | 'ParseGameEvent' >> beam.ParDo(ParseEventFn()) + # Filter out data before and after the given times so that it is not + # included in the calculations. As we collect data in batches (say, by + # day), the batch for the day that we want to analyze could potentially + # include some late-arriving data from the previous day. If so, we want + # to weed it out. Similarly, if we include data from the following day + # (to scoop up late-arriving events from the day we're analyzing), we + # need to weed out events that fall after the time period we want to + # analyze. + | 'FilterStartTime' >> beam.Filter( + lambda element: element['timestamp'] > start_min_filter) + | 'FilterEndTime' >> beam.Filter( + lambda element: element['timestamp'] < end_min_filter) + # Add an element timestamp based on the event log, and apply fixed + # windowing. + # Convert element['timestamp'] into seconds as expected by + # TimestampedValue. + | 'AddEventTimestamps' >> beam.Map( + lambda element: TimestampedValue( + element, element['timestamp'] / 1000.0)) + # Convert window_duration into seconds as expected by FixedWindows. + | 'FixedWindowsTeam' >> beam.WindowInto(FixedWindows( + size=self.window_duration * 60)) + # Extract and sum teamname/score pairs from the event data. + | 'ExtractTeamScore' >> ExtractAndSumScore('team')) + + +def run(argv=None): + """Main entry point; defines and runs the hourly_team_score pipeline.""" + ... + + known_args, pipeline_args = parser.parse_known_args(argv) + + pipeline_options = PipelineOptions(pipeline_args) + p = beam.Pipeline(options=pipeline_options) + pipeline_options.view_as(SetupOptions).save_main_session = True + + (p # pylint: disable=expression-not-assigned + | ReadFromText(known_args.input) + | HourlyTeamScore( + known_args.start_min, known_args.stop_min, known_args.window_duration) + | WriteWindowedToBigQuery( + known_args.table_name, known_args.dataset, configure_bigquery_write())) + + result = p.run() + result.wait_until_finish() +``` + ### Limitations As written, `HourlyTeamScore` still has a limitation: @@ -275,6 +422,8 @@ As written, `HourlyTeamScore` still has a limitation: ## LeaderBoard: Streaming Processing with Real-Time Game Data +> **Note:** This example currently exists in Java only. + One way we can help address the latency issue present in the `UserScore` and `HourlyTeamScore` pipelines is by reading the score data from an unbounded source. The `LeaderBoard` pipeline introduces streaming processing by reading the game score data from an unbounded source that produces an infinite amount of data, rather than from a file on the game server. The `LeaderBoard` pipeline also demonstrates how to process game score data with respect to both _processing time_ and _event time_. `LeaderBoard` outputs data about both individual user scores and about team scores, each with respect to a different time frame. @@ -405,6 +554,8 @@ Taken together, these processing strategies let us address the latency and compl ## GameStats: Abuse Detection and Usage Analysis +> **Note:** This example currently exists in Java only. + While `LeaderBoard` demonstrates how to use basic windowing and triggers to perform low-latency and flexible data analysis, we can use more advanced windowing techniques to perform more comprehensive analysis. This might include some calculations designed to detect system abuse (like spam) or to gain insight into user behavior. The `GameStats` pipeline builds on the low-latency functionality in `LeaderBoard` to demonstrate how you can use Beam to perform this kind of advanced analysis. Like `LeaderBoard`, `GameStats` reads data from an unbounded source. It is best thought of as an ongoing job that provides insight into the game as users play.