Re: Flink 1.18: Unable to resume from a savepoint with error InvalidPidMappingException

2024-04-23 Thread Yanfei Lei
Hi JM,

> why having "transactional.id.expiration.ms" < "transaction.timeout.ms" helps

When recover a job from a checkpoint/savepoint which contains Kafka
transactions, Flink will try to re-commit those transactions based on
transaction ID upon recovery.
If those transactions timeout or transaction ID expire, re-commit may
fail due to the mismatch of transactional id.
IIUC, if we set  "transactional.id.expiration.ms" <
"transaction.timeout.ms", it allows transactional id to be reset, but
will cause data loss.

[1][2] would be helpful to understand what happened.

[1] 
https://issues.apache.org/jira/browse/FLINK-16419?focusedCommentId=17624315=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17624315
[2] 
https://ververica.zendesk.com/hc/en-us/articles/360013269680-Best-Practices-for-Using-Kafka-Sources-Sinks-in-Flink-Jobs

Jean-Marc Paulin  于2024年4月23日周二 16:45写道:
>
> Thanks for y our insight.
>
> I am still trying to understand exactly what happens here. We currently have 
> the default setting in kafka, and we set the "transaction.timeout.ms" to 15 
> minutes (which also happen to be the default "transaction.max.timeout.ms".  
> My expectation would be that if our savepoint is more than 15 minutes old it 
> would fail, but that is not the case.
>
> I still think we need to extend the "transaction.max.timeout.ms" to something 
> like 7 days, as a 7 days old savepoints is effectively worthless, and 
> probably adjust  "transaction.timeout.ms" to be close to this.
>
> But can you explain how "transactional.id.expiration.ms" influences the 
> InvalidPidMappingException, or why having "transactional.id.expiration.ms" < 
> "transaction.timeout.ms" helps?
>
> Kind regards
>
> Jean-Marc
>
>
> 
> From: Yanfei Lei 
> Sent: Monday, April 22, 2024 03:28
> To: Jean-Marc Paulin 
> Cc: user@flink.apache.org 
> Subject: [EXTERNAL] Re: Flink 1.18: Unable to resume from a savepoint with 
> error InvalidPidMappingException
>
> Hi JM,
>
> Yes, `InvalidPidMappingException` occurs because the transaction is
> lost in most cases.
>
> For short-term, " transaction.timeout.ms" >
> "transactional.id.expiration.ms" can ignore the
> `InvalidPidMappingException`[1].
> For long-term, FLIP-319[2] provides a solution.
>
> [1] 
> https://speakerdeck.com/rmetzger/3-flink-mistakes-we-made-so-you-wont-have-to?slide=13
> [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
>
> Jean-Marc Paulin  于2024年4月20日周六 02:30写道:
> >
> > Hi,
> >
> > we use Flink 1.18 with Kafka Sink, and we enabled `EXACTLY_ONCE` on one of 
> > our kafka sink. We set the transation timeout to 15 minutes. When we try to 
> > restore from a savepoint, way after that 15 minutes window, Flink enter in 
> > a RESTARTING loop. We see the error:
> >
> > ```
> > {
> >   "exception": {
> > "exception_class": 
> > "org.apache.kafka.common.errors.InvalidPidMappingException",
> > "exception_message": "The producer attempted to use a producer id which 
> > is not currently assigned to its transactional id.",
> > "stacktrace": 
> > "org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> > attempted to use a producer id which is not currently assigned to its 
> > transactional id.\n"
> >   },
> >   "@version": 1,
> >   "source_host": "aiops-ir-lifecycle-eventprocessor-ep-jobmanager-0",
> >   "message": "policy-exec::schedule-policy-execution -> 
> > (policy-exec::select-kafka-async-policy-stages, 
> > policy-exec::select-async-policy-stages -> 
> > policy-exec::execute-async-policy-stages, 
> > policy-exec::select-non-async-policy-stages, Sink: stories-input, Sink: 
> > policy-completion-results, Sink: stories-changes, Sink: alerts-input, Sink: 
> > story-notifications-output, Sink: alerts-output, Sink: alerts-changes, 
> > Sink: connector-alerts, Sink: updated-events-output, Sink: stories-output, 
> > Sink: runbook-execution-requests) (6/6) 
> > (3f8cb042c1aa628891c66a8b52d1_593c33b9decafa4ad6ae85c185860bef_5_0) 
> > switched from INITIALIZING to FAILED on 
> > aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc:6122-d2828c
> >  @ 
> > aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc.cluster.local
> >  (dataPort=6121).",
> >   "thread_name": "flink-pekko.actor.default-dispatcher-18",
> >   "@timestamp": "2024-04-19T11:11:05.169+",
> >   "level": "INFO",
> >   "logger_name": "org.apache.flink.runtime.executiongraph.ExecutionGraph"
> > }
> > ```
> > As much as I understanding the transaction is lost, would it be possible to 
> > ignore this particular error and resume the job anyway?
> >
> > Thanks for any suggestions
> >
> > JM
> >
> >
> > Unless otherwise stated above:
> >
> > IBM United Kingdom Limited
> > Registered in England and Wales with number 741598
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>
>
>

RE: Flink 1.18: Unable to resume from a savepoint with error InvalidPidMappingException

2024-04-23 Thread Jean-Marc Paulin
Thanks for y our insight.

I am still trying to understand exactly what happens here. We currently have 
the default setting in kafka, and we set the "transaction.timeout.ms" to 15 
minutes (which also happen to be the default "transaction.max.timeout.ms".  My 
expectation would be that if our savepoint is more than 15 minutes old it would 
fail, but that is not the case.

I still think we need to extend the "transaction.max.timeout.ms" to something 
like 7 days, as a 7 days old savepoints is effectively worthless, and probably 
adjust  "transaction.timeout.ms" to be close to this.

But can you explain how "transactional.id.expiration.ms" influences the 
InvalidPidMappingException, or why having "transactional.id.expiration.ms" < 
"transaction.timeout.ms" helps?

Kind regards

Jean-Marc



From: Yanfei Lei 
Sent: Monday, April 22, 2024 03:28
To: Jean-Marc Paulin 
Cc: user@flink.apache.org 
Subject: [EXTERNAL] Re: Flink 1.18: Unable to resume from a savepoint with 
error InvalidPidMappingException

Hi JM,

Yes, `InvalidPidMappingException` occurs because the transaction is
lost in most cases.

For short-term, " transaction.timeout.ms" >
"transactional.id.expiration.ms" can ignore the
`InvalidPidMappingException`[1].
For long-term, FLIP-319[2] provides a solution.

[1] 
https://speakerdeck.com/rmetzger/3-flink-mistakes-we-made-so-you-wont-have-to?slide=13
[2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710

Jean-Marc Paulin  于2024年4月20日周六 02:30写道:
>
> Hi,
>
> we use Flink 1.18 with Kafka Sink, and we enabled `EXACTLY_ONCE` on one of 
> our kafka sink. We set the transation timeout to 15 minutes. When we try to 
> restore from a savepoint, way after that 15 minutes window, Flink enter in a 
> RESTARTING loop. We see the error:
>
> ```
> {
>   "exception": {
> "exception_class": 
> "org.apache.kafka.common.errors.InvalidPidMappingException",
> "exception_message": "The producer attempted to use a producer id which 
> is not currently assigned to its transactional id.",
> "stacktrace": "org.apache.kafka.common.errors.InvalidPidMappingException: 
> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id.\n"
>   },
>   "@version": 1,
>   "source_host": "aiops-ir-lifecycle-eventprocessor-ep-jobmanager-0",
>   "message": "policy-exec::schedule-policy-execution -> 
> (policy-exec::select-kafka-async-policy-stages, 
> policy-exec::select-async-policy-stages -> 
> policy-exec::execute-async-policy-stages, 
> policy-exec::select-non-async-policy-stages, Sink: stories-input, Sink: 
> policy-completion-results, Sink: stories-changes, Sink: alerts-input, Sink: 
> story-notifications-output, Sink: alerts-output, Sink: alerts-changes, Sink: 
> connector-alerts, Sink: updated-events-output, Sink: stories-output, Sink: 
> runbook-execution-requests) (6/6) 
> (3f8cb042c1aa628891c66a8b52d1_593c33b9decafa4ad6ae85c185860bef_5_0) 
> switched from INITIALIZING to FAILED on 
> aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc:6122-d2828c
>  @ 
> aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc.cluster.local
>  (dataPort=6121).",
>   "thread_name": "flink-pekko.actor.default-dispatcher-18",
>   "@timestamp": "2024-04-19T11:11:05.169+",
>   "level": "INFO",
>   "logger_name": "org.apache.flink.runtime.executiongraph.ExecutionGraph"
> }
> ```
> As much as I understanding the transaction is lost, would it be possible to 
> ignore this particular error and resume the job anyway?
>
> Thanks for any suggestions
>
> JM
>
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU



--
Best,
Yanfei

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


Re: Flink 1.18: Unable to resume from a savepoint with error InvalidPidMappingException

2024-04-21 Thread Yanfei Lei
Hi JM,

Yes, `InvalidPidMappingException` occurs because the transaction is
lost in most cases.

For short-term, " transaction.timeout.ms" >
"transactional.id.expiration.ms" can ignore the
`InvalidPidMappingException`[1].
For long-term, FLIP-319[2] provides a solution.

[1] 
https://speakerdeck.com/rmetzger/3-flink-mistakes-we-made-so-you-wont-have-to?slide=13
[2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710

Jean-Marc Paulin  于2024年4月20日周六 02:30写道:
>
> Hi,
>
> we use Flink 1.18 with Kafka Sink, and we enabled `EXACTLY_ONCE` on one of 
> our kafka sink. We set the transation timeout to 15 minutes. When we try to 
> restore from a savepoint, way after that 15 minutes window, Flink enter in a 
> RESTARTING loop. We see the error:
>
> ```
> {
>   "exception": {
> "exception_class": 
> "org.apache.kafka.common.errors.InvalidPidMappingException",
> "exception_message": "The producer attempted to use a producer id which 
> is not currently assigned to its transactional id.",
> "stacktrace": "org.apache.kafka.common.errors.InvalidPidMappingException: 
> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id.\n"
>   },
>   "@version": 1,
>   "source_host": "aiops-ir-lifecycle-eventprocessor-ep-jobmanager-0",
>   "message": "policy-exec::schedule-policy-execution -> 
> (policy-exec::select-kafka-async-policy-stages, 
> policy-exec::select-async-policy-stages -> 
> policy-exec::execute-async-policy-stages, 
> policy-exec::select-non-async-policy-stages, Sink: stories-input, Sink: 
> policy-completion-results, Sink: stories-changes, Sink: alerts-input, Sink: 
> story-notifications-output, Sink: alerts-output, Sink: alerts-changes, Sink: 
> connector-alerts, Sink: updated-events-output, Sink: stories-output, Sink: 
> runbook-execution-requests) (6/6) 
> (3f8cb042c1aa628891c66a8b52d1_593c33b9decafa4ad6ae85c185860bef_5_0) 
> switched from INITIALIZING to FAILED on 
> aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc:6122-d2828c
>  @ 
> aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc.cluster.local
>  (dataPort=6121).",
>   "thread_name": "flink-pekko.actor.default-dispatcher-18",
>   "@timestamp": "2024-04-19T11:11:05.169+",
>   "level": "INFO",
>   "logger_name": "org.apache.flink.runtime.executiongraph.ExecutionGraph"
> }
> ```
> As much as I understanding the transaction is lost, would it be possible to 
> ignore this particular error and resume the job anyway?
>
> Thanks for any suggestions
>
> JM
>
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU



-- 
Best,
Yanfei


Flink 1.18: Unable to resume from a savepoint with error InvalidPidMappingException

2024-04-19 Thread Jean-Marc Paulin
Hi,

we use Flink 1.18 with Kafka Sink, and we enabled `EXACTLY_ONCE` on one of our 
kafka sink. We set the transation timeout to 15 minutes. When we try to restore 
from a savepoint, way after that 15 minutes window, Flink enter in a RESTARTING 
loop. We see the error:

```
{
  "exception": {
"exception_class": 
"org.apache.kafka.common.errors.InvalidPidMappingException",
"exception_message": "The producer attempted to use a producer id which is 
not currently assigned to its transactional id.",
"stacktrace": "org.apache.kafka.common.errors.InvalidPidMappingException: 
The producer attempted to use a producer id which is not currently assigned to 
its transactional id.\n"
  },
  "@version": 1,
  "source_host": "aiops-ir-lifecycle-eventprocessor-ep-jobmanager-0",
  "message": "policy-exec::schedule-policy-execution -> 
(policy-exec::select-kafka-async-policy-stages, 
policy-exec::select-async-policy-stages -> 
policy-exec::execute-async-policy-stages, 
policy-exec::select-non-async-policy-stages, Sink: stories-input, Sink: 
policy-completion-results, Sink: stories-changes, Sink: alerts-input, Sink: 
story-notifications-output, Sink: alerts-output, Sink: alerts-changes, Sink: 
connector-alerts, Sink: updated-events-output, Sink: stories-output, Sink: 
runbook-execution-requests) (6/6) 
(3f8cb042c1aa628891c66a8b52d1_593c33b9decafa4ad6ae85c185860bef_5_0) 
switched from INITIALIZING to FAILED on 
aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc:6122-d2828c
 @ 
aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc.cluster.local
 (dataPort=6121).",
  "thread_name": "flink-pekko.actor.default-dispatcher-18",
  "@timestamp": "2024-04-19T11:11:05.169+",
  "level": "INFO",
  "logger_name": "org.apache.flink.runtime.executiongraph.ExecutionGraph"
}
```
As much as I understanding the transaction is lost, would it be possible to 
ignore this particular error and resume the job anyway?

Thanks for any suggestions

JM


Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU