Repository: beam
Updated Branches:
  refs/heads/master b80aac5e3 -> e9cd41165


Implement combiner lifting for direct runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0de5cf87
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0de5cf87
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0de5cf87

Branch: refs/heads/master
Commit: 0de5cf875aaef9e987561371a0fa56c875ce45c1
Parents: b80aac5
Author: Robert Bradshaw <rober...@gmail.com>
Authored: Tue Jan 31 11:41:09 2017 -0800
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Tue Jan 31 15:35:21 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/direct/direct_runner.py | 15 +++-
 .../runners/direct/helper_transforms.py         | 77 ++++++++++++++++++++
 2 files changed, 88 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0de5cf87/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py 
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index dc2668d..28dc012 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -40,6 +40,17 @@ class DirectRunner(PipelineRunner):
   def __init__(self):
     self._cache = None
 
+  def apply_CombinePerKey(self, transform, pcoll):
+    # TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
+    # with resolving imports when they are at top.
+    # pylint: disable=wrong-import-position
+    from apache_beam.runners.direct.helper_transforms import 
LiftedCombinePerKey
+    try:
+      return pcoll | LiftedCombinePerKey(
+        transform.fn, transform.args, transform.kwargs)
+    except NotImplementedError:
+      return transform.expand(pcoll)
+
   def run(self, pipeline):
     """Execute the entire pipeline and returns an DirectPipelineResult."""
 
@@ -90,10 +101,6 @@ class DirectRunner(PipelineRunner):
       self._cache = BufferingInMemoryCache()
     return self._cache.pvalue_cache
 
-  def apply(self, transform, input):  # pylint: disable=redefined-builtin
-    """Runner callback for a pipeline.apply call."""
-    return transform.expand(input)
-
 
 class BufferingInMemoryCache(object):
   """PValueCache wrapper for buffering bundles until a PValue is fully 
computed.

http://git-wip-us.apache.org/repos/asf/beam/blob/0de5cf87/sdks/python/apache_beam/runners/direct/helper_transforms.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py 
b/sdks/python/apache_beam/runners/direct/helper_transforms.py
new file mode 100644
index 0000000..340db75
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import collections
+import itertools
+
+import apache_beam as beam
+from apache_beam.utils.windowed_value import WindowedValue
+from apache_beam.internal.util import ArgumentPlaceholder
+
+
+class LiftedCombinePerKey(beam.PTransform):
+  """An implementation of CombinePerKey that does mapper-side pre-combining.
+  """
+  def __init__(self, combine_fn, args, kwargs):
+    if any(isinstance(arg, ArgumentPlaceholder)
+           for arg in itertools.chain(args, kwargs.values())):
+      # This isn't implemented in dataflow either...
+      raise NotImplementedError('Deferred CombineFn side inputs.')
+    self._combine_fn = beam.transforms.combiners.curry_combine_fn(
+        combine_fn, args, kwargs)
+
+  def expand(self, pcoll):
+    return (pcoll
+      | beam.ParDo(PartialGroupByKeyCombiningValues(self._combine_fn))
+      | beam.GroupByKey()
+      | beam.ParDo(FinishCombine(self._combine_fn)))
+
+
+class PartialGroupByKeyCombiningValues(beam.DoFn):
+  """Aggregates values into a per-key-window cache.
+
+  As bundles are in-memory-sized, we don't bother flushing until the very end.
+  """
+  def __init__(self, combine_fn):
+    self._combine_fn = combine_fn
+
+  def start_bundle(self, context):
+    self._cache = collections.defaultdict(self._combine_fn.create_accumulator)
+
+  def process(self, context):
+    k, vi = context.element
+    for w in context.windows:
+      self._cache[k, w] = self._combine_fn.add_input(self._cache[k, w], vi)
+
+  def finish_bundle(self, context):
+    import pprint
+    pprint.pprint(dict(self._cache))
+    for (k, w), va in self._cache.items():
+      yield WindowedValue((k, va), w.end, (w,))
+
+
+class FinishCombine(beam.DoFn):
+  """Merges partially combined results.
+  """
+  def __init__(self, combine_fn):
+    self._combine_fn = combine_fn
+
+  def process(self, context):
+    k, vs = context.element
+    return [(
+      k,
+      
self._combine_fn.extract_output(self._combine_fn.merge_accumulators(vs)))]

Reply via email to