Only data along the main input edge causes that DoFn to be executed again.
Side inputs don't cause main inputs to be reprocessed. The trigger on the
side input controls when the side input data becomes available and is
updated.

You could choose to have a generator that produces an event on the main
input every hour and you could use it to look at the side input and compute
all the outputs that you want.

I do think that a solution that uses state and timers would likely fit more
naturally to solve the problem. This blog[1] is a good starting point.

1: https://beam.apache.org/blog/timely-processing/

On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola <[email protected]>
wrote:

> Hi all,
>
>
> I have a question regarding triggers in sideinput. I would try to explain
> my doubt with an example.
>
> Suppose we have a data stream of sensor events as follow:
>
>    - id, timestamp, type, value
>
> Where:
>
>    - id is the sensor id
>    - timestamp is the event timestamp
>    - type could be some of this value (measurement, new-state)
>    - value is a conditional field following this rule. If the type is
>    'measurement' it is a real number with the measured value. If the type
>    field is 'new-state' value it is a string with a new sensor state.
>
> We want to produce aggregated data in a time based way (for example by an
> hour) as follow:
>
> Report:
>
> {
>
> 'timestamp: 'report timestamp',
>
> 'sensors': {
>
> 'sensorId1': {
>
> 'stateA': 'sum of measures in stateA',
>
> 'stateB': 'sum of measures in stateB',
>
> 'stateC': 'sum of measures in stateC',
>
> ....
>
> }
>
> }
>
> }
>
> The state at a given timestamp X is the last known state of that sensor at
> that moment. We could have late events (suppose max 15' late).
>
>
> I thought this solution:
>
>
> 1) Create a 'sideInput' with the states (in a global window)
>
>
> Window<KV<String, SensorEvent>> sideInputWindow =
>
>     Window.<KV<String, SensorEvent>>*into*(new GlobalWindows()) //
>
>         .triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1)))
> //
>
>         .accumulatingFiredPanes();
>
>
> PCollectionView<Map<String, Iterable<TreeMap<Long,EventState>>>>
> sensorStates =
>
> ....
>
>
> Where I have an entry to this map for every sensor, and the TreeMap has an
> entry for every new state (in the last 15') + 1 we need the previous one in
> case we didn't receive state changes in the last 15'.
>
>
> 2) Then, the solution aggregates values based on the sideInput
>
>
> Window<KV<String, SensorEvent>> mainInputWindow = FixedWindow of an hour
> with allowed lateness 15' and accumulating fired panes.
>
>
> The solution is not working as I expected in this scenario (row order is
> processing time order):
>
>
> Timestamp Sensor Type Value Output Expected
>
> t0 s1 new-state A s1 {A: 0} s1 {A: 0}
>
> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10}
>
> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30}
>
> t2 s1 new-state B No output s1 {A:10 B:20}
>
>
> I assumed that a new fire in the 'sideInput' would force a new fire in the
> 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput
> producing the new state, and then the mainInput will be recomputed
> producing the expected value. Is my assumption wrong?
>
>
> Thank you
>
>
>
>

Reply via email to