If you don't have any watermark based triggers then using the global window state and timers makes sense and you can rewindow into a different window after it. A hacky was to be able to rewindow into a different windowing strategy is to output the data to a message queue and ingest the data in the same pipeline. People have used this to create loops and work around some windowing issues like this.
The graph would look like: EventStream -> ParDo(GlobalWindow State and Timers) -> ParDo(Output summary to Pubsub) Input summary from Pubsub -> Window.into(My favorite window strategy) -> Additional processing On Fri, Oct 9, 2020 at 1:35 PM Andrés Garagiola <[email protected]> wrote: > Hi Luke, > > Thanks for your answer. > > I was studying the state/timer approach. What doesn't convince me, is the > fact that I would have to use a global window in the main input, otherwise > I could lost some states when I don't receive state events for a while. By > using side inputs I keep two independents window strategies, global for > states but fixed (or whatever) for the measurements. Do you see other way > to overcome this? > > Regards > > > > On Fri, Oct 9, 2020, 7:38 PM Luke Cwik <[email protected]> wrote: > >> Only data along the main input edge causes that DoFn to be executed >> again. Side inputs don't cause main inputs to be reprocessed. The trigger >> on the side input controls when the side input data becomes available and >> is updated. >> >> You could choose to have a generator that produces an event on the main >> input every hour and you could use it to look at the side input and compute >> all the outputs that you want. >> >> I do think that a solution that uses state and timers would likely fit >> more naturally to solve the problem. This blog[1] is a good starting point. >> >> 1: https://beam.apache.org/blog/timely-processing/ >> >> On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola < >> [email protected]> wrote: >> >>> Hi all, >>> >>> >>> I have a question regarding triggers in sideinput. I would try to >>> explain my doubt with an example. >>> >>> Suppose we have a data stream of sensor events as follow: >>> >>> - id, timestamp, type, value >>> >>> Where: >>> >>> - id is the sensor id >>> - timestamp is the event timestamp >>> - type could be some of this value (measurement, new-state) >>> - value is a conditional field following this rule. If the type is >>> 'measurement' it is a real number with the measured value. If the type >>> field is 'new-state' value it is a string with a new sensor state. >>> >>> We want to produce aggregated data in a time based way (for example by >>> an hour) as follow: >>> >>> Report: >>> >>> { >>> >>> 'timestamp: 'report timestamp', >>> >>> 'sensors': { >>> >>> 'sensorId1': { >>> >>> 'stateA': 'sum of measures in stateA', >>> >>> 'stateB': 'sum of measures in stateB', >>> >>> 'stateC': 'sum of measures in stateC', >>> >>> .... >>> >>> } >>> >>> } >>> >>> } >>> >>> The state at a given timestamp X is the last known state of that sensor >>> at that moment. We could have late events (suppose max 15' late). >>> >>> >>> I thought this solution: >>> >>> >>> 1) Create a 'sideInput' with the states (in a global window) >>> >>> >>> Window<KV<String, SensorEvent>> sideInputWindow = >>> >>> Window.<KV<String, SensorEvent>>*into*(new GlobalWindows()) // >>> >>> >>> .triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))) >>> // >>> >>> .accumulatingFiredPanes(); >>> >>> >>> PCollectionView<Map<String, Iterable<TreeMap<Long,EventState>>>> >>> sensorStates = >>> >>> .... >>> >>> >>> Where I have an entry to this map for every sensor, and the TreeMap has >>> an entry for every new state (in the last 15') + 1 we need the previous one >>> in case we didn't receive state changes in the last 15'. >>> >>> >>> 2) Then, the solution aggregates values based on the sideInput >>> >>> >>> Window<KV<String, SensorEvent>> mainInputWindow = FixedWindow of an hour >>> with allowed lateness 15' and accumulating fired panes. >>> >>> >>> The solution is not working as I expected in this scenario (row order is >>> processing time order): >>> >>> >>> Timestamp Sensor Type Value Output Expected >>> >>> t0 s1 new-state A s1 {A: 0} s1 {A: 0} >>> >>> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10} >>> >>> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30} >>> >>> t2 s1 new-state B No output s1 {A:10 B:20} >>> >>> >>> I assumed that a new fire in the 'sideInput' would force a new fire in >>> the 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput >>> producing the new state, and then the mainInput will be recomputed >>> producing the expected value. Is my assumption wrong? >>> >>> >>> Thank you >>> >>> >>> >>> >>
