Hi,

Has anyone run into the same problem? I have updated my producer
transaction timeout to 1.5 hours,
but the problem sill happened when I restarted broker with active
controller. It might not due to the
problem that checkpoint duration is too long causing transaction timeout. I
had no more clue to find out
what's wrong about my kafka producer. Could someone help me please?

Best,
Tony Wei

Fabian Hueske <fhue...@gmail.com> 於 2019年8月16日 週五 下午4:10寫道:

> Hi Tony,
>
> I'm sorry I cannot help you with this issue, but Becket (in CC) might have
> an idea what went wrong here.
>
> Best, Fabian
>
> Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <
> tony19920...@gmail.com>:
>
>> Hi,
>>
>> Currently, I was trying to update our kafka cluster with larger `
>> transaction.max.timeout.ms`. The
>> original setting is kafka's default value (i.e. 15 minutes) and I tried
>> to set as 3 hours.
>>
>> When I was doing rolling-restart for my brokers, this exception came to
>> me on the next checkpoint
>> after I restarted the broker with active controller.
>>
>> java.lang.RuntimeException: Error while confirming checkpoint at
>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748) Caused by:
>>> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
>>> failed, logging first encountered failure at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5
>>> more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException:
>>> The producer attempted a transactional operation in an invalid state
>>
>>
>> I have no idea why it happened, and I didn't find any error log from
>> brokers. Does anyone have
>> this exception before? How can I prevent from this exception when I tried
>> to restart kafka cluster?
>> Does this exception mean that I will lost data in some of these
>> transactions?
>>
>> flink cluster version: 1.8.1
>> kafka cluster version: 1.0.1
>> flink kafka producer version: universal
>> producer transaction timeout: 15 minutes
>> checkpoint interval: 5 minutes
>> number of concurrent checkpoint: 1
>> max checkpoint duration before and after the exception occurred:  < 2
>> minutes
>>
>> Best,
>> Tony Wei
>>
>

Reply via email to