As of today, Beam Python streaming does not support writing to GCS yet,
which  explains
https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python
 .

You are right - id_label and timestamp_attribute does not work on Direct
runner yet as per https://issues.apache.org/jira/browse/BEAM-4275, I
checked with a few folks and that seems to be the current status, but you
can still give them a try on Dataflow runner.

You may also find the following examples helpful:

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py
(streaming pipeline).
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
(batch
pipeline)
https://stackoverflow.com/questions/46854167/dataflow-streaming-using-python-sdk-transform-for-pubsub-messages-to-bigquery-o
.

On Mon, Jul 15, 2019 at 2:12 AM Matthew Darwin <
matthew.dar...@carfinance247.co.uk> wrote:

> Hi Valentyn,
>
> Thank you for your reply. I'm already using the with_attributes=True
> option, however this returns the attributes property of the JSON, i.e :-
>
> {
>>       "attributes": {
>>         "source": "python"
>>       }
>>
>
>
> My pipeline currently looks like this (id_label is commented out when
> running directly, as it causes a not implemented error):-
>
> messages = (p
> | 'Read From Pub Sub' >> ReadFromPubSub(subscription
> =known_args.input_subscription,with_attributes=True)
> #,id_label='message_id')
> | 'Parse JSON' >> beam.Map(format_message_element)
>
> My function to parse the message looks like this:-
>
> def format_message_element(message, timestamp=beam.DoFn.TimestampParam):
> messagedict = json.loads(message.data)
> rownumber = messagedict['rownumber']
> fullmessage = {'data' : json.dumps(message.data),
> 'rownumber' : int(rownumber),
> 'attributes' : json.dumps(message.attributes),
> 'timestamp' : float(timestamp)}
>
> logging.info(message.attributes)
> logging.info(message)
>
> return (rownumber, fullmessage)
>
> I'm aware there are the id_label and with_timestamp parameters for the
> ReadFromPubSub method, however, these don't seem to work with the direct
> runner, as per
> https://issues.apache.org/jira/browse/BEAM-4275?jql=text%20~%20%22python%20id_label%22
> which makes testing somewhat difficult.
>
> My full code is attached, when running above 2.9.0 of the SDK I can't get
> passed the windowing function, due to an issue that appears related to this
> https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python
> and this
> https://stackoverflow.com/questions/55109403/apache-beam-python-sdk-upgrade-issue
> as I was receiving the following error when running on 2.12.0:
>
> Cannot convert GlobalWindow to
> apache_beam.utils.windowed_value._IntervalWindowBase [while running
> 'generatedPtransform-150']
>
> On 2.9.0 when running on the local runner, I receive the following output
> from the logging.info calls in format_message_element:
>
> INFO:root:{u'source': u'python'}
> INFO:root:PubsubMessage({"rownumber": 1}, {u'source': u'python'})
>
> I was expecting the messageId and publishTime as part of the object
> returned; but as you can see there's nothing there for those attributes.
>
> (The code does not quite map correctly to the BigQuery table so it fails
> inserts at that point, which I'm currently trying to resolve!)
>
> Kind regards,
>
> Matthew
>
>
>
> On Fri, 2019-07-12 at 09:13 -0700, Valentyn Tymofieiev wrote:
>
> *This message originated from outside your organization*
> ------------------------------
> Hi Matthew,
>
> Welcome to Beam!
>
> Looking at Python PubSub IO API, you should be able to access id and
> timestamp by setting `with_attributes=True` when using `ReadFromPubSub`
> PTransform, see [1,2].
>
> [1]
> https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L61
> [2]
> https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L138
>
> On Fri, Jul 12, 2019 at 1:36 AM Matthew Darwin <
> matthew.dar...@carfinance247.co.uk> wrote:
>
> Good morning,
>
> I'm very new to Beam, and pretty new to Python so please first accept my
> apologies for any obvious misconceptions/mistakes in the following.
>
> I am currently trying to develop a sample pipeline in Python to pull
> messages from Pub/Sub and then write them to either files in cloud storage
> or to BigQuery. The ultimate goal will be to utilise the pipeline for real
> time streaming of event data to BigQuery (with various transformations) but
> also to store the raw messages long term in files in cloud storage.
>
> At the moment, I'm simply trying to parse the message to get the PubSub
> messageId and publishTime in order to be able to write them into the
> output. The json of my PubSub message looks like this:-
>
> [
>   {
>     "ackId":
> "BCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIICBQFfH1xU1t1Xl8aB1ENGXJ8Zyc_XxcIB0BTeFVaEQx6bVxXOFcMEHF8YXZpWhUIA0FTfXeq5cveluzJNksxIbvE8KxfeqqmgfhiZho9XxJLLD5-PT5FQV5AEkw2C0RJUytDCypYEU4",
>     "message": {
>       "attributes": {
>         "source": "python"
>       },
>       "data": "eyJyb3dudW1iZXIiOiAyfQ==",
>       "messageId": "619310330691403",
>       "publishTime": "2019-07-12T08:27:58.522Z"
>     }
>   }
> ]
> According to the documentation
> <https://beam.apache.org/releases/pydoc/2.13.0/apache_beam.io.gcp.pubsub.html>
> the PubSub message payload returns the *data* and *attributes*
> properties; is there simply no way of retrieving the messageId and
> publishTime, or are these exposed somewhere else? If not, will the
> inclusion of these be in the roadmap, and are they available if using Java
> (I have zero Java experience hence why reaching for Python first).
>
> Kind regards,
>
> Matthew
>
>
>
>
> ---------- Forwarded message ----------
> From: Domain postMaster address <postmas...@carfinance247.co.uk>
> To: "user@beam.apache.org" <user@beam.apache.org>
> Cc:
> Bcc:
> Date: Mon, 15 Jul 2019 09:12:15 +0000
> Subject: ApacheBeamDataflow-To-BigQuery2_py was removed from this message
>
> [image: Logo]
>   We removed a file from this message
>
> Your organization's email policy doesn't permit this type of file. If you
> need it, please contact your administrator.
>
>
> File Details
>
> *ApacheBeamDataflow-To-BigQuery2.py* (6559 bytes)
>
>
>   © 2003 - 2019 Mimecast Services Limited.
>
>
>

Reply via email to