Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 4b584ca26 -> a60b58a94


Insert a shuffle before write finalization


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4602c954
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4602c954
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4602c954

Branch: refs/heads/python-sdk
Commit: 4602c954caeda24cc7ce762c2fd9995aa4c04c1a
Parents: 4b584ca
Author: Charles Chen <c...@google.com>
Authored: Wed Sep 14 12:14:34 2016 -0700
Committer: Charles Chen <c...@google.com>
Committed: Wed Sep 14 14:52:08 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/iobase.py | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4602c954/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index b269ae5..ecb8a70 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -1028,9 +1028,12 @@ class WriteImpl(ptransform.PTransform):
                                       AsSingleton(init_result_coll)))
     else:
       min_shards = 1
-      write_result_coll = pcoll | core.ParDo('write_bundles',
-                                             _WriteBundleDoFn(), self.sink,
-                                             AsSingleton(init_result_coll))
+      write_result_coll = (pcoll | core.ParDo('write_bundles',
+                                              _WriteBundleDoFn(), self.sink,
+                                              AsSingleton(init_result_coll))
+                           | core.Map(lambda x: (None, x))
+                           | core.GroupByKey()
+                           | core.FlatMap(lambda x: x[1]))
     return do_once | core.FlatMap(
         'finalize_write',
         _finalize_write,

Reply via email to