This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f51edc1 [BEAM-7390] Add code snippet for GroupByKey new d032994 Merge pull request #9926 from davidcavazos/groupbykey-code f51edc1 is described below commit f51edc10e1c724bdf113f84ab3b7283b9fabe19c Author: David Cavazos <dcava...@google.com> AuthorDate: Wed Oct 16 18:36:39 2019 -0700 [BEAM-7390] Add code snippet for GroupByKey --- .../snippets/transforms/aggregation/groupbykey.py | 47 +++++++++++++++++++ .../transforms/aggregation/groupbykey_test.py | 54 ++++++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey.py new file mode 100644 index 0000000..83e4f87 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey.py @@ -0,0 +1,47 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + + +def groupbykey(test=None): + # [START groupbykey] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + produce_counts = ( + pipeline + | 'Create produce counts' >> beam.Create([ + ('spring', '🍓'), + ('spring', '🥕'), + ('spring', '🍆'), + ('spring', '🍅'), + ('summer', '🥕'), + ('summer', '🍅'), + ('summer', '🌽'), + ('fall', '🥕'), + ('fall', '🍅'), + ('winter', '🍆'), + ]) + | 'Group counts per produce' >> beam.GroupByKey() + | beam.Map(print) + ) + # [END groupbykey] + if test: + test(produce_counts) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey_test.py new file mode 100644 index 0000000..4d8283a --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey_test.py @@ -0,0 +1,54 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + +import unittest + +import mock + +from apache_beam.examples.snippets.util import assert_matches_stdout +from apache_beam.testing.test_pipeline import TestPipeline + +from . import groupbykey + + +def check_produce_counts(actual): + expected = '''[START produce_counts] +('spring', ['🍓', '🥕', '🍆', '🍅']) +('summer', ['🥕', '🍅', '🌽']) +('fall', ['🥕', '🍅']) +('winter', ['🍆']) +[END produce_counts]'''.splitlines()[1:-1] + # The elements order is non-deterministic, so sort them first. + assert_matches_stdout( + actual, expected, lambda pair: (pair[0], sorted(pair[1]))) + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +@mock.patch( + 'apache_beam.examples.snippets.transforms.aggregation.groupbykey.print', + str) +class GroupByKeyTest(unittest.TestCase): + def test_groupbykey(self): + groupbykey.groupbykey(check_produce_counts) + + +if __name__ == '__main__': + unittest.main()