Also, see
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/leader_board.py
which
involves both PubSub and Bigquery IOs.

On Fri, Jul 19, 2019 at 12:31 PM Pablo Estrada <pabl...@google.com> wrote:

> Beam 2.14.0 will include support for writing files in the fileio module
> (the support will include GCS, local files, HDFS). It will also support
> streaming. The transform is still marked as experimental, and is likely to
> receive improvements - but you can check it out for your pipelines, and see
> if it helps you : )
> Best
> -P.
>
> On Fri, Jul 19, 2019 at 12:24 PM Valentyn Tymofieiev <valen...@google.com>
> wrote:
>
>> As of today, Beam Python streaming does not support writing to GCS yet,
>> which  explains
>> https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python
>>  .
>>
>> You are right - id_label and timestamp_attribute does not work on Direct
>> runner yet as per https://issues.apache.org/jira/browse/BEAM-4275, I
>> checked with a few folks and that seems to be the current status, but you
>> can still give them a try on Dataflow runner.
>>
>> You may also find the following examples helpful:
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py
>> (streaming pipeline).
>>
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
>>  (batch
>> pipeline)
>>
>> https://stackoverflow.com/questions/46854167/dataflow-streaming-using-python-sdk-transform-for-pubsub-messages-to-bigquery-o
>> .
>>
>> On Mon, Jul 15, 2019 at 2:12 AM Matthew Darwin <
>> matthew.dar...@carfinance247.co.uk> wrote:
>>
>>> Hi Valentyn,
>>>
>>> Thank you for your reply. I'm already using the with_attributes=True
>>> option, however this returns the attributes property of the JSON, i.e :-
>>>
>>> {
>>>>       "attributes": {
>>>>         "source": "python"
>>>>       }
>>>>
>>>
>>>
>>> My pipeline currently looks like this (id_label is commented out when
>>> running directly, as it causes a not implemented error):-
>>>
>>> messages = (p
>>> | 'Read From Pub Sub' >> ReadFromPubSub(subscription
>>> =known_args.input_subscription,with_attributes=True)
>>> #,id_label='message_id')
>>> | 'Parse JSON' >> beam.Map(format_message_element)
>>>
>>> My function to parse the message looks like this:-
>>>
>>> def format_message_element(message, timestamp=beam.DoFn.TimestampParam):
>>> messagedict = json.loads(message.data)
>>> rownumber = messagedict['rownumber']
>>> fullmessage = {'data' : json.dumps(message.data),
>>> 'rownumber' : int(rownumber),
>>> 'attributes' : json.dumps(message.attributes),
>>> 'timestamp' : float(timestamp)}
>>>
>>> logging.info(message.attributes)
>>> logging.info(message)
>>>
>>> return (rownumber, fullmessage)
>>>
>>> I'm aware there are the id_label and with_timestamp parameters for the
>>> ReadFromPubSub method, however, these don't seem to work with the direct
>>> runner, as per
>>> https://issues.apache.org/jira/browse/BEAM-4275?jql=text%20~%20%22python%20id_label%22
>>> which makes testing somewhat difficult.
>>>
>>> My full code is attached, when running above 2.9.0 of the SDK I can't
>>> get passed the windowing function, due to an issue that appears related to
>>> this
>>> https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python
>>> and this
>>> https://stackoverflow.com/questions/55109403/apache-beam-python-sdk-upgrade-issue
>>> as I was receiving the following error when running on 2.12.0:
>>>
>>> Cannot convert GlobalWindow to
>>> apache_beam.utils.windowed_value._IntervalWindowBase [while running
>>> 'generatedPtransform-150']
>>>
>>> On 2.9.0 when running on the local runner, I receive the following
>>> output from the logging.info calls in format_message_element:
>>>
>>> INFO:root:{u'source': u'python'}
>>> INFO:root:PubsubMessage({"rownumber": 1}, {u'source': u'python'})
>>>
>>> I was expecting the messageId and publishTime as part of the object
>>> returned; but as you can see there's nothing there for those attributes.
>>>
>>> (The code does not quite map correctly to the BigQuery table so it fails
>>> inserts at that point, which I'm currently trying to resolve!)
>>>
>>> Kind regards,
>>>
>>> Matthew
>>>
>>>
>>>
>>> On Fri, 2019-07-12 at 09:13 -0700, Valentyn Tymofieiev wrote:
>>>
>>> *This message originated from outside your organization*
>>> ------------------------------
>>> Hi Matthew,
>>>
>>> Welcome to Beam!
>>>
>>> Looking at Python PubSub IO API, you should be able to access id and
>>> timestamp by setting `with_attributes=True` when using `ReadFromPubSub`
>>> PTransform, see [1,2].
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L61
>>> [2]
>>> https://github.com/apache/beam/blob/0fce2b88660f52dae638697e1472aa108c982ae6/sdks/python/apache_beam/io/gcp/pubsub.py#L138
>>>
>>> On Fri, Jul 12, 2019 at 1:36 AM Matthew Darwin <
>>> matthew.dar...@carfinance247.co.uk> wrote:
>>>
>>> Good morning,
>>>
>>> I'm very new to Beam, and pretty new to Python so please first accept my
>>> apologies for any obvious misconceptions/mistakes in the following.
>>>
>>> I am currently trying to develop a sample pipeline in Python to pull
>>> messages from Pub/Sub and then write them to either files in cloud storage
>>> or to BigQuery. The ultimate goal will be to utilise the pipeline for real
>>> time streaming of event data to BigQuery (with various transformations) but
>>> also to store the raw messages long term in files in cloud storage.
>>>
>>> At the moment, I'm simply trying to parse the message to get the PubSub
>>> messageId and publishTime in order to be able to write them into the
>>> output. The json of my PubSub message looks like this:-
>>>
>>> [
>>>   {
>>>     "ackId":
>>> "BCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIICBQFfH1xU1t1Xl8aB1ENGXJ8Zyc_XxcIB0BTeFVaEQx6bVxXOFcMEHF8YXZpWhUIA0FTfXeq5cveluzJNksxIbvE8KxfeqqmgfhiZho9XxJLLD5-PT5FQV5AEkw2C0RJUytDCypYEU4",
>>>     "message": {
>>>       "attributes": {
>>>         "source": "python"
>>>       },
>>>       "data": "eyJyb3dudW1iZXIiOiAyfQ==",
>>>       "messageId": "619310330691403",
>>>       "publishTime": "2019-07-12T08:27:58.522Z"
>>>     }
>>>   }
>>> ]
>>> According to the documentation
>>> <https://beam.apache.org/releases/pydoc/2.13.0/apache_beam.io.gcp.pubsub.html>
>>> the PubSub message payload returns the *data* and *attributes*
>>> properties; is there simply no way of retrieving the messageId and
>>> publishTime, or are these exposed somewhere else? If not, will the
>>> inclusion of these be in the roadmap, and are they available if using Java
>>> (I have zero Java experience hence why reaching for Python first).
>>>
>>> Kind regards,
>>>
>>> Matthew
>>>
>>>
>>>
>>>
>>> ---------- Forwarded message ----------
>>> From: Domain postMaster address <postmas...@carfinance247.co.uk>
>>> To: "user@beam.apache.org" <user@beam.apache.org>
>>> Cc:
>>> Bcc:
>>> Date: Mon, 15 Jul 2019 09:12:15 +0000
>>> Subject: ApacheBeamDataflow-To-BigQuery2_py was removed from this message
>>>
>>> [image: Logo]
>>>   We removed a file from this message
>>>
>>> Your organization's email policy doesn't permit this type of file. If
>>> you need it, please contact your administrator.
>>>
>>>
>>> File Details
>>>
>>> *ApacheBeamDataflow-To-BigQuery2.py* (6559 bytes)
>>>
>>>
>>>   © 2003 - 2019 Mimecast Services Limited.
>>>
>>>
>>>
>>

Reply via email to