This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new b1d9a82 [BEAM-7390] Add code snippet for Sample new dfc814a Merge pull request #10174 from davidcavazos/sample-code b1d9a82 is described below commit b1d9a820a7a5dd9c977c8ae5977b827e75b99efa Author: David Cavazos <dcava...@google.com> AuthorDate: Tue Nov 19 15:22:46 2019 -0800 [BEAM-7390] Add code snippet for Sample --- .../snippets/transforms/aggregation/sample.py | 69 ++++++++++++++++++++++ .../snippets/transforms/aggregation/sample_test.py | 63 ++++++++++++++++++++ 2 files changed, 132 insertions(+) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample.py new file mode 100644 index 0000000..d5abc37 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample.py @@ -0,0 +1,69 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + + +def sample_fixed_size_globally(test=None): + # [START sample_fixed_size_globally] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + sample = ( + pipeline + | 'Create produce' >> beam.Create([ + '🍓 Strawberry', + '🥕 Carrot', + '🍆 Eggplant', + '🍅 Tomato', + '🥔 Potato', + ]) + | 'Sample N elements' >> beam.combiners.Sample.FixedSizeGlobally(3) + | beam.Map(print) + ) + # [END sample_fixed_size_globally] + if test: + test(sample) + + +def sample_fixed_size_per_key(test=None): + # [START sample_fixed_size_per_key] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + samples_per_key = ( + pipeline + | 'Create produce' >> beam.Create([ + ('spring', '🍓'), + ('spring', '🥕'), + ('spring', '🍆'), + ('spring', '🍅'), + ('summer', '🥕'), + ('summer', '🍅'), + ('summer', '🌽'), + ('fall', '🥕'), + ('fall', '🍅'), + ('winter', '🍆'), + ]) + | 'Samples per key' >> beam.combiners.Sample.FixedSizePerKey(3) + | beam.Map(print) + ) + # [END sample_fixed_size_per_key] + if test: + test(samples_per_key) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample_test.py new file mode 100644 index 0000000..22cd656 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/sample_test.py @@ -0,0 +1,63 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + +import unittest + +import mock + +from apache_beam.examples.snippets.util import assert_matches_stdout +from apache_beam.testing.test_pipeline import TestPipeline + +from . import sample + + +def check_sample(actual): + expected = '''[START sample] +['🥕 Carrot', '🍆 Eggplant', '🍅 Tomato'] +[END sample]'''.splitlines()[1:-1] + # The sampled elements are non-deterministic, so check the sample size. + assert_matches_stdout(actual, expected, lambda elements: len(elements)) + + +def check_samples_per_key(actual): + expected = '''[START samples_per_key] +('spring', ['🍓', '🥕', '🍆']) +('summer', ['🥕', '🍅', '🌽']) +('fall', ['🥕', '🍅']) +('winter', ['🍆']) +[END samples_per_key]'''.splitlines()[1:-1] + # The sampled elements are non-deterministic, so check the sample size. + assert_matches_stdout(actual, expected, lambda pair: (pair[0], len(pair[1]))) + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +@mock.patch( + 'apache_beam.examples.snippets.transforms.aggregation.sample.print', str) +class SampleTest(unittest.TestCase): + def test_sample_fixed_size_globally(self): + sample.sample_fixed_size_globally(check_sample) + + def test_sample_fixed_size_per_key(self): + sample.sample_fixed_size_per_key(check_samples_per_key) + + +if __name__ == '__main__': + unittest.main()