Hi all,

I have a question regarding triggers in sideinput. I would try to explain
my doubt with an example.

Suppose we have a data stream of sensor events as follow:

   - id, timestamp, type, value

Where:

   - id is the sensor id
   - timestamp is the event timestamp
   - type could be some of this value (measurement, new-state)
   - value is a conditional field following this rule. If the type is
   'measurement' it is a real number with the measured value. If the type
   field is 'new-state' value it is a string with a new sensor state.

We want to produce aggregated data in a time based way (for example by an
hour) as follow:

Report:

{

'timestamp: 'report timestamp',

'sensors': {

'sensorId1': {

'stateA': 'sum of measures in stateA',

'stateB': 'sum of measures in stateB',

'stateC': 'sum of measures in stateC',

....

}

}

}

The state at a given timestamp X is the last known state of that sensor at
that moment. We could have late events (suppose max 15' late).


I thought this solution:


1) Create a 'sideInput' with the states (in a global window)


Window<KV<String, SensorEvent>> sideInputWindow =

    Window.<KV<String, SensorEvent>>*into*(new GlobalWindows()) //

        .triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1)))
//

        .accumulatingFiredPanes();


PCollectionView<Map<String, Iterable<TreeMap<Long,EventState>>>>
sensorStates =

....


Where I have an entry to this map for every sensor, and the TreeMap has an
entry for every new state (in the last 15') + 1 we need the previous one in
case we didn't receive state changes in the last 15'.


2) Then, the solution aggregates values based on the sideInput


Window<KV<String, SensorEvent>> mainInputWindow = FixedWindow of an hour
with allowed lateness 15' and accumulating fired panes.


The solution is not working as I expected in this scenario (row order is
processing time order):


Timestamp Sensor Type Value Output Expected

t0 s1 new-state A s1 {A: 0} s1 {A: 0}

t1 s1 measurement 10 s1 {A: 10} s1 {A: 10}

t3 s1 measurement 20 s1 {A: 30} s1 {A: 30}

t2 s1 new-state B No output s1 {A:10 B:20}


I assumed that a new fire in the 'sideInput' would force a new fire in the
'mainInput'. So when t2 arrives, it fires a trigger of the sideInput
producing the new state, and then the mainInput will be recomputed
producing the expected value. Is my assumption wrong?


Thank you

Reply via email to