If you don't have any watermark based triggers then using the global window
state and timers makes sense and you can rewindow into a different window
after it. A hacky was to be able to rewindow into a different windowing
strategy is to output the data to a message queue and ingest the data in
the same pipeline. People have used this to create loops and work around
some windowing issues like this.

The graph would look like:
EventStream -> ParDo(GlobalWindow State and Timers) -> ParDo(Output summary
to Pubsub)
Input summary from Pubsub -> Window.into(My favorite window strategy) ->
Additional processing

On Fri, Oct 9, 2020 at 1:35 PM Andrés Garagiola <[email protected]>
wrote:

> Hi Luke,
>
> Thanks for your answer.
>
> I was studying the state/timer approach. What doesn't convince me, is the
> fact that I would have to use a global window in the main input, otherwise
> I could lost some states when I don't receive state events for a while. By
> using side inputs I keep two independents window strategies, global for
> states but fixed (or whatever) for the measurements. Do you see other way
> to overcome this?
>
> Regards
>
>
>
> On Fri, Oct 9, 2020, 7:38 PM Luke Cwik <[email protected]> wrote:
>
>> Only data along the main input edge causes that DoFn to be executed
>> again. Side inputs don't cause main inputs to be reprocessed. The trigger
>> on the side input controls when the side input data becomes available and
>> is updated.
>>
>> You could choose to have a generator that produces an event on the main
>> input every hour and you could use it to look at the side input and compute
>> all the outputs that you want.
>>
>> I do think that a solution that uses state and timers would likely fit
>> more naturally to solve the problem. This blog[1] is a good starting point.
>>
>> 1: https://beam.apache.org/blog/timely-processing/
>>
>> On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola <
>> [email protected]> wrote:
>>
>>> Hi all,
>>>
>>>
>>> I have a question regarding triggers in sideinput. I would try to
>>> explain my doubt with an example.
>>>
>>> Suppose we have a data stream of sensor events as follow:
>>>
>>>    - id, timestamp, type, value
>>>
>>> Where:
>>>
>>>    - id is the sensor id
>>>    - timestamp is the event timestamp
>>>    - type could be some of this value (measurement, new-state)
>>>    - value is a conditional field following this rule. If the type is
>>>    'measurement' it is a real number with the measured value. If the type
>>>    field is 'new-state' value it is a string with a new sensor state.
>>>
>>> We want to produce aggregated data in a time based way (for example by
>>> an hour) as follow:
>>>
>>> Report:
>>>
>>> {
>>>
>>> 'timestamp: 'report timestamp',
>>>
>>> 'sensors': {
>>>
>>> 'sensorId1': {
>>>
>>> 'stateA': 'sum of measures in stateA',
>>>
>>> 'stateB': 'sum of measures in stateB',
>>>
>>> 'stateC': 'sum of measures in stateC',
>>>
>>> ....
>>>
>>> }
>>>
>>> }
>>>
>>> }
>>>
>>> The state at a given timestamp X is the last known state of that sensor
>>> at that moment. We could have late events (suppose max 15' late).
>>>
>>>
>>> I thought this solution:
>>>
>>>
>>> 1) Create a 'sideInput' with the states (in a global window)
>>>
>>>
>>> Window<KV<String, SensorEvent>> sideInputWindow =
>>>
>>>     Window.<KV<String, SensorEvent>>*into*(new GlobalWindows()) //
>>>
>>>         
>>> .triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1)))
>>> //
>>>
>>>         .accumulatingFiredPanes();
>>>
>>>
>>> PCollectionView<Map<String, Iterable<TreeMap<Long,EventState>>>>
>>> sensorStates =
>>>
>>> ....
>>>
>>>
>>> Where I have an entry to this map for every sensor, and the TreeMap has
>>> an entry for every new state (in the last 15') + 1 we need the previous one
>>> in case we didn't receive state changes in the last 15'.
>>>
>>>
>>> 2) Then, the solution aggregates values based on the sideInput
>>>
>>>
>>> Window<KV<String, SensorEvent>> mainInputWindow = FixedWindow of an hour
>>> with allowed lateness 15' and accumulating fired panes.
>>>
>>>
>>> The solution is not working as I expected in this scenario (row order is
>>> processing time order):
>>>
>>>
>>> Timestamp Sensor Type Value Output Expected
>>>
>>> t0 s1 new-state A s1 {A: 0} s1 {A: 0}
>>>
>>> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10}
>>>
>>> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30}
>>>
>>> t2 s1 new-state B No output s1 {A:10 B:20}
>>>
>>>
>>> I assumed that a new fire in the 'sideInput' would force a new fire in
>>> the 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput
>>> producing the new state, and then the mainInput will be recomputed
>>> producing the expected value. Is my assumption wrong?
>>>
>>>
>>> Thank you
>>>
>>>
>>>
>>>
>>

Reply via email to