-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

`transaction.timeout.ms` is a producer setting, thus you can increase
it accordingly.

Note, that brokers bound the range via `transaction.max.timeout.ms`;
thus, you may need to increase this broker configs, too.


- -Matthias

On 8/12/19 2:43 AM, Piotr Nowojski wrote:
> Hi,
> 
> Ok, I see. You can try to rewrite your logic (or maybe records
> schema by adding some ID fields) to manually deduplicating the
> records after processing them with at least once semantic. Such
> setup is usually simpler, with slightly better throughput and
> significantly better latency (end-to-end exactly once latency is
> limited by checkpointing time).
> 
> Piotrek
> 
>> On 12 Aug 2019, at 11:12, Tony Wei <tony19920...@gmail.com 
>> <mailto:tony19920...@gmail.com>> wrote:
>> 
>> Hi Piotr,
>> 
>> Thanks a lot. I need exactly once in my use case, but instead of 
>> having the risk of losing data, at least once is more acceptable
>> when error occurred.
>> 
>> Best, Tony Wei
>> 
>> Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> 於 2019年8月12日 週一 下午3:27寫道:
>> 
>> Hi,
>> 
>> Yes, if it’s due to transaction timeout you will lose the data.
>> 
>> Whether can you fallback to at least once, that depends on
>> Kafka, not on Flink, since it’s the Kafka that timeouts those 
>> transactions and I don’t see in the documentation anything that 
>> could override this [1]. You might try disabling the mechanism
>> via setting 
>> `transaction.abort.timed.out.transaction.cleanup.interval.ms 
>> <http://transaction.abort.timed.out.transaction.cleanup.interval.ms/>
`
>>
>> 
or `transaction.remove.expired.transaction.cleanup.interval.ms
>> <http://transaction.remove.expired.transaction.cleanup.interval.ms/>`
,
>>
>> 
but that’s question more to Kafka guys. Maybe Becket could help
>> with this.
>> 
>> Also it MIGHT be that Kafka doesn’t remove records from the
>> topics when aborting the transaction and MAYBE you can still
>> access them via “READ_UNCOMMITTED” mode. But that’s again,
>> question to Kafka.
>> 
>> Sorry that I can not help more.
>> 
>> If you do not care about exactly once, why don’t you just set
>> the connector to at least once mode?
>> 
>> Piotrek
>> 
>>> On 12 Aug 2019, at 06:29, Tony Wei <tony19920...@gmail.com 
>>> <mailto:tony19920...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I had the same exception recently. I want to confirm that if
>>> it is due to transaction timeout, then I will lose those data.
>>> Am I right? Can I make it fall back to at least once semantic
>>> in this situation?
>>> 
>>> Best, Tony Wei
>>> 
>>> Piotr Nowojski <pi...@data-artisans.com 
>>> <mailto:pi...@data-artisans.com>> 於 2018年3月21日 週三 下午10:28 寫道:
>>> 
>>> Hi,
>>> 
>>> But that’s exactly the case: producer’s transaction timeout 
>>> starts when the external transaction starts - but 
>>> FlinkKafkaProducer011 keeps an active Kafka transaction for the
>>> whole period between checkpoints.
>>> 
>>> As I wrote in the previous message:
>>> 
>>>> in case of failure, your timeout must also be able to cover
>>> the additional downtime required for the successful job 
>>> restart. Thus you should increase your timeout accordingly.
>>> 
>>> I think that 15 minutes timeout is a way too small value. If 
>>> your job fails because of some intermittent failure (for 
>>> example worker crash/restart), you will only have a couple of 
>>> minutes for a successful Flink job restart. Otherwise you will
>>> lose some data (because of the transaction timeouts).
>>> 
>>> Piotrek
>>> 
>>>> On 21 Mar 2018, at 10:30, Dongwon Kim <eastcirc...@gmail.com 
>>>> <mailto:eastcirc...@gmail.com>> wrote:
>>>> 
>>>> Hi Piotr,
>>>> 
>>>> Now my streaming pipeline is working without retries. I
>>>> decreased Flink's checkpoint interval from 15min to 10min as
>>>> you suggested [see screenshot_10min_ckpt.png].
>>>> 
>>>> I though that producer's transaction timeout starts when the 
>>>> external transaction starts. The truth is that Producer's
>>>> transaction timeout starts after the last external checkpoint
>>>> is committed. Now that I have 15min for Producer's
>>>> transaction timeout and 10min for Flink's checkpoint
>>>> interval, and every checkpoint takes less than 5 minutes,
>>>> everything is working fine. Am I right?
>>>> 
>>>> Anyway thank you very much for the detailed explanation!
>>>> 
>>>> Best,
>>>> 
>>>> Dongwon
>>>> 
>>>> 
>>>> 
>>>> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski 
>>>> <pi...@data-artisans.com <mailto:pi...@data-artisans.com>> 
>>>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> Please increase transaction.timeout.ms 
>>>> <http://transaction.timeout.ms/> to a greater value or 
>>>> decrease Flink’s checkpoint interval, I’m pretty sure the
>>>> issue here is that those two values are overlapping. I think
>>>> that’s even visible on the screenshots. First checkpoint
>>>> completed started at 14:28:48 and ended at 14:30:43, while
>>>> the second one started at 14:45:53 and ended at 14:49:16.
>>>> That gives you minimal transaction duration of 15 minutes and
>>>> 10 seconds, with maximal transaction duration of 21 minutes.
>>>> 
>>>> In HAPPY SCENARIO (without any failure and restarting), you
>>>> should assume that your timeout interval should cover with
>>>> some safety margin the period between start of a checkpoint
>>>> and end of the NEXT checkpoint, since this is the upper bound
>>>> how long the transaction might be used. In your case at least
>>>> ~25 minutes.
>>>> 
>>>> On top of that, as described in the docs,
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/con
nectors/kafka.html#kafka-producers-and-fault-tolerance
>>>> , in case of failure, your timeout must also be able to cover
>>>> the additional downtime required for the successful job
>>>> restart. Thus you should increase your timeout accordingly.
>>>> 
>>>> Piotrek
>>>> 
>>>> 
>>>>> On 20 Mar 2018, at 11:58, Dongwon Kim 
>>>>> <eastcirc...@gmail.com <mailto:eastcirc...@gmail.com>> 
>>>>> wrote:
>>>>> 
>>>>> Hi Piotr,
>>>>> 
>>>>> We have set producer's [transaction.timeout.ms 
>>>>> <http://transaction.timeout.ms/>] to 15 minutes and have
>>>>> used the default setting for broker (15 mins). As Flink's
>>>>> checkpoint interval is 15 minutes, it is not a situation
>>>>> where Kafka's timeout is smaller than Flink's checkpoint
>>>>> interval. As our first checkpoint just takes 2 minutes, it
>>>>> seems like transaction is not committed properly.
>>>>> 
>>>>> Best,
>>>>> 
>>>>> - Dongwon
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski 
>>>>> <pi...@data-artisans.com <mailto:pi...@data-artisans.com>>
>>>>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> What’s your Kafka’s transaction timeout setting? Please
>>>>> both check Kafka producer configuration 
>>>>> (transaction.timeout.ms <http://transaction.timeout.ms/>
>>>>> property) and Kafka broker configuration. The most likely
>>>>> cause of such error message is when Kafka's timeout is 
>>>>> smaller then Flink’s checkpoint interval and transactions
>>>>> are not committed quickly enough before timeout occurring.
>>>>> 
>>>>> Piotrek
>>>>> 
>>>>>> On 17 Mar 2018, at 07:24, Dongwon Kim 
>>>>>> <eastcirc...@gmail.com <mailto:eastcirc...@gmail.com>>
>>>>>> wrote:
>>>>>> 
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> I'm faced with the following ProducerFencedException
>>>>>> after 1st, 3rd, 5th, 7th, ... checkpoints: -- 
>>>>>> java.lang.RuntimeException: Error while confirming 
>>>>>> checkpoint at 
>>>>>> org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>>>>>
>>>>>> 
at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.jav
a:511)
>>>>>>
>>>>>> 
at
>>>>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) 
>>>>>> at 
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecu
tor.java:1149)
>>>>>>
>>>>>> 
at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExec
utor.java:624)
>>>>>>
>>>>>> 
at java.lang.Thread.run(Thread.java:748) Caused
>>>>>> by: 
>>>>>> org.apache.kafka.common.errors.ProducerFencedException: 
>>>>>> Producer attempted an operation with an old epoch. Either
>>>>>> there is a newer producer with the same transactionalId,
>>>>>> or the producer's transaction has been expired by the
>>>>>> broker. --
>>>>>> 
>>>>>> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly
>>>>>> once processing using Kafka sink. We use FsStateBackend
>>>>>> to store snapshot data on HDFS.
>>>>>> 
>>>>>> As shown in configuration.png, my checkpoint 
>>>>>> configuration is: - Checkpointing Mode : Exactly Once -
>>>>>> Interval : 15m 0s - Timeout : 10m 0s - Minimum Pause
>>>>>> Between Checkpoints : 5m 0s - Maximum Concurrent
>>>>>> Checkpoints : 1 - Persist Checkpoints Externally :
>>>>>> Disabled
>>>>>> 
>>>>>> After the first checkpoint completed [see history after
>>>>>> 1st ckpt.png], the job is restarted due to the
>>>>>> ProducerFencedException [see exception after 1st
>>>>>> ckpt.png]. The first checkpoint takes less than 2
>>>>>> minutes while my checkpoint interval is 15m and minimum 
>>>>>> pause between checkpoints is 5m. After the job is
>>>>>> restarted, the second checkpoint is triggered after a
>>>>>> while [see history after 2nd ckpt.png] and this time I've
>>>>>> got no exception. The third checkpoint results in the
>>>>>> same exception as after the first checkpoint.
>>>>>> 
>>>>>> Can anybody let me know what's going wrong behind the
>>>>>> scene?
>>>>>> 
>>>>>> Best,
>>>>>> 
>>>>>> Dongwon <history after 3rd ckpt.png><exception after 3rd 
>>>>>> ckpt.png><history after 2nd 
>>>>>> ckpt.png><configuration.png><summary.png><exception
>>>>>> after 1st ckpt.png><history after 1st ckpt.png>
>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> <screenshot_10min_ckpt.png>
>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIzBAEBCgAdFiEE8rTF/+V8YmXHQb9YY43Spj86DwAFAl1Rj6QACgkQY43Spj86
DwDaXhAAsghkoeh4DP1XD/Oty+1TkgD29+I96izx+JtKYhh1PSwbm+5LUBqFeGvk
N8mMJjyNhHNbOFbqkvjIwKpV3IAXRUCugDHtGWzbdVjpDFG/O5e11LydYtNg+VGA
VueQm/dV/W6EyrPjpmpXFCKrGDoVFcJ7JV9T4nAmkdML5BjNJUDAdK+ipsePEnZI
q2htM5NDE29Pl242uqnelHCgje0kXzpaPn20V8xnKTeEU/OmHcZVbDl/u1caOWE2
PnbeMuU06QfwdZ/2u7yVl2PH4l97YFB89h8W6HuDkbgvSAVtnG1OA6cMUNjyYOKl
gfRW2NE9YRnaaP1c+UIcnT14DlWbGzwl7DXy4213jxo9Dhjm08IjKNTQAqutWoeZ
kEto+k7h1bmP6W5prk4r6YNOEBzZoLWhSPhFl6un8NZetCys6E66ShXVSMFKzkDO
06mj9g0BDZI0uEggfFZ054I5mpMLGGWCIDjFm6i3LdpsDMpdY9izpmipEvKXfPzL
mBfPVddzz9DrvQmGPEs8KDa8nI+WvvJj2lIhtOY7uOX5fzlMxNZDW0ST0FQC/R0x
y4mssLVwa0OdPYFnXvyqsDd2KxAsG77K6D4N5rSMApHPHoSqdIgNmyuikGYGzmzu
w7ZdFKxrHiIHXuwthIEahfipXJ2nynAKi06k08z8HeQoxGkBt84=
=FqSe
-----END PGP SIGNATURE-----

Reply via email to