The use of CoGroupByKey is for joining your input stream with the
aggregation. It isn't strictly necessary to use CoGroupByKey to join the
average humidity and average temperature. You could also compute both of
these together by building a compound Combine transform to do both at the
same time. There are two ways:

1. Write your own CombineFn that calculates both averages together. I will
refer you to
https://beam.apache.org/documentation/programming-guide/#combine

2. Use "composed combine" which I think is most documented in javadoc,
here:
https://beam.apache.org/documentation/sdks/javadoc/2.2.0/org/apache/beam/sdk/transforms/CombineFns.html#compose--

I think the second one is probably a good fit and will look roughly like
this:

    Combine.perKey(
                    CombineFns.compose()
                        .with(RawEvent::getHumidity, new MeanFn<>(),
humidityTag)
                        .with(RawEvent::getTemp, new MeanFn<>(), tempTag)))

Kenn

On Tue, Jan 30, 2018 at 4:31 AM, Steiner Patrick <
[email protected]> wrote:

> Thanks for the pointer to 'CoGroupByKey', which makes perfect sense and so
> does the usage of Mean.
>
> Still I have to bother you, as I'm still probably lacking some basic
> understanding on how ideal messages/data-structures are supposed to look
> like in a Beam Pipeline.
>
> This is what I want to happen for each element in my original PCollection
> ( read from PubSub )
>
>     original message -> "{ humidity=42.700001, temp=20.700001}"
>     final message -> "deviceID":{timestamp=1516549776,
> humidity=42.700001, temp=20.700001, avg_hum=<xyz>, avg_temp=<abc>}"
>
> Where I'm stuck is how to calculate the average ( Mean ) for each
> datapoint in my original message?
>
> As you pointed out the usage of 'CoGroupByKey' I assume I have to do
> something like in the following visualization?
>
> https://github.com/PatrickSteiner/Google_Cloud_
> IoT_Demo/blob/master/pictures/Mean_flow.png
>
> Am I on the right path?
>
>
> Thanks
>
> Patrick
>
> Kenneth Knowles wrote:
>
> It looks like what you want is to join your input stream with the computed
> averages. It might look something like this:
>
>     PCollection<KV<DeviceId, RawEvent> inputEvents = ...apply(Window.into(
> SlidingWindows....))
>     PCollection<KV<DeviceId, Double>> avgTemps =
> inputEvents.apply(Mean.perKey())
>
> I don't want to recreate all of the docs on this thread, so I will just
> point to CoGroupByKey [1] that you would use to join these on DeviceId;
> they pattern looks something like this, where I've left out lots of
> boilerplate:
>
>     PCollection<KV<DeviceId, CoGbkResult>> joined = ...
>     PCollection<KV<DeviceId, EventWithAvg>> result =
> joined.apply(ParDo.of(<function to pull out the joined results>))
>
> Kenn
>
> [1] https://beam.apache.org/documentation/programming-guide/#cogroupbykey
>
>
>
>
>
> On Fri, Jan 26, 2018 at 6:23 AM, Steiner Patrick <
> [email protected]> wrote:
>
>> Hi Kenn,
>>
>> thanks again for responding.
>>
>> Let me try to explain better what I'm looking for. For simplicity reason
>> let's take a more simple example.
>>
>> Message 1  "4711" : "{temp=10}"  will become  "4711" : "{temp=10,
>> avg_temp=10}"
>> Message 2  "4711" : "{temp=12}"  will become  "4711" : "{temp=12,
>> avg_temp=11}"
>> Message 3  "4711" : "{temp=14}"  will become  "4711" : "{temp=14,
>> avg_temp=12}"
>> Message 4  "4711" : "{temp=10}"  will become  "4711" : "{temp=10,
>> avg_temp=11.5}"
>>
>> So for each incoming message I would like to append the current windows
>> "avg_temp" and all this with a sliding window.
>> So if we would say that the window is 2 seconds and we receive one
>> message per second, my sample would change to
>>
>>
>> Message 1  "4711" : "{temp=10}"  will become  "4711" : "{temp=10,
>> avg_temp=10}"
>> Message 2  "4711" : "{temp=12}"  will become  "4711" : "{temp=12,
>> avg_temp=11}"
>> Message 3  "4711" : "{temp=14}"  will become  "4711" : "{temp=14,
>> avg_temp=13}"
>> Message 4  "4711" : "{temp=10}"  will become  "4711" : "{temp=10,
>> avg_temp=12}"
>>
>> Does this explain what I plan?
>>
>> Thanks again for your help
>>
>> Patrick
>>
>>
>> Kenneth Knowles wrote:
>>
>>
>>
>> On Thu, Jan 25, 2018 at 3:31 AM, Steiner Patrick <
>> [email protected]> wrote:
>>
>>> Hi Kenn, all,
>>>
>>> you are right, sliding windows seems to be exactly what I need. Thanks
>>> for that pointer.
>>>
>>> Where I'm still in need of expert advise is how to structure the data
>>> within my PCollection. From PubSub I do read all data as a JSON-String
>>>
>>> Format 1: "{ humidity=42.700001, temp=20.700001}"
>>>
>>> currently I'm just extending the JSON-String with the deviceID of the
>>> sender and the timestamp
>>>
>>> Format 2: "{timestamp=1516549776, deviceID=esp8266_D608CF,
>>> humidity=42.700001, temp=20.700001}"
>>>
>>> I guess it would make sense to take the deviceID as "Key" to a Key/Value
>>> pair, so I can Group by "deviceID"?
>>>
>>> Format 3: "esp8266_D608CF" : "{timestamp=1516549776, humidity=42.700001,
>>> temp=20.700001}"
>>>
>>
>> Yup, this is the right basic set up for just about anything you'll want
>> to do.
>>
>>
>>
>>> What I'm still "missing" is an idea how to apply  "Mean" to each
>>> "humidity" and "temp", so that as a result I can create something like
>>>
>>> Format 4: "esp8266_D608CF" : "{timestamp=1516549776, humidity=42.700001,
>>> temp=20.700001, avg_hum=<xyz>, avg_temp=<abc>}"
>>>
>>
>> Do you just want to calculate the averages and have one record output
>> summarizing the device/window? Or do you want to keep all the original
>> records and annotate them with the avg for their window, in other words
>> basically doing the first calculation and then joining it with your
>> original stream?
>>
>> Kenn
>>
>>
>>
>>
>>> Happy to take advise or pointer into the right direction.
>>>
>>> Thanks
>>>
>>> Patrick
>>>
>>>
>>>
>>> Kenneth Knowles wrote:
>>>
>>> Hi Patrick,
>>>
>>> It sounds like you want to read about event-time windowing:
>>> https://beam.apache.org/documentation/programming-guide/#windowing
>>>
>>> In particular, when you say "the last 5 minutes" I would ask what the
>>> point of reference is. Your needs may be served by fixed or sliding windows
>>> of five minutes. These are included with the SDK, and documented here:
>>> https://beam.apache.org/documentation/programming-guid
>>> e/#provided-windowing-functions
>>>
>>> Hope that gets you started,
>>>
>>> Kenn
>>>
>>> On Sun, Jan 21, 2018 at 8:29 AM, Steiner Patrick <
>>> [email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm in the process of porting work that I have done based on JBoss
>>>> Technology ( ActiveMQ, Drools, etc ) to GCloud.
>>>>
>>>> The scenario is a simple IoT example with devices sending their values
>>>> via MQTT to PubSub and getting received by Dataflow for processing.
>>>>
>>>> So far, while learning GCloud features from scratch, I was able to
>>>> receive the data and write it to a BigQuery Table. It's all documented at
>>>> https://github.com/PatrickSteiner/Google_Cloud_IoT_Demo
>>>>
>>>> What's working is, that I receive a JSON String ( e.g. {
>>>> humidity=42.700001, temp=20.700001} ) via PubSub and extend it to
>>>> {timestamp=1516549776, deviceID=esp8266_D608CF, humidity=42.700001,
>>>> temp=20.700001} via DataFlow.
>>>>
>>>> Where I have no clue is the following: I want to calculate for every
>>>> "deviceID" the average value for "humidity" and  "temp" for the last 5
>>>> minutes.
>>>>
>>>> Currently my simple Pipeline is
>>>>
>>>>     p.apply(PubsubIO.readMessagesWithAttributes().fromTopic(opti
>>>> ons.getPubSubTopic()))
>>>>          .apply(ParDo.of(new FormatMessageAsTableRowFn()))
>>>>          .apply(BigQueryIO.writeTableRows().to(tableSpec.toString())
>>>>                   .withWriteDisposition(BigQuery
>>>> IO.Write.WriteDisposition.WRITE_APPEND)
>>>>                   .withCreateDisposition(BigQuer
>>>> yIO.Write.CreateDisposition.CREATE_NEVER));
>>>>
>>>> Anyone an advice or pointer to docu how I need to proceed?
>>>>
>>>> Patrick
>>>>
>>>
>>>
>>>
>>
>>
>
>

Reply via email to