Hi, Has anyone run into the same problem? I have updated my producer transaction timeout to 1.5 hours, but the problem sill happened when I restarted broker with active controller. It might not due to the problem that checkpoint duration is too long causing transaction timeout. I had no more clue to find out what's wrong about my kafka producer. Could someone help me please?
Best, Tony Wei Fabian Hueske <fhue...@gmail.com> 於 2019年8月16日 週五 下午4:10寫道: > Hi Tony, > > I'm sorry I cannot help you with this issue, but Becket (in CC) might have > an idea what went wrong here. > > Best, Fabian > > Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei < > tony19920...@gmail.com>: > >> Hi, >> >> Currently, I was trying to update our kafka cluster with larger ` >> transaction.max.timeout.ms`. The >> original setting is kafka's default value (i.e. 15 minutes) and I tried >> to set as 3 hours. >> >> When I was doing rolling-restart for my brokers, this exception came to >> me on the next checkpoint >> after I restarted the broker with active controller. >> >> java.lang.RuntimeException: Error while confirming checkpoint at >>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at >>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) Caused by: >>> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions >>> failed, logging first encountered failure at >>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) >>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 >>> more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: >>> The producer attempted a transactional operation in an invalid state >> >> >> I have no idea why it happened, and I didn't find any error log from >> brokers. Does anyone have >> this exception before? How can I prevent from this exception when I tried >> to restart kafka cluster? >> Does this exception mean that I will lost data in some of these >> transactions? >> >> flink cluster version: 1.8.1 >> kafka cluster version: 1.0.1 >> flink kafka producer version: universal >> producer transaction timeout: 15 minutes >> checkpoint interval: 5 minutes >> number of concurrent checkpoint: 1 >> max checkpoint duration before and after the exception occurred: < 2 >> minutes >> >> Best, >> Tony Wei >> >