Re: Kafka ProducerFencedException after checkpointing

2019-08-12 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

`transaction.timeout.ms` is a producer setting, thus you can increase
it accordingly.

Note, that brokers bound the range via `transaction.max.timeout.ms`;
thus, you may need to increase this broker configs, too.


- -Matthias

On 8/12/19 2:43 AM, Piotr Nowojski wrote:
> Hi,
> 
> Ok, I see. You can try to rewrite your logic (or maybe records
> schema by adding some ID fields) to manually deduplicating the
> records after processing them with at least once semantic. Such
> setup is usually simpler, with slightly better throughput and
> significantly better latency (end-to-end exactly once latency is
> limited by checkpointing time).
> 
> Piotrek
> 
>> On 12 Aug 2019, at 11:12, Tony Wei > > wrote:
>> 
>> Hi Piotr,
>> 
>> Thanks a lot. I need exactly once in my use case, but instead of 
>> having the risk of losing data, at least once is more acceptable
>> when error occurred.
>> 
>> Best, Tony Wei
>> 
>> Piotr Nowojski > > 於 2019年8月12日 週一 下午3:27寫道:
>> 
>> Hi,
>> 
>> Yes, if it’s due to transaction timeout you will lose the data.
>> 
>> Whether can you fallback to at least once, that depends on
>> Kafka, not on Flink, since it’s the Kafka that timeouts those 
>> transactions and I don’t see in the documentation anything that 
>> could override this [1]. You might try disabling the mechanism
>> via setting 
>> `transaction.abort.timed.out.transaction.cleanup.interval.ms 
>> 
`
>>
>> 
or `transaction.remove.expired.transaction.cleanup.interval.ms
>> `
,
>>
>> 
but that’s question more to Kafka guys. Maybe Becket could help
>> with this.
>> 
>> Also it MIGHT be that Kafka doesn’t remove records from the
>> topics when aborting the transaction and MAYBE you can still
>> access them via “READ_UNCOMMITTED” mode. But that’s again,
>> question to Kafka.
>> 
>> Sorry that I can not help more.
>> 
>> If you do not care about exactly once, why don’t you just set
>> the connector to at least once mode?
>> 
>> Piotrek
>> 
>>> On 12 Aug 2019, at 06:29, Tony Wei >> > wrote:
>>> 
>>> Hi,
>>> 
>>> I had the same exception recently. I want to confirm that if
>>> it is due to transaction timeout, then I will lose those data.
>>> Am I right? Can I make it fall back to at least once semantic
>>> in this situation?
>>> 
>>> Best, Tony Wei
>>> 
>>> Piotr Nowojski >> > 於 2018年3月21日 週三 下午10:28 寫道:
>>> 
>>> Hi,
>>> 
>>> But that’s exactly the case: producer’s transaction timeout 
>>> starts when the external transaction starts - but 
>>> FlinkKafkaProducer011 keeps an active Kafka transaction for the
>>> whole period between checkpoints.
>>> 
>>> As I wrote in the previous message:
>>> 
 in case of failure, your timeout must also be able to cover
>>> the additional downtime required for the successful job 
>>> restart. Thus you should increase your timeout accordingly.
>>> 
>>> I think that 15 minutes timeout is a way too small value. If 
>>> your job fails because of some intermittent failure (for 
>>> example worker crash/restart), you will only have a couple of 
>>> minutes for a successful Flink job restart. Otherwise you will
>>> lose some data (because of the transaction timeouts).
>>> 
>>> Piotrek
>>> 
 On 21 Mar 2018, at 10:30, Dongwon Kim >>> > wrote:
 
 Hi Piotr,
 
 Now my streaming pipeline is working without retries. I
 decreased Flink's checkpoint interval from 15min to 10min as
 you suggested [see screenshot_10min_ckpt.png].
 
 I though that producer's transaction timeout starts when the 
 external transaction starts. The truth is that Producer's
 transaction timeout starts after the last external checkpoint
 is committed. Now that I have 15min for Producer's
 transaction timeout and 10min for Flink's checkpoint
 interval, and every checkpoint takes less than 5 minutes,
 everything is working fine. Am I right?
 
 Anyway thank you very much for the detailed explanation!
 
 Best,
 
 Dongwon
 
 
 
 On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski 
 mailto:pi...@data-artisans.com>> 
 wrote:
 
 Hi,
 
 Please increase transaction.timeout.ms 
  to a greater value or 
 decrease Flink’s checkpoint interval, I’m pretty sure the
 issue here is that those two values are overlapping. I think
 that’s even visible on the screenshots. First checkpoint
 completed started at 14:28:48 and ended at 14:30:43, while
 the second one started at 14:45:53 and ended at 14:49:16.
 That gives you minimal transaction duration of 15 minutes and
 10 seconds, with maximal transaction duration of 21 minutes.
 
 In HAPPY 

