Hi Luke,

Thank you for your reply.

I have a doubt about the approach you are suggesting. Is it not possible to
do a re-window without using a message queue?

I mean:

EventStream -> ParDo(GlobalWindow State and Timers) ->
apply(CalendarWindow) -> Additional processing

If that was possible, I think that there is a solution without timers,
because every time that a new event is ingested into the global window, I
would be in one of these scenarios:

* measurement event (get the state and write the event + state)
* new-state event (update the state based on the new sensor-state, and
resend all the affected events)

So every time that the sensor-state changes, I could send new records to
the downstream forcing the calendar window's triggers to be fired again.
I would require two states in the global window transformations, one for
the 'sensor-states' and another one for the events within the allowed
lateness.
I'm not sure that what I'm thinking is possible, surely there is a good
reason for the hacky approach that I'm not seeing here.

Thanks a lot for your help
Regards

On Fri, Oct 9, 2020 at 10:53 PM Luke Cwik <[email protected]> wrote:

> If you don't have any watermark based triggers then using the global
> window state and timers makes sense and you can rewindow into a different
> window after it. A hacky was to be able to rewindow into a different
> windowing strategy is to output the data to a message queue and ingest the
> data in the same pipeline. People have used this to create loops and work
> around some windowing issues like this.
>
> The graph would look like:
> EventStream -> ParDo(GlobalWindow State and Timers) -> ParDo(Output
> summary to Pubsub)
> Input summary from Pubsub -> Window.into(My favorite window strategy) ->
> Additional processing
>
> On Fri, Oct 9, 2020 at 1:35 PM Andrés Garagiola <[email protected]>
> wrote:
>
>> Hi Luke,
>>
>> Thanks for your answer.
>>
>> I was studying the state/timer approach. What doesn't convince me, is the
>> fact that I would have to use a global window in the main input, otherwise
>> I could lost some states when I don't receive state events for a while. By
>> using side inputs I keep two independents window strategies, global for
>> states but fixed (or whatever) for the measurements. Do you see other way
>> to overcome this?
>>
>> Regards
>>
>>
>>
>> On Fri, Oct 9, 2020, 7:38 PM Luke Cwik <[email protected]> wrote:
>>
>>> Only data along the main input edge causes that DoFn to be executed
>>> again. Side inputs don't cause main inputs to be reprocessed. The trigger
>>> on the side input controls when the side input data becomes available and
>>> is updated.
>>>
>>> You could choose to have a generator that produces an event on the main
>>> input every hour and you could use it to look at the side input and compute
>>> all the outputs that you want.
>>>
>>> I do think that a solution that uses state and timers would likely fit
>>> more naturally to solve the problem. This blog[1] is a good starting point.
>>>
>>> 1: https://beam.apache.org/blog/timely-processing/
>>>
>>> On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola <
>>> [email protected]> wrote:
>>>
>>>> Hi all,
>>>>
>>>>
>>>> I have a question regarding triggers in sideinput. I would try to
>>>> explain my doubt with an example.
>>>>
>>>> Suppose we have a data stream of sensor events as follow:
>>>>
>>>>    - id, timestamp, type, value
>>>>
>>>> Where:
>>>>
>>>>    - id is the sensor id
>>>>    - timestamp is the event timestamp
>>>>    - type could be some of this value (measurement, new-state)
>>>>    - value is a conditional field following this rule. If the type is
>>>>    'measurement' it is a real number with the measured value. If the type
>>>>    field is 'new-state' value it is a string with a new sensor state.
>>>>
>>>> We want to produce aggregated data in a time based way (for example by
>>>> an hour) as follow:
>>>>
>>>> Report:
>>>>
>>>> {
>>>>
>>>> 'timestamp: 'report timestamp',
>>>>
>>>> 'sensors': {
>>>>
>>>> 'sensorId1': {
>>>>
>>>> 'stateA': 'sum of measures in stateA',
>>>>
>>>> 'stateB': 'sum of measures in stateB',
>>>>
>>>> 'stateC': 'sum of measures in stateC',
>>>>
>>>> ....
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>> The state at a given timestamp X is the last known state of that sensor
>>>> at that moment. We could have late events (suppose max 15' late).
>>>>
>>>>
>>>> I thought this solution:
>>>>
>>>>
>>>> 1) Create a 'sideInput' with the states (in a global window)
>>>>
>>>>
>>>> Window<KV<String, SensorEvent>> sideInputWindow =
>>>>
>>>>     Window.<KV<String, SensorEvent>>*into*(new GlobalWindows()) //
>>>>
>>>>         .triggering(Repeatedly.*forever*(AfterPane.
>>>> *elementCountAtLeast*(1))) //
>>>>
>>>>         .accumulatingFiredPanes();
>>>>
>>>>
>>>> PCollectionView<Map<String, Iterable<TreeMap<Long,EventState>>>>
>>>> sensorStates =
>>>>
>>>> ....
>>>>
>>>>
>>>> Where I have an entry to this map for every sensor, and the TreeMap has
>>>> an entry for every new state (in the last 15') + 1 we need the previous one
>>>> in case we didn't receive state changes in the last 15'.
>>>>
>>>>
>>>> 2) Then, the solution aggregates values based on the sideInput
>>>>
>>>>
>>>> Window<KV<String, SensorEvent>> mainInputWindow = FixedWindow of an
>>>> hour with allowed lateness 15' and accumulating fired panes.
>>>>
>>>>
>>>> The solution is not working as I expected in this scenario (row order
>>>> is processing time order):
>>>>
>>>>
>>>> Timestamp Sensor Type Value Output Expected
>>>>
>>>> t0 s1 new-state A s1 {A: 0} s1 {A: 0}
>>>>
>>>> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10}
>>>>
>>>> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30}
>>>>
>>>> t2 s1 new-state B No output s1 {A:10 B:20}
>>>>
>>>>
>>>> I assumed that a new fire in the 'sideInput' would force a new fire in
>>>> the 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput
>>>> producing the new state, and then the mainInput will be recomputed
>>>> producing the expected value. Is my assumption wrong?
>>>>
>>>>
>>>> Thank you
>>>>
>>>>
>>>>
>>>>
>>>

Reply via email to