Hi JM,

> why having "transactional.id.expiration.ms" < "transaction.timeout.ms" helps

When recover a job from a checkpoint/savepoint which contains Kafka
transactions, Flink will try to re-commit those transactions based on
transaction ID upon recovery.
If those transactions timeout or transaction ID expire, re-commit may
fail due to the mismatch of transactional id.
IIUC, if we set  "transactional.id.expiration.ms" <
"transaction.timeout.ms", it allows transactional id to be reset, but
will cause data loss.

[1][2] would be helpful to understand what happened.

[1] 
https://issues.apache.org/jira/browse/FLINK-16419?focusedCommentId=17624315&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17624315
[2] 
https://ververica.zendesk.com/hc/en-us/articles/360013269680-Best-Practices-for-Using-Kafka-Sources-Sinks-in-Flink-Jobs

Jean-Marc Paulin <j...@uk.ibm.com> 于2024年4月23日周二 16:45写道:
>
> Thanks for y our insight.
>
> I am still trying to understand exactly what happens here. We currently have 
> the default setting in kafka, and we set the "transaction.timeout.ms" to 15 
> minutes (which also happen to be the default "transaction.max.timeout.ms".  
> My expectation would be that if our savepoint is more than 15 minutes old it 
> would fail, but that is not the case.
>
> I still think we need to extend the "transaction.max.timeout.ms" to something 
> like 7 days, as a 7 days old savepoints is effectively worthless, and 
> probably adjust  "transaction.timeout.ms" to be close to this.
>
> But can you explain how "transactional.id.expiration.ms" influences the 
> InvalidPidMappingException, or why having "transactional.id.expiration.ms" < 
> "transaction.timeout.ms" helps?
>
> Kind regards
>
> Jean-Marc
>
>
> ________________________________
> From: Yanfei Lei <fredia...@gmail.com>
> Sent: Monday, April 22, 2024 03:28
> To: Jean-Marc Paulin <j...@uk.ibm.com>
> Cc: user@flink.apache.org <user@flink.apache.org>
> Subject: [EXTERNAL] Re: Flink 1.18: Unable to resume from a savepoint with 
> error InvalidPidMappingException
>
> Hi JM,
>
> Yes, `InvalidPidMappingException` occurs because the transaction is
> lost in most cases.
>
> For short-term, " transaction.timeout.ms" >
> "transactional.id.expiration.ms" can ignore the
> `InvalidPidMappingException`[1].
> For long-term, FLIP-319[2] provides a solution.
>
> [1] 
> https://speakerdeck.com/rmetzger/3-flink-mistakes-we-made-so-you-wont-have-to?slide=13
> [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
>
> Jean-Marc Paulin <j...@uk.ibm.com> 于2024年4月20日周六 02:30写道:
> >
> > Hi,
> >
> > we use Flink 1.18 with Kafka Sink, and we enabled `EXACTLY_ONCE` on one of 
> > our kafka sink. We set the transation timeout to 15 minutes. When we try to 
> > restore from a savepoint, way after that 15 minutes window, Flink enter in 
> > a RESTARTING loop. We see the error:
> >
> > ```
> > {
> >   "exception": {
> >     "exception_class": 
> > "org.apache.kafka.common.errors.InvalidPidMappingException",
> >     "exception_message": "The producer attempted to use a producer id which 
> > is not currently assigned to its transactional id.",
> >     "stacktrace": 
> > "org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> > attempted to use a producer id which is not currently assigned to its 
> > transactional id.\n"
> >   },
> >   "@version": 1,
> >   "source_host": "aiops-ir-lifecycle-eventprocessor-ep-jobmanager-0",
> >   "message": "policy-exec::schedule-policy-execution -> 
> > (policy-exec::select-kafka-async-policy-stages, 
> > policy-exec::select-async-policy-stages -> 
> > policy-exec::execute-async-policy-stages, 
> > policy-exec::select-non-async-policy-stages, Sink: stories-input, Sink: 
> > policy-completion-results, Sink: stories-changes, Sink: alerts-input, Sink: 
> > story-notifications-output, Sink: alerts-output, Sink: alerts-changes, 
> > Sink: connector-alerts, Sink: updated-events-output, Sink: stories-output, 
> > Sink: runbook-execution-requests) (6/6) 
> > (3f8cb042c1aa628891c444466a8b52d1_593c33b9decafa4ad6ae85c185860bef_5_0) 
> > switched from INITIALIZING to FAILED on 
> > aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc:6122-d2828c
> >  @ 
> > aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc.cluster.local
> >  (dataPort=6121).",
> >   "thread_name": "flink-pekko.actor.default-dispatcher-18",
> >   "@timestamp": "2024-04-19T11:11:05.169+0000",
> >   "level": "INFO",
> >   "logger_name": "org.apache.flink.runtime.executiongraph.ExecutionGraph"
> > }
> > ```
> > As much as I understanding the transaction is lost, would it be possible to 
> > ignore this particular error and resume the job anyway?
> >
> > Thanks for any suggestions
> >
> > JM
> >
> >
> > Unless otherwise stated above:
> >
> > IBM United Kingdom Limited
> > Registered in England and Wales with number 741598
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>
>
>
> --
> Best,
> Yanfei
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU



-- 
Best,
Yanfei

Reply via email to