Repository: beam Updated Branches: refs/heads/master fb85d84dc -> db4b0939a
Add support for Python's native type hint types in Beam's type hint annotations. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cc699ece Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cc699ece Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cc699ece Branch: refs/heads/master Commit: cc699ece9e4321c3460c2aab04c74fa086c7a3cd Parents: fb85d84 Author: Chuan Yu Foo <cy...@google.com> Authored: Mon Jul 31 17:10:45 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Tue Aug 8 23:16:01 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/typehints/decorators.py | 18 +- .../typehints/native_type_compatibility.py | 164 +++++++++++++++++++ .../typehints/native_type_compatibility_test.py | 92 +++++++++++ .../typehints/typed_pipeline_test.py | 27 ++- sdks/python/apache_beam/typehints/typehints.py | 4 +- sdks/python/setup.py | 1 + 6 files changed, 300 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cc699ece/sdks/python/apache_beam/typehints/decorators.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/decorators.py b/sdks/python/apache_beam/typehints/decorators.py index 6ed388a..d5954e2 100644 --- a/sdks/python/apache_beam/typehints/decorators.py +++ b/sdks/python/apache_beam/typehints/decorators.py @@ -86,6 +86,7 @@ defined, or before importing a module containing type-hinted functions. import inspect import types +from apache_beam.typehints import native_type_compatibility from apache_beam.typehints import typehints from apache_beam.typehints.typehints import check_constraint from apache_beam.typehints.typehints import CompositeTypeHintError @@ -347,13 +348,22 @@ def with_input_types(*positional_hints, **keyword_hints): for all received function arguments. """ + converted_positional_hints = ( + native_type_compatibility.convert_to_beam_types(positional_hints)) + converted_keyword_hints = ( + native_type_compatibility.convert_to_beam_types(keyword_hints)) + del positional_hints + del keyword_hints + def annotate(f): if isinstance(f, types.FunctionType): - for t in list(positional_hints) + list(keyword_hints.values()): + for t in (list(converted_positional_hints) + + list(converted_keyword_hints.values())): validate_composite_type_param( t, error_msg_prefix='All type hint arguments') - get_type_hints(f).set_input_types(*positional_hints, **keyword_hints) + get_type_hints(f).set_input_types(*converted_positional_hints, + **converted_keyword_hints) return f return annotate @@ -410,7 +420,8 @@ def with_output_types(*return_type_hint, **kwargs): "order to specify multiple return types, use the 'Tuple' " "type-hint.") - return_type_hint = return_type_hint[0] + return_type_hint = native_type_compatibility.convert_to_beam_type( + return_type_hint[0]) validate_composite_type_param( return_type_hint, @@ -420,6 +431,7 @@ def with_output_types(*return_type_hint, **kwargs): def annotate(f): get_type_hints(f).set_output_types(return_type_hint) return f + return annotate http://git-wip-us.apache.org/repos/asf/beam/blob/cc699ece/sdks/python/apache_beam/typehints/native_type_compatibility.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py new file mode 100644 index 0000000..d88f933 --- /dev/null +++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py @@ -0,0 +1,164 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module to convert Python's native typing types to Beam types.""" + +import collections +import typing +from apache_beam.typehints import typehints + +# Describes an entry in the type map in convert_to_beam_type. +# match is a function that takes a user type and returns whether the conversion +# should trigger. +# arity is the expected arity of the user type. -1 means it's variadic. +# beam_type is the Beam type the user type should map to. +_TypeMapEntry = collections.namedtuple( + '_TypeMapEntry', ['match', 'arity', 'beam_type']) + + +def _get_arg(typ, index): + """Returns the index-th argument to the given type.""" + return typ.__args__[index] + + +def _len_arg(typ): + """Returns the length of the arguments to the given type.""" + try: + return len(typ.__args__) + except AttributeError: + # For Any type, which takes no arguments. + return 0 + + +def _safe_issubclass(derived, parent): + """Like issubclass, but swallows TypeErrors. + + This is useful for when either parameter might not actually be a class, + e.g. typing.Union isn't actually a class. + + Args: + derived: As in issubclass. + parent: As in issubclass. + + Returns: + issubclass(derived, parent), or False if a TypeError was raised. + """ + try: + return issubclass(derived, parent) + except TypeError: + return False + + +def _match_issubclass(match_against): + return lambda user_type: _safe_issubclass(user_type, match_against) + + +def _match_same_type(match_against): + # For Union types. They can't be compared with isinstance either, so we + # have to compare their types directly. + return lambda user_type: type(user_type) == type(match_against) + + +def _match_is_named_tuple(user_type): + return (_safe_issubclass(user_type, typing.Tuple) and + hasattr(user_type, '_field_types')) + + +def convert_to_beam_type(typ): + """Convert a given typing type to a Beam type. + + Args: + typ: typing type. + + Returns: + The given type converted to a Beam type as far as we can do the conversion. + + Raises: + ValueError: The type was malformed. + """ + + type_map = [ + _TypeMapEntry( + match=_match_same_type(typing.Any), + arity=0, + beam_type=typehints.Any), + _TypeMapEntry( + match=_match_issubclass(typing.Dict), + arity=2, + beam_type=typehints.Dict), + _TypeMapEntry( + match=_match_issubclass(typing.List), + arity=1, + beam_type=typehints.List), + _TypeMapEntry( + match=_match_issubclass(typing.Set), + arity=1, + beam_type=typehints.Set), + # NamedTuple is a subclass of Tuple, but it needs special handling. + # We just convert it to Any for now. + # This MUST appear before the entry for the normal Tuple. + _TypeMapEntry( + match=_match_is_named_tuple, arity=0, beam_type=typehints.Any), + _TypeMapEntry( + match=_match_issubclass(typing.Tuple), + arity=-1, + beam_type=typehints.Tuple), + _TypeMapEntry( + match=_match_same_type(typing.Union), + arity=-1, + beam_type=typehints.Union) + ] + + # Find the first matching entry. + matched_entry = next((entry for entry in type_map if entry.match(typ)), None) + if not matched_entry: + # No match: return original type. + return typ + + if matched_entry.arity == -1: + arity = _len_arg(typ) + else: + arity = matched_entry.arity + if _len_arg(typ) != arity: + raise ValueError('expecting type %s to have arity %d, had arity %d ' + 'instead' % (str(typ), arity, _len_arg(typ))) + typs = [convert_to_beam_type(_get_arg(typ, i)) for i in xrange(arity)] + if arity == 0: + # Nullary types (e.g. Any) don't accept empty tuples as arguments. + return matched_entry.beam_type + elif arity == 1: + # Unary types (e.g. Set) don't accept 1-tuples as arguments + return matched_entry.beam_type[typs[0]] + else: + return matched_entry.beam_type[tuple(typs)] + + +def convert_to_beam_types(args): + """Convert the given list or dictionary of args to Beam types. + + Args: + args: Either an iterable of types, or a dictionary where the values are + types. + + Returns: + If given an iterable, a list of converted types. If given a dictionary, + a dictionary with the same keys, and values which have been converted. + """ + if isinstance(args, dict): + return {k: convert_to_beam_type(v) for k, v in args.iteritems()} + else: + return [convert_to_beam_type(v) for v in args] http://git-wip-us.apache.org/repos/asf/beam/blob/cc699ece/sdks/python/apache_beam/typehints/native_type_compatibility_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py new file mode 100644 index 0000000..d0cafe1 --- /dev/null +++ b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Test for Beam type compatibility library.""" + +import typing +import unittest + +from apache_beam.typehints import typehints +from apache_beam.typehints import native_type_compatibility + +_TestNamedTuple = typing.NamedTuple('_TestNamedTuple', + [('age', int), ('name', bytes)]) +_TestFlatAlias = typing.Tuple[bytes, float] +_TestNestedAlias = typing.List[_TestFlatAlias] + + +class _TestClass(object): + pass + + +class NativeTypeCompatibilityTest(unittest.TestCase): + + def test_convert_to_beam_type(self): + test_cases = [ + ('raw bytes', bytes, bytes), + ('raw int', int, int), + ('raw float', float, float), + ('any', typing.Any, typehints.Any), + ('simple dict', typing.Dict[bytes, int], + typehints.Dict[bytes, int]), + ('simple list', typing.List[int], typehints.List[int]), + ('simple optional', typing.Optional[int], typehints.Optional[int]), + ('simple set', typing.Set[float], typehints.Set[float]), + ('simple unary tuple', typing.Tuple[bytes], + typehints.Tuple[bytes]), + ('simple union', typing.Union[int, bytes, float], + typehints.Union[int, bytes, float]), + ('namedtuple', _TestNamedTuple, typehints.Any), + ('test class', _TestClass, _TestClass), + ('test class in list', typing.List[_TestClass], + typehints.List[_TestClass]), + ('complex tuple', typing.Tuple[bytes, typing.List[typing.Tuple[ + bytes, typing.Union[int, bytes, float]]]], + typehints.Tuple[bytes, typehints.List[typehints.Tuple[ + bytes, typehints.Union[int, bytes, float]]]]), + ('flat alias', _TestFlatAlias, typehints.Tuple[bytes, float]), + ('nested alias', _TestNestedAlias, + typehints.List[typehints.Tuple[bytes, float]]), + ('complex dict', + typing.Dict[bytes, typing.List[typing.Tuple[bytes, _TestClass]]], + typehints.Dict[bytes, typehints.List[typehints.Tuple[ + bytes, _TestClass]]]) + ] + + for test_case in test_cases: + # Unlike typing types, Beam types are guaranteed to compare equal. + description = test_case[0] + typing_type = test_case[1] + beam_type = test_case[2] + self.assertEqual( + native_type_compatibility.convert_to_beam_type(typing_type), + beam_type, description) + + def test_convert_to_beam_types(self): + typing_types = [bytes, typing.List[bytes], + typing.List[typing.Tuple[bytes, int]], + typing.Union[int, typing.List[int]]] + beam_types = [bytes, typehints.List[bytes], + typehints.List[typehints.Tuple[bytes, int]], + typehints.Union[int, typehints.List[int]]] + self.assertEqual( + native_type_compatibility.convert_to_beam_types(typing_types), + beam_types) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/cc699ece/sdks/python/apache_beam/typehints/typed_pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index c81ef32..58274f3 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -17,9 +17,9 @@ """Unit tests for the type-hint objects and decorators.""" import inspect +import typing import unittest - import apache_beam as beam from apache_beam import pvalue from apache_beam import typehints @@ -98,6 +98,31 @@ class MainInputTest(unittest.TestCase): [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn)) +class NativeTypesTest(unittest.TestCase): + + def test_good_main_input(self): + @typehints.with_input_types(typing.Tuple[str, int]) + def munge((s, i)): + return (s + 's', i * 2) + result = [('apple', 5), ('pear', 3)] | beam.Map(munge) + self.assertEqual([('apples', 10), ('pears', 6)], sorted(result)) + + def test_bad_main_input(self): + @typehints.with_input_types(typing.Tuple[str, str]) + def munge((s, i)): + return (s + 's', i * 2) + with self.assertRaises(typehints.TypeCheckError): + [('apple', 5), ('pear', 3)] | beam.Map(munge) + + def test_bad_main_output(self): + @typehints.with_input_types(typing.Tuple[int, int]) + @typehints.with_output_types(typing.Tuple[str, str]) + def munge((a, b)): + return (str(a), str(b)) + with self.assertRaises(typehints.TypeCheckError): + [(5, 4), (3, 2)] | beam.Map(munge) | 'Again' >> beam.Map(munge) + + class SideInputTest(unittest.TestCase): def _run_repeat_test(self, repeat): http://git-wip-us.apache.org/repos/asf/beam/blob/cc699ece/sdks/python/apache_beam/typehints/typehints.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index cc430be..6039e0e 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -498,7 +498,7 @@ UnionConstraint = UnionHint.UnionConstraint class OptionalHint(UnionHint): """An Option type-hint. Optional[X] accepts instances of X or None. - The Optional[X] factory function proxies to Union[X, None] + The Optional[X] factory function proxies to Union[X, type(None)] """ def __getitem__(self, py_type): @@ -507,7 +507,7 @@ class OptionalHint(UnionHint): raise TypeError('An Option type-hint only accepts a single type ' 'parameter.') - return Union[py_type, None] + return Union[py_type, type(None)] class TupleHint(CompositeTypeHint): http://git-wip-us.apache.org/repos/asf/beam/blob/cc699ece/sdks/python/setup.py ---------------------------------------------------------------------- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index da82466..c13da8e 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -104,6 +104,7 @@ REQUIRED_PACKAGES = [ 'oauth2client>=2.0.1,<4.0.0', 'protobuf>=3.2.0,<=3.3.0', 'pyyaml>=3.12,<4.0.0', + 'typing>=3.6.0,<3.7.0', ] REQUIRED_SETUP_PACKAGES = [