Hello all,

I am working on adding type hints to my pipeline, and ran into an issue
with PTransforms that produce multiple, tagged outputs.

My class looks like this:

@with_input_types(mytype.Data)
> @with_output_types(mytype.KeyedData)
> class DenormalizeData(ptransform.PTransform):
>   MAIN = 'denormalized'
>   SKIPPED = functions.DenormalizeData.SKIPPED
>   def expand(self, pcol: mytype.Data) -> mytype.KeyedPriceData:
>     return (pcol
>       | 'Denormalize PriceData' >> core.ParDo(
>         functions.DenormalizeData()).with_outputs(
>           self.SKIPPED, main=self.MAIN))


Where functions.DenormalizeData is a core.DoFn. From what I can tell, the
type checking code here at
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/ptransform.py#L429
attempts
to access the pvalue._element_type. But in this case, the pvalue is a
DoOutputsTuple (
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pvalue.py#L239)
which overrides __getattr__ to check for tag names. In this case,
_element_type is not a valid tag, and I get the following partial  stack:

"apache_beam_2_17_0/apache_beam/transforms/ptransform.py", line 401, in
> type_check_inputs_or_outputs
>     if pvalue_.element_type is None:
>   File "apache_beam_2_17_0/apache_beam/pvalue.py", line 241, in __getattr__
>     return self[tag]
>   File "apache_beam_2_17_0/apache_beam/pvalue.py", line 256, in __getitem__
>     tag, self._main_tag, self._tags))
> ValueError: Tag 'element_type' is neither the main tag 'denormalized' nor
> any of the tags ('skipped',)


Is my diagnoses correct? Is this a known issue? Can type hints exist on
DoOutputsTuples?

Thank you for your time and help.

Best,
Joshua

-- 
Joshua Harrison |  Software Engineer |  [email protected]
<[email protected]> |  404-433-0242

Reply via email to