Re: Kafka ProducerFencedException after checkpointing

2019-08-12 Thread Piotr Nowojski
Hi,

Ok, I see. You can try to rewrite your logic (or maybe records schema by adding 
some ID fields) to manually deduplicating the records after processing them 
with at least once semantic. Such setup is usually simpler, with slightly 
better throughput and significantly better latency (end-to-end exactly once 
latency is limited by checkpointing time).

Piotrek

> On 12 Aug 2019, at 11:12, Tony Wei  wrote:
> 
> Hi Piotr,
> 
> Thanks a lot. I need exactly once in my use case, but instead of having the 
> risk of losing data, at least once is more acceptable when error occurred.
> 
> Best,
> Tony Wei
> 
> Piotr Nowojski mailto:pi...@data-artisans.com>> 於 
> 2019年8月12日 週一 下午3:27寫道:
> Hi,
> 
> Yes, if it’s due to transaction timeout you will lose the data.
> 
> Whether can you fallback to at least once, that depends on Kafka, not on 
> Flink, since it’s the Kafka that timeouts those transactions and I don’t see 
> in the documentation anything that could override this [1]. You might try 
> disabling the mechanism via setting 
> `transaction.abort.timed.out.transaction.cleanup.interval.ms 
> ` or 
> `transaction.remove.expired.transaction.cleanup.interval.ms 
> `, but 
> that’s question more to Kafka guys. Maybe Becket could help with this.
> 
> Also it MIGHT be that Kafka doesn’t remove records from the topics when 
> aborting the transaction and MAYBE you can still access them via 
> “READ_UNCOMMITTED” mode. But that’s again, question to Kafka. 
> 
> Sorry that I can not help more.
> 
> If you do not care about exactly once, why don’t you just set the connector 
> to at least once mode?
> 
> Piotrek
> 
>> On 12 Aug 2019, at 06:29, Tony Wei > > wrote:
>> 
>> Hi,
>> 
>> I had the same exception recently. I want to confirm that if it is due to 
>> transaction timeout,
>> then I will lose those data. Am I right? Can I make it fall back to at least 
>> once semantic in
>> this situation?
>> 
>> Best,
>> Tony Wei
>> 
>> Piotr Nowojski mailto:pi...@data-artisans.com>> 於 
>> 2018年3月21日 週三 下午10:28寫道:
>> Hi,
>> 
>> But that’s exactly the case: producer’s transaction timeout starts when the 
>> external transaction starts - but FlinkKafkaProducer011 keeps an active 
>> Kafka transaction for the whole period between checkpoints.
>> 
>> As I wrote in the previous message:
>> 
>> > in case of failure, your timeout must also be able to cover the additional 
>> > downtime required for the successful job restart. Thus you should increase 
>> > your timeout accordingly.
>> 
>> I think that 15 minutes timeout is a way too small value. If your job fails 
>> because of some intermittent failure (for example worker crash/restart), you 
>> will only have a couple of minutes for a successful Flink job restart. 
>> Otherwise you will lose some data (because of the transaction timeouts).
>> 
>> Piotrek
>> 
>>> On 21 Mar 2018, at 10:30, Dongwon Kim >> > wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> Now my streaming pipeline is working without retries. 
>>> I decreased Flink's checkpoint interval from 15min to 10min as you 
>>> suggested [see screenshot_10min_ckpt.png].
>>> 
>>> I though that producer's transaction timeout starts when the external 
>>> transaction starts.
>>> The truth is that Producer's transaction timeout starts after the last 
>>> external checkpoint is committed.
>>> Now that I have 15min for Producer's transaction timeout and 10min for 
>>> Flink's checkpoint interval, and every checkpoint takes less than 5 
>>> minutes, everything is working fine.
>>> Am I right?
>>> 
>>> Anyway thank you very much for the detailed explanation!
>>> 
>>> Best,
>>> 
>>> Dongwon
>>> 
>>> 
>>> 
>>> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski >> > wrote:
>>> Hi,
>>> 
>>> Please increase transaction.timeout.ms  to 
>>> a greater value or decrease Flink’s checkpoint interval, I’m pretty sure 
>>> the issue here is that those two values are overlapping. I think that’s 
>>> even visible on the screenshots. First checkpoint completed started at 
>>> 14:28:48 and ended at 14:30:43, while the second one started at 14:45:53 
>>> and ended at 14:49:16. That gives you minimal transaction duration of 15 
>>> minutes and 10 seconds, with maximal transaction duration of 21 minutes.
>>> 
>>> In HAPPY SCENARIO (without any failure and restarting), you should assume 
>>> that your timeout interval should cover with some safety margin the period 
>>> between start of a checkpoint and end of the NEXT checkpoint, since this is 
>>> the upper bound how long the transaction might be used. In your case at 
>>> least ~25 minutes.
>>> 
>>> On top of that, as described in the docs, 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance

Re: Kafka ProducerFencedException after checkpointing

2019-08-12 Thread Tony Wei
Hi Piotr,

Thanks a lot. I need exactly once in my use case, but instead of having the
risk of losing data, at least once is more acceptable when error occurred.

Best,
Tony Wei

Piotr Nowojski  於 2019年8月12日 週一 下午3:27寫道:

> Hi,
>
> Yes, if it’s due to transaction timeout you will lose the data.
>
> Whether can you fallback to at least once, that depends on Kafka, not on
> Flink, since it’s the Kafka that timeouts those transactions and I don’t
> see in the documentation anything that could override this [1]. You might
> try disabling the mechanism via setting `
> transaction.abort.timed.out.transaction.cleanup.interval.ms` or `
> transaction.remove.expired.transaction.cleanup.interval.ms`, but that’s
> question more to Kafka guys. Maybe Becket could help with this.
>
> Also it MIGHT be that Kafka doesn’t remove records from the topics when
> aborting the transaction and MAYBE you can still access them via
> “READ_UNCOMMITTED” mode. But that’s again, question to Kafka.
>
> Sorry that I can not help more.
>
> If you do not care about exactly once, why don’t you just set the
> connector to at least once mode?
>
> Piotrek
>
> On 12 Aug 2019, at 06:29, Tony Wei  wrote:
>
> Hi,
>
> I had the same exception recently. I want to confirm that if it is due to
> transaction timeout,
> then I will lose those data. Am I right? Can I make it fall back to at
> least once semantic in
> this situation?
>
> Best,
> Tony Wei
>
> Piotr Nowojski  於 2018年3月21日 週三 下午10:28寫道:
>
>> Hi,
>>
>> But that’s exactly the case: producer’s transaction timeout starts when
>> the external transaction starts - but FlinkKafkaProducer011 keeps an active
>> Kafka transaction for the whole period between checkpoints.
>>
>> As I wrote in the previous message:
>>
>> > in case of failure, your timeout must also be able to cover the
>> additional downtime required for the successful job restart. Thus you
>> should increase your timeout accordingly.
>>
>> I think that 15 minutes timeout is a way too small value. If your job
>> fails because of some intermittent failure (for example worker
>> crash/restart), you will only have a couple of minutes for a successful
>> Flink job restart. Otherwise you will lose some data (because of the
>> transaction timeouts).
>>
>> Piotrek
>>
>> On 21 Mar 2018, at 10:30, Dongwon Kim  wrote:
>>
>> Hi Piotr,
>>
>> Now my streaming pipeline is working without retries.
>> I decreased Flink's checkpoint interval from 15min to 10min as you
>> suggested [see screenshot_10min_ckpt.png].
>>
>> I though that producer's transaction timeout starts when the external
>> transaction starts.
>> The truth is that Producer's transaction timeout starts after the last
>> external checkpoint is committed.
>> Now that I have 15min for Producer's transaction timeout and 10min for
>> Flink's checkpoint interval, and every checkpoint takes less than 5
>> minutes, everything is working fine.
>> Am I right?
>>
>> Anyway thank you very much for the detailed explanation!
>>
>> Best,
>>
>> Dongwon
>>
>>
>>
>> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> Please increase transaction.timeout.ms to a greater value or decrease
>>> Flink’s checkpoint interval, I’m pretty sure the issue here is that those
>>> two values are overlapping. I think that’s even visible on the screenshots.
>>> First checkpoint completed started at 14:28:48 and ended at 14:30:43, while
>>> the second one started at 14:45:53 and ended at 14:49:16. That gives you
>>> minimal transaction duration of 15 minutes and 10 seconds, with maximal
>>> transaction duration of 21 minutes.
>>>
>>> In HAPPY SCENARIO (without any failure and restarting), you should
>>> assume that your timeout interval should cover with some safety margin the
>>> period between start of a checkpoint and end of the NEXT checkpoint, since
>>> this is the upper bound how long the transaction might be used. In your
>>> case at least ~25 minutes.
>>>
>>> On top of that, as described in the docs,
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
>>>  ,
>>> in case of failure, your timeout must also be able to cover the additional
>>> downtime required for the successful job restart. Thus you should increase
>>> your timeout accordingly.
>>>
>>> Piotrek
>>>
>>>
>>> On 20 Mar 2018, at 11:58, Dongwon Kim  wrote:
>>>
>>> Hi Piotr,
>>>
>>> We have set producer's [transaction.timeout.ms] to 15 minutes and have
>>> used the default setting for broker (15 mins).
>>> As Flink's checkpoint interval is 15 minutes, it is not a situation
>>> where Kafka's timeout is smaller than Flink's checkpoint interval.
>>> As our first checkpoint just takes 2 minutes, it seems like transaction
>>> is not committed properly.
>>>
>>> Best,
>>>
>>> - Dongwon
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski >> > wrote:
>>>
 Hi,

 What’s your Kafka’s transaction timeout setting? Please both check
 

