flink exactly once semantics implementation internals

2023-10-09 Thread Enric Ott
Hi,Team:
Does Flink really discard replayed duplicated records via record timestamp as 
the paperLightweight Asynchronous Snapshots for Distributed Dataflows 
states "To achieve this we can follow a similar scheme to SDGs [5] and mark 
records with sequence numbers from the sources, thus, every downstream node can 
discard records with sequence numbers less than what they have processed 
already"? If so,which part of the Flink source confirmsthis?

Re: Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
Hey Yun,

Thanks for your quick response. Much appreciated. I have replied to your
answer on SO and I will continue with my doubts over there.

Thanks,
Sid

On Fri, Jan 7, 2022 at 9:05 PM Yun Gao  wrote:

> Hi Siddhesh,
>
> I answered on the stackoverflow and I also copied the answers here for
> reference:
>
> For the producer side, Flink Kafka Consumer would bookkeeper the current
> offset in the
>
> distributed checkpoint, and if the consumer task failed, it will restarted
> from the latest
>
> checkpoint and re-emit from the offset recorded in the checkpoint. For
> example, suppose
>
> the latest checkpoint records offset 3, and after that flink continue to
> emit 4, 5 and then
>
> failover, then Flink would continue to emit records from 4. Notes that
> this would not cause
>
> duplication since the state of all the operators are also fallback to the
> state after processed
>
> records 3.
>
>
> For the producer side, Flink use two-phase commit [1] to achieve
> exactly-once. Roughly
>
> Flink Producer would relies on Kafka's transaction to write data, and only
> commit data
>
> formally after the transaction is committed. Users could use
> Semantics.EXACTLY_ONCE
>
> to enable this functionality.
>
>
> We are warmly welcome for reaching to the community for help and very
> thanks
>
> everyone for participating in the community :) I think David and Martijn
> are also try to
>
> make we work together more efficiently. Very thanks for the understandings~
>
>
> Best,
>
> Yun
>
>
> [1]
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#fault-tolerance
>
>
>
> ----------
> From:Siddhesh Kalgaonkar 
> Send Time:2022 Jan. 7 (Fri.) 23:25
> To:Martijn Visser 
> Cc:"David Morávek" ; user 
> Subject:Re: Exactly Once Semantics
>
> Hi Martijn,
>
> Understood. If possible please help me out with the problem.
>
> Thanks,
> Sid
>
> On Fri, Jan 7, 2022 at 8:45 PM Martijn Visser 
> wrote:
> Hi Siddesh,
>
> The purpose of both Stackoverflow and the mailing list is to solve a
> question or a problem, the mailing list is not for getting attention. It
> equivalents crossposting, which we rather don't. As David mentioned, time
> is limited and we all try to spent it the best we can.
>
> Best regards,
>
> Martijn
>
> Op vr 7 jan. 2022 om 16:04 schreef Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com>
> Hi David,
>
> It's actually better in my opinion. Because people who are not aware of
> the ML thread can Google and check the SO posts when they come across any
> similar problems. The reason behind posting on ML is to get attention.
> Because few questions are unanswered for multiple days and since we are
> beginners, the only things which we have are SO and ML.  I won't say
> "Duplication" but more kind of "Availability of similar problems".
>
> It's okay if you don't want to help.
>
> Cheers!
>
> Sid
>
> On Fri, Jan 7, 2022 at 8:18 PM David Morávek  wrote:
> Hi Siddhesh,
>
> can you please focus your questions on one channel only? (either SO or the
> ML)
>
> this could lead to unnecessary work duplication (which would be shame,
> because the community has limited resources) as people answering on SO
> might not be aware of the ML thread
>
> D.
>
> On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
> I am trying to achieve exactly one semantics using Flink and Kafka. I have
> explained my scenario thoroughly in this post
>
> https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer
>
> Any help is much appreciated!
>
> Thanks,
> Sid
> --
>
> Martijn Visser | Product Manager
>
> mart...@ververica.com
>
> <https://www.ververica.com/>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
>
>


Re: Exactly Once Semantics

2022-01-07 Thread Yun Gao
Hi Siddhesh,

I answered on the stackoverflow and I also copied the answers here for 
reference:

For the producer side, Flink Kafka Consumer would bookkeeper the current offset 
in the 
distributed checkpoint, and if the consumer task failed, it will restarted from 
the latest
 checkpoint and re-emit from the offset recorded in the checkpoint. For 
example, suppose 
the latest checkpoint records offset 3, and after that flink continue to emit 
4, 5 and then 
failover, then Flink would continue to emit records from 4. Notes that this 
would not cause 
duplication since the state of all the operators are also fallback to the state 
after processed 
records 3.

For the producer side, Flink use two-phase commit [1] to achieve exactly-once. 
Roughly 
Flink Producer would relies on Kafka's transaction to write data, and only 
commit data 
formally after the transaction is committed. Users could use 
Semantics.EXACTLY_ONCE
to enable this functionality.

We are warmly welcome for reaching to the community for help and very thanks 
everyone for participating in the community :) I think David and Martijn are 
also try to 
make we work together more efficiently. Very thanks for the understandings~

Best,
Yun

[1] 
https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#fault-tolerance




--
From:Siddhesh Kalgaonkar 
Send Time:2022 Jan. 7 (Fri.) 23:25
To:Martijn Visser 
Cc:"David Morávek" ; user 
Subject:Re: Exactly Once Semantics

Hi Martijn,

Understood. If possible please help me out with the problem.

Thanks,
Sid
On Fri, Jan 7, 2022 at 8:45 PM Martijn Visser  wrote:

Hi Siddesh,

The purpose of both Stackoverflow and the mailing list is to solve a question 
or a problem, the mailing list is not for getting attention. It equivalents 
crossposting, which we rather don't. As David mentioned, time is limited and we 
all try to spent it the best we can.

Best regards,

Martijn

Op vr 7 jan. 2022 om 16:04 schreef Siddhesh Kalgaonkar 

Hi David,

It's actually better in my opinion. Because people who are not aware of the ML 
thread can Google and check the SO posts when they come across any similar 
problems. The reason behind posting on ML is to get attention. Because few 
questions are unanswered for multiple days and since we are beginners, the only 
things which we have are SO and ML.  I won't say "Duplication" but more kind of 
"Availability of similar problems".

It's okay if you don't want to help.  

Cheers!

Sid
On Fri, Jan 7, 2022 at 8:18 PM David Morávek  wrote:
Hi Siddhesh,

can you please focus your questions on one channel only? (either SO or the ML)

this could lead to unnecessary work duplication (which would be shame, because 
the community has limited resources) as people answering on SO might not be 
aware of the ML thread

D.

On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar 
 wrote:
I am trying to achieve exactly one semantics using Flink and Kafka. I have 
explained my scenario thoroughly in this post 
https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer

Any help is much appreciated!

Thanks,
Sid-- 

Martijn Visser | Product Manager
mart...@ververica.com


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time




Re: Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
Hi Martijn,

Understood. If possible please help me out with the problem.

Thanks,
Sid

On Fri, Jan 7, 2022 at 8:45 PM Martijn Visser  wrote:

> Hi Siddesh,
>
> The purpose of both Stackoverflow and the mailing list is to solve a
> question or a problem, the mailing list is not for getting attention. It
> equivalents crossposting, which we rather don't. As David mentioned, time
> is limited and we all try to spent it the best we can.
>
> Best regards,
>
> Martijn
>
> Op vr 7 jan. 2022 om 16:04 schreef Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com>
>
>> Hi David,
>>
>> It's actually better in my opinion. Because people who are not aware of
>> the ML thread can Google and check the SO posts when they come across any
>> similar problems. The reason behind posting on ML is to get attention.
>> Because few questions are unanswered for multiple days and since we are
>> beginners, the only things which we have are SO and ML.  I won't say
>> "Duplication" but more kind of "Availability of similar problems".
>>
>> It's okay if you don't want to help.
>>
>> Cheers!
>>
>> Sid
>>
>> On Fri, Jan 7, 2022 at 8:18 PM David Morávek  wrote:
>>
>>> Hi Siddhesh,
>>>
>>> can you please focus your questions on one channel only? (either SO or
>>> the ML)
>>>
>>> this could lead to unnecessary work duplication (which would be shame,
>>> because the community has limited resources) as people answering on SO
>>> might not be aware of the ML thread
>>>
>>> D.
>>>
>>> On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar <
>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>
 I am trying to achieve exactly one semantics using Flink and Kafka. I
 have explained my scenario thoroughly in this post

 https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer

 Any help is much appreciated!

 Thanks,
 Sid

>>> --
>
> Martijn Visser | Product Manager
>
> mart...@ververica.com
>
> 
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
>


Re: Exactly Once Semantics

2022-01-07 Thread David Morávek
Also please note that the Apache mailing lists are also indexed by search
engines and publicly archived [1].

[1] https://lists.apache.org/list.html?user@flink.apache.org

On Fri, Jan 7, 2022 at 4:15 PM Martijn Visser  wrote:

> Hi Siddesh,
>
> The purpose of both Stackoverflow and the mailing list is to solve a
> question or a problem, the mailing list is not for getting attention. It
> equivalents crossposting, which we rather don't. As David mentioned, time
> is limited and we all try to spent it the best we can.
>
> Best regards,
>
> Martijn
>
> Op vr 7 jan. 2022 om 16:04 schreef Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com>
>
>> Hi David,
>>
>> It's actually better in my opinion. Because people who are not aware of
>> the ML thread can Google and check the SO posts when they come across any
>> similar problems. The reason behind posting on ML is to get attention.
>> Because few questions are unanswered for multiple days and since we are
>> beginners, the only things which we have are SO and ML.  I won't say
>> "Duplication" but more kind of "Availability of similar problems".
>>
>> It's okay if you don't want to help.
>>
>> Cheers!
>>
>> Sid
>>
>> On Fri, Jan 7, 2022 at 8:18 PM David Morávek  wrote:
>>
>>> Hi Siddhesh,
>>>
>>> can you please focus your questions on one channel only? (either SO or
>>> the ML)
>>>
>>> this could lead to unnecessary work duplication (which would be shame,
>>> because the community has limited resources) as people answering on SO
>>> might not be aware of the ML thread
>>>
>>> D.
>>>
>>> On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar <
>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>
 I am trying to achieve exactly one semantics using Flink and Kafka. I
 have explained my scenario thoroughly in this post

 https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer

 Any help is much appreciated!

 Thanks,
 Sid

>>> --
>
> Martijn Visser | Product Manager
>
> mart...@ververica.com
>
> 
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
>


Re: Exactly Once Semantics

2022-01-07 Thread Martijn Visser
Hi Siddesh,

The purpose of both Stackoverflow and the mailing list is to solve a
question or a problem, the mailing list is not for getting attention. It
equivalents crossposting, which we rather don't. As David mentioned, time
is limited and we all try to spent it the best we can.

Best regards,

Martijn

Op vr 7 jan. 2022 om 16:04 schreef Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com>

