Hi Luke, Thank you for your reply.
I have a doubt about the approach you are suggesting. Is it not possible to do a re-window without using a message queue? I mean: EventStream -> ParDo(GlobalWindow State and Timers) -> apply(CalendarWindow) -> Additional processing If that was possible, I think that there is a solution without timers, because every time that a new event is ingested into the global window, I would be in one of these scenarios: * measurement event (get the state and write the event + state) * new-state event (update the state based on the new sensor-state, and resend all the affected events) So every time that the sensor-state changes, I could send new records to the downstream forcing the calendar window's triggers to be fired again. I would require two states in the global window transformations, one for the 'sensor-states' and another one for the events within the allowed lateness. I'm not sure that what I'm thinking is possible, surely there is a good reason for the hacky approach that I'm not seeing here. Thanks a lot for your help Regards On Fri, Oct 9, 2020 at 10:53 PM Luke Cwik <[email protected]> wrote: > If you don't have any watermark based triggers then using the global > window state and timers makes sense and you can rewindow into a different > window after it. A hacky was to be able to rewindow into a different > windowing strategy is to output the data to a message queue and ingest the > data in the same pipeline. People have used this to create loops and work > around some windowing issues like this. > > The graph would look like: > EventStream -> ParDo(GlobalWindow State and Timers) -> ParDo(Output > summary to Pubsub) > Input summary from Pubsub -> Window.into(My favorite window strategy) -> > Additional processing > > On Fri, Oct 9, 2020 at 1:35 PM Andrés Garagiola <[email protected]> > wrote: > >> Hi Luke, >> >> Thanks for your answer. >> >> I was studying the state/timer approach. What doesn't convince me, is the >> fact that I would have to use a global window in the main input, otherwise >> I could lost some states when I don't receive state events for a while. By >> using side inputs I keep two independents window strategies, global for >> states but fixed (or whatever) for the measurements. Do you see other way >> to overcome this? >> >> Regards >> >> >> >> On Fri, Oct 9, 2020, 7:38 PM Luke Cwik <[email protected]> wrote: >> >>> Only data along the main input edge causes that DoFn to be executed >>> again. Side inputs don't cause main inputs to be reprocessed. The trigger >>> on the side input controls when the side input data becomes available and >>> is updated. >>> >>> You could choose to have a generator that produces an event on the main >>> input every hour and you could use it to look at the side input and compute >>> all the outputs that you want. >>> >>> I do think that a solution that uses state and timers would likely fit >>> more naturally to solve the problem. This blog[1] is a good starting point. >>> >>> 1: https://beam.apache.org/blog/timely-processing/ >>> >>> On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola < >>> [email protected]> wrote: >>> >>>> Hi all, >>>> >>>> >>>> I have a question regarding triggers in sideinput. I would try to >>>> explain my doubt with an example. >>>> >>>> Suppose we have a data stream of sensor events as follow: >>>> >>>> - id, timestamp, type, value >>>> >>>> Where: >>>> >>>> - id is the sensor id >>>> - timestamp is the event timestamp >>>> - type could be some of this value (measurement, new-state) >>>> - value is a conditional field following this rule. If the type is >>>> 'measurement' it is a real number with the measured value. If the type >>>> field is 'new-state' value it is a string with a new sensor state. >>>> >>>> We want to produce aggregated data in a time based way (for example by >>>> an hour) as follow: >>>> >>>> Report: >>>> >>>> { >>>> >>>> 'timestamp: 'report timestamp', >>>> >>>> 'sensors': { >>>> >>>> 'sensorId1': { >>>> >>>> 'stateA': 'sum of measures in stateA', >>>> >>>> 'stateB': 'sum of measures in stateB', >>>> >>>> 'stateC': 'sum of measures in stateC', >>>> >>>> .... >>>> >>>> } >>>> >>>> } >>>> >>>> } >>>> >>>> The state at a given timestamp X is the last known state of that sensor >>>> at that moment. We could have late events (suppose max 15' late). >>>> >>>> >>>> I thought this solution: >>>> >>>> >>>> 1) Create a 'sideInput' with the states (in a global window) >>>> >>>> >>>> Window<KV<String, SensorEvent>> sideInputWindow = >>>> >>>> Window.<KV<String, SensorEvent>>*into*(new GlobalWindows()) // >>>> >>>> .triggering(Repeatedly.*forever*(AfterPane. >>>> *elementCountAtLeast*(1))) // >>>> >>>> .accumulatingFiredPanes(); >>>> >>>> >>>> PCollectionView<Map<String, Iterable<TreeMap<Long,EventState>>>> >>>> sensorStates = >>>> >>>> .... >>>> >>>> >>>> Where I have an entry to this map for every sensor, and the TreeMap has >>>> an entry for every new state (in the last 15') + 1 we need the previous one >>>> in case we didn't receive state changes in the last 15'. >>>> >>>> >>>> 2) Then, the solution aggregates values based on the sideInput >>>> >>>> >>>> Window<KV<String, SensorEvent>> mainInputWindow = FixedWindow of an >>>> hour with allowed lateness 15' and accumulating fired panes. >>>> >>>> >>>> The solution is not working as I expected in this scenario (row order >>>> is processing time order): >>>> >>>> >>>> Timestamp Sensor Type Value Output Expected >>>> >>>> t0 s1 new-state A s1 {A: 0} s1 {A: 0} >>>> >>>> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10} >>>> >>>> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30} >>>> >>>> t2 s1 new-state B No output s1 {A:10 B:20} >>>> >>>> >>>> I assumed that a new fire in the 'sideInput' would force a new fire in >>>> the 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput >>>> producing the new state, and then the mainInput will be recomputed >>>> producing the expected value. Is my assumption wrong? >>>> >>>> >>>> Thank you >>>> >>>> >>>> >>>> >>>
