An alternative would be to use a FlatMapFunction with a ListState instead
of a window with custom trigger.
When a new element arrives (i.e., the flatMap() method is called), you
check if the value changed.
If the value did not changed, you append the element to the state.
If the value changed, you emit the current list state as a session, clear
the list, and insert the new element as the first to the list state.
However, you should keep in mind that this assumes that the order of
elements is preserved.
Flink ensures within a partition, i.e, as long as elements are not shuffled
and all operators run with the same parallelism.
2017-06-18 15:10 GMT+02:00 Jonas <jo...@huntun.de>:
> Hey Milad,
> since you cannot look into the future which element comes next, you have to
> "lag" one behind. This requires building an operator that creates 2-tuples
> from incoming elements containing (current-1, current), so basically a
> single value state that emits the last and the current element in a tuple.
> In a trigger, the element is then of the 2-tuple type and you can see
> changes "beforehand". The last element of 1's is then (1, 2).
> Hope this helps.
> View this message in context: http://apache-flink-user-
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.