As of today, Beam Python streaming does not support writing to GCS yet, which explains https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python .
You are right - id_label and timestamp_attribute does not work on Direct runner yet as per https://issues.apache.org/jira/browse/BEAM-4275, I checked with a few folks and that seems to be the current status, but you can still give them a try on Dataflow runner. You may also find the following examples helpful: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py (streaming pipeline). https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py (batch pipeline) https://stackoverflow.com/questions/46854167/dataflow-streaming-using-python-sdk-transform-for-pubsub-messages-to-bigquery-o . On Mon, Jul 15, 2019 at 2:12 AM Matthew Darwin < matthew.dar...@carfinance247.co.uk> wrote: > Hi Valentyn, > > Thank you for your reply. I'm already using the with_attributes=True > option, however this returns the attributes property of the JSON, i.e :- > > { >> "attributes": { >> "source": "python" >> } >> > > > My pipeline currently looks like this (id_label is commented out when > running directly, as it causes a not implemented error):- > > messages = (p > | 'Read From Pub Sub' >> ReadFromPubSub(subscription > =known_args.input_subscription,with_attributes=True) > #,id_label='message_id') > | 'Parse JSON' >> beam.Map(format_message_element) > > My function to parse the message looks like this:- > > def format_message_element(message, timestamp=beam.DoFn.TimestampParam): > messagedict = json.loads(message.data) > rownumber = messagedict['rownumber'] > fullmessage = {'data' : json.dumps(message.data), > 'rownumber' : int(rownumber), > 'attributes' : json.dumps(message.attributes), > 'timestamp' : float(timestamp)} > > logging.info(message.attributes) > logging.info(message) > > return (rownumber, fullmessage) > > I'm aware there are the id_label and with_timestamp parameters for the > ReadFromPubSub method, however, these don't seem to work with the direct > runner, as per > https://issues.apache.org/jira/browse/BEAM-4275?jql=text%20~%20%22python%20id_label%22 > which makes testing somewhat difficult. > > My full code is attached, when running above 2.9.0 of the SDK I can't get > passed the windowing function, due to an issue that appears related to this > https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python > and this > https://stackoverflow.com/questions/55109403/apache-beam-python-sdk-upgrade-issue > as I was receiving the following error when running on 2.12.0: > > Cannot convert GlobalWindow to > apache_beam.utils.windowed_value._IntervalWindowBase [while running > 'generatedPtransform-150'] > > On 2.9.0 when running on the local runner, I receive the following output > from the logging.info calls in format_message_element: > > INFO:root:{u'source': u'python'} > INFO:root:PubsubMessage({"rownumber": 1}, {u'source': u'python'}) > > I was expecting the messageId and publishTime as part of the object > returned; but as you can see there's nothing there for those attributes. > > (The code does not quite map correctly to the BigQuery table so it fails > inserts at that point, which I'm currently trying to resolve!) > > Kind regards, > > Matthew > > > > On Fri, 2019-07-12 at 09:13 -0700, Valentyn Tymofieiev wrote: > > *This message originated from outside your organization* > ------------------------------ > Hi Matthew, > > Welcome to Beam! > > Looking at Python PubSub IO API, you should be able to access id and > timestamp by setting `with_attributes=True` when using `ReadFromPubSub` > PTransform, see [1,2]. > > [1] > https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L61 > [2] > https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L138 > > On Fri, Jul 12, 2019 at 1:36 AM Matthew Darwin < > matthew.dar...@carfinance247.co.uk> wrote: > > Good morning, > > I'm very new to Beam, and pretty new to Python so please first accept my > apologies for any obvious misconceptions/mistakes in the following. > > I am currently trying to develop a sample pipeline in Python to pull > messages from Pub/Sub and then write them to either files in cloud storage > or to BigQuery. The ultimate goal will be to utilise the pipeline for real > time streaming of event data to BigQuery (with various transformations) but > also to store the raw messages long term in files in cloud storage. > > At the moment, I'm simply trying to parse the message to get the PubSub > messageId and publishTime in order to be able to write them into the > output. The json of my PubSub message looks like this:- > > [ > { > "ackId": > "BCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIICBQFfH1xU1t1Xl8aB1ENGXJ8Zyc_XxcIB0BTeFVaEQx6bVxXOFcMEHF8YXZpWhUIA0FTfXeq5cveluzJNksxIbvE8KxfeqqmgfhiZho9XxJLLD5-PT5FQV5AEkw2C0RJUytDCypYEU4", > "message": { > "attributes": { > "source": "python" > }, > "data": "eyJyb3dudW1iZXIiOiAyfQ==", > "messageId": "619310330691403", > "publishTime": "2019-07-12T08:27:58.522Z" > } > } > ] > According to the documentation > <https://beam.apache.org/releases/pydoc/2.13.0/apache_beam.io.gcp.pubsub.html> > the PubSub message payload returns the *data* and *attributes* > properties; is there simply no way of retrieving the messageId and > publishTime, or are these exposed somewhere else? If not, will the > inclusion of these be in the roadmap, and are they available if using Java > (I have zero Java experience hence why reaching for Python first). > > Kind regards, > > Matthew > > > > > ---------- Forwarded message ---------- > From: Domain postMaster address <postmas...@carfinance247.co.uk> > To: "user@beam.apache.org" <user@beam.apache.org> > Cc: > Bcc: > Date: Mon, 15 Jul 2019 09:12:15 +0000 > Subject: ApacheBeamDataflow-To-BigQuery2_py was removed from this message > > [image: Logo] > We removed a file from this message > > Your organization's email policy doesn't permit this type of file. If you > need it, please contact your administrator. > > > File Details > > *ApacheBeamDataflow-To-BigQuery2.py* (6559 bytes) > > > © 2003 - 2019 Mimecast Services Limited. > > >