Hi Tony,

>From the symptom it is not quite clear to me what may cause this issue.
Supposedly the TransactionCoordinator is independent of the active
controller, so bouncing the active controller should not have special
impact on the transactions (at least not every time). If this is stably
reproducible, is it possible to turn on debug level logging
on kafka.coordinator.transaction.TransactionCoordinator to see what does
the broker say?

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 29, 2019 at 3:55 PM Tony Wei <tony19920...@gmail.com> wrote:

> Hi,
>
> Has anyone run into the same problem? I have updated my producer
> transaction timeout to 1.5 hours,
> but the problem sill happened when I restarted broker with active
> controller. It might not due to the
> problem that checkpoint duration is too long causing transaction timeout.
> I had no more clue to find out
> what's wrong about my kafka producer. Could someone help me please?
>
> Best,
> Tony Wei
>
> Fabian Hueske <fhue...@gmail.com> 於 2019年8月16日 週五 下午4:10寫道:
>
>> Hi Tony,
>>
>> I'm sorry I cannot help you with this issue, but Becket (in CC) might
>> have an idea what went wrong here.
>>
>> Best, Fabian
>>
>> Am Mi., 14. Aug. 2019 um 07:00 Uhr schrieb Tony Wei <
>> tony19920...@gmail.com>:
>>
>>> Hi,
>>>
>>> Currently, I was trying to update our kafka cluster with larger `
>>> transaction.max.timeout.ms`. The
>>> original setting is kafka's default value (i.e. 15 minutes) and I tried
>>> to set as 3 hours.
>>>
>>> When I was doing rolling-restart for my brokers, this exception came to
>>> me on the next checkpoint
>>> after I restarted the broker with active controller.
>>>
>>> java.lang.RuntimeException: Error while confirming checkpoint at
>>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
>>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> at java.lang.Thread.run(Thread.java:748) Caused by:
>>>> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
>>>> failed, logging first encountered failure at
>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
>>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5
>>>> more Caused by: org.apache.kafka.common.errors.InvalidTxnStateException:
>>>> The producer attempted a transactional operation in an invalid state
>>>
>>>
>>> I have no idea why it happened, and I didn't find any error log from
>>> brokers. Does anyone have
>>> this exception before? How can I prevent from this exception when I
>>> tried to restart kafka cluster?
>>> Does this exception mean that I will lost data in some of these
>>> transactions?
>>>
>>> flink cluster version: 1.8.1
>>> kafka cluster version: 1.0.1
>>> flink kafka producer version: universal
>>> producer transaction timeout: 15 minutes
>>> checkpoint interval: 5 minutes
>>> number of concurrent checkpoint: 1
>>> max checkpoint duration before and after the exception occurred:  < 2
>>> minutes
>>>
>>> Best,
>>> Tony Wei
>>>
>>

Reply via email to