If I understand correctly, you want to return something like
ParDo(...).with_outputs(...)? That is, only return the schema if
explicitly asked for?
In this case, you could take a boolean parameter to your PTranform
constructor and your expand method could look lke
def expand(self, p):
...
if self._return_schema
return tuples, schema
else:
return tuples
You would use it like
result = MyPTransform()
or
result, schema = MyPTransform(return_schema=True)
On Wed, Jul 12, 2017 at 11:32 AM, Dmitry Demeshchuk
<[email protected]> wrote:
> Thanks, Robert!
>
> If I make it a dictionary, can I somehow specify the main (default) label?
> Or, if it's a tuple, will the first element be the default one?
>
> On Tue, Jul 11, 2017 at 6:22 PM, Robert Bradshaw <[email protected]>
> wrote:
>>
>> You could return a (tuples, pcoll_schema) tuple from your expand method.
>> Make this a dictionary for more explicit labeling.
>>
>>
>> On Tuesday, July 11, 2017, Dmitry Demeshchuk <[email protected]> wrote:
>>>
>>> Hi list,
>>>
>>> I'm trying to make a SQL PTransform return the rows as the main output
>>> and the ordered list of columns as a tagged output.
>>>
>>> This is what my expand() function looks like:
>>>
>>> def expand(self):
>>> pcoll_query = pcoll.pipeline | 'Query' >>
>>> beam.Create([self.query])
>>> pcoll_s3_prefix = pcoll.pipeline | 'S3Prefix' >>
>>> beam.Create([self.s3_prefix])
>>> pcoll_manifest = (pcoll_query
>>> | 'Unload' >> beam.ParDo(self.UnloadFromRedshift(self.dsn,
>>> self.aws_config, self.delimiter, self.null_string),
>>> s3_prefix=AsSingleton(pcoll_s3_prefix)))
>>> pcoll_schema = (pcoll_query
>>> | 'Schema' >> ReadRedshiftQuerySchema(self.dsn))
>>> tuples = (pcoll_manifest
>>> | 'GetS3Files' >>
>>> beam.ParDo(self.ReadRedshiftUnloadedFiles(self.aws_config))
>>> | 'AntiFusion' >> beam.GroupByKey()
>>> | 'LoadDataFromS3' >>
>>> beam.ParDo(self.LoadFromS3(self.aws_config))
>>> | 'ParseCSV' >>
>>> beam.ParDo(self.CSVLineToTuple(self.delimiter, self.null_string),
>>> columns=AsList(pcoll_schema)))
>>> return tuples
>>>
>>> Is there any way for me to add pcoll_schema as a tagged output here,
>>> inside the CSVLineToTuple ParDo, or inside the expand() function itself?
>>>
>>> Thanks!
>>>
>>> --
>>> Best regards,
>>> Dmitry Demeshchuk.
>
>
>
>
> --
> Best regards,
> Dmitry Demeshchuk.