> Hi David,
>
> It's actually better in my opinion. Because people who are not aware of
> the ML thread can Google and check the SO posts when they come across any
> similar problems. The reason behind posting on ML is to get attention.
> Because few questions are unanswered for multiple days and since we are
> beginners, the only things which we have are SO and ML.  I won't say
> "Duplication" but more kind of "Availability of similar problems".
>
> It's okay if you don't want to help.
>
> Cheers!
>
> Sid
>
> On Fri, Jan 7, 2022 at 8:18 PM David Morávek  wrote:
>
>> Hi Siddhesh,
>>
>> can you please focus your questions on one channel only? (either SO or
>> the ML)
>>
>> this could lead to unnecessary work duplication (which would be shame,
>> because the community has limited resources) as people answering on SO
>> might not be aware of the ML thread
>>
>> D.
>>
>> On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar <
>> kalgaonkarsiddh...@gmail.com> wrote:
>>
>>> I am trying to achieve exactly one semantics using Flink and Kafka. I
>>> have explained my scenario thoroughly in this post
>>>
>>> https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer
>>>
>>> Any help is much appreciated!
>>>
>>> Thanks,
>>> Sid
>>>
>> --

Martijn Visser | Product Manager

mart...@ververica.com




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time


Re: Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
Hi David,

It's actually better in my opinion. Because people who are not aware of the
ML thread can Google and check the SO posts when they come across any
similar problems. The reason behind posting on ML is to get attention.
Because few questions are unanswered for multiple days and since we are
beginners, the only things which we have are SO and ML.  I won't say
"Duplication" but more kind of "Availability of similar problems".

It's okay if you don't want to help.

Cheers!

Sid

On Fri, Jan 7, 2022 at 8:18 PM David Morávek  wrote:

> Hi Siddhesh,
>
> can you please focus your questions on one channel only? (either SO or the
> ML)
>
> this could lead to unnecessary work duplication (which would be shame,
> because the community has limited resources) as people answering on SO
> might not be aware of the ML thread
>
> D.
>
> On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
>
>> I am trying to achieve exactly one semantics using Flink and Kafka. I
>> have explained my scenario thoroughly in this post
>>
>> https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer
>>
>> Any help is much appreciated!
>>
>> Thanks,
>> Sid
>>
>


Re: Exactly Once Semantics

2022-01-07 Thread David Morávek
Hi Siddhesh,

can you please focus your questions on one channel only? (either SO or the
ML)

this could lead to unnecessary work duplication (which would be shame,
because the community has limited resources) as people answering on SO
might not be aware of the ML thread

D.

On Fri, Jan 7, 2022 at 3:02 PM Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com> wrote:

> I am trying to achieve exactly one semantics using Flink and Kafka. I have
> explained my scenario thoroughly in this post
>
> https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer
>
> Any help is much appreciated!
>
> Thanks,
> Sid
>


Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
I am trying to achieve exactly one semantics using Flink and Kafka. I have
explained my scenario thoroughly in this post
https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer

Any help is much appreciated!

Thanks,
Sid


Re: Exactly once semantics for hdfs sink

2020-02-12 Thread Khachatryan Roman
Hi Vishwas,

Please let me know if you have any specific questions about the
StreamingFile sink.

Regards,
Roman


On Wed, Feb 12, 2020 at 4:45 AM Zhijiang  wrote:

> Hi Vishwas,
>
> I guess this link [1] can help understand how it works and how to use in
> practice for StreamingFileSink.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
>
> Best,
> Zhijiang
>
> --
> From:Vishwas Siravara 
> Send Time:2020 Feb. 12 (Wed.) 05:19
> To:Khachatryan Roman 
> Cc:user 
> Subject:Re: Exactly once semantics for hdfs sink
>
> Hi Khachatryan,
> Thanks for your reply. Can you help me understand how it works with hdfs
> specifically , even a link to a document will help.
>
>
> Best,
> Vishwas
>
> On Mon, Feb 10, 2020 at 10:32 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
> Hi Vishwas,
>
> Yes, Streaming File Sink does support exactly-once semantics and can be
> used with HDFS.
>
> Regards,
> Roman
>
>
> On Mon, Feb 10, 2020 at 5:20 PM Vishwas Siravara 
> wrote:
> Hi all,
> I want to use the StreamingFile sink for writing data to hdfs. Can I
> achieve exactly once semantics with this sink ?
>
>
> Best,
> HW.
>
>
>


Re: Exactly once semantics for hdfs sink

2020-02-11 Thread Vishwas Siravara
Hi Khachatryan,
Thanks for your reply. Can you help me understand how it works with hdfs
specifically , even a link to a document will help.


Best,
Vishwas

On Mon, Feb 10, 2020 at 10:32 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Vishwas,
>
> Yes, Streaming File Sink does support exactly-once semantics and can be
> used with HDFS.
>
> Regards,
> Roman
>
>
> On Mon, Feb 10, 2020 at 5:20 PM Vishwas Siravara 
> wrote:
>
>> Hi all,
>> I want to use the StreamingFile sink for writing data to hdfs. Can I
>> achieve exactly once semantics with this sink ?
>>
>>
>> Best,
>> HW.
>>
>


Re: Exactly once semantics for hdfs sink

2020-02-10 Thread Khachatryan Roman
Hi Vishwas,

Yes, Streaming File Sink does support exactly-once semantics and can be
used with HDFS.

Regards,
Roman


On Mon, Feb 10, 2020 at 5:20 PM Vishwas Siravara  wrote:

> Hi all,
> I want to use the StreamingFile sink for writing data to hdfs. Can I
> achieve exactly once semantics with this sink ?
>
>
> Best,
> HW.
>


Exactly once semantics for hdfs sink

2020-02-10 Thread Vishwas Siravara
Hi all,
I want to use the StreamingFile sink for writing data to hdfs. Can I
achieve exactly once semantics with this sink ?


Best,
HW.


Re: Exactly-once semantics in Flink Kafka Producer

2019-08-02 Thread Mohammad Hosseinian
Hi Vasily,

I haven't tested the stare recovery under YARN setup. But in case of 
stand-alone Flink cluster setup, I needed to run the application with proper 
open-checkpoint recovery directory (whose name stars with 'chk-') passed as -s 
parameter value. This was the only way I could recover my application state 
from an ungraceful shutdown.

Hope that it helps, and would be glad if some one could suggest a better 
solution.

BR, Moe
On 02/08/2019 12:41, Vasily Melnik wrote:
Hi, Eduardo.
Maybe i should describe experiment design  precisely :
1) I run Flink on YARN (YARN Session method).
2) I do not stop/cancell application, i just kill TaskManager process
3) After that YARN creates another TaskManager Process and auto checkpoint 
restore from HDFS happens.

That's why i expect to see correct restore.


С уважением,
Василий Мельник

GlowByte Consulting<http://www.gbconsulting.ru/>

===

Моб. тел.: +7 (903) 101-43-71
vasily.mel...@glowbyteconsulting.com<mailto:vasily.mel...@glowbyteconsulting.com>


On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor 
mailto:eduardo.winpe...@gmail.com>> wrote:
Hi Vasily,

You're probably executing this from your IDE or from a local Flink cluster 
without starting your job from a checkpoint.

When you start your Flink job for the second time you need to specify the path 
to the latest checkpoint as an argument, otherwise Flink will start from 
scratch.

You're probably thinking that's not great, ideally Flink should be able to 
automatically continue from the last produced checkpoint, and actually that's 
what the docs say! Well, that's only when you're running in a proper cluster 
environment. Flink is able to recover using checkpoints when only part of the 
cluster fails, not when the whole job is stopped. For full stops you need to 
specify the checkpoint manually.

Hope that helps!


On Fri, 2 Aug 2019, 10:05 Vasily Melnik, 
mailto:vasily.mel...@glowbyteconsulting.com>>
 wrote:

I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source and 
Sink:

  1.  Run flink app, simply transferring messages from one topic to another 
with parallelism=1, checkpoint interval 20 seconds
  2.  Generate messages with incrementing integer numbers using Python script 
each 2 seconds.
  3.  Read output topic with console consumer in read_committed isolation level.
  4.  Manually kill TaskManager

I expect to see monotonically increasing integers in output topic regardless 
TaskManager killing and recovery.

But actually a see something unexpected in console-consumer output:

32
33
34
35
36
37
38
39
40
-- TaskManager Killed
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45


Looks like all messages between checkpoints where replayed in output topic. 
Also i expected to see results in output topic only after checkpointing i.e. 
each 20 seconds, but messages appeared in output immediately as they where send 
to input.
Is it supposed to be correct behaviour or i do something wrong?

Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional 
producer and read-committed console comsumer with custom code and it worked 
perfectly well reading messages only after commitTransaction on producer.

My Flink code:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.enableCheckpointing(2, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new 
RocksDBStateBackend("hdfs:///checkpoints-data"));

Properties producerProperty = new Properties();
producerProperty.setProperty("bootstrap.servers", ...);
producerProperty.setProperty("zookeeper.connect", ...);

producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"1");

producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
"true");

Properties consumerProperty = new Properties();
consumerProperty.setProperty("bootstrap.servers", ...);
consumerProperty.setProperty("zookeeper.connect", ...);
consumerProperty.setProperty("group.id<http://group.id>", "test2");

FlinkKafkaConsumer consumer1 = new 
FlinkKafkaConsumer("stringTopic1", new ComplexStringSchema(), 
consumerProperty);
consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());

