Hi Luke, Thanks for your answer.
I was studying the state/timer approach. What doesn't convince me, is the fact that I would have to use a global window in the main input, otherwise I could lost some states when I don't receive state events for a while. By using side inputs I keep two independents window strategies, global for states but fixed (or whatever) for the measurements. Do you see other way to overcome this? Regards On Fri, Oct 9, 2020, 7:38 PM Luke Cwik <[email protected]> wrote: > Only data along the main input edge causes that DoFn to be executed again. > Side inputs don't cause main inputs to be reprocessed. The trigger on the > side input controls when the side input data becomes available and is > updated. > > You could choose to have a generator that produces an event on the main > input every hour and you could use it to look at the side input and compute > all the outputs that you want. > > I do think that a solution that uses state and timers would likely fit > more naturally to solve the problem. This blog[1] is a good starting point. > > 1: https://beam.apache.org/blog/timely-processing/ > > On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola <[email protected]> > wrote: > >> Hi all, >> >> >> I have a question regarding triggers in sideinput. I would try to explain >> my doubt with an example. >> >> Suppose we have a data stream of sensor events as follow: >> >> - id, timestamp, type, value >> >> Where: >> >> - id is the sensor id >> - timestamp is the event timestamp >> - type could be some of this value (measurement, new-state) >> - value is a conditional field following this rule. If the type is >> 'measurement' it is a real number with the measured value. If the type >> field is 'new-state' value it is a string with a new sensor state. >> >> We want to produce aggregated data in a time based way (for example by an >> hour) as follow: >> >> Report: >> >> { >> >> 'timestamp: 'report timestamp', >> >> 'sensors': { >> >> 'sensorId1': { >> >> 'stateA': 'sum of measures in stateA', >> >> 'stateB': 'sum of measures in stateB', >> >> 'stateC': 'sum of measures in stateC', >> >> .... >> >> } >> >> } >> >> } >> >> The state at a given timestamp X is the last known state of that sensor >> at that moment. We could have late events (suppose max 15' late). >> >> >> I thought this solution: >> >> >> 1) Create a 'sideInput' with the states (in a global window) >> >> >> Window<KV<String, SensorEvent>> sideInputWindow = >> >> Window.<KV<String, SensorEvent>>*into*(new GlobalWindows()) // >> >> .triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))) >> // >> >> .accumulatingFiredPanes(); >> >> >> PCollectionView<Map<String, Iterable<TreeMap<Long,EventState>>>> >> sensorStates = >> >> .... >> >> >> Where I have an entry to this map for every sensor, and the TreeMap has >> an entry for every new state (in the last 15') + 1 we need the previous one >> in case we didn't receive state changes in the last 15'. >> >> >> 2) Then, the solution aggregates values based on the sideInput >> >> >> Window<KV<String, SensorEvent>> mainInputWindow = FixedWindow of an hour >> with allowed lateness 15' and accumulating fired panes. >> >> >> The solution is not working as I expected in this scenario (row order is >> processing time order): >> >> >> Timestamp Sensor Type Value Output Expected >> >> t0 s1 new-state A s1 {A: 0} s1 {A: 0} >> >> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10} >> >> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30} >> >> t2 s1 new-state B No output s1 {A:10 B:20} >> >> >> I assumed that a new fire in the 'sideInput' would force a new fire in >> the 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput >> producing the new state, and then the mainInput will be recomputed >> producing the expected value. Is my assumption wrong? >> >> >> Thank you >> >> >> >> >
