Also, see https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/leader_board.py which involves both PubSub and Bigquery IOs.
On Fri, Jul 19, 2019 at 12:31 PM Pablo Estrada <pabl...@google.com> wrote: > Beam 2.14.0 will include support for writing files in the fileio module > (the support will include GCS, local files, HDFS). It will also support > streaming. The transform is still marked as experimental, and is likely to > receive improvements - but you can check it out for your pipelines, and see > if it helps you : ) > Best > -P. > > On Fri, Jul 19, 2019 at 12:24 PM Valentyn Tymofieiev <valen...@google.com> > wrote: > >> As of today, Beam Python streaming does not support writing to GCS yet, >> which explains >> https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python >> . >> >> You are right - id_label and timestamp_attribute does not work on Direct >> runner yet as per https://issues.apache.org/jira/browse/BEAM-4275, I >> checked with a few folks and that seems to be the current status, but you >> can still give them a try on Dataflow runner. >> >> You may also find the following examples helpful: >> >> >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py >> (streaming pipeline). >> >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py >> (batch >> pipeline) >> >> https://stackoverflow.com/questions/46854167/dataflow-streaming-using-python-sdk-transform-for-pubsub-messages-to-bigquery-o >> . >> >> On Mon, Jul 15, 2019 at 2:12 AM Matthew Darwin < >> matthew.dar...@carfinance247.co.uk> wrote: >> >>> Hi Valentyn, >>> >>> Thank you for your reply. I'm already using the with_attributes=True >>> option, however this returns the attributes property of the JSON, i.e :- >>> >>> { >>>> "attributes": { >>>> "source": "python" >>>> } >>>> >>> >>> >>> My pipeline currently looks like this (id_label is commented out when >>> running directly, as it causes a not implemented error):- >>> >>> messages = (p >>> | 'Read From Pub Sub' >> ReadFromPubSub(subscription >>> =known_args.input_subscription,with_attributes=True) >>> #,id_label='message_id') >>> | 'Parse JSON' >> beam.Map(format_message_element) >>> >>> My function to parse the message looks like this:- >>> >>> def format_message_element(message, timestamp=beam.DoFn.TimestampParam): >>> messagedict = json.loads(message.data) >>> rownumber = messagedict['rownumber'] >>> fullmessage = {'data' : json.dumps(message.data), >>> 'rownumber' : int(rownumber), >>> 'attributes' : json.dumps(message.attributes), >>> 'timestamp' : float(timestamp)} >>> >>> logging.info(message.attributes) >>> logging.info(message) >>> >>> return (rownumber, fullmessage) >>> >>> I'm aware there are the id_label and with_timestamp parameters for the >>> ReadFromPubSub method, however, these don't seem to work with the direct >>> runner, as per >>> https://issues.apache.org/jira/browse/BEAM-4275?jql=text%20~%20%22python%20id_label%22 >>> which makes testing somewhat difficult. >>> >>> My full code is attached, when running above 2.9.0 of the SDK I can't >>> get passed the windowing function, due to an issue that appears related to >>> this >>> https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python >>> and this >>> https://stackoverflow.com/questions/55109403/apache-beam-python-sdk-upgrade-issue >>> as I was receiving the following error when running on 2.12.0: >>> >>> Cannot convert GlobalWindow to >>> apache_beam.utils.windowed_value._IntervalWindowBase [while running >>> 'generatedPtransform-150'] >>> >>> On 2.9.0 when running on the local runner, I receive the following >>> output from the logging.info calls in format_message_element: >>> >>> INFO:root:{u'source': u'python'} >>> INFO:root:PubsubMessage({"rownumber": 1}, {u'source': u'python'}) >>> >>> I was expecting the messageId and publishTime as part of the object >>> returned; but as you can see there's nothing there for those attributes. >>> >>> (The code does not quite map correctly to the BigQuery table so it fails >>> inserts at that point, which I'm currently trying to resolve!) >>> >>> Kind regards, >>> >>> Matthew >>> >>> >>> >>> On Fri, 2019-07-12 at 09:13 -0700, Valentyn Tymofieiev wrote: >>> >>> *This message originated from outside your organization* >>> ------------------------------ >>> Hi Matthew, >>> >>> Welcome to Beam! >>> >>> Looking at Python PubSub IO API, you should be able to access id and >>> timestamp by setting `with_attributes=True` when using `ReadFromPubSub` >>> PTransform, see [1,2]. >>> >>> [1] >>> https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L61 >>> [2] >>> https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L138 >>> >>> On Fri, Jul 12, 2019 at 1:36 AM Matthew Darwin < >>> matthew.dar...@carfinance247.co.uk> wrote: >>> >>> Good morning, >>> >>> I'm very new to Beam, and pretty new to Python so please first accept my >>> apologies for any obvious misconceptions/mistakes in the following. >>> >>> I am currently trying to develop a sample pipeline in Python to pull >>> messages from Pub/Sub and then write them to either files in cloud storage >>> or to BigQuery. The ultimate goal will be to utilise the pipeline for real >>> time streaming of event data to BigQuery (with various transformations) but >>> also to store the raw messages long term in files in cloud storage. >>> >>> At the moment, I'm simply trying to parse the message to get the PubSub >>> messageId and publishTime in order to be able to write them into the >>> output. The json of my PubSub message looks like this:- >>> >>> [ >>> { >>> "ackId": >>> "BCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIICBQFfH1xU1t1Xl8aB1ENGXJ8Zyc_XxcIB0BTeFVaEQx6bVxXOFcMEHF8YXZpWhUIA0FTfXeq5cveluzJNksxIbvE8KxfeqqmgfhiZho9XxJLLD5-PT5FQV5AEkw2C0RJUytDCypYEU4", >>> "message": { >>> "attributes": { >>> "source": "python" >>> }, >>> "data": "eyJyb3dudW1iZXIiOiAyfQ==", >>> "messageId": "619310330691403", >>> "publishTime": "2019-07-12T08:27:58.522Z" >>> } >>> } >>> ] >>> According to the documentation >>> <https://beam.apache.org/releases/pydoc/2.13.0/apache_beam.io.gcp.pubsub.html> >>> the PubSub message payload returns the *data* and *attributes* >>> properties; is there simply no way of retrieving the messageId and >>> publishTime, or are these exposed somewhere else? If not, will the >>> inclusion of these be in the roadmap, and are they available if using Java >>> (I have zero Java experience hence why reaching for Python first). >>> >>> Kind regards, >>> >>> Matthew >>> >>> >>> >>> >>> ---------- Forwarded message ---------- >>> From: Domain postMaster address <postmas...@carfinance247.co.uk> >>> To: "user@beam.apache.org" <user@beam.apache.org> >>> Cc: >>> Bcc: >>> Date: Mon, 15 Jul 2019 09:12:15 +0000 >>> Subject: ApacheBeamDataflow-To-BigQuery2_py was removed from this message >>> >>> [image: Logo] >>> We removed a file from this message >>> >>> Your organization's email policy doesn't permit this type of file. If >>> you need it, please contact your administrator. >>> >>> >>> File Details >>> >>> *ApacheBeamDataflow-To-BigQuery2.py* (6559 bytes) >>> >>> >>> © 2003 - 2019 Mimecast Services Limited. >>> >>> >>> >>