Hi Mike,
if I'm not misunderstand, you are doing aggregation for every device on the 
stream. You mentioned that, you want to use the MapState to store the state for 
each device ID? this is a bit confusing to me, I think what you need maybe a 
ValueState. In flink, every keyed state(Value, MapState,...so on) is already 
scoped to the key that you keyed. For example,
source.keyBy(deviceId).process(processFunction);
if you keyBy the source by deviceId, then in processFunction every keyed state 
is scoped to the deviceID internally, you don't need to use the 
MapState<DeviceID, ?> to maintance the device state yourself.


Concerning to the TTL question. I think the tumbling windows & the per-window 
state is enough for you, than that is a better way to go currently.


Best,
Sihua
On 05/24/2018 13:46,Mike Urbach<mikeurb...@gmail.com> wrote:
Hi,


I have a two-part question related to processing and storing large amounts of 
time-series data. The first part is related to the preferred way to keep state 
on the time-series data in an efficient way, and the second part is about how 
to further enrich the processed data and feed it back into the state.


For the sake of discussion, let's say that I am tracking tens to hundreds of 
millions of IoT devices. This could grow but that's what I'm looking at right 
now. I will receive an initial event from each device, as well as an unknown 
number of subsequent events. I will need to aggregate together all the events 
related to one device for some period of time after the initial event, say 1 
hour, at which point I can discard the state. After that, I will never hear 
from that device again. (I'm not actually working with IoT devices, but that is 
the gist. At any given point in time, I will have millions of active keys, and 
as some keys expire new keys are added).


The output of my application should contain the full state for a given device, 
and a new output should be generated every time a new event comes in. This 
application must be fault tolerant. I am currently checkpointing my state using 
the RocksDB state backend.


Part 1 of my question is how best to manage this state. This sounds like an 
excellent use case for State TTL 
(https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively).
 Since this is still a pending feature under active discussion, I did some 
reading about how others have dealt with similar use-cases. What I gleaned 
boils down to this: naively storing everything in one large MapState keyed by 
device ID, and using Triggers to clear the state 1 hour after the initial event 
will lead to far to many Triggers to be efficient. 


An alternate approach is to bucket my devices into a far smaller amount of keys 
(not in the millions, maybe thousands), and maintain a MapState for each 
bucket. I can fire a Trigger every minute for every bucket, and iterate over 
the MapState to clear any state that has past its TTL.


A similar, alternative approach is to use tumbling Windows to achieve the same 
effect. Every incoming event has a copy of the timestamp of the initial event 
for that device (think of it as when the device came online), so I can use that 
for event time, and let the watermarks lag by 1 hour. The devices are bucketed 
into some fixed amount of keys like above, so I will have a Window for each 
bucket, for each time slice. The Window has a Trigger that eagerly fires and 
purges each element, and a ProcessWindowFunction updates a MapState using 
per-window state, so that when a Window expires I can clear the state. I am 
currently using this approach, since it uses Flink's own Windowing and 
per-window state to clear old data, rather than manually doing it with Triggers.


Other than waiting for the State TTL feature, is there a more efficient 
approach to maintain the aggregate state of all events related to one device, 
and output this every time a new event arrives?


Part 2 of my question relates to how I can enrich the state I have accumulated 
before generating outputs. I have some set of enrichments I'd like to do using 
AsyncFunctions to call out to external services. The issue is some enrichments 
require data that may never be present on any one event; I need to work with 
the stream of aggregated data described above to be able to make some of those 
calls. Furthermore, some enrichments might need the data added by other 
enrichments. I would like to feed the enriched data back into the state.


This initially sounded like a perfect use case for an IterativeStream, until I 
tried to use it and realized the only way to enable checkpointing was to force 
it using a feature that is deprecated in Flink 1.4.2. Is that approach a dead 
end? If checkpoints will never be supported for IterativeStream, I don't want 
to explore this route, but it would be nice if checkpointed IterativeStreams 
are on the roadmap, or at least a possibility.


Now I'm kind of stumped. The only way I can think of aggregating together all 
the state *before* applying enrichments, and feeding the enriched data back 
into that state *after* the enrichments is to sink the enriched data to Kafka 
or something, and then create a source that reads it back and feeds into the 
operator that keeps the state. That works, but I'd prefer to keep all the data 
flowing within the Flink application if possible. Are there other approaches to 
creating feedback loops that play well with fault tolerance and checkpoints?


I appreciate any suggestions related to the two points above.


Thanks,


Mike Urbach




--

Mike Urbach

Reply via email to