Hi,
I have a pipeline with a Pcollection of dicts in Python, and I'd like to apply
a schema to it for use with SQLTransforms.
The schema is defined as follows:
class RowSchema(typing.NamedTuple):
colA: str
colB: typing.Optional[str]
beam.coders.registry.register_coder(RowSchema, beam.coders.RowCoder)
The code that ingests the Pcollection of dicts and attempts to apply the schema
is:
pcol = (p
| 'read from BQ' >>
beam.io.ReadFromBigQuery(
gcs_location="gs://example_location",
query=query, #Reads only the columns defined in the schema
use_standard_sql=True)
| 'ToRow' >> beam.Map(
lambda x: RowSchema(**x)).with_output_types(RowSchema)
# | SqlTransform(...)
However, it results in the following error:
File "/home/lib/python3.9/site-packages/apache_beam/coders/coders.py", line
423, in encode
return value.encode('utf-8')
AttributeError: 'int' object has no attribute 'encode' [while running 'ToRow']
I've tested that if I use a Pcollection of beam.pvalue.Row, such as the
following, the code does in fact work:
pcol = (p
| "Create" >> beam.Create(
[{'colA': 'a1', 'colB': 'b1'}, {'colA': 'a2', 'colB': None}])
| 'ToRow' >> beam.Map(lambda x: RowSchema(**x)).with_output_types(RowSchema)
# | SqlTransform(...)
What can I do to apply the schema and enable SQLTransforms on a Pcollection of
dicts?
The structure I tried to use is based on the following references:
* https://beam.apache.org/documentation/programming-guide/#inferring-schemas
*
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.sql.html
I've also checked the io.gcp.bigquery reference
(https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.bigquery.html).
I've noticed it has a schema implementation but only for writes to BigQuery,
so I wasn't able to avoid the input as a Pcollection of dicts.
I also found this example
(https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/sql_taxi.py)
using a dynamic schema, which wouldn't be a valid approach for my use case as
far as I understand it.
Any help with this issue would be greatly appreciated. Thanks!
**************************************************************** Este e-mail e
seus anexos s?o para uso exclusivo do destinat?rio e podem conter informa??es
confidenciais e/ou legalmente privilegiadas. N?o podem ser parcial ou
totalmente reproduzidos sem o consentimento do autor. Qualquer divulga??o ou
uso n?o autorizado deste e-mail ou seus anexos ? proibida. Se voc? receber esse
e-mail por engano, por favor, notifique o remetente e apague-o imediatamente.
This e-mail and its attachments are for the sole use of the addressee and may
contain information which is confidential and/or legally privileged. Should not
be partly or wholly reproduced without consent of the owner. Any unauthorized
use of disclosure of this e-mail or its attachments is prohibited. If you
receive this e-mail in error, please immediately delete it and notify the
sender by return e-mail.
*****************************************************************