I have been playing around with Flink for a few weeks to try to
ascertain whether or not it meets our use cases, and also what best
practices we should be following. I have a few questions I would
appreciate answers to.
Our scenario is that we want to process a lot of event data into
cases. A case is an inorder sequence of events; this event data could
be quite old. We never know when a case is complete, so we just want
to have the most up to date picture of what a case looks like.
The inorder sequence of events of a case is called the trace. Many
cases could have an identical trace. We would like to construct these
traces, and do some aggregations on those (case count, average/min/max
We then have further downstream processing we will do on a case, some
of which would require additional inputs, either from side-inputs of
somehow joining data sources.
We don’t really care about event time at the moment, because we just
want to build cases and traces with all the data we have received.
The end results should be available for our web front end via rest api.
Based on the above I have the following idea for a first implementation:
Kafka source -> key by case id -> session window with rocks db state
backend holding case for that key -> postgres sink
The reason for a session window is that, as I mentioned above, we just
want to build a group with all the data we have received into kafka up
until that point in time. We would experiment with what this gap time
should be, and in future it might be specific to the type of group,
but for the start a naive approach is acceptable. I think this could
be better than just doing it, say, every 10 minutes because we really
don’t know yet the frequency of the data received. Also, some inputs
to kafka come directly from a CSV upload, so we will get “firehose”
periods, and periods of nothing.
In short: I think what we have closely matches session behaviour.
We also have to implement a postgres sink that is capable of doing
upserts. The reason for postgres is to service the rest front end.
We then have to build our traces and can see two options for it:
1) The most obvious solution would be to use a kafka sink for the
keyed case stream, and to do the trace aggregations in a downstream
flink job with this kafka topic as a source. However, I have some
concerns over losing any data (i.e. how do we know whether or not an
event has been successfully pushed into the kafka stream).
2) Another approach might be to use some other type of sink (perhaps
postgres), and to use this as a source for the traces job. This would
help us guarantee data consistency.
3) Or, to somehow re-merge the keyed cases stream (is this a broadcast?), so:
Keyed cases stream -> broadcast -> key by tracehash with rocks db
state backend holding trace for that tracehash -> perform
aggregrations -> postgres sink
Is broadcast an option here? How costly is it?
Which of these approaches (or any other), would you recommend?
Another question regarding the state:
As we never know when a case is complete this means that the rocksdb
backend could grow infinitely (!). Obviously we would need to get a
bit smarter here.
Is using state in this way a somewhat standard practice, or is state
intended more for recovery?
Managing growing state: I found some discussion regarding how to clear
which references https://issues.apache.org/jira/browse/FLINK-3946