Thanks very kindly for your response, Stephan!

We will definitely use a custom sink for persistence of idempotent mutations 
whenever possible. Exposing state as read-only to external systems is a 
complication we will try to avoid. Also, we will definitely only write to the 
DB upon checkpoint, and the write will be synchronous and transactional (no 
possibility of partial success/failure).

However, we do want Flink state to be durable, we want it to be in memory when 
possible, and we want to avoid running out of memory due to the size of the 
state. For example, if you have a wide window that hasn't gotten an event for a 
long time, we want to evict that window state from memory. We're now thinking 
of using Redis (via AWS Elasticache) which also conveniently has TTL, instead 
of DynamoDB.

I just wanted to check whether eviction of (inactive/quiet) state from memory 
is something that I should consider implementing, or whether Flink already had 
some built-in way of doing it.

Along the same lines, I am also wondering whether Flink already has means of 
compacting the state of a window by applying an aggregation function to the 
elements so-far (eg. every time window is triggered)? For example, if you are 
only executing a sum on the contents of the window, the window state doesn't 
need to store all the individual items in the window, it only needs to store 
the sum. Aggregations other than "sum" might have that characteristic too. I 
don't know if Flink is already that intelligent or whether I should figure out 
how to aggregate window contents myself when possible with something like a 
window fold? Another poster (Aljoscha) was talking about adding incremental 
snapshots, but it sounds like that would only improve the write throughput not 
the memory usage.

Thanks again!
Shannon Carey


From: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>>
Date: Wednesday, April 6, 2016 at 10:37 PM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: State in external db (dynamodb)

Hi Shannon!

Welcome to the Flink community!

You are right, sinks need in general to be idempotent if you want 
"exactly-once" semantics, because there can be a replay of elements that were 
already written.

However, what you describe later, overwriting of a key with a new value (or the 
same value again) is pretty much sufficient. That means that when a duplicate 
write happens during replay, the value for the key is simply overwritten with 
the same value again.
As long as all computation is purely in Flink and you only write to the 
key/value store (rather than read from k/v, modify in Flink, write to k/v), you 
get the consistency that for example counts/aggregates never have duplicates.

If Flink needs to look up state from the database (because it is no longer in 
Flink), it is a bit more tricky. I assume that is where you are going with 
"Subsequently, when an event is processed, we must be able to quickly load up 
any evicted state".  In that case, there are two things you can do:

(1)  Only write to the DB upon a checkpoint, at which point it is known that no 
replay of that data will occur any more. Values from partially successful 
writes will be overwritten with correct value. I assume that is what you 
thought of when referring to the State Backend, because in some sense, that is 
what that state backend would do.

I think it is simpler to realize that in a custom sink, than developing a new 
state backend.  Another Flink committer (Chesnay) has developed some nice 
tooling for that, to be merged into Flink soon.

(2) You could attach version numbers to every write, and increment the versions 
upon each checkpoint. That allows you to always refer to a consistent previous 
value, if some writes were made, but a failure occurred before the checkpoint 
completed.

I hope these answers apply to your case. Let us know if some things are still 
unclear, or if I misunderstood your question!


Greetings,
Stephan



On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever 
<sanne.de.roe...@gmail.com<mailto:sanne.de.roe...@gmail.com>> wrote:
FYI Cassandra has a TTL on data: 
https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html

On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
Hi, new Flink user here!

I found a discussion on user@flink.apache.org<mailto:user@flink.apache.org> 
about using DynamoDB as a sink. However, as noted, sinks have an at-least-once 
guarantee so your operations must idempotent.

However, another way to go about this (and correct me if I'm wrong) is to write 
the state to the external store via a custom State Backend. Since the state 
participates in checkpointing, you don't have to worry about idempotency: every 
time state is checkpointed, overwrite the value of that key.

We are starting a project with Flink, and we are interested in evicting the 
state from memory once a TTL is reached during which no events have come in for 
that state. Subsequently, when an event is processed, we must be able to 
quickly load up any evicted state. Does this sound reasonable? We are 
considering using DynamoDB for our state backend because it seems like all we 
will need is a key-value store. The only weakness of this is that if state gets 
older than, say, 2 years we would like to get rid of it which might not be easy 
in DynamoDB. I don't suppose Flink has any behind-the-scenes features that deal 
with getting rid of old state (either evicting from memory or TTL/aging out 
entirely)?

Thanks for your time!
Shannon Carey


Reply via email to