Repository: incubator-beam Updated Branches: refs/heads/python-sdk 4b584ca26 -> a60b58a94
Insert a shuffle before write finalization Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4602c954 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4602c954 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4602c954 Branch: refs/heads/python-sdk Commit: 4602c954caeda24cc7ce762c2fd9995aa4c04c1a Parents: 4b584ca Author: Charles Chen <c...@google.com> Authored: Wed Sep 14 12:14:34 2016 -0700 Committer: Charles Chen <c...@google.com> Committed: Wed Sep 14 14:52:08 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/iobase.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4602c954/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index b269ae5..ecb8a70 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -1028,9 +1028,12 @@ class WriteImpl(ptransform.PTransform): AsSingleton(init_result_coll))) else: min_shards = 1 - write_result_coll = pcoll | core.ParDo('write_bundles', - _WriteBundleDoFn(), self.sink, - AsSingleton(init_result_coll)) + write_result_coll = (pcoll | core.ParDo('write_bundles', + _WriteBundleDoFn(), self.sink, + AsSingleton(init_result_coll)) + | core.Map(lambda x: (None, x)) + | core.GroupByKey() + | core.FlatMap(lambda x: x[1])) return do_once | core.FlatMap( 'finalize_write', _finalize_write,