Re: recover from svaepoint

2021-06-07 Thread Piotr Nowojski
Hi, Thanks Tianxin and 周瑞' for reporting and tracking down the problem. Indeed that could be the reason behind it. Have either of you already created a JIRA ticket for this bug? > Concerning the required changing of the UID of an operator Piotr, is this a known issue and documented somewhere? I f

Re: recover from svaepoint

2021-06-03 Thread Till Rohrmann
Thanks for this insight. So the problem might be Flink using an internal Kafka API (the connector uses reflection to get hold of the TransactionManager) which changed between version 2.4.1 and 2.5. I think this is a serious problem because it breaks our end-to-end exactly once story when using new

Re: recover from svaepoint

2021-06-03 Thread Tianxin Zhao
I encountered the exact same issue before when experimenting in a testing environment. I was not able to spot the bug as mentioned in this thread, the solution I did was to downgrade my own kafka-client version from 2.5 to 2.4.1, matching the version of flink-connector-kafka. In 2.4.1 Kafka, Transa

Re: recover from svaepoint

2021-06-03 Thread Till Rohrmann
Thanks for the update. Skimming over the code it looks indeed that we are overwriting the values of the static value ProducerIdAndEpoch.NONE. I am not 100% how this will cause the observed problem, though. I am also not a Flink Kafka connector and Kafka expert so I would appreciate it if someone mo

Re: recover from svaepoint

2021-06-02 Thread Till Rohrmann
Forwarding 周瑞's message to a duplicate thread: After our analysis, we found a bug in the org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction method The analysis process is as follows: org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkF

Re: recover from svaepoint

2021-06-02 Thread Piotr Nowojski
Hi, I think there is no generic way. If this error has happened indeed after starting a second job from the same savepoint, or something like that, user can change Sink's operator UID. If this is an issue of intentional recovery from an earlier checkpoint/savepoint, maybe `FlinkKafkaProducer#setL

Re: recover from svaepoint

2021-06-01 Thread Till Rohrmann
The error message says that we are trying to reuse a transaction id that is currently being used or has expired. I am not 100% sure how this can happen. My suspicion is that you have resumed a job multiple times from the same savepoint. Have you checked that there is no other job which has been re