Hi,

I have a two-part question related to processing and storing large amounts
of time-series data. The first part is related to the preferred way to keep
state on the time-series data in an efficient way, and the second part is
about how to further enrich the processed data and feed it back into the
state.

For the sake of discussion, let's say that I am tracking tens to hundreds
of millions of IoT devices. This could grow but that's what I'm looking at
right now. I will receive an initial event from each device, as well as an
unknown number of subsequent events. I will need to aggregate together all
the events related to one device for some period of time after the initial
event, say 1 hour, at which point I can discard the state. After that, I
will never hear from that device again. (I'm not actually working with IoT
devices, but that is the gist. At any given point in time, I will have
millions of active keys, and as some keys expire new keys are added).

The output of my application should contain the full state for a given
device, and a new output should be generated every time a new event comes
in. This application must be fault tolerant. I am currently checkpointing
my state using the RocksDB state backend.

Part 1 of my question is how best to manage this state. This sounds like an
excellent use case for State TTL (https://cwiki.apache.org/
confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively). Since
this is still a pending feature under active discussion, I did some reading
about how others have dealt with similar use-cases. What I gleaned boils
down to this: naively storing everything in one large MapState keyed by
device ID, and using Triggers to clear the state 1 hour after the initial
event will lead to far to many Triggers to be efficient.

An alternate approach is to bucket my devices into a far smaller amount of
keys (not in the millions, maybe thousands), and maintain a MapState for
each bucket. I can fire a Trigger every minute for every bucket, and
iterate over the MapState to clear any state that has past its TTL.

A similar, alternative approach is to use tumbling Windows to achieve the
same effect. Every incoming event has a copy of the timestamp of the
initial event for that device (think of it as when the device came online),
so I can use that for event time, and let the watermarks lag by 1 hour. The
devices are bucketed into some fixed amount of keys like above, so I will
have a Window for each bucket, for each time slice. The Window has a
Trigger that eagerly fires and purges each element, and a
ProcessWindowFunction updates a MapState using per-window state, so that
when a Window expires I can clear the state. I am currently using this
approach, since it uses Flink's own Windowing and per-window state to clear
old data, rather than manually doing it with Triggers.

Other than waiting for the State TTL feature, is there a more efficient
approach to maintain the aggregate state of all events related to one
device, and output this every time a new event arrives?

Part 2 of my question relates to how I can enrich the state I have
accumulated before generating outputs. I have some set of enrichments I'd
like to do using AsyncFunctions to call out to external services. The issue
is some enrichments require data that may never be present on any one
event; I need to work with the stream of aggregated data described above to
be able to make some of those calls. Furthermore, some enrichments might
need the data added by other enrichments. I would like to feed the enriched
data back into the state.

This initially sounded like a perfect use case for an IterativeStream,
until I tried to use it and realized the only way to enable checkpointing
was to force it using a feature that is deprecated in Flink 1.4.2. Is that
approach a dead end? If checkpoints will never be supported for
IterativeStream, I don't want to explore this route, but it would be nice
if checkpointed IterativeStreams are on the roadmap, or at least a
possibility.

Now I'm kind of stumped. The only way I can think of aggregating together
all the state *before* applying enrichments, and feeding the enriched data
back into that state *after* the enrichments is to sink the enriched data
to Kafka or something, and then create a source that reads it back and
feeds into the operator that keeps the state. That works, but I'd prefer to
keep all the data flowing within the Flink application if possible. Are
there other approaches to creating feedback loops that play well with fault
tolerance and checkpoints?

I appreciate any suggestions related to the two points above.

Thanks,

Mike Urbach


-- 
Mike Urbach

Reply via email to