Hi all,
I have a question regarding triggers in sideinput. I would try to explain
my doubt with an example.
Suppose we have a data stream of sensor events as follow:
- id, timestamp, type, value
Where:
- id is the sensor id
- timestamp is the event timestamp
- type could be some of this value (measurement, new-state)
- value is a conditional field following this rule. If the type is
'measurement' it is a real number with the measured value. If the type
field is 'new-state' value it is a string with a new sensor state.
We want to produce aggregated data in a time based way (for example by an
hour) as follow:
Report:
{
'timestamp: 'report timestamp',
'sensors': {
'sensorId1': {
'stateA': 'sum of measures in stateA',
'stateB': 'sum of measures in stateB',
'stateC': 'sum of measures in stateC',
....
}
}
}
The state at a given timestamp X is the last known state of that sensor at
that moment. We could have late events (suppose max 15' late).
I thought this solution:
1) Create a 'sideInput' with the states (in a global window)
Window<KV<String, SensorEvent>> sideInputWindow =
Window.<KV<String, SensorEvent>>*into*(new GlobalWindows()) //
.triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1)))
//
.accumulatingFiredPanes();
PCollectionView<Map<String, Iterable<TreeMap<Long,EventState>>>>
sensorStates =
....
Where I have an entry to this map for every sensor, and the TreeMap has an
entry for every new state (in the last 15') + 1 we need the previous one in
case we didn't receive state changes in the last 15'.
2) Then, the solution aggregates values based on the sideInput
Window<KV<String, SensorEvent>> mainInputWindow = FixedWindow of an hour
with allowed lateness 15' and accumulating fired panes.
The solution is not working as I expected in this scenario (row order is
processing time order):
Timestamp Sensor Type Value Output Expected
t0 s1 new-state A s1 {A: 0} s1 {A: 0}
t1 s1 measurement 10 s1 {A: 10} s1 {A: 10}
t3 s1 measurement 20 s1 {A: 30} s1 {A: 30}
t2 s1 new-state B No output s1 {A:10 B:20}
I assumed that a new fire in the 'sideInput' would force a new fire in the
'mainInput'. So when t2 arrives, it fires a trigger of the sideInput
producing the new state, and then the mainInput will be recomputed
producing the expected value. Is my assumption wrong?
Thank you