Hi Patrick, It sounds like you want to read about event-time windowing: https://beam.apache.org/documentation/programming-guide/#windowing
In particular, when you say "the last 5 minutes" I would ask what the point of reference is. Your needs may be served by fixed or sliding windows of five minutes. These are included with the SDK, and documented here: https://beam.apache.org/documentation/programming-guide/#provided-windowing-functions Hope that gets you started, Kenn On Sun, Jan 21, 2018 at 8:29 AM, Steiner Patrick < [email protected]> wrote: > Hi, > > I'm in the process of porting work that I have done based on JBoss > Technology ( ActiveMQ, Drools, etc ) to GCloud. > > The scenario is a simple IoT example with devices sending their values via > MQTT to PubSub and getting received by Dataflow for processing. > > So far, while learning GCloud features from scratch, I was able to receive > the data and write it to a BigQuery Table. It's all documented at > https://github.com/PatrickSteiner/Google_Cloud_IoT_Demo > > What's working is, that I receive a JSON String ( e.g. { > humidity=42.700001, temp=20.700001} ) via PubSub and extend it to > {timestamp=1516549776, deviceID=esp8266_D608CF, humidity=42.700001, > temp=20.700001} via DataFlow. > > Where I have no clue is the following: I want to calculate for every > "deviceID" the average value for "humidity" and "temp" for the last 5 > minutes. > > Currently my simple Pipeline is > > p.apply(PubsubIO.readMessagesWithAttributes().fromTopic( > options.getPubSubTopic())) > .apply(ParDo.of(new FormatMessageAsTableRowFn())) > .apply(BigQueryIO.writeTableRows().to(tableSpec.toString()) > .withWriteDisposition(BigQueryIO.Write.WriteDisposition. > WRITE_APPEND) > .withCreateDisposition(BigQuer > yIO.Write.CreateDisposition.CREATE_NEVER)); > > Anyone an advice or pointer to docu how I need to proceed? > > Patrick >
