Thank you for the clarification Luke. I implemented this solution and it works.
Regards On Tue, Oct 13, 2020 at 5:57 PM Luke Cwik <[email protected]> wrote: > Sorry for the confusion: > > "If you don't have any watermark based triggers then using the global > window state and timers makes sense and you can rewindow into a different > window after it." > was meant to be a separate statement from > "A hacky was to be able to rewindow into a different windowing strategy is > to output the data to a message queue and ingest the data in the same > pipeline. People have used this to create loops and work around some > windowing issues like this." > > You only need the latter containing the message queue if your computations > before the rewindowing rely on watermark based triggers. > > On Tue, Oct 13, 2020 at 7:49 AM Andrés Garagiola < > [email protected]> wrote: > >> Hi Luke, >> >> Thank you for your reply. >> >> I have a doubt about the approach you are suggesting. Is it not possible >> to do a re-window without using a message queue? >> >> I mean: >> >> EventStream -> ParDo(GlobalWindow State and Timers) -> >> apply(CalendarWindow) -> Additional processing >> >> If that was possible, I think that there is a solution without timers, >> because every time that a new event is ingested into the global window, I >> would be in one of these scenarios: >> >> * measurement event (get the state and write the event + state) >> * new-state event (update the state based on the new sensor-state, and >> resend all the affected events) >> >> So every time that the sensor-state changes, I could send new records to >> the downstream forcing the calendar window's triggers to be fired again. >> I would require two states in the global window transformations, one for >> the 'sensor-states' and another one for the events within the allowed >> lateness. >> I'm not sure that what I'm thinking is possible, surely there is a good >> reason for the hacky approach that I'm not seeing here. >> >> Thanks a lot for your help >> Regards >> >> On Fri, Oct 9, 2020 at 10:53 PM Luke Cwik <[email protected]> wrote: >> >>> If you don't have any watermark based triggers then using the global >>> window state and timers makes sense and you can rewindow into a different >>> window after it. A hacky was to be able to rewindow into a different >>> windowing strategy is to output the data to a message queue and ingest the >>> data in the same pipeline. People have used this to create loops and work >>> around some windowing issues like this. >>> >>> The graph would look like: >>> EventStream -> ParDo(GlobalWindow State and Timers) -> ParDo(Output >>> summary to Pubsub) >>> Input summary from Pubsub -> Window.into(My favorite window strategy) -> >>> Additional processing >>> >>> On Fri, Oct 9, 2020 at 1:35 PM Andrés Garagiola < >>> [email protected]> wrote: >>> >>>> Hi Luke, >>>> >>>> Thanks for your answer. >>>> >>>> I was studying the state/timer approach. What doesn't convince me, is >>>> the fact that I would have to use a global window in the main input, >>>> otherwise I could lost some states when I don't receive state events for a >>>> while. By using side inputs I keep two independents window strategies, >>>> global for states but fixed (or whatever) for the measurements. Do you see >>>> other way to overcome this? >>>> >>>> Regards >>>> >>>> >>>> >>>> On Fri, Oct 9, 2020, 7:38 PM Luke Cwik <[email protected]> wrote: >>>> >>>>> Only data along the main input edge causes that DoFn to be executed >>>>> again. Side inputs don't cause main inputs to be reprocessed. The trigger >>>>> on the side input controls when the side input data becomes available and >>>>> is updated. >>>>> >>>>> You could choose to have a generator that produces an event on the >>>>> main input every hour and you could use it to look at the side input and >>>>> compute all the outputs that you want. >>>>> >>>>> I do think that a solution that uses state and timers would likely fit >>>>> more naturally to solve the problem. This blog[1] is a good starting >>>>> point. >>>>> >>>>> 1: https://beam.apache.org/blog/timely-processing/ >>>>> >>>>> On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> >>>>>> I have a question regarding triggers in sideinput. I would try to >>>>>> explain my doubt with an example. >>>>>> >>>>>> Suppose we have a data stream of sensor events as follow: >>>>>> >>>>>> - id, timestamp, type, value >>>>>> >>>>>> Where: >>>>>> >>>>>> - id is the sensor id >>>>>> - timestamp is the event timestamp >>>>>> - type could be some of this value (measurement, new-state) >>>>>> - value is a conditional field following this rule. If the type >>>>>> is 'measurement' it is a real number with the measured value. If the >>>>>> type >>>>>> field is 'new-state' value it is a string with a new sensor state. >>>>>> >>>>>> We want to produce aggregated data in a time based way (for example >>>>>> by an hour) as follow: >>>>>> >>>>>> Report: >>>>>> >>>>>> { >>>>>> >>>>>> 'timestamp: 'report timestamp', >>>>>> >>>>>> 'sensors': { >>>>>> >>>>>> 'sensorId1': { >>>>>> >>>>>> 'stateA': 'sum of measures in stateA', >>>>>> >>>>>> 'stateB': 'sum of measures in stateB', >>>>>> >>>>>> 'stateC': 'sum of measures in stateC', >>>>>> >>>>>> .... >>>>>> >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> The state at a given timestamp X is the last known state of that >>>>>> sensor at that moment. We could have late events (suppose max 15' late). >>>>>> >>>>>> >>>>>> I thought this solution: >>>>>> >>>>>> >>>>>> 1) Create a 'sideInput' with the states (in a global window) >>>>>> >>>>>> >>>>>> Window<KV<String, SensorEvent>> sideInputWindow = >>>>>> >>>>>> Window.<KV<String, SensorEvent>>*into*(new GlobalWindows()) // >>>>>> >>>>>> .triggering(Repeatedly.*forever*(AfterPane. >>>>>> *elementCountAtLeast*(1))) // >>>>>> >>>>>> .accumulatingFiredPanes(); >>>>>> >>>>>> >>>>>> PCollectionView<Map<String, Iterable<TreeMap<Long,EventState>>>> >>>>>> sensorStates = >>>>>> >>>>>> .... >>>>>> >>>>>> >>>>>> Where I have an entry to this map for every sensor, and the TreeMap >>>>>> has an entry for every new state (in the last 15') + 1 we need the >>>>>> previous >>>>>> one in case we didn't receive state changes in the last 15'. >>>>>> >>>>>> >>>>>> 2) Then, the solution aggregates values based on the sideInput >>>>>> >>>>>> >>>>>> Window<KV<String, SensorEvent>> mainInputWindow = FixedWindow of an >>>>>> hour with allowed lateness 15' and accumulating fired panes. >>>>>> >>>>>> >>>>>> The solution is not working as I expected in this scenario (row order >>>>>> is processing time order): >>>>>> >>>>>> >>>>>> Timestamp Sensor Type Value Output Expected >>>>>> >>>>>> t0 s1 new-state A s1 {A: 0} s1 {A: 0} >>>>>> >>>>>> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10} >>>>>> >>>>>> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30} >>>>>> >>>>>> t2 s1 new-state B No output s1 {A:10 B:20} >>>>>> >>>>>> >>>>>> I assumed that a new fire in the 'sideInput' would force a new fire >>>>>> in the 'mainInput'. So when t2 arrives, it fires a trigger of the >>>>>> sideInput >>>>>> producing the new state, and then the mainInput will be recomputed >>>>>> producing the expected value. Is my assumption wrong? >>>>>> >>>>>> >>>>>> Thank you >>>>>> >>>>>> >>>>>> >>>>>> >>>>>
