Hi Luke,

Thanks for your answer.

I was studying the state/timer approach. What doesn't convince me, is the
fact that I would have to use a global window in the main input, otherwise
I could lost some states when I don't receive state events for a while. By
using side inputs I keep two independents window strategies, global for
states but fixed (or whatever) for the measurements. Do you see other way
to overcome this?

Regards



On Fri, Oct 9, 2020, 7:38 PM Luke Cwik <[email protected]> wrote:

> Only data along the main input edge causes that DoFn to be executed again.
> Side inputs don't cause main inputs to be reprocessed. The trigger on the
> side input controls when the side input data becomes available and is
> updated.
>
> You could choose to have a generator that produces an event on the main
> input every hour and you could use it to look at the side input and compute
> all the outputs that you want.
>
> I do think that a solution that uses state and timers would likely fit
> more naturally to solve the problem. This blog[1] is a good starting point.
>
> 1: https://beam.apache.org/blog/timely-processing/
>
> On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola <[email protected]>
> wrote:
>
>> Hi all,
>>
>>
>> I have a question regarding triggers in sideinput. I would try to explain
>> my doubt with an example.
>>
>> Suppose we have a data stream of sensor events as follow:
>>
>>    - id, timestamp, type, value
>>
>> Where:
>>
>>    - id is the sensor id
>>    - timestamp is the event timestamp
>>    - type could be some of this value (measurement, new-state)
>>    - value is a conditional field following this rule. If the type is
>>    'measurement' it is a real number with the measured value. If the type
>>    field is 'new-state' value it is a string with a new sensor state.
>>
>> We want to produce aggregated data in a time based way (for example by an
>> hour) as follow:
>>
>> Report:
>>
>> {
>>
>> 'timestamp: 'report timestamp',
>>
>> 'sensors': {
>>
>> 'sensorId1': {
>>
>> 'stateA': 'sum of measures in stateA',
>>
>> 'stateB': 'sum of measures in stateB',
>>
>> 'stateC': 'sum of measures in stateC',
>>
>> ....
>>
>> }
>>
>> }
>>
>> }
>>
>> The state at a given timestamp X is the last known state of that sensor
>> at that moment. We could have late events (suppose max 15' late).
>>
>>
>> I thought this solution:
>>
>>
>> 1) Create a 'sideInput' with the states (in a global window)
>>
>>
>> Window<KV<String, SensorEvent>> sideInputWindow =
>>
>>     Window.<KV<String, SensorEvent>>*into*(new GlobalWindows()) //
>>
>>         .triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1)))
>> //
>>
>>         .accumulatingFiredPanes();
>>
>>
>> PCollectionView<Map<String, Iterable<TreeMap<Long,EventState>>>>
>> sensorStates =
>>
>> ....
>>
>>
>> Where I have an entry to this map for every sensor, and the TreeMap has
>> an entry for every new state (in the last 15') + 1 we need the previous one
>> in case we didn't receive state changes in the last 15'.
>>
>>
>> 2) Then, the solution aggregates values based on the sideInput
>>
>>
>> Window<KV<String, SensorEvent>> mainInputWindow = FixedWindow of an hour
>> with allowed lateness 15' and accumulating fired panes.
>>
>>
>> The solution is not working as I expected in this scenario (row order is
>> processing time order):
>>
>>
>> Timestamp Sensor Type Value Output Expected
>>
>> t0 s1 new-state A s1 {A: 0} s1 {A: 0}
>>
>> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10}
>>
>> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30}
>>
>> t2 s1 new-state B No output s1 {A:10 B:20}
>>
>>
>> I assumed that a new fire in the 'sideInput' would force a new fire in
>> the 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput
>> producing the new state, and then the mainInput will be recomputed
>> producing the expected value. Is my assumption wrong?
>>
>>
>> Thank you
>>
>>
>>
>>
>

Reply via email to