FlinkKafkaProducer producer1 = new 
FlinkKafkaProducer("test",  new KeyedSerializationSchemaWrapper(new 
SimpleStringSchema()), producerProperty, 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
producer1.ignoreFailuresAfterTransactionTimeout();
DataStreamSource s1 = env.addSource(consumer1);
s1.addSink(p

Re: Exactly-once semantics in Flink Kafka Producer

2019-08-02 Thread Vasily Melnik
Hi, Maxim.
My console-consumer command:
kafka-console-consumer --zookeeper ... --topic test --from-beginning
--isolation-level read_committed
It works perfectly well with manually written kafka producer - it reads
data only after commitTransaction.

On Fri, 2 Aug 2019 at 14:19, Maxim Parkachov  wrote:

> Hi Vasily,
>
> as far as I know, by default console-consumer reads uncommited.
> Try setting  isolation.level to read_committed in console-consumer
> properties.
>
> Hope this helps,
> Maxim.
>
> On Fri, Aug 2, 2019 at 12:42 PM Vasily Melnik <
> vasily.mel...@glowbyteconsulting.com> wrote:
>
>> Hi, Eduardo.
>> Maybe i should describe experiment design  precisely :
>> 1) I run Flink on YARN (YARN Session method).
>> 2) I do not stop/cancell application, i just kill TaskManager process
>> 3) After that YARN creates another TaskManager Process and auto
>> checkpoint restore from HDFS happens.
>>
>> That's why i expect to see correct restore.
>>
>> С уважением,
>> Василий Мельник
>>
>> *Glow**Byte Consulting* <http://www.gbconsulting.ru/>
>>
>> ===
>>
>> Моб. тел.: +7 (903) 101-43-71
>> vasily.mel...@glowbyteconsulting.com
>>
>>
>> On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor <
>> eduardo.winpe...@gmail.com> wrote:
>>
>>> Hi Vasily,
>>>
>>> You're probably executing this from your IDE or from a local Flink
>>> cluster without starting your job from a checkpoint.
>>>
>>> When you start your Flink job for the second time you need to specify
>>> the path to the latest checkpoint as an argument, otherwise Flink will
>>> start from scratch.
>>>
>>> You're probably thinking that's not great, ideally Flink should be able
>>> to automatically continue from the last produced checkpoint, and actually
>>> that's what the docs say! Well, that's only when you're running in a proper
>>> cluster environment. Flink is able to recover using checkpoints when only
>>> part of the cluster fails, not when the whole job is stopped. For full
>>> stops you need to specify the checkpoint manually.
>>>
>>> Hope that helps!
>>>
>>>
>>> On Fri, 2 Aug 2019, 10:05 Vasily Melnik, <
>>> vasily.mel...@glowbyteconsulting.com> wrote:
>>>
>>>> I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source
>>>> and Sink:
>>>>
>>>>1. Run flink app, simply transferring messages from one topic to
>>>>another with parallelism=1, checkpoint interval 20 seconds
>>>>2. Generate messages with incrementing integer numbers using Python
>>>>script each 2 seconds.
>>>>3. Read output topic with console consumer in read_committed
>>>>isolation level.
>>>>4. Manually kill TaskManager
>>>>
>>>> I expect to see monotonically increasing integers in output topic
>>>> regardless TaskManager killing and recovery.
>>>>
>>>> But actually a see something unexpected in console-consumer output:
>>>>
>>>> 32
>>>> 33
>>>> 34
>>>> 35
>>>> 36
>>>> 37
>>>> 38
>>>> 39
>>>> 40
>>>> -- TaskManager Killed
>>>> 32
>>>> 34
>>>> 35
>>>> 36
>>>> 40
>>>> 41
>>>> 46
>>>> 31
>>>> 33
>>>> 37
>>>> 38
>>>> 39
>>>> 42
>>>> 43
>>>> 44
>>>> 45
>>>>
>>>> Looks like all messages between checkpoints where replayed in output
>>>> topic. Also i expected to see results in output topic only after
>>>> checkpointing i.e. each 20 seconds, but messages appeared in output
>>>> immediately as they where send to input.
>>>> Is it supposed to be correct behaviour or i do something wrong?
>>>>
>>>> Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional
>>>> producer and read-committed console comsumer with custom code and it worked
>>>> perfectly well reading messages only after commitTransaction on producer.
>>>>
>>>> My Flink code:
>>>>
>>>> StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>> env.getConfig(

Re: Exactly-once semantics in Flink Kafka Producer

2019-08-02 Thread Maxim Parkachov
Hi Vasily,

as far as I know, by default console-consumer reads uncommited.
Try setting  isolation.level to read_committed in console-consumer
properties.

Hope this helps,
Maxim.

On Fri, Aug 2, 2019 at 12:42 PM Vasily Melnik <
vasily.mel...@glowbyteconsulting.com> wrote:

> Hi, Eduardo.
> Maybe i should describe experiment design  precisely :
> 1) I run Flink on YARN (YARN Session method).
> 2) I do not stop/cancell application, i just kill TaskManager process
> 3) After that YARN creates another TaskManager Process and auto checkpoint
> restore from HDFS happens.
>
> That's why i expect to see correct restore.
>
> С уважением,
> Василий Мельник
>
> *Glow**Byte Consulting* <http://www.gbconsulting.ru/>
>
> ===
>
> Моб. тел.: +7 (903) 101-43-71
> vasily.mel...@glowbyteconsulting.com
>
>
> On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor <
> eduardo.winpe...@gmail.com> wrote:
>
>> Hi Vasily,
>>
>> You're probably executing this from your IDE or from a local Flink
>> cluster without starting your job from a checkpoint.
>>
>> When you start your Flink job for the second time you need to specify the
>> path to the latest checkpoint as an argument, otherwise Flink will start
>> from scratch.
>>
>> You're probably thinking that's not great, ideally Flink should be able
>> to automatically continue from the last produced checkpoint, and actually
>> that's what the docs say! Well, that's only when you're running in a proper
>> cluster environment. Flink is able to recover using checkpoints when only
>> part of the cluster fails, not when the whole job is stopped. For full
>> stops you need to specify the checkpoint manually.
>>
>> Hope that helps!
>>
>>
>> On Fri, 2 Aug 2019, 10:05 Vasily Melnik, <
>> vasily.mel...@glowbyteconsulting.com> wrote:
>>
>>> I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source
>>> and Sink:
>>>
>>>1. Run flink app, simply transferring messages from one topic to
>>>another with parallelism=1, checkpoint interval 20 seconds
>>>2. Generate messages with incrementing integer numbers using Python
>>>script each 2 seconds.
>>>3. Read output topic with console consumer in read_committed
>>>isolation level.
>>>4. Manually kill TaskManager
>>>
>>> I expect to see monotonically increasing integers in output topic
>>> regardless TaskManager killing and recovery.
>>>
>>> But actually a see something unexpected in console-consumer output:
>>>
>>> 32
>>> 33
>>> 34
>>> 35
>>> 36
>>> 37
>>> 38
>>> 39
>>> 40
>>> -- TaskManager Killed
>>> 32
>>> 34
>>> 35
>>> 36
>>> 40
>>> 41
>>> 46
>>> 31
>>> 33
>>> 37
>>> 38
>>> 39
>>> 42
>>> 43
>>> 44
>>> 45
>>>
>>> Looks like all messages between checkpoints where replayed in output
>>> topic. Also i expected to see results in output topic only after
>>> checkpointing i.e. each 20 seconds, but messages appeared in output
>>> immediately as they where send to input.
>>> Is it supposed to be correct behaviour or i do something wrong?
>>>
>>> Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional
>>> producer and read-committed console comsumer with custom code and it worked
>>> perfectly well reading messages only after commitTransaction on producer.
>>>
>>> My Flink code:
>>>
>>> StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> env.getConfig().setAutoWatermarkInterval(1000);
>>> env.enableCheckpointing(2, CheckpointingMode.EXACTLY_ONCE);
>>> env.setStateBackend(new 
>>> RocksDBStateBackend("hdfs:///checkpoints-data"));
>>>
>>> Properties producerProperty = new Properties();
>>> producerProperty.setProperty("bootstrap.servers", ...);
>>> producerProperty.setProperty("zookeeper.connect", ...);
>>> 
>>> producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"1");
>>> 
>>> producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
>>> 
>>>

Re: Exactly-once semantics in Flink Kafka Producer

2019-08-02 Thread Vasily Melnik
Hi, Eduardo.
Maybe i should describe experiment design  precisely :
1) I run Flink on YARN (YARN Session method).
2) I do not stop/cancell application, i just kill TaskManager process
3) After that YARN creates another TaskManager Process and auto checkpoint
restore from HDFS happens.

That's why i expect to see correct restore.

С уважением,
Василий Мельник

*Glow**Byte Consulting* <http://www.gbconsulting.ru/>

===

Моб. тел.: +7 (903) 101-43-71
vasily.mel...@glowbyteconsulting.com


On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor <
eduardo.winpe...@gmail.com> wrote:

> Hi Vasily,
>
> You're probably executing this from your IDE or from a local Flink cluster
> without starting your job from a checkpoint.
>
> When you start your Flink job for the second time you need to specify the
> path to the latest checkpoint as an argument, otherwise Flink will start
> from scratch.
>
> You're probably thinking that's not great, ideally Flink should be able to
> automatically continue from the last produced checkpoint, and actually
> that's what the docs say! Well, that's only when you're running in a proper
> cluster environment. Flink is able to recover using checkpoints when only
> part of the cluster fails, not when the whole job is stopped. For full
> stops you need to specify the checkpoint manually.
>
> Hope that helps!
>
>
> On Fri, 2 Aug 2019, 10:05 Vasily Melnik, <
> vasily.mel...@glowbyteconsulting.com> wrote:
>
>> I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source
>> and Sink:
>>
>>1. Run flink app, simply transferring messages from one topic to
>>another with parallelism=1, checkpoint interval 20 seconds
>>2. Generate messages with incrementing integer numbers using Python
>>script each 2 seconds.
>>3. Read output topic with console consumer in read_committed
>>isolation level.
>>4. Manually kill TaskManager
>>
>> I expect to see monotonically increasing integers in output topic
>> regardless TaskManager killing and recovery.
>>
>> But actually a see something unexpected in console-consumer output:
>>
>> 32
>> 33
>> 34
>> 35
>> 36
>> 37
>> 38
>> 39
>> 40
>> -- TaskManager Killed
>> 32
>> 34
>> 35
>> 36
>> 40
>> 41
>> 46
>> 31
>> 33
>> 37
>> 38
>> 39
>> 42
>> 43
>> 44
>> 45
>>
>> Looks like all messages between checkpoints where replayed in output
>> topic. Also i expected to see results in output topic only after
>> checkpointing i.e. each 20 seconds, but messages appeared in output
>> immediately as they where send to input.
>> Is it supposed to be correct behaviour or i do something wrong?
>>
>> Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional
>> producer and read-committed console comsumer with custom code and it worked
>> perfectly well reading messages only after commitTransaction on producer.
>>
>> My Flink code:
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.getConfig().setAutoWatermarkInterval(1000);
>> env.enableCheckpointing(2, CheckpointingMode.EXACTLY_ONCE);
>> env.setStateBackend(new 
>> RocksDBStateBackend("hdfs:///checkpoints-data"));
>>
>> Properties producerProperty = new Properties();
>> producerProperty.setProperty("bootstrap.servers", ...);
>> producerProperty.setProperty("zookeeper.connect", ...);
>> 
>> producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"1");
>> 
>> producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
>> 
>> producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
>> "true");
>>
>> Properties consumerProperty = new Properties();
>> consumerProperty.setProperty("bootstrap.servers", ...);
>> consumerProperty.setProperty("zookeeper.connect", ...);
>> consumerProperty.setProperty("group.id", "test2");
>>
>> FlinkKafkaConsumer consumer1 = new 
>> FlinkKafkaConsumer("stringTopic1", new ComplexStringSchema(), 
>> consumerProperty);
>> consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());
>>
>> FlinkKafkaProducer producer1 = new 
>> FlinkKafkaProducer("test",  new KeyedSerializationSchemaWrapper(new 
>> SimpleStringSchema()), producerProperty, 
>> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>> producer1.ignoreFailuresAfterTransactionTimeout();
>> DataStreamSource s1 = env.addSource(consumer1);
>> s1.addSink(producer1);
>> env.execute("Test");
>>
>>
>>


Re: Exactly-once semantics in Flink Kafka Producer

2019-08-02 Thread Eduardo Winpenny Tejedor
Hi Vasily,

You're probably executing this from your IDE or from a local Flink cluster
without starting your job from a checkpoint.

When you start your Flink job for the second time you need to specify the
path to the latest checkpoint as an argument, otherwise Flink will start
from scratch.

You're probably thinking that's not great, ideally Flink should be able to
automatically continue from the last produced checkpoint, and actually
that's what the docs say! Well, that's only when you're running in a proper
cluster environment. Flink is able to recover using checkpoints when only
part of the cluster fails, not when the whole job is stopped. For full
stops you need to specify the checkpoint manually.

Hope that helps!


On Fri, 2 Aug 2019, 10:05 Vasily Melnik, <
vasily.mel...@glowbyteconsulting.com> wrote:

> I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source
> and Sink:
>
>1. Run flink app, simply transferring messages from one topic to
>another with parallelism=1, checkpoint interval 20 seconds
>2. Generate messages with incrementing integer numbers using Python
>script each 2 seconds.
>3. Read output topic with console consumer in read_committed isolation
>level.
>4. Manually kill TaskManager
>
> I expect to see monotonically increasing integers in output topic
> regardless TaskManager killing and recovery.
>
> But actually a see something unexpected in console-consumer output:
>
> 32
> 33
> 34
> 35
> 36
> 37
> 38
> 39
> 40
> -- TaskManager Killed
> 32
> 34
> 35
> 36
> 40
> 41
> 46
> 31
> 33
> 37
> 38
> 39
> 42
> 43
> 44
> 45
>
> Looks like all messages between checkpoints where replayed in output
> topic. Also i expected to see results in output topic only after
> checkpointing i.e. each 20 seconds, but messages appeared in output
> immediately as they where send to input.
> Is it supposed to be correct behaviour or i do something wrong?
>
> Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional
> producer and read-committed console comsumer with custom code and it worked
> perfectly well reading messages only after commitTransaction on producer.
>
> My Flink code:
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.getConfig().setAutoWatermarkInterval(1000);
> env.enableCheckpointing(2, CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new 
> RocksDBStateBackend("hdfs:///checkpoints-data"));
>
> Properties producerProperty = new Properties();
> producerProperty.setProperty("bootstrap.servers", ...);
> producerProperty.setProperty("zookeeper.connect", ...);
> 
> producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"1");
> 
> producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
> 
> producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
> "true");
>
> Properties consumerProperty = new Properties();
> consumerProperty.setProperty("bootstrap.servers", ...);
> consumerProperty.setProperty("zookeeper.connect", ...);
> consumerProperty.setProperty("group.id", "test2");
>
> FlinkKafkaConsumer consumer1 = new 
> FlinkKafkaConsumer("stringTopic1", new ComplexStringSchema(), 
> consumerProperty);
> consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());
>
> FlinkKafkaProducer producer1 = new 
> FlinkKafkaProducer("test",  new KeyedSerializationSchemaWrapper(new 
> SimpleStringSchema()), producerProperty, 
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
> producer1.ignoreFailuresAfterTransactionTimeout();
> DataStreamSource s1 = env.addSource(consumer1);
> s1.addSink(producer1);
> env.execute("Test");
>
>
>


Exactly-once semantics in Flink Kafka Producer

2019-08-02 Thread Vasily Melnik
I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source and
Sink:

   1. Run flink app, simply transferring messages from one topic to another
   with parallelism=1, checkpoint interval 20 seconds
   2. Generate messages with incrementing integer numbers using Python
   script each 2 seconds.
   3. Read output topic with console consumer in read_committed isolation
   level.
   4. Manually kill TaskManager

I expect to see monotonically increasing integers in output topic
regardless TaskManager killing and recovery.

But actually a see something unexpected in console-consumer output:

32
33
34
35
36
37
38
39
40
-- TaskManager Killed
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45

Looks like all messages between checkpoints where replayed in output topic.
Also i expected to see results in output topic only after checkpointing
i.e. each 20 seconds, but messages appeared in output immediately as they
where send to input.
Is it supposed to be correct behaviour or i do something wrong?

Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional
producer and read-committed console comsumer with custom code and it worked
perfectly well reading messages only after commitTransaction on producer.

My Flink code:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.enableCheckpointing(2, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new
RocksDBStateBackend("hdfs:///checkpoints-data"));

Properties producerProperty = new Properties();
producerProperty.setProperty("bootstrap.servers", ...);
producerProperty.setProperty("zookeeper.connect", ...);

producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"1");

producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
"true");

Properties consumerProperty = new Properties();
consumerProperty.setProperty("bootstrap.servers", ...);
consumerProperty.setProperty("zookeeper.connect", ...);
consumerProperty.setProperty("group.id", "test2");

FlinkKafkaConsumer consumer1 = new
FlinkKafkaConsumer("stringTopic1", new ComplexStringSchema(),
consumerProperty);
consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());

FlinkKafkaProducer producer1 = new
FlinkKafkaProducer("test",  new
KeyedSerializationSchemaWrapper(new SimpleStringSchema()),
producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
producer1.ignoreFailuresAfterTransactionTimeout();
DataStreamSource s1 = env.addSource(consumer1);
s1.addSink(producer1);
env.execute("Test");


Re: End-to-end exactly-once semantics in simple streaming app

2019-04-08 Thread Piotr Nowojski
Hi,

Regarding the JDBC and Two-Phase commit (2PC) protocol. As Fabian mentioned it 
is not supported by the JDBC standard out of the box. With some workarounds I 
guess you could make it work by for example following one of the ideas:

1. Write records using JDBC with at-least-once semantics, by flushing the 
records on the checkpoint and then deduplicate the records, for example by 
defining some primary key in the table. Thanks to that you would get 
effectively-once semantic.

2. Use some kind of staging table. During the writing phase of 2PC, write 
records to the staging table with some checkpoint/snapshot id. On pre-commit in 
2PC make sure those are flushed. During commit phase of 2PC just re-write 
records from the staging table to the target/final one. You would need the 
checkpoint/snapshot it column if you want to handle more than one on-going 
checkpoint.

3. Modify the schema of your target table so that you can identify the 
individual records (like primary key) and add extra column “COMMIT_STATE”, with 
type of enum:
{PENDING, PRE-COMMITTED, COMMITTED, ABORTED} and use this column both in 2PC 
and when reading from the table (generally to ensure exactly-once semantic you 
must only read COMMITTED records and ignore the rest).

4. ???

Piotrek

> On 8 Apr 2019, at 12:16, Fabian Hueske  wrote:
> 
> Hi Patrick,
> 
> In general, you could also implement the 2PC logic in a regular operator. It 
> does not have to be a sink.
> You would need to add the logic of TwoPhaseCommitSinkFunction to your 
> operator.
> However, the TwoPhaseCommitSinkFunction does not work well with JDBC. The 
> problem is that you would need to recover / reopen a transaction after 
> recovery. I don't think that is possible with JDBC.
> There might be workarounds as writing to a separate table and atomically 
> moving all records to the actual table for a commit, but this would be a bit 
> of custom code.
> The GenericWriteAheadSink does not require to require to recover transactions 
> but stores all records that need to be committed in Flink state.
> 
> Regarding your questions:
> 
> > - How do you manage regular cleanup / deletion of old data, so the state 
> > does not grow big?
> With the coming version 1.8, Flink will support State TTL to remove state 
> that was not accessed for a certain period of time. You can also clean it up 
> manually, using timers.
> 
> > - When I have e.g. parallelism 3, and thus 3 instances of my .flatMap() 
> > lookup operator, they will not share the state, right? Thus they cannot 
> > access data which was processed by the other instances? This is going to be 
> > a problem.
> That is correct. State is sharded and cannot be accessed by remote tasks.
> 
> > - The flink application state seems volatile, I have to make 100% sure (by 
> > bash scripts and the like) that the application is never stopped/canceled 
> > without making a savepoint, and the restart must resume from the savepoint. 
> > Otherwise it will result in complete data loss worth weeks/months of data. 
> > E.g. if anyone by accident hits the > job “cancel” button in the flink UI, 
> > all data is lost. This seems pretty much like an operational risk, since 
> > this is going to be a 24/7 high availability application.
> > So maybe using only flink state is also not viable.
> You can use externalized checkpoints to ensure that the latest checkpoint is 
> always kept even if the application fails or is explicitly canceled.
> 
> Best,
> Fabian
> 
> Am Mo., 1. Apr. 2019 um 08:56 Uhr schrieb Patrick Fial  >:
> Hi,
> 
> thanks for your reply and sorry for the late response.
> 
> The problem is, I am unsure how I should implement the two-phase-commit 
> pattern, because my JDBC connection is within a .map()/.flatMap() operator, 
> and it is NOT a data sink. As written in my original question, my stream 
> setup is a simple, one-dimensional pipeline:
> 
> environment.addSource(consumer) 
>   .map(… idempotent transformations ...)
>   .map(new DatabaseFunction)
>   .map(… idempotent transformations ...)
>   .addSink(producer)
> 
> For the state / flink buffering, I am unsure if it would work even with a 
> composite key, since I must be able to do arbitrary database lookups (like 
> select .. from .. where x between y and z order by …). So I am pretty sure I 
> am bound to use a real database connection for the job. 
> So currently, I see no option to use flink states in this situation. Also, as 
> it seems, stream operators currently don’t support the two-phase-commit 
> protocol, I would have to do this with a sink, correct?
> What do you advise to do? Currently I am think about replacing flink with 
> another technology for this part of the application, since this 
> database-lookup logic does not seem to fit the idea of a streaming 
> application in general, thus flink might not be the best choice.
> 
> Another option maybe, lets assume I was able to redesign my application 
> (somehow, maybe I 

Re: End-to-end exactly-once semantics in simple streaming app

2019-04-08 Thread Fabian Hueske
Hi Patrick,

In general, you could also implement the 2PC logic in a regular operator.
It does not have to be a sink.
You would need to add the logic of TwoPhaseCommitSinkFunction to your
operator.
However, the TwoPhaseCommitSinkFunction does not work well with JDBC. The
problem is that you would need to recover / reopen a transaction after
recovery. I don't think that is possible with JDBC.
There might be workarounds as writing to a separate table and atomically
moving all records to the actual table for a commit, but this would be a
bit of custom code.
The GenericWriteAheadSink does not require to require to recover
transactions but stores all records that need to be committed in Flink
state.

Regarding your questions:

> - How do you manage regular cleanup / deletion of old data, so the state
does not grow big?
With the coming version 1.8, Flink will support State TTL to remove state
that was not accessed for a certain period of time. You can also clean it
up manually, using timers.

> - When I have e.g. parallelism 3, and thus 3 instances of my .flatMap()
lookup operator, they will not share the state, right? Thus they cannot
access data which was processed by the other instances? This is going to be
a problem.
That is correct. State is sharded and cannot be accessed by remote tasks.

> - The flink application state seems volatile, I have to make 100% sure
(by bash scripts and the like) that the application is never
stopped/canceled without making a savepoint, and the restart must resume
from the savepoint. Otherwise it will result in complete data loss worth
weeks/months of data. E.g. if anyone by accident hits the > job “cancel”
button in the flink UI, all data is lost. This seems pretty much like an
operational risk, since this is going to be a 24/7 high availability
application.
> So maybe using only flink state is also not viable.
You can use externalized checkpoints to ensure that the latest checkpoint
is always kept even if the application fails or is explicitly canceled.

Best,
Fabian

Am Mo., 1. Apr. 2019 um 08:56 Uhr schrieb Patrick Fial :

> Hi,
>
> thanks for your reply and sorry for the late response.
>
> The problem is, I am unsure how I should implement the two-phase-commit
> pattern, because my JDBC connection is within a .map()/.flatMap() operator,
> and it is NOT a data sink. As written in my original question, my stream
> setup is a simple, one-dimensional pipeline:
>
> environment.addSource(consumer)
>   .map(… idempotent transformations ...)
>   .map(new DatabaseFunction)
>   .map(… idempotent transformations ...)
>   .addSink(producer)
>
> For the state / flink buffering, I am unsure if it would work even with a
> composite key, since I must be able to do arbitrary database lookups (like
> select .. from .. where x between y and z order by …). So I am pretty sure
> I am bound to use a real database connection for the job.
> So currently, I see no option to use flink states in this situation. Also,
> as it seems, stream operators currently don’t support the two-phase-commit
> protocol, I would have to do this with a sink, correct?
> What do you advise to do? Currently I am think about replacing flink with
> another technology for this part of the application, since this
> database-lookup logic does not seem to fit the idea of a streaming
> application in general, thus flink might not be the best choice.
>
> Another option maybe, lets assume I was able to redesign my application
> (somehow, maybe I come up with something). Would you advise that I replace
> the oracle database *entirely* with a flink managed state?
>
> This would rise a couple of questions for me:
> - How do you manage regular cleanup / deletion of old data, so the state
> does not grow big?
> - When I have e.g. parallelism 3, and thus 3 instances of my .flatMap()
> lookup operator, they will not share the state, right? Thus they cannot
> access data which was processed by the other instances? This is going to be
> a problem.
> - The flink application state seems volatile, I have to make 100% sure (by
> bash scripts and the like) that the application is never stopped/canceled
> without making a savepoint, and the restart must resume from the savepoint.
> Otherwise it will result in complete data loss worth weeks/months of data.
> E.g. if anyone by accident hits the job “cancel” button in the flink UI,
> all data is lost. This seems pretty much like an operational risk, since
> this is going to be a 24/7 high availability application.
> So maybe using only flink state is also not viable.
>
> regards
> Patrick
>
> --
>
> *Patrick Fial*
>
> Client Platform Entwickler
>
> Information Design One AG
>
>
> Phone +49 69 244 502 38
>
> Web www.id1.de
>
>
>
> Information Design One AG, Baseler Straße 10, 60329 Frankfurt am Main
>
> Registereintrag: Amtsgericht Frankfurt am Main, HRB 52596
>
> Vorstand: Robert Peters, Benjamin Walther, Aufsichtsrat: Christian Hecht
> (Vorsitz)
>
> Am 21. März 2019 um 11:25:46, Kostas Kloudas 

Re: End-to-end exactly-once semantics in simple streaming app

2019-04-01 Thread Patrick Fial
Hi,

thanks for your reply and sorry for the late response.

The problem is, I am unsure how I should implement the two-phase-commit 
pattern, because my JDBC connection is within a .map()/.flatMap() operator, and 
it is NOT a data sink. As written in my original question, my stream setup is a 
simple, one-dimensional pipeline:

environment.addSource(consumer)
  .map(… idempotent transformations ...)
  .map(new DatabaseFunction)
  .map(… idempotent transformations ...)
  .addSink(producer)

For the state / flink buffering, I am unsure if it would work even with a 
composite key, since I must be able to do arbitrary database lookups (like 
select .. from .. where x between y and z order by …). So I am pretty sure I am 
bound to use a real database connection for the job.
So currently, I see no option to use flink states in this situation. Also, as 
it seems, stream operators currently don’t support the two-phase-commit 
protocol, I would have to do this with a sink, correct?
What do you advise to do? Currently I am think about replacing flink with 
another technology for this part of the application, since this database-lookup 
logic does not seem to fit the idea of a streaming application in general, thus 
flink might not be the best choice.

Another option maybe, lets assume I was able to redesign my application 
(somehow, maybe I come up with something). Would you advise that I replace the 
oracle database *entirely* with a flink managed state?

This would rise a couple of questions for me:
- How do you manage regular cleanup / deletion of old data, so the state does 
not grow big?
- When I have e.g. parallelism 3, and thus 3 instances of my .flatMap() lookup 
operator, they will not share the state, right? Thus they cannot access data 
which was processed by the other instances? This is going to be a problem.
- The flink application state seems volatile, I have to make 100% sure (by bash 
scripts and the like) that the application is never stopped/canceled without 
making a savepoint, and the restart must resume from the savepoint. Otherwise 
it will result in complete data loss worth weeks/months of data. E.g. if anyone 
by accident hits the job “cancel” button in the flink UI, all data is lost. 
This seems pretty much like an operational risk, since this is going to be a 
24/7 high availability application.
So maybe using only flink state is also not viable.

regards
Patrick


--

Patrick Fial

Client Platform Entwickler

Information Design One AG


Phone +49 69 244 502 38

Web www.id1.de 



Information Design One AG, Baseler Straße 10, 60329 Frankfurt am Main

Registereintrag: Amtsgericht Frankfurt am Main, HRB 52596

Vorstand: Robert Peters, Benjamin Walther, Aufsichtsrat: Christian Hecht 
(Vorsitz)


Am 21. März 2019 um 11:25:46, Kostas Kloudas 
(kklou...@gmail.com) schrieb:

Hi Patrick,

In order for you DB records to be up-to-date and correct, I think that you 
would have to implement a 2-phase-commit sink.
Now for querying multiple keys, why not doing the following:

Let's assume for a single result record, you want to join data from K1, K2, K3.
You can have a function that creates a composite key `K_comp = 
createCompositeKey(K1, K2, K3)`.
Then you send 3 records out: (K1, K_comp), (K2, K_comp), (K3, K_comp).
You keyBy the first field initially, i,e. K1, K2, K3. This will send the 
records to the nodes responsible for each key.
The nodes there will either have the data in state, or they can hit the Oracle 
DB to fetch the related data.
So now, Flink will pick the relevant state.
And then you can keyBy the K_comp, which will send again all the records to the 
same node, where they can be joined together.

Then you can use your 2-phase JDBC connector to push the result to your Oracle 
DB when the checkpoint is acknowledged.
This solution uses Flink's state as a buffer.

What do you think about this solution?

Cheers,
Kostas



On Wed, Mar 20, 2019 at 9:38 AM Patrick Fial 
mailto:patrick.f...@id1.de>> wrote:
Hi Andrey,

thanks for your feedback. I am not sure if I understand 100% correctly, but 
using the flink state to store my stuff (in addition to the oracle database) is 
not an option, because to my knowledge flink state does not allow arbitrary 
lookup queries, which I need to do, however. Also, given the logic described in 
my original post, the database access is never going to be idempotent, which 
lies in the nature of the required insert/update logic.

regards
Patrick


--

Patrick Fial

Client Platform Entwickler

Information Design One AG


Phone +49 69 244 502 38

Web www.id1.de



Information Design One AG, Baseler Straße 10, 60329 Frankfurt am Main

Registereintrag: Amtsgericht Frankfurt am Main, HRB 52596

Vorstand: Robert Peters, Benjamin Walther, Aufsichtsrat: Christian Hecht 
(Vorsitz)


Am 19. März 2019 um 17:59:22, Andrey Zagrebin 
(and...@ververica.com) schrieb:

Hi Patrick,

One 

Re: End-to-end exactly-once semantics in simple streaming app

2019-03-20 Thread Patrick Fial
Hi Andrey,

thanks for your feedback. I am not sure if I understand 100% correctly, but 
using the flink state to store my stuff (in addition to the oracle database) is 
not an option, because to my knowledge flink state does not allow arbitrary 
lookup queries, which I need to do, however. Also, given the logic described in 
my original post, the database access is never going to be idempotent, which 
lies in the nature of the required insert/update logic.

regards
Patrick


--

Patrick Fial

Client Platform Entwickler

Information Design One AG


Phone +49 69 244 502 38

Web www.id1.de 



Information Design One AG, Baseler Straße 10, 60329 Frankfurt am Main

Registereintrag: Amtsgericht Frankfurt am Main, HRB 52596

Vorstand: Robert Peters, Benjamin Walther, Aufsichtsrat: Christian Hecht 
(Vorsitz)


Am 19. März 2019 um 17:59:22, Andrey Zagrebin 
(and...@ververica.com) schrieb:

Hi Patrick,

One approach, I would try, is to use Flink state and sync it with database in 
initializeState and CheckpointListener.notifyCheckpointComplete.
Basically issue only idempotent updates to database but only when the last 
checkpoint is securely taken and records before it are not processed again.
This has though a caveat that database might have stale data between 
checkpoints.
Once the current state is synced with database, depending on your App, it might 
be even cleared from Flink state.

I also cc Piotr and Kostas, maybe, they have more ideas.

Best,
Andrey

On Tue, Mar 19, 2019 at 10:09 AM Patrick Fial 
mailto:patrick.f...@id1.de>> wrote:
Hello,

I am working on a streaming application with apache flink, which shall provide 
end-to-end exactly-once delivery guarantees. The application is roughly built 
like this:

environment.addSource(consumer)
  .map(… idempotent transformations ...)
  .map(new DatabaseFunction)
  .map(… idempotent transformations ...)
  .addSink(producer)

Both source and sink are kafka connectors, and thus support exactly-once 
delivery guarantees.

The tricky part comes with the .map() containing the DatabaseFunction. Its job 
is to:
1) look up the incoming message in some oracle database
2a) insert it if it is not already stored in the database and publish the 
incoming message
2b) otherwise combine the incoming update with previous contents from the 
database, and store back the combined update in the database
3) output the result of 2) to the next operator