Re: Kafka ProducerFencedException after checkpointing

2019-08-12 Thread Piotr Nowojski
Hi,

Yes, if it’s due to transaction timeout you will lose the data.

Whether can you fallback to at least once, that depends on Kafka, not on Flink, 
since it’s the Kafka that timeouts those transactions and I don’t see in the 
documentation anything that could override this [1]. You might try disabling 
the mechanism via setting 
`transaction.abort.timed.out.transaction.cleanup.interval.ms` or 
`transaction.remove.expired.transaction.cleanup.interval.ms`, but that’s 
question more to Kafka guys. Maybe Becket could help with this.

Also it MIGHT be that Kafka doesn’t remove records from the topics when 
aborting the transaction and MAYBE you can still access them via 
“READ_UNCOMMITTED” mode. But that’s again, question to Kafka. 

Sorry that I can not help more.

If you do not care about exactly once, why don’t you just set the connector to 
at least once mode?

Piotrek

> On 12 Aug 2019, at 06:29, Tony Wei  wrote:
> 
> Hi,
> 
> I had the same exception recently. I want to confirm that if it is due to 
> transaction timeout,
> then I will lose those data. Am I right? Can I make it fall back to at least 
> once semantic in
> this situation?
> 
> Best,
> Tony Wei
> 
> Piotr Nowojski mailto:pi...@data-artisans.com>> 於 
> 2018年3月21日 週三 下午10:28寫道:
> Hi,
> 
> But that’s exactly the case: producer’s transaction timeout starts when the 
> external transaction starts - but FlinkKafkaProducer011 keeps an active Kafka 
> transaction for the whole period between checkpoints.
> 
> As I wrote in the previous message:
> 
> > in case of failure, your timeout must also be able to cover the additional 
> > downtime required for the successful job restart. Thus you should increase 
> > your timeout accordingly.
> 
> I think that 15 minutes timeout is a way too small value. If your job fails 
> because of some intermittent failure (for example worker crash/restart), you 
> will only have a couple of minutes for a successful Flink job restart. 
> Otherwise you will lose some data (because of the transaction timeouts).
> 
> Piotrek
> 
>> On 21 Mar 2018, at 10:30, Dongwon Kim > > wrote:
>> 
>> Hi Piotr,
>> 
>> Now my streaming pipeline is working without retries. 
>> I decreased Flink's checkpoint interval from 15min to 10min as you suggested 
>> [see screenshot_10min_ckpt.png].
>> 
>> I though that producer's transaction timeout starts when the external 
>> transaction starts.
>> The truth is that Producer's transaction timeout starts after the last 
>> external checkpoint is committed.
>> Now that I have 15min for Producer's transaction timeout and 10min for 
>> Flink's checkpoint interval, and every checkpoint takes less than 5 minutes, 
>> everything is working fine.
>> Am I right?
>> 
>> Anyway thank you very much for the detailed explanation!
>> 
>> Best,
>> 
>> Dongwon
>> 
>> 
>> 
>> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski > > wrote:
>> Hi,
>> 
>> Please increase transaction.timeout.ms  to a 
>> greater value or decrease Flink’s checkpoint interval, I’m pretty sure the 
>> issue here is that those two values are overlapping. I think that’s even 
>> visible on the screenshots. First checkpoint completed started at 14:28:48 
>> and ended at 14:30:43, while the second one started at 14:45:53 and ended at 
>> 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 
>> seconds, with maximal transaction duration of 21 minutes.
>> 
>> In HAPPY SCENARIO (without any failure and restarting), you should assume 
>> that your timeout interval should cover with some safety margin the period 
>> between start of a checkpoint and end of the NEXT checkpoint, since this is 
>> the upper bound how long the transaction might be used. In your case at 
>> least ~25 minutes.
>> 
>> On top of that, as described in the docs, 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
>>  
>> 
>>  , in case of failure, your timeout must also be able to cover the 
>> additional downtime required for the successful job restart. Thus you should 
>> increase your timeout accordingly. 
>> 
>> Piotrek
>> 
>> 
>>> On 20 Mar 2018, at 11:58, Dongwon Kim >> > wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> We have set producer's [transaction.timeout.ms 
>>> ] to 15 minutes and have used the default 
>>> setting for broker (15 mins).
>>> As Flink's checkpoint interval is 15 minutes, it is not a situation where 
>>> Kafka's timeout is smaller than Flink's checkpoint interval.
>>> As our first checkpoint just takes 2 minutes, it seems like transaction is 
>>> not committed properly.
>>> 
>>> Best,
>>> 
>>> - Dongwon
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski >> 

