Oops, seems that Stephan's email covers my answer plus the plans to provide transactional sinks :-)
On Thu, Aug 27, 2015 at 1:25 PM, Kostas Tzoumas <[email protected]> wrote: > Note that the definition of "exactly-once" means that records are > guaranteed to be processed exactly once by Flink operators, and thus state > updates to operator state happen exactly once (e.g., if C had a counter > that x1, x2, and x3 incremented, the counter would have a value of 3 and > not a value of 6). This is not specific to Flink, but the most accepted > definition, and applicable to all stream processing systems. The reason is > that the stream processor cannot by itself guarantee what happens to the > outside world (the outside world is in this case the data sink). > > See the docs ( > https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html > ): > > "Apache Flink offers a fault tolerance mechanism to consistently recover > the state of data streaming applications. The mechanism ensures that even > in the presence of failures, the program’s state will eventually reflect > every record from the data stream exactly once." > > Guaranteeing exactly once delivery to the sink is possible, as Marton > above suggests, but the sink implementation needs to be aware and take part > in the checkpointing mechanism. > > > On Thu, Aug 27, 2015 at 1:14 PM, Márton Balassi <[email protected]> > wrote: > >> Dear Zhangrucong, >> >> From your explanation it seems that you have a good general understanding >> of Flink's checkpointing algorithm. Your concern is valid, by default a >> sink C with emits tuples to the "outside world" potentially multiple times. >> A neat trick to solve this issue for your user defined sinks is to use the >> CheckpointNotifier interface to output records only after the corresponding >> checkpoint has been totally processed by the system, so sinks can also >> provid exactly once guarantees in Flink. >> >> This would mean that your SinkFunction has to implement both the >> Checkpointed and the CheckpointNotifier interfaces. The idea is to mark the >> output tuples with the correspoding checkpoint id, so then they can be >> emitted in a "consistent" manner when the checkpoint is globally >> acknowledged by the system. You buffer your output records in a collection >> of your choice and whenever a snapshotState of the Checkpointed interface >> is invoked you mark your fresh output records with the current >> checkpointID. Whenever the notifyCheckpointComplete is invoked you emit >> records with the corresponding ID. >> >> Note that this adds latency to your processing and as you potentially >> need to checkpoint a lot of data in the sinks I would recommend to use a >> HDFS as a state backend instead of the default solution. >> >> Best, >> >> Marton >> >> On Thu, Aug 27, 2015 at 12:32 PM, Zhangrucong <[email protected]> >> wrote: >> >>> Hi: >>> >>> The document said Flink can guarantee processing each tuple >>> exactly-once, but I can not understand how it works. >>> >>> For example, In Fig 1, C is running between snapshot n-1 and snapshot >>> n(snapshot n hasn’t been generated). After snapshot n-1, C has processed >>> tuple x1, x2, x3 and already outputted to user, then C failed and it >>> recoveries from snapshot n-1. In my opinion, x1, x2, x3 will be processed >>> and outputted to user again. My question is how Flink guarantee x1,x2,x3 >>> are processed and outputted to user only once? >>> >>> >>> >>> >>> >>> Fig 1. >>> >>> Thanks for answing. >>> >> >> >