This logic leads to inconsistent data beeing published to the sink in case of a 
failure where the DatabaseFunction was already executed, but the message is not 
yet published to the sink.

My understanding is, that in such a scenario all operator states would be 
reverted to the last checkpoint. Since the .map() operator is stateless, 
nothing is done here, so only the consumer and producer states are reverted. 
This leads to the message beeing reprocessed from the beginning (source), and 
thus beeing processed *again* by the DatabaseFunction. However, the 
DatabaseFunction is not idempotent (because of 1)-3) as explained above), and 
thus leads to a different output than in the first run.

The question is, how I can assure transaction-safety in this application?

Basically, I would need to use database transactions within the 
DatabaseFunction, and commit those only if the messages are also commited to 
the kafka sink. However, I don’t know how to achieve this.

I read about the two phase commit protocol in flink 
(https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html),
 but I fail to find examples of how to implement this in detail for stream 
operators (NOT sinks). All documentation I find only refers to using the two 
phase commit protocol for sinks. Should I, in this case, only implement the 
CheckpointedFunction and hook on the initializeState/snapshotState to 
rollback/commit by database transactions? Would this already make things work? 
I am a bit confused because there seem to be no hooks for the 
pre-commit/commit/abort signals.

Anyway, I am also afraid that this might also introduce scaling issues, because 
depending on the message throughput, committing database actions only with 
every checkpoint interval might blow the temp tablespace in the oracle database.

Thanks in advance for any help.

best regards
Patrick Fial


--

Patrick Fial

Client Platform Entwickler

Information Design One AG


Phone +49 69 244 502 38

Web www.id1.de



Information Design One AG, Baseler Straße 10, 60329 Frankfurt am Main

Registereintrag: Amtsgericht Frankfurt am Main, HRB 52596