Re: Kafka ProducerFencedException after checkpointing

2019-08-11 Thread Tony Wei
Hi,

I had the same exception recently. I want to confirm that if it is due to
transaction timeout,
then I will lose those data. Am I right? Can I make it fall back to at
least once semantic in
this situation?

Best,
Tony Wei

Piotr Nowojski  於 2018年3月21日 週三 下午10:28寫道:

> Hi,
>
> But that’s exactly the case: producer’s transaction timeout starts when
> the external transaction starts - but FlinkKafkaProducer011 keeps an active
> Kafka transaction for the whole period between checkpoints.
>
> As I wrote in the previous message:
>
> > in case of failure, your timeout must also be able to cover the
> additional downtime required for the successful job restart. Thus you
> should increase your timeout accordingly.
>
> I think that 15 minutes timeout is a way too small value. If your job
> fails because of some intermittent failure (for example worker
> crash/restart), you will only have a couple of minutes for a successful
> Flink job restart. Otherwise you will lose some data (because of the
> transaction timeouts).
>
> Piotrek
>
> On 21 Mar 2018, at 10:30, Dongwon Kim  wrote:
>
> Hi Piotr,
>
> Now my streaming pipeline is working without retries.
> I decreased Flink's checkpoint interval from 15min to 10min as you
> suggested [see screenshot_10min_ckpt.png].
>
> I though that producer's transaction timeout starts when the external
> transaction starts.
> The truth is that Producer's transaction timeout starts after the last
> external checkpoint is committed.
> Now that I have 15min for Producer's transaction timeout and 10min for
> Flink's checkpoint interval, and every checkpoint takes less than 5
> minutes, everything is working fine.
> Am I right?
>
> Anyway thank you very much for the detailed explanation!
>
> Best,
>
> Dongwon
>
>
>
> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Please increase transaction.timeout.ms to a greater value or decrease
>> Flink’s checkpoint interval, I’m pretty sure the issue here is that those
>> two values are overlapping. I think that’s even visible on the screenshots.
>> First checkpoint completed started at 14:28:48 and ended at 14:30:43, while
>> the second one started at 14:45:53 and ended at 14:49:16. That gives you
>> minimal transaction duration of 15 minutes and 10 seconds, with maximal
>> transaction duration of 21 minutes.
>>
>> In HAPPY SCENARIO (without any failure and restarting), you should assume
>> that your timeout interval should cover with some safety margin the period
>> between start of a checkpoint and end of the NEXT checkpoint, since this is
>> the upper bound how long the transaction might be used. In your case at
>> least ~25 minutes.
>>
>> On top of that, as described in the docs,
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
>>  ,
>> in case of failure, your timeout must also be able to cover the additional
>> downtime required for the successful job restart. Thus you should increase
>> your timeout accordingly.
>>
>> Piotrek
>>
>>
>> On 20 Mar 2018, at 11:58, Dongwon Kim  wrote:
>>
>> Hi Piotr,
>>
>> We have set producer's [transaction.timeout.ms] to 15 minutes and have
>> used the default setting for broker (15 mins).
>> As Flink's checkpoint interval is 15 minutes, it is not a situation where
>> Kafka's timeout is smaller than Flink's checkpoint interval.
>> As our first checkpoint just takes 2 minutes, it seems like transaction
>> is not committed properly.
>>
>> Best,
>>
>> - Dongwon
>>
>>
>>
>>
>>
>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> What’s your Kafka’s transaction timeout setting? Please both check Kafka
>>> producer configuration (transaction.timeout.ms property) and Kafka
>>> broker configuration. The most likely cause of such error message is when
>>> Kafka's timeout is smaller then Flink’s checkpoint interval and
>>> transactions are not committed quickly enough before timeout occurring.
>>>
>>> Piotrek
>>>
>>> On 17 Mar 2018, at 07:24, Dongwon Kim  wrote:
>>>
>>>
>>> Hi,
>>>
>>> I'm faced with the following ProducerFencedException after 1st, 3rd,
>>> 5th, 7th, ... checkpoints:
>>>
>>> --
>>>
>>> java.lang.RuntimeException: Error while confirming checkpoint
>>> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>> at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
>>> attempted an operation with an old epoch. Either there is a newer producer 
>>> with the same transactionalId, or the producer's transaction has been 
>>> expired by the broker.
>>>

