You raised a good point. Fortunately, there should be a simply way to fix this.
The Kafka Sunk Function should implement the "Checkpointed" interface. It will get a call to the "snapshotState()" method whenever a checkpoint happens. Inside that call, it should then sync on the callbacks, and only return once all have completed. It may return null (no need to store anything in the checkpoint). While the "Checkpointed" method has not returned, the checkpoint will not complete. That way, there will be a "synchronization" point per checkpoint. We can even improve this further in the future: The checkpoint method can return an async state handle. While the async state handle completes its "wait for callbacks" in the background (and only acks the checkpoint after that has complete), the sink function can continue processing. What do you think? Stephan On Sat, Jun 4, 2016 at 4:05 AM, Elias Levy <fearsome.lucid...@gmail.com> wrote: > I am correct in assuming that the Kafka producer sink can lose message? > > I don't expect exactly-once semantics using Kafka as a sink given Kafka > publishing guarantees, but I do expect at least once. > > I gather from reading the source that the producer is publishing messages > asynchronously, as expected. And that a callback can record publishing > errors, which will be raised when detected. But as far as I can tell, > there is no barrier to wait for async errors from the sink when > checkpointing. Did I miss it? > > To do so you'd have to call the flush() method in KafkaProducer, which > would flush and block until all penny requests succeeded or failed. Given > that FlinkKafkaProducer09 is just a SinkFuntion, with a simple invoke() > method, there doesn't appear to be a way to ensure the sink has published > all pending writes successfully. > > To it seems like if a checkpoint occurs while there are pending publish > requests, and the requests return a failure after the checkpoint occurred, > those message will be lost as the checkpoint will consider them processed > by the sink. > > Seems as if there is an expectation that SinkFunction is synchronous. > Maybe there is a need for a AsyncSinkFunction interface with a method to > block until messages are flushed, or the Sink should keep track what > messages have been successfully published so that the information can be > used by the checkpointing system. > >