Hi Patrick,

It sounds like you want to read about event-time windowing:
https://beam.apache.org/documentation/programming-guide/#windowing

In particular, when you say "the last 5 minutes" I would ask what the point
of reference is. Your needs may be served by fixed or sliding windows of
five minutes. These are included with the SDK, and documented here:
https://beam.apache.org/documentation/programming-guide/#provided-windowing-functions

Hope that gets you started,

Kenn

On Sun, Jan 21, 2018 at 8:29 AM, Steiner Patrick <
[email protected]> wrote:

> Hi,
>
> I'm in the process of porting work that I have done based on JBoss
> Technology ( ActiveMQ, Drools, etc ) to GCloud.
>
> The scenario is a simple IoT example with devices sending their values via
> MQTT to PubSub and getting received by Dataflow for processing.
>
> So far, while learning GCloud features from scratch, I was able to receive
> the data and write it to a BigQuery Table. It's all documented at
> https://github.com/PatrickSteiner/Google_Cloud_IoT_Demo
>
> What's working is, that I receive a JSON String ( e.g. {
> humidity=42.700001, temp=20.700001} ) via PubSub and extend it to
> {timestamp=1516549776, deviceID=esp8266_D608CF, humidity=42.700001,
> temp=20.700001} via DataFlow.
>
> Where I have no clue is the following: I want to calculate for every
> "deviceID" the average value for "humidity" and  "temp" for the last 5
> minutes.
>
> Currently my simple Pipeline is
>
>     p.apply(PubsubIO.readMessagesWithAttributes().fromTopic(
> options.getPubSubTopic()))
>          .apply(ParDo.of(new FormatMessageAsTableRowFn()))
>          .apply(BigQueryIO.writeTableRows().to(tableSpec.toString())
>                   .withWriteDisposition(BigQueryIO.Write.WriteDisposition.
> WRITE_APPEND)
>                   .withCreateDisposition(BigQuer
> yIO.Write.CreateDisposition.CREATE_NEVER));
>
> Anyone an advice or pointer to docu how I need to proceed?
>
> Patrick
>

Reply via email to