On Sun, Jun 5, 2016 at 3:16 PM, Stephan Ewen <se...@apache.org> wrote:

> You raised a good point. Fortunately, there should be a simply way to fix
> this.
>
> The Kafka Sunk Function should implement the "Checkpointed" interface. It
> will get a call to the "snapshotState()" method whenever a checkpoint
> happens. Inside that call, it should then sync on the callbacks, and only
> return once all have completed. It may return null (no need to store
> anything in the checkpoint).
>
> While the "Checkpointed" method has not returned, the checkpoint will not
> complete. That way, there will be a "synchronization" point per checkpoint.
>
> We can even improve this further in the future: The checkpoint method can
> return an async state handle. While the async state handle completes its
> "wait for callbacks" in the background (and only acks the checkpoint after
> that has complete), the sink function can continue processing.
>
> What do you think?
>

I opened FLINK-4027 <https://issues.apache.org/jira/browse/FLINK-4027> to
track the issue.

That seems like an acceptable solution.  Presumably an exception can be
raised in snapshotState() if there is a Kafka publishing error when calling
flush() on the Kafka producer, which will cause the checkpoint to fail.

I do wonder what sort of performance penalty using flush() will incur, as
it is a synchronous call.  I assume no other messages can be processed by
the sink while inside snapshotState().  In theory a sink could continue
processing messages, so long as it kept track of pending messages that
occurred before the barrier and responded to the snapshotState() call when
there no longer were any pending messages from before the barrier.

Reply via email to