Re: Kafka ProducerFencedException after checkpointing

2018-03-21 Thread Piotr Nowojski
Hi,

But that’s exactly the case: producer’s transaction timeout starts when the 
external transaction starts - but FlinkKafkaProducer011 keeps an active Kafka 
transaction for the whole period between checkpoints.

As I wrote in the previous message:

> in case of failure, your timeout must also be able to cover the additional 
> downtime required for the successful job restart. Thus you should increase 
> your timeout accordingly.

I think that 15 minutes timeout is a way too small value. If your job fails 
because of some intermittent failure (for example worker crash/restart), you 
will only have a couple of minutes for a successful Flink job restart. 
Otherwise you will lose some data (because of the transaction timeouts).

Piotrek

> On 21 Mar 2018, at 10:30, Dongwon Kim  wrote:
> 
> Hi Piotr,
> 
> Now my streaming pipeline is working without retries. 
> I decreased Flink's checkpoint interval from 15min to 10min as you suggested 
> [see screenshot_10min_ckpt.png].
> 
> I though that producer's transaction timeout starts when the external 
> transaction starts.
> The truth is that Producer's transaction timeout starts after the last 
> external checkpoint is committed.
> Now that I have 15min for Producer's transaction timeout and 10min for 
> Flink's checkpoint interval, and every checkpoint takes less than 5 minutes, 
> everything is working fine.
> Am I right?
> 
> Anyway thank you very much for the detailed explanation!
> 
> Best,
> 
> Dongwon
> 
> 
> 
> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski  > wrote:
> Hi,
> 
> Please increase transaction.timeout.ms  to a 
> greater value or decrease Flink’s checkpoint interval, I’m pretty sure the 
> issue here is that those two values are overlapping. I think that’s even 
> visible on the screenshots. First checkpoint completed started at 14:28:48 
> and ended at 14:30:43, while the second one started at 14:45:53 and ended at 
> 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 
> seconds, with maximal transaction duration of 21 minutes.
> 
> In HAPPY SCENARIO (without any failure and restarting), you should assume 
> that your timeout interval should cover with some safety margin the period 
> between start of a checkpoint and end of the NEXT checkpoint, since this is 
> the upper bound how long the transaction might be used. In your case at least 
> ~25 minutes.
> 
> On top of that, as described in the docs, 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
>  
> 
>  , in case of failure, your timeout must also be able to cover the additional 
> downtime required for the successful job restart. Thus you should increase 
> your timeout accordingly. 
> 
> Piotrek
> 
> 
>> On 20 Mar 2018, at 11:58, Dongwon Kim > > wrote:
>> 
>> Hi Piotr,
>> 
>> We have set producer's [transaction.timeout.ms 
>> ] to 15 minutes and have used the default 
>> setting for broker (15 mins).
>> As Flink's checkpoint interval is 15 minutes, it is not a situation where 
>> Kafka's timeout is smaller than Flink's checkpoint interval.
>> As our first checkpoint just takes 2 minutes, it seems like transaction is 
>> not committed properly.
>> 
>> Best,
>> 
>> - Dongwon
>> 
>> 
>> 
>> 
>> 
>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski > > wrote:
>> Hi,
>> 
>> What’s your Kafka’s transaction timeout setting? Please both check Kafka 
>> producer configuration (transaction.timeout.ms 
>>  property) and Kafka broker configuration. 
>> The most likely cause of such error message is when Kafka's timeout is 
>> smaller then Flink’s checkpoint interval and transactions are not committed 
>> quickly enough before timeout occurring.
>> 
>> Piotrek
>> 
>>> On 17 Mar 2018, at 07:24, Dongwon Kim >> > wrote:
>>> 
>>> 
>>> Hi,
>>> 
>>> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 
>>> 7th, ... checkpoints:
>>> --
>>> java.lang.RuntimeException: Error while confirming checkpoint
>>> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>> at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: 

