-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 `transaction.timeout.ms` is a producer setting, thus you can increase it accordingly.
Note, that brokers bound the range via `transaction.max.timeout.ms`; thus, you may need to increase this broker configs, too. - -Matthias On 8/12/19 2:43 AM, Piotr Nowojski wrote: > Hi, > > Ok, I see. You can try to rewrite your logic (or maybe records > schema by adding some ID fields) to manually deduplicating the > records after processing them with at least once semantic. Such > setup is usually simpler, with slightly better throughput and > significantly better latency (end-to-end exactly once latency is > limited by checkpointing time). > > Piotrek > >> On 12 Aug 2019, at 11:12, Tony Wei <tony19920...@gmail.com >> <mailto:tony19920...@gmail.com>> wrote: >> >> Hi Piotr, >> >> Thanks a lot. I need exactly once in my use case, but instead of >> having the risk of losing data, at least once is more acceptable >> when error occurred. >> >> Best, Tony Wei >> >> Piotr Nowojski <pi...@data-artisans.com >> <mailto:pi...@data-artisans.com>> 於 2019年8月12日 週一 下午3:27寫道: >> >> Hi, >> >> Yes, if it’s due to transaction timeout you will lose the data. >> >> Whether can you fallback to at least once, that depends on >> Kafka, not on Flink, since it’s the Kafka that timeouts those >> transactions and I don’t see in the documentation anything that >> could override this [1]. You might try disabling the mechanism >> via setting >> `transaction.abort.timed.out.transaction.cleanup.interval.ms >> <http://transaction.abort.timed.out.transaction.cleanup.interval.ms/> ` >> >> or `transaction.remove.expired.transaction.cleanup.interval.ms >> <http://transaction.remove.expired.transaction.cleanup.interval.ms/>` , >> >> but that’s question more to Kafka guys. Maybe Becket could help >> with this. >> >> Also it MIGHT be that Kafka doesn’t remove records from the >> topics when aborting the transaction and MAYBE you can still >> access them via “READ_UNCOMMITTED” mode. But that’s again, >> question to Kafka. >> >> Sorry that I can not help more. >> >> If you do not care about exactly once, why don’t you just set >> the connector to at least once mode? >> >> Piotrek >> >>> On 12 Aug 2019, at 06:29, Tony Wei <tony19920...@gmail.com >>> <mailto:tony19920...@gmail.com>> wrote: >>> >>> Hi, >>> >>> I had the same exception recently. I want to confirm that if >>> it is due to transaction timeout, then I will lose those data. >>> Am I right? Can I make it fall back to at least once semantic >>> in this situation? >>> >>> Best, Tony Wei >>> >>> Piotr Nowojski <pi...@data-artisans.com >>> <mailto:pi...@data-artisans.com>> 於 2018年3月21日 週三 下午10:28 寫道: >>> >>> Hi, >>> >>> But that’s exactly the case: producer’s transaction timeout >>> starts when the external transaction starts - but >>> FlinkKafkaProducer011 keeps an active Kafka transaction for the >>> whole period between checkpoints. >>> >>> As I wrote in the previous message: >>> >>>> in case of failure, your timeout must also be able to cover >>> the additional downtime required for the successful job >>> restart. Thus you should increase your timeout accordingly. >>> >>> I think that 15 minutes timeout is a way too small value. If >>> your job fails because of some intermittent failure (for >>> example worker crash/restart), you will only have a couple of >>> minutes for a successful Flink job restart. Otherwise you will >>> lose some data (because of the transaction timeouts). >>> >>> Piotrek >>> >>>> On 21 Mar 2018, at 10:30, Dongwon Kim <eastcirc...@gmail.com >>>> <mailto:eastcirc...@gmail.com>> wrote: >>>> >>>> Hi Piotr, >>>> >>>> Now my streaming pipeline is working without retries. I >>>> decreased Flink's checkpoint interval from 15min to 10min as >>>> you suggested [see screenshot_10min_ckpt.png]. >>>> >>>> I though that producer's transaction timeout starts when the >>>> external transaction starts. The truth is that Producer's >>>> transaction timeout starts after the last external checkpoint >>>> is committed. Now that I have 15min for Producer's >>>> transaction timeout and 10min for Flink's checkpoint >>>> interval, and every checkpoint takes less than 5 minutes, >>>> everything is working fine. Am I right? >>>> >>>> Anyway thank you very much for the detailed explanation! >>>> >>>> Best, >>>> >>>> Dongwon >>>> >>>> >>>> >>>> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski >>>> <pi...@data-artisans.com <mailto:pi...@data-artisans.com>> >>>> wrote: >>>> >>>> Hi, >>>> >>>> Please increase transaction.timeout.ms >>>> <http://transaction.timeout.ms/> to a greater value or >>>> decrease Flink’s checkpoint interval, I’m pretty sure the >>>> issue here is that those two values are overlapping. I think >>>> that’s even visible on the screenshots. First checkpoint >>>> completed started at 14:28:48 and ended at 14:30:43, while >>>> the second one started at 14:45:53 and ended at 14:49:16. >>>> That gives you minimal transaction duration of 15 minutes and >>>> 10 seconds, with maximal transaction duration of 21 minutes. >>>> >>>> In HAPPY SCENARIO (without any failure and restarting), you >>>> should assume that your timeout interval should cover with >>>> some safety margin the period between start of a checkpoint >>>> and end of the NEXT checkpoint, since this is the upper bound >>>> how long the transaction might be used. In your case at least >>>> ~25 minutes. >>>> >>>> On top of that, as described in the docs, >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/con nectors/kafka.html#kafka-producers-and-fault-tolerance >>>> , in case of failure, your timeout must also be able to cover >>>> the additional downtime required for the successful job >>>> restart. Thus you should increase your timeout accordingly. >>>> >>>> Piotrek >>>> >>>> >>>>> On 20 Mar 2018, at 11:58, Dongwon Kim >>>>> <eastcirc...@gmail.com <mailto:eastcirc...@gmail.com>> >>>>> wrote: >>>>> >>>>> Hi Piotr, >>>>> >>>>> We have set producer's [transaction.timeout.ms >>>>> <http://transaction.timeout.ms/>] to 15 minutes and have >>>>> used the default setting for broker (15 mins). As Flink's >>>>> checkpoint interval is 15 minutes, it is not a situation >>>>> where Kafka's timeout is smaller than Flink's checkpoint >>>>> interval. As our first checkpoint just takes 2 minutes, it >>>>> seems like transaction is not committed properly. >>>>> >>>>> Best, >>>>> >>>>> - Dongwon >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski >>>>> <pi...@data-artisans.com <mailto:pi...@data-artisans.com>> >>>>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> What’s your Kafka’s transaction timeout setting? Please >>>>> both check Kafka producer configuration >>>>> (transaction.timeout.ms <http://transaction.timeout.ms/> >>>>> property) and Kafka broker configuration. The most likely >>>>> cause of such error message is when Kafka's timeout is >>>>> smaller then Flink’s checkpoint interval and transactions >>>>> are not committed quickly enough before timeout occurring. >>>>> >>>>> Piotrek >>>>> >>>>>> On 17 Mar 2018, at 07:24, Dongwon Kim >>>>>> <eastcirc...@gmail.com <mailto:eastcirc...@gmail.com>> >>>>>> wrote: >>>>>> >>>>>> >>>>>> Hi, >>>>>> >>>>>> I'm faced with the following ProducerFencedException >>>>>> after 1st, 3rd, 5th, 7th, ... checkpoints: -- >>>>>> java.lang.RuntimeException: Error while confirming >>>>>> checkpoint at >>>>>> org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260) >>>>>> >>>>>> at >>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.jav a:511) >>>>>> >>>>>> at >>>>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecu tor.java:1149) >>>>>> >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExec utor.java:624) >>>>>> >>>>>> at java.lang.Thread.run(Thread.java:748) Caused >>>>>> by: >>>>>> org.apache.kafka.common.errors.ProducerFencedException: >>>>>> Producer attempted an operation with an old epoch. Either >>>>>> there is a newer producer with the same transactionalId, >>>>>> or the producer's transaction has been expired by the >>>>>> broker. -- >>>>>> >>>>>> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly >>>>>> once processing using Kafka sink. We use FsStateBackend >>>>>> to store snapshot data on HDFS. >>>>>> >>>>>> As shown in configuration.png, my checkpoint >>>>>> configuration is: - Checkpointing Mode : Exactly Once - >>>>>> Interval : 15m 0s - Timeout : 10m 0s - Minimum Pause >>>>>> Between Checkpoints : 5m 0s - Maximum Concurrent >>>>>> Checkpoints : 1 - Persist Checkpoints Externally : >>>>>> Disabled >>>>>> >>>>>> After the first checkpoint completed [see history after >>>>>> 1st ckpt.png], the job is restarted due to the >>>>>> ProducerFencedException [see exception after 1st >>>>>> ckpt.png]. The first checkpoint takes less than 2 >>>>>> minutes while my checkpoint interval is 15m and minimum >>>>>> pause between checkpoints is 5m. After the job is >>>>>> restarted, the second checkpoint is triggered after a >>>>>> while [see history after 2nd ckpt.png] and this time I've >>>>>> got no exception. The third checkpoint results in the >>>>>> same exception as after the first checkpoint. >>>>>> >>>>>> Can anybody let me know what's going wrong behind the >>>>>> scene? >>>>>> >>>>>> Best, >>>>>> >>>>>> Dongwon <history after 3rd ckpt.png><exception after 3rd >>>>>> ckpt.png><history after 2nd >>>>>> ckpt.png><configuration.png><summary.png><exception >>>>>> after 1st ckpt.png><history after 1st ckpt.png> >>>>> >>>>> >>>> >>>> >>>> <screenshot_10min_ckpt.png> >>> >> > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIzBAEBCgAdFiEE8rTF/+V8YmXHQb9YY43Spj86DwAFAl1Rj6QACgkQY43Spj86 DwDaXhAAsghkoeh4DP1XD/Oty+1TkgD29+I96izx+JtKYhh1PSwbm+5LUBqFeGvk N8mMJjyNhHNbOFbqkvjIwKpV3IAXRUCugDHtGWzbdVjpDFG/O5e11LydYtNg+VGA VueQm/dV/W6EyrPjpmpXFCKrGDoVFcJ7JV9T4nAmkdML5BjNJUDAdK+ipsePEnZI q2htM5NDE29Pl242uqnelHCgje0kXzpaPn20V8xnKTeEU/OmHcZVbDl/u1caOWE2 PnbeMuU06QfwdZ/2u7yVl2PH4l97YFB89h8W6HuDkbgvSAVtnG1OA6cMUNjyYOKl gfRW2NE9YRnaaP1c+UIcnT14DlWbGzwl7DXy4213jxo9Dhjm08IjKNTQAqutWoeZ kEto+k7h1bmP6W5prk4r6YNOEBzZoLWhSPhFl6un8NZetCys6E66ShXVSMFKzkDO 06mj9g0BDZI0uEggfFZ054I5mpMLGGWCIDjFm6i3LdpsDMpdY9izpmipEvKXfPzL mBfPVddzz9DrvQmGPEs8KDa8nI+WvvJj2lIhtOY7uOX5fzlMxNZDW0ST0FQC/R0x y4mssLVwa0OdPYFnXvyqsDd2KxAsG77K6D4N5rSMApHPHoSqdIgNmyuikGYGzmzu w7ZdFKxrHiIHXuwthIEahfipXJ2nynAKi06k08z8HeQoxGkBt84= =FqSe -----END PGP SIGNATURE-----