It looks like what you want is to join your input stream with the computed
averages. It might look something like this:

    PCollection<KV<DeviceId, RawEvent> inputEvents =
...apply(Window.into(SlidingWindows....))
    PCollection<KV<DeviceId, Double>> avgTemps =
inputEvents.apply(Mean.perKey())

I don't want to recreate all of the docs on this thread, so I will just
point to CoGroupByKey [1] that you would use to join these on DeviceId;
they pattern looks something like this, where I've left out lots of
boilerplate:

    PCollection<KV<DeviceId, CoGbkResult>> joined = ...
    PCollection<KV<DeviceId, EventWithAvg>> result =
joined.apply(ParDo.of(<function to pull out the joined results>))

Kenn

[1] https://beam.apache.org/documentation/programming-guide/#cogroupbykey





On Fri, Jan 26, 2018 at 6:23 AM, Steiner Patrick <
[email protected]> wrote:

> Hi Kenn,
>
> thanks again for responding.
>
> Let me try to explain better what I'm looking for. For simplicity reason
> let's take a more simple example.
>
> Message 1  "4711" : "{temp=10}"  will become  "4711" : "{temp=10,
> avg_temp=10}"
> Message 2  "4711" : "{temp=12}"  will become  "4711" : "{temp=12,
> avg_temp=11}"
> Message 3  "4711" : "{temp=14}"  will become  "4711" : "{temp=14,
> avg_temp=12}"
> Message 4  "4711" : "{temp=10}"  will become  "4711" : "{temp=10,
> avg_temp=11.5}"
>
> So for each incoming message I would like to append the current windows
> "avg_temp" and all this with a sliding window.
> So if we would say that the window is 2 seconds and we receive one message
> per second, my sample would change to
>
>
> Message 1  "4711" : "{temp=10}"  will become  "4711" : "{temp=10,
> avg_temp=10}"
> Message 2  "4711" : "{temp=12}"  will become  "4711" : "{temp=12,
> avg_temp=11}"
> Message 3  "4711" : "{temp=14}"  will become  "4711" : "{temp=14,
> avg_temp=13}"
> Message 4  "4711" : "{temp=10}"  will become  "4711" : "{temp=10,
> avg_temp=12}"
>
> Does this explain what I plan?
>
> Thanks again for your help
>
> Patrick
>
>
> Kenneth Knowles wrote:
>
>
>
> On Thu, Jan 25, 2018 at 3:31 AM, Steiner Patrick <
> [email protected]> wrote:
>
>> Hi Kenn, all,
>>
>> you are right, sliding windows seems to be exactly what I need. Thanks
>> for that pointer.
>>
>> Where I'm still in need of expert advise is how to structure the data
>> within my PCollection. From PubSub I do read all data as a JSON-String
>>
>> Format 1: "{ humidity=42.700001, temp=20.700001}"
>>
>> currently I'm just extending the JSON-String with the deviceID of the
>> sender and the timestamp
>>
>> Format 2: "{timestamp=1516549776, deviceID=esp8266_D608CF,
>> humidity=42.700001, temp=20.700001}"
>>
>> I guess it would make sense to take the deviceID as "Key" to a Key/Value
>> pair, so I can Group by "deviceID"?
>>
>> Format 3: "esp8266_D608CF" : "{timestamp=1516549776, humidity=42.700001,
>> temp=20.700001}"
>>
>
> Yup, this is the right basic set up for just about anything you'll want to
> do.
>
>
>
>> What I'm still "missing" is an idea how to apply  "Mean" to each
>> "humidity" and "temp", so that as a result I can create something like
>>
>> Format 4: "esp8266_D608CF" : "{timestamp=1516549776, humidity=42.700001,
>> temp=20.700001, avg_hum=<xyz>, avg_temp=<abc>}"
>>
>
> Do you just want to calculate the averages and have one record output
> summarizing the device/window? Or do you want to keep all the original
> records and annotate them with the avg for their window, in other words
> basically doing the first calculation and then joining it with your
> original stream?
>
> Kenn
>
>
>
>
>> Happy to take advise or pointer into the right direction.
>>
>> Thanks
>>
>> Patrick
>>
>>
>>
>> Kenneth Knowles wrote:
>>
>> Hi Patrick,
>>
>> It sounds like you want to read about event-time windowing:
>> https://beam.apache.org/documentation/programming-guide/#windowing
>>
>> In particular, when you say "the last 5 minutes" I would ask what the
>> point of reference is. Your needs may be served by fixed or sliding windows
>> of five minutes. These are included with the SDK, and documented here:
>> https://beam.apache.org/documentation/programming-guid
>> e/#provided-windowing-functions
>>
>> Hope that gets you started,
>>
>> Kenn
>>
>> On Sun, Jan 21, 2018 at 8:29 AM, Steiner Patrick <
>> [email protected]> wrote:
>>
>>> Hi,
>>>
>>> I'm in the process of porting work that I have done based on JBoss
>>> Technology ( ActiveMQ, Drools, etc ) to GCloud.
>>>
>>> The scenario is a simple IoT example with devices sending their values
>>> via MQTT to PubSub and getting received by Dataflow for processing.
>>>
>>> So far, while learning GCloud features from scratch, I was able to
>>> receive the data and write it to a BigQuery Table. It's all documented at
>>> https://github.com/PatrickSteiner/Google_Cloud_IoT_Demo
>>>
>>> What's working is, that I receive a JSON String ( e.g. {
>>> humidity=42.700001, temp=20.700001} ) via PubSub and extend it to
>>> {timestamp=1516549776, deviceID=esp8266_D608CF, humidity=42.700001,
>>> temp=20.700001} via DataFlow.
>>>
>>> Where I have no clue is the following: I want to calculate for every
>>> "deviceID" the average value for "humidity" and  "temp" for the last 5
>>> minutes.
>>>
>>> Currently my simple Pipeline is
>>>
>>>     p.apply(PubsubIO.readMessagesWithAttributes().fromTopic(opti
>>> ons.getPubSubTopic()))
>>>          .apply(ParDo.of(new FormatMessageAsTableRowFn()))
>>>          .apply(BigQueryIO.writeTableRows().to(tableSpec.toString())
>>>                   .withWriteDisposition(BigQuery
>>> IO.Write.WriteDisposition.WRITE_APPEND)
>>>                   .withCreateDisposition(BigQuer
>>> yIO.Write.CreateDisposition.CREATE_NEVER));
>>>
>>> Anyone an advice or pointer to docu how I need to proceed?
>>>
>>> Patrick
>>>
>>
>>
>>
>
>

Reply via email to