Re: Kafka ProducerFencedException after checkpointing

2018-03-20 Thread Dongwon Kim
Hi Piotr,

We have set producer's [transaction.timeout.ms] to 15 minutes and have used
the default setting for broker (15 mins).
As Flink's checkpoint interval is 15 minutes, it is not a situation where
Kafka's timeout is smaller than Flink's checkpoint interval.
As our first checkpoint just takes 2 minutes, it seems like transaction is
not committed properly.

Best,

- Dongwon





On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski 
wrote:

> Hi,
>
> What’s your Kafka’s transaction timeout setting? Please both check Kafka
> producer configuration (transaction.timeout.ms property) and Kafka broker
> configuration. The most likely cause of such error message is when Kafka's
> timeout is smaller then Flink’s checkpoint interval and transactions are
> not committed quickly enough before timeout occurring.
>
> Piotrek
>
> On 17 Mar 2018, at 07:24, Dongwon Kim  wrote:
>
>
> Hi,
>
> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th,
> 7th, ... checkpoints:
>
> --
>
> java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
> attempted an operation with an old epoch. Either there is a newer producer 
> with the same transactionalId, or the producer's transaction has been expired 
> by the broker.
>
> --
>
>
> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing
> using Kafka sink.
> We use FsStateBackend to store snapshot data on HDFS.
>
> As shown in configuration.png, my checkpoint configuration is:
> - Checkpointing Mode : Exactly Once
> - Interval : 15m 0s
> - Timeout : 10m 0s
> - Minimum Pause Between Checkpoints : 5m 0s
> - Maximum Concurrent Checkpoints : 1
> - Persist Checkpoints Externally : Disabled
>
> After the first checkpoint completed [see history after 1st ckpt.png], the
> job is restarted due to the ProducerFencedException [see exception after
> 1st ckpt.png].
> The first checkpoint takes less than 2 minutes while my checkpoint
> interval is 15m and minimum pause between checkpoints is 5m.
> After the job is restarted, the second checkpoint is triggered after a
> while [see history after 2nd ckpt.png] and this time I've got no exception.
> The third checkpoint results in the same exception as after the first
> checkpoint.
>
> Can anybody let me know what's going wrong behind the scene?
>
> Best,
>
> Dongwon
>  2nd ckpt.png> ckpt.png>
>
>
>


Re: Kafka ProducerFencedException after checkpointing

2018-03-20 Thread Piotr Nowojski
Hi,

What’s your Kafka’s transaction timeout setting? Please both check Kafka 
producer configuration (transaction.timeout.ms property) and Kafka broker 
configuration. The most likely cause of such error message is when Kafka's 
timeout is smaller then Flink’s checkpoint interval and transactions are not 
committed quickly enough before timeout occurring.

Piotrek

> On 17 Mar 2018, at 07:24, Dongwon Kim  wrote:
> 
> 
> Hi,
> 
> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 
> 7th, ... checkpoints:
> --
> java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
> attempted an operation with an old epoch. Either there is a newer producer 
> with the same transactionalId, or the producer's transaction has been expired 
> by the broker.
> --
> 
> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing 
> using Kafka sink.
> We use FsStateBackend to store snapshot data on HDFS.
> 
> As shown in configuration.png, my checkpoint configuration is:
> - Checkpointing Mode : Exactly Once
> - Interval : 15m 0s
> - Timeout : 10m 0s
> - Minimum Pause Between Checkpoints : 5m 0s
> - Maximum Concurrent Checkpoints : 1
> - Persist Checkpoints Externally : Disabled
> 
> After the first checkpoint completed [see history after 1st ckpt.png], the 
> job is restarted due to the ProducerFencedException [see exception after 1st 
> ckpt.png].
> The first checkpoint takes less than 2 minutes while my checkpoint interval 
> is 15m and minimum pause between checkpoints is 5m.
> After the job is restarted, the second checkpoint is triggered after a while 
> [see history after 2nd ckpt.png] and this time I've got no exception.
> The third checkpoint results in the same exception as after the first 
> checkpoint.
> 
> Can anybody let me know what's going wrong behind the scene?
> 
> Best,
> 
> Dongwon
>  ckpt.png> ckpt.png>