Hi Elias! The concern you raised about the sink being synchronous is exactly what my last suggestion should address:
The internal state backends can return a handle that can do the sync in a background thread. The sink would continue processing messages, and the checkpoint would only be acknowledged after the background sync did complete. We should allow user code to return such a handle as well. We have to think about implications concerning message order, though... Greetings, Stephan On Mon, Jun 6, 2016 at 11:58 PM, Elias Levy <fearsome.lucid...@gmail.com> wrote: > On Sun, Jun 5, 2016 at 3:16 PM, Stephan Ewen <se...@apache.org> wrote: > >> You raised a good point. Fortunately, there should be a simply way to fix >> this. >> >> The Kafka Sunk Function should implement the "Checkpointed" interface. It >> will get a call to the "snapshotState()" method whenever a checkpoint >> happens. Inside that call, it should then sync on the callbacks, and only >> return once all have completed. It may return null (no need to store >> anything in the checkpoint). >> >> While the "Checkpointed" method has not returned, the checkpoint will not >> complete. That way, there will be a "synchronization" point per checkpoint. >> >> We can even improve this further in the future: The checkpoint method can >> return an async state handle. While the async state handle completes its >> "wait for callbacks" in the background (and only acks the checkpoint after >> that has complete), the sink function can continue processing. >> >> What do you think? >> > > I opened FLINK-4027 <https://issues.apache.org/jira/browse/FLINK-4027> to > track the issue. > > That seems like an acceptable solution. Presumably an exception can be > raised in snapshotState() if there is a Kafka publishing error when calling > flush() on the Kafka producer, which will cause the checkpoint to fail. > > I do wonder what sort of performance penalty using flush() will incur, as > it is a synchronous call. I assume no other messages can be processed by > the sink while inside snapshotState(). In theory a sink could continue > processing messages, so long as it kept track of pending messages that > occurred before the barrier and responded to the snapshotState() call when > there no longer were any pending messages from before the barrier. > >