Hi Martin!

Let me try to follow-up some of your questions :)

a. When the acknowledgeIDs method is called, is it certain that all the rest of 
the operators, including the Sinks finished successfully? E.g: If I have a sink 
that writes to MySQL/Cassandra and one that writes to SQS/Kafka, will the 
writes to both of these systems have been completed successfully before 
acknowledgeIDs is called?
That is correct. The `MessageAcknowledgingSourceBase#acknowledgeIDs` is 
basically wrapped within a `notifyCheckpointComplete()` call. Checkpoints are 
only notified to be completed when all sinks have finished their snapshot for 
the checkpoint.

For sinks like Cassandra and Kafka, currently what the snapshot method does is 
flush all in-flight pending requests to write to the external system. So yes, 
you can be sure that the writes for the acknowledged IDs have been completed by 
all sinks.

b. Messages can be duplicated in case the processing takes longer than the 
queue timeout or if there are failures and Flink needs to recover. This is a 
problem for sinks that write to non-idempotent systems e.g. SQS, Kafka. What 
are the recommended approaches to avoid this duplication? Are there any example 
repos?
Duplication is a general problem for non-idempotent pipeline setups, and is not 
easy to avoid when the external sink does not support transactions. I’m not 
aware of any cookbook solutions for this.

a. Is there some better api for fine grained killing of various services, 
tasks, resources in a Flink cluster or job?
b. Can you point me to a repo with reliability tests for Flink i.e. where 
things are killed to see whether the system recovers etc?
The Flink Kafka consumer actually has quite a few tests for exactly-once 
guarantees. You can take a look at them here [1]. Specifically, take a look at 
the `testOneToOneSources`, `testOneSourceMultiplePartitions`, etc. tests. I 
think they are quite good examples of how to write tests for exactly-once 
testing.



- Gordon

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java



On 8 May 2017 at 1:08:44 PM, Martin Eden (martineden...@gmail.com) wrote:

Hi Kostas,

Thanks for pointing me in the right direction.

I have gone and extended MessageAcknowledgingSourceBase. It was quite easy to 
do.

I have however some follow-up questions about the guarantees it gives and 
testing my solution.

1. Guarantees:

Questions:
a. When the acknowledgeIDs method is called, is it certain that all the rest of 
the operators, including the Sinks finished successfully? E.g: If I have a sink 
that writes to MySQL/Cassandra and one that writes to SQS/Kafka, will the 
writes to both of these systems have been completed successfully before 
acknowledgeIDs is called?
b. Messages can be duplicated in case the processing takes longer than the 
queue timeout or if there are failures and Flink needs to recover. This is a 
problem for sinks that write to non-idempotent systems e.g. SQS, Kafka. What 
are the recommended approaches to avoid this duplication? Are there any example 
repos?

2. Testing:

Work done so far:
In order to convince myself that indeed this source is reliable, I wrote some 
integration tests. I used LocalFlinkMiniCluster which is quite nice. However 
when I tried to test what happens when I kill the TaskManager running the task 
that is executing my MessageAcknowledgingSourceBase I found it not so straight 
forward. I managed to get around it by starting the cluster and the job, 
getting all the task manager actor references, adding a new task manager to the 
cluster, killing the initial task managers by sending a poison pill actor msg. 
I had to kill all the initial task managers as I did not find a way to get a 
mapping between task running the source and the task manager actor to which it 
got assigned.

Questions:
a. Is there some better api for fine grained killing of various services, 
tasks, resources in a Flink cluster or job?
b. Can you point me to a repo with reliability tests for Flink i.e. where 
things are killed to see whether the system recovers etc?

Thanks,
M


On Tue, Apr 25, 2017 at 9:23 AM, Kostas Kloudas <k.klou...@data-artisans.com> 
wrote:
Hi Martin!

For an example of a source that acknowledges received messages, you could check 
the MessageAcknowledgingSourceBase
and the MultipleIdsMessageAcknowledgingSourceBase that ship with Flink. I hope 
this will give you some ideas.

Now for the Flink version on top of which to implement your source, I would 
suggest the Flink 1.3. The reason is that it will
come out soon (~1 month) and it will include a lot of new features and 
bug-fixes. Until then, it may change a bit, but the APIs
that you will be using, will not change.

So why not going straight for the more future-proof way?

Thanks,
Kostas

> On Apr 24, 2017, at 11:20 PM, Martin Eden <martineden...@gmail.com> wrote:
>
> Hi everyone,
>
> Are there any examples of how to implement a reliable (zero data loss) Flink 
> source reading from a system that is not replay-able but supports 
> acknowledging messages?
>
> Or any pointers of how one can achieve this and how Flink can help?
>
> I imagine it should involve a write ahead log but not yet clear of how to 
> implement it and how to integrate with the Flink fault tolerance mechanism. 
> Can Flink maintain the write ahead log for me?
>
> Also, does it make sense to start implementing this in the current stable 
> Flink release 1.2 or is there any advantage in implementing it directly in 
> Flink 1.3 since it is coming up soon anyway?
>
> Thanks,
> M


Reply via email to