This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f42d92e  Adding Python samples to the Stateful Processing post. (#8363)
f42d92e is described below

commit f42d92ea5cc51ed56b29df5e872fcb374f4f263e
Author: Pablo <pabl...@users.noreply.github.com>
AuthorDate: Mon Jun 10 10:04:51 2019 -0700

    Adding Python samples to the Stateful Processing post. (#8363)
    
    * Adding Python code to the Stateful Processing post.
    
    * Fixing indentation and removing erroneous comment.
    
    * Addressing comments
    
    * Improving samples to match CombiningValueState
    
    * Update userstate_test.py
    
    * Update 2017-02-13-stateful-processing.md
    
    * Addressing some comments
    
    * s/stage/step
    
    * removing prose changes
---
 sdks/python/apache_beam/transforms/trigger.py      |  13 ++-
 .../apache_beam/transforms/userstate_test.py       |   9 +-
 .../src/_posts/2017-02-13-stateful-processing.md   | 110 +++++++++++++++++----
 3 files changed, 99 insertions(+), 33 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/trigger.py 
b/sdks/python/apache_beam/transforms/trigger.py
index ddf4c24..f0d2a1c 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -67,8 +67,7 @@ __all__ = [
 
 
 class AccumulationMode(object):
-  """Controls what to do with data when a trigger fires multiple times.
-  """
+  """Controls what to do with data when a trigger fires multiple times."""
   DISCARDING = beam_runner_api_pb2.AccumulationMode.DISCARDING
   ACCUMULATING = beam_runner_api_pb2.AccumulationMode.ACCUMULATING
   # TODO(robertwb): Provide retractions of previous outputs.
@@ -78,10 +77,7 @@ class AccumulationMode(object):
 class _StateTag(with_metaclass(ABCMeta, object)):
   """An identifier used to store and retrieve typed, combinable state.
 
-  The given tag must be unique for this stage.  If CombineFn is None then
-  all elements will be returned as a list, otherwise the given CombineFn
-  will be applied (possibly incrementally and eagerly) when adding elements.
-  """
+  The given tag must be unique for this step."""
 
   def __init__(self, tag):
     self.tag = tag
@@ -98,7 +94,10 @@ class _ValueStateTag(_StateTag):
 
 
 class _CombiningValueStateTag(_StateTag):
-  """StateTag pointing to an element, accumulated with a combiner."""
+  """StateTag pointing to an element, accumulated with a combiner.
+
+  The given tag must be unique for this step. The given CombineFn will be
+  applied (possibly incrementally and eagerly) when adding elements."""
 
   # TODO(robertwb): Also store the coder (perhaps extracted from the 
combine_fn)
   def __init__(self, tag, combine_fn):
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py 
b/sdks/python/apache_beam/transforms/userstate_test.py
index 53a7e36..7a05c73 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -594,14 +594,13 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
 
   def test_index_assignment(self):
     class IndexAssigningStatefulDoFn(DoFn):
-      INDEX_STATE = BagStateSpec('index', VarIntCoder())
+      INDEX_STATE = CombiningValueStateSpec('index', sum)
 
       def process(self, element, state=DoFn.StateParam(INDEX_STATE)):
         unused_key, value = element
-        next_index, = list(state.read()) or [0]
-        yield (value, next_index)
-        state.clear()
-        state.add(next_index + 1)
+        current_index = state.read()
+        yield (value, current_index)
+        state.add(1)
 
     with TestPipeline() as p:
       test_stream = (TestStream()
diff --git a/website/src/_posts/2017-02-13-stateful-processing.md 
b/website/src/_posts/2017-02-13-stateful-processing.md
index f51b307..a778dde 100644
--- a/website/src/_posts/2017-02-13-stateful-processing.md
+++ b/website/src/_posts/2017-02-13-stateful-processing.md
@@ -28,6 +28,9 @@ efficiencies. In this post, I will guide you through stateful 
processing in
 Beam: how it works, how it fits in with the other features of the Beam model,
 what you might use it for, and what it looks like in code.
 
+**Note: This post has been updated in May of 2019, to include Python
+snippets!**
+
 <!--more-->
 
 > **Warning: new features ahead!**: This is a very new aspect of the Beam
@@ -278,7 +281,7 @@ new DoFn<KV<MyKey, MyValue>, KV<Integer, KV<MyKey, 
MyValue>>>() {
 
   // A state cell holding a single Integer per key+window
   @StateId("index")
-  private final StateSpec<ValueState<Integer>> indexSpec = 
+  private final StateSpec<ValueState<Integer>> indexSpec =
       StateSpecs.value(VarIntCoder.of());
 
   @ProcessElement
@@ -293,8 +296,14 @@ new DoFn<KV<MyKey, MyValue>, KV<Integer, KV<MyKey, 
MyValue>>>() {
 ```
 
 ```py
-# State and timers are not yet supported in Beam's Python SDK.
-# Watch this space!
+class IndexAssigningStatefulDoFn(DoFn):
+  INDEX_STATE = CombiningStateSpec('index', sum)
+
+  def process(self, element, index=DoFn.StateParam(INDEX_STATE)):
+    unused_key, value = element
+    current_index = index.read()
+    yield (value, current_index)
+    index.add(1)
 ```
 
 Let's dissect this:
@@ -371,8 +380,20 @@ class ModelFromEventsFn extends CombineFn<Event, Model, 
Model> {
 ```
 
 ```py
-# State and timers are not yet supported in Beam's Python SDK.
-# Watch this space!
+class ModelFromEventsFn(apache_beam.core.CombineFn):
+
+  def create_accumulator(self):
+    # Create a new empty model
+    return Model()
+
+  def add_input(self, model, input):
+    return model.update(input)
+
+  def merge_accumulators(self, accumulators):
+    # Custom merging logic
+
+  def extract_output(self, model):
+    return model
 ```
 
 Now you have a way to compute the model of a particular user for a window as
@@ -407,8 +428,24 @@ PCollection<KV<UserId, Prediction>> predictions = events
 ```
 
 ```py
-# State and timers are not yet supported in Beam's Python SDK.
-# Watch this space!
+# Events is a collection of (user, event) pairs.
+events = (p | ReadFromEventSource() | beam.WindowInto(....))
+
+user_models = beam.pvalue.AsDict(
+                  events
+                  | beam.core.CombinePerKey(ModelFromEventsFn()))
+
+def event_prediction(user_event, models):
+  user = user_event[0]
+  event = user_event[1]
+
+  # Retrieve the model calculated for this user
+  model = models[user]
+
+  return (user, model.prediction(event))
+
+# Predictions is a collection of (user, prediction) pairs.
+predictions = events | beam.Map(event_prediction, user_models)
 ```
 
 In this pipeline, there is just one model emitted by the `Combine.perKey(...)`
@@ -441,8 +478,15 @@ PCollectionView<Map<UserId, Model>> userModels = events
 ```
 
 ```py
-# State and timers are not yet supported in Beam's Python SDK.
-# Watch this space!
+events = ...
+
+user_models = beam.pvalue.AsDict(
+                  events
+                  | beam.WindowInto(GlobalWindows(),
+                      trigger=trigger.AfterAll(
+                          trigger.AfterCount(1),
+                          trigger.AfterProcessingTime(1)))
+                  | beam.CombinePerKey(ModelFromEventsFn()))
 ```
 
 This is often a pretty nice tradeoff between latency and cost: If a huge flood
@@ -499,8 +543,31 @@ new DoFn<KV<UserId, Event>, KV<UserId, Prediction>>() {
 ```
 
 ```py
-# State and timers are not yet supported in Beam's Python SDK.
-# Watch this space!
+class ModelStatefulFn(beam.DoFn):
+
+  PREVIOUS_PREDICTION = BagStateSpec('previous_pred_state', PredictionCoder())
+  MODEL_STATE = CombiningValueStateSpec('model_state',
+                                        ModelCoder(),
+                                        ModelFromEventsFn())
+
+  def process(self,
+              user_event,
+              previous_pred_state=beam.DoFn.StateParam(PREVIOUS_PREDICTION),
+              model_state=beam.DoFn.StateParam(MODEL_STATE)):
+    user = user_event[0]
+    event = user_event[1]
+    model = model_state.read()
+    previous_prediction = previous_pred_state.read()
+
+    new_prediction = model.prediction(event)
+    model_state.add(event)
+
+    if (previous_prediction is None
+        or self.should_output_prediction(
+            previous_prediction, new_prediction)):
+      previous_pred_state.clear()
+      previous_pred_state.add(new_prediction)
+      yield (user, new_prediction)
 ```
 
 Let's walk through it,
@@ -510,20 +577,21 @@ Let's walk through it,
    the prediction output previously.
  - Access to the two state cells by annotation in the `@ProcessElement` method
    is as before.
- - You read the current model via `modelState.read()`. Because state is also
-   per-key-and-window, this is a model just for the UserId of the Event
+ - You read the current model via `modelState.read()`.
+   per-key-and-window, this is a model just for the UserId of the Event 
    currently being processed.
  - You derive a new prediction `model.prediction(event)` and compare it against
-   the last one you output, accessed via `previousPredicationState.read()`.
+   the last one you output, accessed via
+   `previousPredicationState.read()`.
  - You then update the model `model.update()` and write it via
-   `modelState.write(...)`. It is perfectly fine to mutate the value you pulled
-   out of state as long as you also remember to write the mutated value, in the
-   same way you are encouraged to mutate `CombineFn` accumulators.
+   `modelState.write(...)`. It is perfectly fine to mutate the value
+   you pulled out of state as long as you also remember to write the mutated
+   value, in the same way you are encouraged to mutate `CombineFn` 
accumulators.
  - If the prediction has changed a significant amount since the last time you
-   output, you emit it via `context.output(...)` and save the prediction using
-   `previousPredictionState.write(...)`. Here the decision is relative to the
-   prior prediction output, not the last one computed - realistically you might
-   have some complex conditions here.
+   output, you emit it via `context.output(...)` and
+   save the prediction using `previousPredictionState.write(...)`.
+   Here the decision is relative to the prior prediction output, not the last
+   one computed - realistically you might have some complex conditions here.
 
 Most of the above is just talking through Java! But before you go out and
 convert all of your pipelines to use stateful processing, I want to go over

Reply via email to