Vorstand: Robert Peters, Benjamin Walther, Aufsichtsrat: Christian Hecht 
(Vorsitz)


Re: End-to-end exactly-once semantics in simple streaming app

2019-03-19 Thread Andrey Zagrebin
Hi Patrick,

One approach, I would try, is to use Flink state and sync it with database
in initializeState and CheckpointListener.notifyCheckpointComplete.
Basically issue only idempotent updates to database but only when the last
checkpoint is securely taken and records before it are not processed again.
This has though a caveat that database might have stale data between
checkpoints.
Once the current state is synced with database, depending on your App, it
might be even cleared from Flink state.

I also cc Piotr and Kostas, maybe, they have more ideas.

Best,
Andrey

On Tue, Mar 19, 2019 at 10:09 AM Patrick Fial  wrote:

> Hello,
>
> I am working on a streaming application with apache flink, which shall
> provide end-to-end exactly-once delivery guarantees. The application is
> roughly built like this:
>
> environment.addSource(consumer)
>   .map(… idempotent transformations ...)
>   .map(new DatabaseFunction)
>   .map(… idempotent transformations ...)
>   .addSink(producer)
>
> Both source and sink are kafka connectors, and thus support exactly-once
> delivery guarantees.
>
> The tricky part comes with the .map() containing the DatabaseFunction. Its
> job is to:
> 1) look up the incoming message in some oracle database
> 2a) insert it if it is not already stored in the database and publish the
> incoming message
> 2b) otherwise combine the incoming update with previous contents from the
> database, and store back the combined update in the database
> 3) output the result of 2) to the next operator
>
> This logic leads to inconsistent data beeing published to the sink in case
> of a failure where the DatabaseFunction was already executed, but the
> message is not yet published to the sink.
>
> My understanding is, that in such a scenario all operator states would be
> reverted to the last checkpoint. Since the .map() operator is stateless,
> nothing is done here, so only the consumer and producer states are
> reverted. This leads to the message beeing reprocessed from the beginning
> (source), and thus beeing processed *again* by the DatabaseFunction.
> However, the DatabaseFunction is not idempotent (because of 1)-3) as
> explained above), and thus leads to a different output than in the first
> run.
>
> The question is, how I can assure transaction-safety in this application?
>
> Basically, I would need to use database transactions within the
> DatabaseFunction, and commit those only if the messages are also commited
> to the kafka sink. However, I don’t know how to achieve this.
>
> I read about the two phase commit protocol in flink (
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html),
> but I fail to find examples of how to implement this in detail for stream
> operators (NOT sinks). All documentation I find only refers to using the
> two phase commit protocol for sinks. Should I, in this case, only implement
> the CheckpointedFunction and hook on the initializeState/snapshotState to
> rollback/commit by database transactions? Would this already make things
> work? I am a bit confused because there seem to be no hooks for the
> pre-commit/commit/abort signals.
>
> Anyway, I am also afraid that this might also introduce scaling issues,
> because depending on the message throughput, committing database actions
> only with every checkpoint interval might blow the temp tablespace in the
> oracle database.
>
> Thanks in advance for any help.
>
> best regards
> Patrick Fial
>
> --
>
> *Patrick Fial*
>
> Client Platform Entwickler
>
> Information Design One AG
>
>
> Phone +49 69 244 502 38
>
> Web www.id1.de
>
>
>
> Information Design One AG, Baseler Straße 10, 60329 Frankfurt am Main
>
> Registereintrag: Amtsgericht Frankfurt am Main, HRB 52596
>
> Vorstand: Robert Peters, Benjamin Walther, Aufsichtsrat: Christian Hecht
> (Vorsitz)
>


End-to-end exactly-once semantics in simple streaming app

2019-03-19 Thread Patrick Fial
Hello,

I am working on a streaming application with apache flink, which shall provide 
end-to-end exactly-once delivery guarantees. The application is roughly built 
like this:

environment.addSource(consumer)
  .map(… idempotent transformations ...)
  .map(new DatabaseFunction)
  .map(… idempotent transformations ...)
  .addSink(producer)

Both source and sink are kafka connectors, and thus support exactly-once 
delivery guarantees.

The tricky part comes with the .map() containing the DatabaseFunction. Its job 
is to:
1) look up the incoming message in some oracle database
2a) insert it if it is not already stored in the database and publish the 
incoming message
2b) otherwise combine the incoming update with previous contents from the 
database, and store back the combined update in the database
3) output the result of 2) to the next operator

This logic leads to inconsistent data beeing published to the sink in case of a 
failure where the DatabaseFunction was already executed, but the message is not 
yet published to the sink.

My understanding is, that in such a scenario all operator states would be 
reverted to the last checkpoint. Since the .map() operator is stateless, 
nothing is done here, so only the consumer and producer states are reverted. 
This leads to the message beeing reprocessed from the beginning (source), and 
thus beeing processed *again* by the DatabaseFunction. However, the 
DatabaseFunction is not idempotent (because of 1)-3) as explained above), and 
thus leads to a different output than in the first run.

The question is, how I can assure transaction-safety in this application?

Basically, I would need to use database transactions within the 
DatabaseFunction, and commit those only if the messages are also commited to 
the kafka sink. However, I don’t know how to achieve this.

I read about the two phase commit protocol in flink 
(https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html),
 but I fail to find examples of how to implement this in detail for stream 
operators (NOT sinks). All documentation I find only refers to using the two 
phase commit protocol for sinks. Should I, in this case, only implement the 
CheckpointedFunction and hook on the initializeState/snapshotState to 
rollback/commit by database transactions? Would this already make things work? 
I am a bit confused because there seem to be no hooks for the 
pre-commit/commit/abort signals.

Anyway, I am also afraid that this might also introduce scaling issues, because 
depending on the message throughput, committing database actions only with 
every checkpoint interval might blow the temp tablespace in the oracle database.

Thanks in advance for any help.

best regards
Patrick Fial


--

Patrick Fial

Client Platform Entwickler

Information Design One AG


Phone +49 69 244 502 38

Web www.id1.de 



Information Design One AG, Baseler Straße 10, 60329 Frankfurt am Main

Registereintrag: Amtsgericht Frankfurt am Main, HRB 52596

Vorstand: Robert Peters, Benjamin Walther, Aufsichtsrat: Christian Hecht 
(Vorsitz)


Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Stephan Ewen
@Gabor: That assumes deterministic streams and to some extend deterministic
tuple order.
That may be given sometimes, but it is a very strong assumption in many
cases.

On Fri, Feb 5, 2016 at 1:09 PM, Gábor Gévay <gga...@gmail.com> wrote:

> Hello,
>
> > I think that there is actually a fundamental latency issue with
> > "exactly once sinks", no matter how you implement them in any systems:
> > You can only commit once you are sure that everything went well,
> > to a specific point where you are sure no replay will ever be needed.
>
> What if the persistent buffer in the sink would be used to determine
> which data elements should be emitted in case of a replay? I mean, the
> sink pushes everything as soon as it arrives, and also writes
> everything to the persistent buffer, and then in case of a replay it
> looks into the buffer before pushing every element, and only does the
> push if the buffer says that the element was not pushed before.
>
> Best,
> Gábor
>
>
> 2016-02-05 11:57 GMT+01:00 Stephan Ewen <se...@apache.org>:
> > Hi Niels!
> >
> > In general, exactly once output requires transactional cooperation from
> the
> > target system. Kafka has that on the roadmap, we should be able to
> integrate
> > that once it is out.
> > That means output is "committed" upon completed checkpoints, which
> > guarantees nothing is written multiple times.
> >
> > Chesnay is working on an interesting prototype as a generic solution
> (also
> > for Kafka, while they don't have that feature):
> > It buffers the data in the sink persistently (using the fault tolerance
> > state backends) and pushes the results out on notification of a completed
> > checkpoint.
> > That gives you exactly once semantics, but involves an extra
> materialization
> > of the data.
> >
> >
> > I think that there is actually a fundamental latency issue with "exactly
> > once sinks", no matter how you implement them in any systems:
> > You can only commit once you are sure that everything went well, to a
> > specific point where you are sure no replay will ever be needed.
> >
> > So the latency in Flink for an exactly-once output would be at least the
> > checkpoint interval.
> >
> > I'm eager to hear your thoughts on this.
> >
> > Greetings,
> > Stephan
> >
> >
> > On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <ni...@basjes.nl> wrote:
> >>
> >> Hi,
> >>
> >> It is my understanding that the exactly-once semantics regarding the
> input
> >> from Kafka is based on the checkpointing in the source component
> retaining
> >> the offset where it was at the checkpoint moment.
> >>
> >> My question is how does that work for a sink? How can I make sure that
> (in
> >> light of failures) each message that is read from Kafka (my input) is
> >> written to Kafka (my output) exactly once?
> >>
> >>
> >> --
> >> Best regards / Met vriendelijke groeten,
> >>
> >> Niels Basjes
> >
> >
>


Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Paris Carbone
That would be good indeed. I just learned about it from Stephan mentioned. It 
sounds correct to me along the lines but it would be nice to see the details.

> On 05 Feb 2016, at 13:32, Ufuk Celebi  wrote:
> 
> 
>> On 05 Feb 2016, at 13:28, Paris Carbone  wrote:
>> 
>> Hi Gabor,
>> 
>> The sinks should aware that the global checkpoint is indeed persisted before 
>> emitting so they will have to wait until they are notified for its 
>> completion before pushing to Kafka. The current view of the local state is 
>> not the actual persisted view so checking against is like relying on dirty 
>> state. Imagine the following scenario:
>> 
>> 1) sink pushes to kafka record k and updates local buffer for k
>> 2) sink snapshots k and the rest of its state on checkpoint barrier
>> 3) global checkpoint fails due to some reason (e.g. another sink subtask 
>> failed) and the job gets restarted
>> 4) sink pushes again record k to kafka since the last global snapshots did 
>> not complete before and k is not in the local buffer
>> 
>> Chesnay’s approach seems to be valid and best effort for the time being.
> 
> Chesnay’s approach is not part of this thread. Can you or Chesnay 
> elaborate/provide a link?
> 
> – Ufuk
> 



Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Paris Carbone
From what I understood state on sinks is included in the operator state of the 
sinks and pushed to kafka when 3-phase commit is complete.
i.e. when the checkpoint completion notification arrives at the sinks. 

There are several pitfalls I am really curious to check and see how they are 
(going to be) handled, this is of course not as simple as it sounds. It really 
depends on the guarantees and operations the outside storage gives you. For 
example, how can we know that the pushed records are actually persisted in 
kafka in a single transaction? Not as simple as it sounds.

@Chesnay can you tell us more?

