If I understand correctly, you want to return something like
ParDo(...).with_outputs(...)? That is, only return the schema if
explicitly asked for?

In this case, you could take a boolean parameter to your PTranform
constructor and your expand method could look lke

def expand(self, p):
    ...
    if self._return_schema
        return tuples, schema
    else:
        return tuples

You would use it like

    result = MyPTransform()

or

    result, schema = MyPTransform(return_schema=True)


On Wed, Jul 12, 2017 at 11:32 AM, Dmitry Demeshchuk
<dmi...@postmates.com> wrote:
> Thanks, Robert!
>
> If I make it a dictionary, can I somehow specify the main (default) label?
> Or, if it's a tuple, will the first element be the default one?
>
> On Tue, Jul 11, 2017 at 6:22 PM, Robert Bradshaw <rober...@google.com>
> wrote:
>>
>> You could return a (tuples, pcoll_schema) tuple from your expand method.
>> Make this a dictionary for more explicit labeling.
>>
>>
>> On Tuesday, July 11, 2017, Dmitry Demeshchuk <dmi...@postmates.com> wrote:
>>>
>>> Hi list,
>>>
>>> I'm trying to make a SQL PTransform return the rows as the main output
>>> and the ordered list of columns as a tagged output.
>>>
>>> This is what my expand() function looks like:
>>>
>>>     def expand(self):
>>>         pcoll_query = pcoll.pipeline | 'Query' >>
>>> beam.Create([self.query])
>>>         pcoll_s3_prefix = pcoll.pipeline | 'S3Prefix' >>
>>> beam.Create([self.s3_prefix])
>>>         pcoll_manifest = (pcoll_query
>>>             | 'Unload' >> beam.ParDo(self.UnloadFromRedshift(self.dsn,
>>> self.aws_config, self.delimiter, self.null_string),
>>> s3_prefix=AsSingleton(pcoll_s3_prefix)))
>>>         pcoll_schema = (pcoll_query
>>>             | 'Schema' >> ReadRedshiftQuerySchema(self.dsn))
>>>         tuples = (pcoll_manifest
>>>             | 'GetS3Files' >>
>>> beam.ParDo(self.ReadRedshiftUnloadedFiles(self.aws_config))
>>>             | 'AntiFusion' >> beam.GroupByKey()
>>>             | 'LoadDataFromS3' >>
>>> beam.ParDo(self.LoadFromS3(self.aws_config))
>>>             | 'ParseCSV' >>
>>> beam.ParDo(self.CSVLineToTuple(self.delimiter, self.null_string),
>>> columns=AsList(pcoll_schema)))
>>>         return tuples
>>>
>>> Is there any way for me to add pcoll_schema as a tagged output here,
>>> inside the CSVLineToTuple ParDo, or inside the expand() function itself?
>>>
>>> Thanks!
>>>
>>> --
>>> Best regards,
>>> Dmitry Demeshchuk.
>
>
>
>
> --
> Best regards,
> Dmitry Demeshchuk.

Reply via email to