Hello all, I am working on adding type hints to my pipeline, and ran into an issue with PTransforms that produce multiple, tagged outputs.
My class looks like this: @with_input_types(mytype.Data) > @with_output_types(mytype.KeyedData) > class DenormalizeData(ptransform.PTransform): > MAIN = 'denormalized' > SKIPPED = functions.DenormalizeData.SKIPPED > def expand(self, pcol: mytype.Data) -> mytype.KeyedPriceData: > return (pcol > | 'Denormalize PriceData' >> core.ParDo( > functions.DenormalizeData()).with_outputs( > self.SKIPPED, main=self.MAIN)) Where functions.DenormalizeData is a core.DoFn. From what I can tell, the type checking code here at https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/ptransform.py#L429 attempts to access the pvalue._element_type. But in this case, the pvalue is a DoOutputsTuple ( https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pvalue.py#L239) which overrides __getattr__ to check for tag names. In this case, _element_type is not a valid tag, and I get the following partial stack: "apache_beam_2_17_0/apache_beam/transforms/ptransform.py", line 401, in > type_check_inputs_or_outputs > if pvalue_.element_type is None: > File "apache_beam_2_17_0/apache_beam/pvalue.py", line 241, in __getattr__ > return self[tag] > File "apache_beam_2_17_0/apache_beam/pvalue.py", line 256, in __getitem__ > tag, self._main_tag, self._tags)) > ValueError: Tag 'element_type' is neither the main tag 'denormalized' nor > any of the tags ('skipped',) Is my diagnoses correct? Is this a known issue? Can type hints exist on DoOutputsTuples? Thank you for your time and help. Best, Joshua -- Joshua Harrison | Software Engineer | [email protected] <[email protected]> | 404-433-0242