> On 05 Feb 2016, at 13:33, Paris Carbone  wrote:
> 
> That would be good indeed. I just learned about it from Stephan mentioned. It 
> sounds correct to me along the lines but it would be nice to see the details.
> 
>> On 05 Feb 2016, at 13:32, Ufuk Celebi  wrote:
>> 
>> 
>>> On 05 Feb 2016, at 13:28, Paris Carbone  wrote:
>>> 
>>> Hi Gabor,
>>> 
>>> The sinks should aware that the global checkpoint is indeed persisted 
>>> before emitting so they will have to wait until they are notified for its 
>>> completion before pushing to Kafka. The current view of the local state is 
>>> not the actual persisted view so checking against is like relying on dirty 
>>> state. Imagine the following scenario:
>>> 
>>> 1) sink pushes to kafka record k and updates local buffer for k
>>> 2) sink snapshots k and the rest of its state on checkpoint barrier
>>> 3) global checkpoint fails due to some reason (e.g. another sink subtask 
>>> failed) and the job gets restarted
>>> 4) sink pushes again record k to kafka since the last global snapshots did 
>>> not complete before and k is not in the local buffer
>>> 
>>> Chesnay’s approach seems to be valid and best effort for the time being.
>> 
>> Chesnay’s approach is not part of this thread. Can you or Chesnay 
>> elaborate/provide a link?
>> 
>> – Ufuk
>> 
> 



Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Paris Carbone
Hi Gabor,

The sinks should aware that the global checkpoint is indeed persisted before 
emitting so they will have to wait until they are notified for its completion 
before pushing to Kafka. The current view of the local state is not the actual 
persisted view so checking against is like relying on dirty state. Imagine the 
following scenario:

1) sink pushes to kafka record k and updates local buffer for k
2) sink snapshots k and the rest of its state on checkpoint barrier
3) global checkpoint fails due to some reason (e.g. another sink subtask 
failed) and the job gets restarted
4) sink pushes again record k to kafka since the last global snapshots did not 
complete before and k is not in the local buffer

Chesnay’s approach seems to be valid and best effort for the time being.

Paris

> On 05 Feb 2016, at 13:09, Gábor Gévay <gga...@gmail.com> wrote:
> 
> Hello,
> 
>> I think that there is actually a fundamental latency issue with
>> "exactly once sinks", no matter how you implement them in any systems:
>> You can only commit once you are sure that everything went well,
>> to a specific point where you are sure no replay will ever be needed.
> 
> What if the persistent buffer in the sink would be used to determine
> which data elements should be emitted in case of a replay? I mean, the
> sink pushes everything as soon as it arrives, and also writes
> everything to the persistent buffer, and then in case of a replay it
> looks into the buffer before pushing every element, and only does the
> push if the buffer says that the element was not pushed before.
> 
> Best,
> Gábor
> 
> 
> 2016-02-05 11:57 GMT+01:00 Stephan Ewen <se...@apache.org>:
>> Hi Niels!
>> 
>> In general, exactly once output requires transactional cooperation from the
>> target system. Kafka has that on the roadmap, we should be able to integrate
>> that once it is out.
>> That means output is "committed" upon completed checkpoints, which
>> guarantees nothing is written multiple times.
>> 
>> Chesnay is working on an interesting prototype as a generic solution (also
>> for Kafka, while they don't have that feature):
>> It buffers the data in the sink persistently (using the fault tolerance
>> state backends) and pushes the results out on notification of a completed
>> checkpoint.
>> That gives you exactly once semantics, but involves an extra materialization
>> of the data.
>> 
>> 
>> I think that there is actually a fundamental latency issue with "exactly
>> once sinks", no matter how you implement them in any systems:
>> You can only commit once you are sure that everything went well, to a
>> specific point where you are sure no replay will ever be needed.
>> 
>> So the latency in Flink for an exactly-once output would be at least the
>> checkpoint interval.
>> 
>> I'm eager to hear your thoughts on this.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <ni...@basjes.nl> wrote:
>>> 
>>> Hi,
>>> 
>>> It is my understanding that the exactly-once semantics regarding the input
>>> from Kafka is based on the checkpointing in the source component retaining
>>> the offset where it was at the checkpoint moment.
>>> 
>>> My question is how does that work for a sink? How can I make sure that (in
>>> light of failures) each message that is read from Kafka (my input) is
>>> written to Kafka (my output) exactly once?
>>> 
>>> 
>>> --
>>> Best regards / Met vriendelijke groeten,
>>> 
>>> Niels Basjes
>> 
>> 



Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Ufuk Celebi

> On 05 Feb 2016, at 13:28, Paris Carbone  wrote:
> 
> Hi Gabor,
> 
> The sinks should aware that the global checkpoint is indeed persisted before 
> emitting so they will have to wait until they are notified for its completion 
> before pushing to Kafka. The current view of the local state is not the 
> actual persisted view so checking against is like relying on dirty state. 
> Imagine the following scenario:
> 
> 1) sink pushes to kafka record k and updates local buffer for k
> 2) sink snapshots k and the rest of its state on checkpoint barrier
> 3) global checkpoint fails due to some reason (e.g. another sink subtask 
> failed) and the job gets restarted
> 4) sink pushes again record k to kafka since the last global snapshots did 
> not complete before and k is not in the local buffer
> 
> Chesnay’s approach seems to be valid and best effort for the time being.

Chesnay’s approach is not part of this thread. Can you or Chesnay 
elaborate/provide a link?

– Ufuk



Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Gábor Gévay
Hello,

> I think that there is actually a fundamental latency issue with
> "exactly once sinks", no matter how you implement them in any systems:
> You can only commit once you are sure that everything went well,
> to a specific point where you are sure no replay will ever be needed.

What if the persistent buffer in the sink would be used to determine
which data elements should be emitted in case of a replay? I mean, the
sink pushes everything as soon as it arrives, and also writes
everything to the persistent buffer, and then in case of a replay it
looks into the buffer before pushing every element, and only does the
push if the buffer says that the element was not pushed before.

Best,
Gábor


2016-02-05 11:57 GMT+01:00 Stephan Ewen <se...@apache.org>:
> Hi Niels!
>
> In general, exactly once output requires transactional cooperation from the
> target system. Kafka has that on the roadmap, we should be able to integrate
> that once it is out.
> That means output is "committed" upon completed checkpoints, which
> guarantees nothing is written multiple times.
>
> Chesnay is working on an interesting prototype as a generic solution (also
> for Kafka, while they don't have that feature):
> It buffers the data in the sink persistently (using the fault tolerance
> state backends) and pushes the results out on notification of a completed
> checkpoint.
> That gives you exactly once semantics, but involves an extra materialization
> of the data.
>
>
> I think that there is actually a fundamental latency issue with "exactly
> once sinks", no matter how you implement them in any systems:
> You can only commit once you are sure that everything went well, to a
> specific point where you are sure no replay will ever be needed.
>
> So the latency in Flink for an exactly-once output would be at least the
> checkpoint interval.
>
> I'm eager to hear your thoughts on this.
>
> Greetings,
> Stephan
>
>
> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <ni...@basjes.nl> wrote:
>>
>> Hi,
>>
>> It is my understanding that the exactly-once semantics regarding the input
>> from Kafka is based on the checkpointing in the source component retaining
>> the offset where it was at the checkpoint moment.
>>
>> My question is how does that work for a sink? How can I make sure that (in
>> light of failures) each message that is read from Kafka (my input) is
>> written to Kafka (my output) exactly once?
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>
>


Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Gábor Gévay
The way I imagine this is that the sink would have its "own
checkpoints" separately from the rest of the system, and with much
smaller interval, and writes to Kafka (with "transactional
cooperation", as Stephan mentioned) during making these checkpoints.
And then when a replay happens from a global system checkpoint, it can
look at its own checkpoints to decide for each tuple whether to send
it or not.

@Stephan:

> That assumes deterministic streams and to some extend deterministic tuple 
> order.
> That may be given sometimes, but it is a very strong assumption in many cases.

Ah yes, you are right. But doing everything based on event time points
in this direction of deterministic streams, right?


Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Chesnay Schepler

Essentially what happens is the following:

in between checkpoints all incoming data is stored within the operator 
state.


when a checkpoint-complete operation arrives, the data is read from the 
operator state and written into kafka (or any system)


if the job fails while storing records in the state, the current state 
is discarded and we go back to the previous one. since no data was 
written yet, we fulfill exactly-once here.
if the job fails while data is being written into cassandra (it can't be 
written as one atomic action) , some data will persist in cassandra, and 
will be send again upon restart. in this case exactly-once is not fulfilled.
But we minimize the time-frame in which a failure causes exactly-once to 
fail, which is pretty much as close as you can get without support from 
kafka or others.


@Niels we discussed having a counter that tells us how much data was 
written in between checkpoints. But this is currently not possible, an 
operator can't update his state on the fly, so we would need something 
new here.
And there would still be cases where even this would fail, for example 
if the job fails after the message was sent, but before the ID was saved.


On 05.02.2016 13:55, Paris Carbone wrote:

This is not a bad take. It still makes a few assumptions

1) the output checkpoints the id of the last *known* ID that was 
*persisted* in kafka (not just pushed)

2) we assume deterministic tuple order, as Stephan pointed out

On 05 Feb 2016, at 13:41, Niels Basjes <ni...@basjes.nl 
<mailto:ni...@basjes.nl>> wrote:


Hi,

Buffering the data (in all cases) would hurt the latency so much that 
Flink is effectively reverting to microbatching (where batch size is 
checkpoint period) with regards of the output.


My initial thoughts on how to solve this was as follows:
1) The output persists the ID of the last message it wrote to Kafka 
in the checkpoint.

2) Upon recovery the sink would
2a) Record the offset Kafka is at at that point in time
2b) For all 'new' messages validate if it must write this message by 
reading from Kafka (starting at the offset in the checkpoint) and if 
the message is already present it would skip it.
3) If a message arrives that has not yet written the message is 
written. Under the assumption that the messages arrive in the same 
order as before the sink can now simply run as normal.


This way the performance is only impacted in the (short) period after 
the recovery of a disturbance.


What do you think?

Niels Basjes



On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <se...@apache.org 
<mailto:se...@apache.org>> wrote:


Hi Niels!

In general, exactly once output requires transactional
cooperation from the target system. Kafka has that on the
roadmap, we should be able to integrate that once it is out.
That means output is "committed" upon completed checkpoints,
which guarantees nothing is written multiple times.

