Essentially what happens is the following:

in between checkpoints all incoming data is stored within the operator state.

when a checkpoint-complete operation arrives, the data is read from the operator state and written into kafka (or any system)

if the job fails while storing records in the state, the current state is discarded and we go back to the previous one. since no data was written yet, we fulfill exactly-once here. if the job fails while data is being written into cassandra (it can't be written as one atomic action) , some data will persist in cassandra, and will be send again upon restart. in this case exactly-once is not fulfilled. But we minimize the time-frame in which a failure causes exactly-once to fail, which is pretty much as close as you can get without support from kafka or others.

@Niels we discussed having a counter that tells us how much data was written in between checkpoints. But this is currently not possible, an operator can't update his state on the fly, so we would need something new here. And there would still be cases where even this would fail, for example if the job fails after the message was sent, but before the ID was saved.

On 05.02.2016 13:55, Paris Carbone wrote:
This is not a bad take. It still makes a few assumptions

1) the output checkpoints the id of the last *known* ID that was *persisted* in kafka (not just pushed)
2) we assume deterministic tuple order, as Stephan pointed out

On 05 Feb 2016, at 13:41, Niels Basjes <[email protected] <mailto:[email protected]>> wrote:

Hi,

Buffering the data (in all cases) would hurt the latency so much that Flink is effectively reverting to microbatching (where batch size is checkpoint period) with regards of the output.

My initial thoughts on how to solve this was as follows:
1) The output persists the ID of the last message it wrote to Kafka in the checkpoint.
2) Upon recovery the sink would
2a) Record the offset Kafka is at at that point in time
2b) For all 'new' messages validate if it must write this message by reading from Kafka (starting at the offset in the checkpoint) and if the message is already present it would skip it. 3) If a message arrives that has not yet written the message is written. Under the assumption that the messages arrive in the same order as before the sink can now simply run as normal.

This way the performance is only impacted in the (short) period after the recovery of a disturbance.

What do you think?

Niels Basjes



On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <[email protected] <mailto:[email protected]>> wrote:

    Hi Niels!

    In general, exactly once output requires transactional
    cooperation from the target system. Kafka has that on the
    roadmap, we should be able to integrate that once it is out.
    That means output is "committed" upon completed checkpoints,
    which guarantees nothing is written multiple times.

    Chesnay is working on an interesting prototype as a generic
    solution (also for Kafka, while they don't have that feature):
    It buffers the data in the sink persistently (using the fault
    tolerance state backends) and pushes the results out on
    notification of a completed checkpoint.
    That gives you exactly once semantics, but involves an extra
    materialization of the data.


    I think that there is actually a fundamental latency issue with
    "exactly once sinks", no matter how you implement them in any
    systems:
    You can only commit once you are sure that everything went well,
    to a specific point where you are sure no replay will ever be needed.

    So the latency in Flink for an exactly-once output would be at
    least the checkpoint interval.

    I'm eager to hear your thoughts on this.

    Greetings,
    Stephan


    On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <[email protected]
    <mailto:[email protected]>> wrote:

        Hi,

        It is my understanding that the exactly-once semantics
        regarding the input from Kafka is based on the checkpointing
        in the source component retaining the offset where it was at
        the checkpoint moment.

        My question is how does that work for a sink? How can I make
        sure that (in light of failures) each message that is read
        from Kafka (my input) is written to Kafka (my output) exactly
        once?


-- Best regards / Met vriendelijke groeten,

        Niels Basjes





--
Best regards / Met vriendelijke groeten,

Niels Basjes




On 05.02.2016 13:49, Paris Carbone wrote:
 From what I understood state on sinks is included in the operator state of the 
sinks and pushed to kafka when 3-phase commit is complete.
i.e. when the checkpoint completion notification arrives at the sinks.

There are several pitfalls I am really curious to check and see how they are 
(going to be) handled, this is of course not as simple as it sounds. It really 
depends on the guarantees and operations the outside storage gives you. For 
example, how can we know that the pushed records are actually persisted in 
kafka in a single transaction? Not as simple as it sounds.

@Chesnay can you tell us more?

On 05 Feb 2016, at 13:33, Paris Carbone <[email protected]> wrote:

That would be good indeed. I just learned about it from Stephan mentioned. It 
sounds correct to me along the lines but it would be nice to see the details.

On 05 Feb 2016, at 13:32, Ufuk Celebi <[email protected]> wrote:


On 05 Feb 2016, at 13:28, Paris Carbone <[email protected]> wrote:

Hi Gabor,

The sinks should aware that the global checkpoint is indeed persisted before 
emitting so they will have to wait until they are notified for its completion 
before pushing to Kafka. The current view of the local state is not the actual 
persisted view so checking against is like relying on dirty state. Imagine the 
following scenario:

1) sink pushes to kafka record k and updates local buffer for k
2) sink snapshots k and the rest of its state on checkpoint barrier
3) global checkpoint fails due to some reason (e.g. another sink subtask 
failed) and the job gets restarted
4) sink pushes again record k to kafka since the last global snapshots did not 
complete before and k is not in the local buffer

Chesnay’s approach seems to be valid and best effort for the time being.
Chesnay’s approach is not part of this thread. Can you or Chesnay 
elaborate/provide a link?

– Ufuk


Reply via email to