Thank you for the clarification Luke. I implemented this solution and it
works.

Regards

On Tue, Oct 13, 2020 at 5:57 PM Luke Cwik <[email protected]> wrote:

> Sorry for the confusion:
>
> "If you don't have any watermark based triggers then using the global
> window state and timers makes sense and you can rewindow into a different
> window after it."
> was meant to be a separate statement from
> "A hacky was to be able to rewindow into a different windowing strategy is
> to output the data to a message queue and ingest the data in the same
> pipeline. People have used this to create loops and work around some
> windowing issues like this."
>
> You only need the latter containing the message queue if your computations
> before the rewindowing rely on watermark based triggers.
>
> On Tue, Oct 13, 2020 at 7:49 AM Andrés Garagiola <
> [email protected]> wrote:
>
>> Hi Luke,
>>
>> Thank you for your reply.
>>
>> I have a doubt about the approach you are suggesting. Is it not possible
>> to do a re-window without using a message queue?
>>
>> I mean:
>>
>> EventStream -> ParDo(GlobalWindow State and Timers) ->
>> apply(CalendarWindow) -> Additional processing
>>
>> If that was possible, I think that there is a solution without timers,
>> because every time that a new event is ingested into the global window, I
>> would be in one of these scenarios:
>>
>> * measurement event (get the state and write the event + state)
>> * new-state event (update the state based on the new sensor-state, and
>> resend all the affected events)
>>
>> So every time that the sensor-state changes, I could send new records to
>> the downstream forcing the calendar window's triggers to be fired again.
>> I would require two states in the global window transformations, one for
>> the 'sensor-states' and another one for the events within the allowed
>> lateness.
>> I'm not sure that what I'm thinking is possible, surely there is a good
>> reason for the hacky approach that I'm not seeing here.
>>
>> Thanks a lot for your help
>> Regards
>>
>> On Fri, Oct 9, 2020 at 10:53 PM Luke Cwik <[email protected]> wrote:
>>
>>> If you don't have any watermark based triggers then using the global
>>> window state and timers makes sense and you can rewindow into a different
>>> window after it. A hacky was to be able to rewindow into a different
>>> windowing strategy is to output the data to a message queue and ingest the
>>> data in the same pipeline. People have used this to create loops and work
>>> around some windowing issues like this.
>>>
>>> The graph would look like:
>>> EventStream -> ParDo(GlobalWindow State and Timers) -> ParDo(Output
>>> summary to Pubsub)
>>> Input summary from Pubsub -> Window.into(My favorite window strategy) ->
>>> Additional processing
>>>
>>> On Fri, Oct 9, 2020 at 1:35 PM Andrés Garagiola <
>>> [email protected]> wrote:
>>>
>>>> Hi Luke,
>>>>
>>>> Thanks for your answer.
>>>>
>>>> I was studying the state/timer approach. What doesn't convince me, is
>>>> the fact that I would have to use a global window in the main input,
>>>> otherwise I could lost some states when I don't receive state events for a
>>>> while. By using side inputs I keep two independents window strategies,
>>>> global for states but fixed (or whatever) for the measurements. Do you see
>>>> other way to overcome this?
>>>>
>>>> Regards
>>>>
>>>>
>>>>
>>>> On Fri, Oct 9, 2020, 7:38 PM Luke Cwik <[email protected]> wrote:
>>>>
>>>>> Only data along the main input edge causes that DoFn to be executed
>>>>> again. Side inputs don't cause main inputs to be reprocessed. The trigger
>>>>> on the side input controls when the side input data becomes available and
>>>>> is updated.
>>>>>
>>>>> You could choose to have a generator that produces an event on the
>>>>> main input every hour and you could use it to look at the side input and
>>>>> compute all the outputs that you want.
>>>>>
>>>>> I do think that a solution that uses state and timers would likely fit
>>>>> more naturally to solve the problem. This blog[1] is a good starting 
>>>>> point.
>>>>>
>>>>> 1: https://beam.apache.org/blog/timely-processing/
>>>>>
>>>>> On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>>
>>>>>> I have a question regarding triggers in sideinput. I would try to
>>>>>> explain my doubt with an example.
>>>>>>
>>>>>> Suppose we have a data stream of sensor events as follow:
>>>>>>
>>>>>>    - id, timestamp, type, value
>>>>>>
>>>>>> Where:
>>>>>>
>>>>>>    - id is the sensor id
>>>>>>    - timestamp is the event timestamp
>>>>>>    - type could be some of this value (measurement, new-state)
>>>>>>    - value is a conditional field following this rule. If the type
>>>>>>    is 'measurement' it is a real number with the measured value. If the 
>>>>>> type
>>>>>>    field is 'new-state' value it is a string with a new sensor state.
>>>>>>
>>>>>> We want to produce aggregated data in a time based way (for example
>>>>>> by an hour) as follow:
>>>>>>
>>>>>> Report:
>>>>>>
>>>>>> {
>>>>>>
>>>>>> 'timestamp: 'report timestamp',
>>>>>>
>>>>>> 'sensors': {
>>>>>>
>>>>>> 'sensorId1': {
>>>>>>
>>>>>> 'stateA': 'sum of measures in stateA',
>>>>>>
>>>>>> 'stateB': 'sum of measures in stateB',
>>>>>>
>>>>>> 'stateC': 'sum of measures in stateC',
>>>>>>
>>>>>> ....
>>>>>>
>>>>>> }
>>>>>>
>>>>>> }
>>>>>>
>>>>>> }
>>>>>>
>>>>>> The state at a given timestamp X is the last known state of that
>>>>>> sensor at that moment. We could have late events (suppose max 15' late).
>>>>>>
>>>>>>
>>>>>> I thought this solution:
>>>>>>
>>>>>>
>>>>>> 1) Create a 'sideInput' with the states (in a global window)
>>>>>>
>>>>>>
>>>>>> Window<KV<String, SensorEvent>> sideInputWindow =
>>>>>>
>>>>>>     Window.<KV<String, SensorEvent>>*into*(new GlobalWindows()) //
>>>>>>
>>>>>>         .triggering(Repeatedly.*forever*(AfterPane.
>>>>>> *elementCountAtLeast*(1))) //
>>>>>>
>>>>>>         .accumulatingFiredPanes();
>>>>>>
>>>>>>
>>>>>> PCollectionView<Map<String, Iterable<TreeMap<Long,EventState>>>>
>>>>>> sensorStates =
>>>>>>
>>>>>> ....
>>>>>>
>>>>>>
>>>>>> Where I have an entry to this map for every sensor, and the TreeMap
>>>>>> has an entry for every new state (in the last 15') + 1 we need the 
>>>>>> previous
>>>>>> one in case we didn't receive state changes in the last 15'.
>>>>>>
>>>>>>
>>>>>> 2) Then, the solution aggregates values based on the sideInput
>>>>>>
>>>>>>
>>>>>> Window<KV<String, SensorEvent>> mainInputWindow = FixedWindow of an
>>>>>> hour with allowed lateness 15' and accumulating fired panes.
>>>>>>
>>>>>>
>>>>>> The solution is not working as I expected in this scenario (row order
>>>>>> is processing time order):
>>>>>>
>>>>>>
>>>>>> Timestamp Sensor Type Value Output Expected
>>>>>>
>>>>>> t0 s1 new-state A s1 {A: 0} s1 {A: 0}
>>>>>>
>>>>>> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10}
>>>>>>
>>>>>> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30}
>>>>>>
>>>>>> t2 s1 new-state B No output s1 {A:10 B:20}
>>>>>>
>>>>>>
>>>>>> I assumed that a new fire in the 'sideInput' would force a new fire
>>>>>> in the 'mainInput'. So when t2 arrives, it fires a trigger of the 
>>>>>> sideInput
>>>>>> producing the new state, and then the mainInput will be recomputed
>>>>>> producing the expected value. Is my assumption wrong?
>>>>>>
>>>>>>
>>>>>> Thank you
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>

Reply via email to