Chesnay is working on an interesting prototype as a generic
solution (also for Kafka, while they don't have that feature):
It buffers the data in the sink persistently (using the fault
tolerance state backends) and pushes the results out on
notification of a completed checkpoint.
That gives you exactly once semantics, but involves an extra
materialization of the data.


I think that there is actually a fundamental latency issue with
"exactly once sinks", no matter how you implement them in any
systems:
You can only commit once you are sure that everything went well,
to a specific point where you are sure no replay will ever be needed.

So the latency in Flink for an exactly-once output would be at
least the checkpoint interval.

I'm eager to hear your thoughts on this.

Greetings,
Stephan


On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <ni...@basjes.nl
<mailto:ni...@basjes.nl>> wrote:

    Hi,

    It is my understanding that the exactly-once semantics
regarding the input from Kafka is based on the checkpointing
in the source component retaining the offset where it was at
the checkpoint moment.

My question is how does that work for a sink? How can I make
sure that (in light of failures) each message that is read
from Kafka (my input) is written to Kafka (my output) exactly
once?


-- 
Best regards / Met vriendelijke groeten,


Niels Basjes





--
Best regards / Met vriendelijke groeten,

Niels Basjes






On 05.02.2016 13:49, Paris Carbone wrote:

 From what I understood state on sinks is included in the operator state of the 
sinks and pushed to kafka when 3-phase commit is complete.
i.e. when the checkpoint completion notification arrives at the sinks.

There are several pitfalls I am really curious to check and see how they are 
(going to be) handled, this is of course not as simple as it sounds. It real

Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Stephan Ewen
Hi Niels!

That could actually work, given a way to identify messages with a unique ID.

Would be quite an exercise to implement...

Stephan


On Fri, Feb 5, 2016 at 2:14 PM, Niels Basjes <ni...@basjes.nl> wrote:

> @Stephan;
> Kafka keeps the messages for a configured TTL (i.e. a few days/weeks).
> So my idea is based on the fact that Kafka has all the messages and that I
> can read those messages from Kafka to validate if I should or should not
> write them again.
>
> Let me illustrate what I had in mind:
> I write messages to Kafka and at the moment of the checkpoint the last
> message ID I wrote is 5.
> Then I write 6,7,8
> FAIL
> Recover:
> Open a reader starting at message 5
> Get message 6 -> Read from Kafka --> Already have this --> Skip
> Get message 7 -> Read from Kafka --> Already have this --> Skip
> Get message 8 -> Read from Kafka --> Already have this --> Skip
> Get message 9 -> Read from Kafka --> Not yet in Kafka --> Write and resume
> normal operations.
>
> Like I said: This is just the first rough idea I had on a possible
> direction how this can be solved without the latency impact of buffering.
>
> Niels Basjes
>
>
> On Fri, Feb 5, 2016 at 2:06 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> @Niels: I don't fully understand your approach so far.
>>
>> If you write a message to Kafka between two checkpoints, where do you
>> store the information that this particular message is already written (I
>> think this would be the ID in your example).
>> Such an information would need to be persisted for every written messages
>> (or very small group of messages).
>>
>> Stephan
>>
>>
>> On Fri, Feb 5, 2016 at 1:41 PM, Niels Basjes <ni...@basjes.nl> wrote:
>>
>>> Hi,
>>>
>>> Buffering the data (in all cases) would hurt the latency so much that
>>> Flink is effectively reverting to microbatching (where batch size is
>>> checkpoint period) with regards of the output.
>>>
>>> My initial thoughts on how to solve this was as follows:
>>> 1) The output persists the ID of the last message it wrote to Kafka in
>>> the checkpoint.
>>> 2) Upon recovery the sink would
>>> 2a) Record the offset Kafka is at at that point in time
>>> 2b) For all 'new' messages validate if it must write this message by
>>> reading from Kafka (starting at the offset in the checkpoint) and if the
>>> message is already present it would skip it.
>>> 3) If a message arrives that has not yet written the message is written.
>>> Under the assumption that the messages arrive in the same order as before
>>> the sink can now simply run as normal.
>>>
>>> This way the performance is only impacted in the (short) period after
>>> the recovery of a disturbance.
>>>
>>> What do you think?
>>>
>>> Niels Basjes
>>>
>>>
>>>
>>> On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hi Niels!
>>>>
>>>> In general, exactly once output requires transactional cooperation from
>>>> the target system. Kafka has that on the roadmap, we should be able to
>>>> integrate that once it is out.
>>>> That means output is "committed" upon completed checkpoints, which
>>>> guarantees nothing is written multiple times.
>>>>
>>>> Chesnay is working on an interesting prototype as a generic solution
>>>> (also for Kafka, while they don't have that feature):
>>>> It buffers the data in the sink persistently (using the fault tolerance
>>>> state backends) and pushes the results out on notification of a completed
>>>> checkpoint.
>>>> That gives you exactly once semantics, but involves an extra
>>>> materialization of the data.
>>>>
>>>>
>>>> I think that there is actually a fundamental latency issue with
>>>> "exactly once sinks", no matter how you implement them in any systems:
>>>> You can only commit once you are sure that everything went well, to a
>>>> specific point where you are sure no replay will ever be needed.
>>>>
>>>> So the latency in Flink for an exactly-once output would be at least
>>>> the checkpoint interval.
>>>>
>>>> I'm eager to hear your thoughts on this.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <ni...@basjes.nl> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> It is my understanding that the exactly-once semantics regarding the
>>>>> input from Kafka is based on the checkpointing in the source component
>>>>> retaining the offset where it was at the checkpoint moment.
>>>>>
>>>>> My question is how does that work for a sink? How can I make sure that
>>>>> (in light of failures) each message that is read from Kafka (my input) is
>>>>> written to Kafka (my output) exactly once?
>>>>>
>>>>>
>>>>> --
>>>>> Best regards / Met vriendelijke groeten,
>>>>>
>>>>> Niels Basjes
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Paris Carbone
This is not a bad take. It still makes a few assumptions

1) the output checkpoints the id of the last *known* ID that was *persisted* in 
kafka (not just pushed)
2) we assume deterministic tuple order, as Stephan pointed out

On 05 Feb 2016, at 13:41, Niels Basjes 
<ni...@basjes.nl<mailto:ni...@basjes.nl>> wrote:

Hi,

Buffering the data (in all cases) would hurt the latency so much that Flink is 
effectively reverting to microbatching (where batch size is checkpoint period) 
with regards of the output.

My initial thoughts on how to solve this was as follows:
1) The output persists the ID of the last message it wrote to Kafka in the 
checkpoint.
2) Upon recovery the sink would
2a) Record the offset Kafka is at at that point in time
2b) For all 'new' messages validate if it must write this message by reading 
from Kafka (starting at the offset in the checkpoint) and if the message is 
already present it would skip it.
3) If a message arrives that has not yet written the message is written. Under 
the assumption that the messages arrive in the same order as before the sink 
can now simply run as normal.

This way the performance is only impacted in the (short) period after the 
recovery of a disturbance.

What do you think?

Niels Basjes



On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen 
<se...@apache.org<mailto:se...@apache.org>> wrote:
Hi Niels!

In general, exactly once output requires transactional cooperation from the 
target system. Kafka has that on the roadmap, we should be able to integrate 
that once it is out.
That means output is "committed" upon completed checkpoints, which guarantees 
nothing is written multiple times.

Chesnay is working on an interesting prototype as a generic solution (also for 
Kafka, while they don't have that feature):
It buffers the data in the sink persistently (using the fault tolerance state 
backends) and pushes the results out on notification of a completed checkpoint.
That gives you exactly once semantics, but involves an extra materialization of 
the data.


I think that there is actually a fundamental latency issue with "exactly once 
sinks", no matter how you implement them in any systems:
You can only commit once you are sure that everything went well, to a specific 
point where you are sure no replay will ever be needed.

So the latency in Flink for an exactly-once output would be at least the 
checkpoint interval.

I'm eager to hear your thoughts on this.

Greetings,
Stephan


On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes 
<ni...@basjes.nl<mailto:ni...@basjes.nl>> wrote:
Hi,

It is my understanding that the exactly-once semantics regarding the input from 
Kafka is based on the checkpointing in the source component retaining the 
offset where it was at the checkpoint moment.

My question is how does that work for a sink? How can I make sure that (in 
light of failures) each message that is read from Kafka (my input) is written 
to Kafka (my output) exactly once?


--
Best regards / Met vriendelijke groeten,

Niels Basjes




--
Best regards / Met vriendelijke groeten,

Niels Basjes



Re: How to ensure exactly-once semantics in output to Kafka?

2016-02-05 Thread Niels Basjes
@Stephan;
Kafka keeps the messages for a configured TTL (i.e. a few days/weeks).
So my idea is based on the fact that Kafka has all the messages and that I
can read those messages from Kafka to validate if I should or should not
write them again.

Let me illustrate what I had in mind:
I write messages to Kafka and at the moment of the checkpoint the last
message ID I wrote is 5.
Then I write 6,7,8
FAIL
Recover:
Open a reader starting at message 5
Get message 6 -> Read from Kafka --> Already have this --> Skip
Get message 7 -> Read from Kafka --> Already have this --> Skip
Get message 8 -> Read from Kafka --> Already have this --> Skip
Get message 9 -> Read from Kafka --> Not yet in Kafka --> Write and resume
normal operations.

Like I said: This is just the first rough idea I had on a possible
direction how this can be solved without the latency impact of buffering.

Niels Basjes


On Fri, Feb 5, 2016 at 2:06 PM, Stephan Ewen <se...@apache.org> wrote:

> @Niels: I don't fully understand your approach so far.
>
> If you write a message to Kafka between two checkpoints, where do you
> store the information that this particular message is already written (I
> think this would be the ID in your example).
> Such an information would need to be persisted for every written messages
> (or very small group of messages).
>
> Stephan
>
>
> On Fri, Feb 5, 2016 at 1:41 PM, Niels Basjes <ni...@basjes.nl> wrote:
>
>> Hi,
>>
>> Buffering the data (in all cases) would hurt the latency so much that
>> Flink is effectively reverting to microbatching (where batch size is
>> checkpoint period) with regards of the output.
>>
>> My initial thoughts on how to solve this was as follows:
>> 1) The output persists the ID of the last message it wrote to Kafka in
>> the checkpoint.
>> 2) Upon recovery the sink would
>> 2a) Record the offset Kafka is at at that point in time
>> 2b) For all 'new' messages validate if it must write this message by
>> reading from Kafka (starting at the offset in the checkpoint) and if the
>> message is already present it would skip it.
>> 3) If a message arrives that has not yet written the message is written.
>> Under the assumption that the messages arrive in the same order as before
>> the sink can now simply run as normal.
>>
>> This way the performance is only impacted in the (short) period after the
>> recovery of a disturbance.
>>
>> What do you think?
>>
>> Niels Basjes
>>
>>
>>
>> On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi Niels!
>>>
>>> In general, exactly once output requires transactional cooperation from
>>> the target system. Kafka has that on the roadmap, we should be able to
>>> integrate that once it is out.
>>> That means output is "committed" upon completed checkpoints, which
>>> guarantees nothing is written multiple times.
>>>
>>> Chesnay is working on an interesting prototype as a generic solution
>>> (also for Kafka, while they don't have that feature):
>>> It buffers the data in the sink persistently (using the fault tolerance
>>> state backends) and pushes the results out on notification of a completed
>>> checkpoint.
>>> That gives you exactly once semantics, but involves an extra
>>> materialization of the data.
>>>
>>>
>>> I think that there is actually a fundamental latency issue with "exactly
>>> once sinks", no matter how you implement them in any systems:
>>> You can only commit once you are sure that everything went well, to a
>>> specific point where you are sure no replay will ever be needed.
>>>
>>> So the latency in Flink for an exactly-once output would be at least the
>>> checkpoint interval.
>>>
>>> I'm eager to hear your thoughts on this.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <ni...@basjes.nl> wrote:
>>>
>>>> Hi,
>>>>
>>>> It is my understanding that the exactly-once semantics regarding the
>>>> input from Kafka is based on the checkpointing in the source component
>>>> retaining the offset where it was at the checkpoint moment.
>>>>
>>>> My question is how does that work for a sink? How can I make sure that
>>>> (in light of failures) each message that is read from Kafka (my input) is
>>>> written to Kafka (my output) exactly once?
>>>>
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>
>>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes