Essentially what happens is the following:
in between checkpoints all incoming data is stored within the operator
state.
when a checkpoint-complete operation arrives, the data is read from the
operator state and written into kafka (or any system)
if the job fails while storing records in the state, the current state
is discarded and we go back to the previous one. since no data was
written yet, we fulfill exactly-once here.
if the job fails while data is being written into cassandra (it can't be
written as one atomic action) , some data will persist in cassandra, and
will be send again upon restart. in this case exactly-once is not fulfilled.
But we minimize the time-frame in which a failure causes exactly-once to
fail, which is pretty much as close as you can get without support from
kafka or others.
@Niels we discussed having a counter that tells us how much data was
written in between checkpoints. But this is currently not possible, an
operator can't update his state on the fly, so we would need something
new here.
And there would still be cases where even this would fail, for example
if the job fails after the message was sent, but before the ID was saved.
On 05.02.2016 13:55, Paris Carbone wrote:
This is not a bad take. It still makes a few assumptions
1) the output checkpoints the id of the last *known* ID that was
*persisted* in kafka (not just pushed)
2) we assume deterministic tuple order, as Stephan pointed out
On 05 Feb 2016, at 13:41, Niels Basjes <[email protected]
<mailto:[email protected]>> wrote:
Hi,
Buffering the data (in all cases) would hurt the latency so much that
Flink is effectively reverting to microbatching (where batch size is
checkpoint period) with regards of the output.
My initial thoughts on how to solve this was as follows:
1) The output persists the ID of the last message it wrote to Kafka
in the checkpoint.
2) Upon recovery the sink would
2a) Record the offset Kafka is at at that point in time
2b) For all 'new' messages validate if it must write this message by
reading from Kafka (starting at the offset in the checkpoint) and if
the message is already present it would skip it.
3) If a message arrives that has not yet written the message is
written. Under the assumption that the messages arrive in the same
order as before the sink can now simply run as normal.
This way the performance is only impacted in the (short) period after
the recovery of a disturbance.
What do you think?
Niels Basjes
On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <[email protected]
<mailto:[email protected]>> wrote:
Hi Niels!
In general, exactly once output requires transactional
cooperation from the target system. Kafka has that on the
roadmap, we should be able to integrate that once it is out.
That means output is "committed" upon completed checkpoints,
which guarantees nothing is written multiple times.
Chesnay is working on an interesting prototype as a generic
solution (also for Kafka, while they don't have that feature):
It buffers the data in the sink persistently (using the fault
tolerance state backends) and pushes the results out on
notification of a completed checkpoint.
That gives you exactly once semantics, but involves an extra
materialization of the data.
I think that there is actually a fundamental latency issue with
"exactly once sinks", no matter how you implement them in any
systems:
You can only commit once you are sure that everything went well,
to a specific point where you are sure no replay will ever be needed.
So the latency in Flink for an exactly-once output would be at
least the checkpoint interval.
I'm eager to hear your thoughts on this.
Greetings,
Stephan
On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <[email protected]
<mailto:[email protected]>> wrote:
Hi,
It is my understanding that the exactly-once semantics
regarding the input from Kafka is based on the checkpointing
in the source component retaining the offset where it was at
the checkpoint moment.
My question is how does that work for a sink? How can I make
sure that (in light of failures) each message that is read
from Kafka (my input) is written to Kafka (my output) exactly
once?
--
Best regards / Met vriendelijke groeten,
Niels Basjes
--
Best regards / Met vriendelijke groeten,
Niels Basjes
On 05.02.2016 13:49, Paris Carbone wrote:
From what I understood state on sinks is included in the operator state of the
sinks and pushed to kafka when 3-phase commit is complete.
i.e. when the checkpoint completion notification arrives at the sinks.
There are several pitfalls I am really curious to check and see how they are
(going to be) handled, this is of course not as simple as it sounds. It really
depends on the guarantees and operations the outside storage gives you. For
example, how can we know that the pushed records are actually persisted in
kafka in a single transaction? Not as simple as it sounds.
@Chesnay can you tell us more?
On 05 Feb 2016, at 13:33, Paris Carbone <[email protected]> wrote:
That would be good indeed. I just learned about it from Stephan mentioned. It
sounds correct to me along the lines but it would be nice to see the details.
On 05 Feb 2016, at 13:32, Ufuk Celebi <[email protected]> wrote:
On 05 Feb 2016, at 13:28, Paris Carbone <[email protected]> wrote:
Hi Gabor,
The sinks should aware that the global checkpoint is indeed persisted before
emitting so they will have to wait until they are notified for its completion
before pushing to Kafka. The current view of the local state is not the actual
persisted view so checking against is like relying on dirty state. Imagine the
following scenario:
1) sink pushes to kafka record k and updates local buffer for k
2) sink snapshots k and the rest of its state on checkpoint barrier
3) global checkpoint fails due to some reason (e.g. another sink subtask
failed) and the job gets restarted
4) sink pushes again record k to kafka since the last global snapshots did not
complete before and k is not in the local buffer
Chesnay’s approach seems to be valid and best effort for the time being.
Chesnay’s approach is not part of this thread. Can you or Chesnay
elaborate/provide a link?
– Ufuk