Re: flink自动重启出错

2022-08-23 文章 Hangxiang Yu
是DS作业吗?可以share下使用state的部分吗?

On Sat, Aug 20, 2022 at 3:35 PM Jason_H  wrote:

> 您好,改过任务,但是 是以新的任务启动的,改动很大,并不依赖之前老的任务
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
>  回复的原邮件 
> | 发件人 | Michael Ran |
> | 发送日期 | 2022年8月20日 15:31 |
> | 收件人 | tsreape...@gmail.com |
> | 主题 | 回复:flink自动重启出错 |
> 改过任务吗?
>
>
>
> | |
> greemqq...@163.com
> |
> |
> 邮箱:greemqq...@163.com
> |
>
>
>
>
>  回复的原邮件 
> | 发件人 | Jason_H |
> | 日期 | 2022年08月19日 11:52 |
> | 收件人 | flink中文邮件组 |
> | 抄送至 | |
> | 主题 | flink自动重启出错 |
> cause by: java.lang.RuntimeException: Error while getting state
> org.apache.flink.util.StateMigrationException: For heap backends, the new
> state serializer must not be incompatible with the old state serializer
>
> 大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法?
> 强调:作业是新的,没有基于之前的作业的ck进行重启。
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |



-- 
Best,
Hangxiang.


pyflink内存管理

2022-08-23 文章 yidan zhao
如题,pyflink场景的任务,内存是如何管理呢。

python部分的内存是否算入flink TaskManager配置的内存中呢?
比如python算子通过多进程做各种复杂的运算,这部分内存占用是否算入flink呢?



——
如果不算的话,使用pyflink时,容器内存和flink TaskManager内存配置是不是需要预留空间?


Re: HA模式,standalone集群,仅单个 JM 情况下任务异常。

2022-08-23 文章 yidan zhao
masters:
A:8682
workers:
A
B
C

都是内网hostname(相互都可解析),非127.0.0.1。

flink版本:1.15.1版本。

Weihua Hu  于2022年8月24日周三 10:26写道:
>
> PartitionNotFoundException 应该是跟描述的有一台 TM ip 是 127.0.0.1 有关,其他 TM 节点链接不到这个节点。
>
> 用的什么版本呢?
>
> 配置文件是这样的吗?
> master 文件中有一个 内网 IP: A
> workers 文件中有多个内网 IP: A,B,C
>
> Best,
> Weihua
>
>
> On Tue, Aug 23, 2022 at 7:37 PM yidan zhao  wrote:
>
> >
> > 如题,目前发现任务报错是:org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> > Partition
> > c74a0a104d81bf2d38f76f104d65a2ab#27@7e1a8495f062f8ceb964a3205e584613
> > not found
> >
> > ——
> > 任务本身问题不大,也不是网络问题。 目前发现解决方法:
> >
> > 换成非单 JM 即可。
> >
> > 同时也发现一个可能原因,或另一个明显现象:
> >
> > 从web ui的Taskmanager界面可以发现,执行 start-cluster 脚本的机器A(同时也是 JM ,即配置到
> > masters 文件的唯一机器),该机器对应的tm的resource id中ip是127.0.0.1。其他机器都是显示的内网ip。
> >
> > 
> > masters文件换2个以上机器后,没问题了,包括后一个现象,ip也都是正常的。
> >


Re: HA模式,standalone集群,仅单个 JM 情况下任务异常。

2022-08-23 文章 Weihua Hu
PartitionNotFoundException 应该是跟描述的有一台 TM ip 是 127.0.0.1 有关,其他 TM 节点链接不到这个节点。

用的什么版本呢?

配置文件是这样的吗?
master 文件中有一个 内网 IP: A
workers 文件中有多个内网 IP: A,B,C

Best,
Weihua


On Tue, Aug 23, 2022 at 7:37 PM yidan zhao  wrote:

>
> 如题,目前发现任务报错是:org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition
> c74a0a104d81bf2d38f76f104d65a2ab#27@7e1a8495f062f8ceb964a3205e584613
> not found
>
> ——
> 任务本身问题不大,也不是网络问题。 目前发现解决方法:
>
> 换成非单 JM 即可。
>
> 同时也发现一个可能原因,或另一个明显现象:
>
> 从web ui的Taskmanager界面可以发现,执行 start-cluster 脚本的机器A(同时也是 JM ,即配置到
> masters 文件的唯一机器),该机器对应的tm的resource id中ip是127.0.0.1。其他机器都是显示的内网ip。
>
> 
> masters文件换2个以上机器后,没问题了,包括后一个现象,ip也都是正常的。
>


?????? Re: flink1.15.1 stop ????????

2022-08-23 文章 hjw
Kafka 
Connector??Api??IDEAJira
 https://issues.apache.org/jira/browse/FLINK-28758


--  --
??: 
   "user-zh"



Re: Re: flink1.15.1 stop 任务失败

2022-08-23 文章 yidan zhao
1 大概率是source部分问题,或者 savepoint 的 trigger 层面。
2 也可以从 cancel 和 stop 的区别上考虑下?
3 补充信息:我的kafka source是用的旧版本(没办法用新版本,原因是由于一些原因我必须用 kafka 低版本 client)。

yidan zhao  于2022年8月23日周二 23:06写道:
>
> 看了下,报错很少。
> 反正 flink cancel -s 是可以的,flink stop 就不行。而且目测是瞬间失败。从web
> ui来看,整个savepoint的完成是0/841,应该是几乎没开始就出错了。
> 目前4台机器:
> 机器1
> 2022-08-23 22:47:37,093 WARN
> org.apache.flink.runtime.taskmanager.Task[] -
> Source: JobConfig -> Split(JobName_configType)
>  (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from RUNNING to
> FAILED with failure cause:
> org.apache.flink.util.FlinkRuntimeException: S
> top-with-savepoint failed.
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction
> Executor.java:93)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33
> 8)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
> 下面就是各种 free task,unregister扒拉的。
>
> 机器2
> ...
> 基本看起来都是从 Attempt to cancel 开始,没有先报错,再cancel的貌似。
>
> Xuyang  于2022年8月23日周二 22:25写道:
> >
> > Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Best!
> > Xuyang
> >
> >
> >
> >
> >
> > Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> > 在 2022-08-23 20:41:59,"yidan zhao"  写道:
> > >补充部分信息:
> > >看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
> > >2022-08-23 20:33:22,307 INFO
> > >org.apache.flink.runtime.jobmaster.JobMaster [] -
> > >Triggering savepoint for job 8d231de75b8227a1b
> > >715b1aa665caa91.
> > >
> > >2022-08-23 20:33:22,318 INFO
> > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > >Triggering checkpoint 5 (type=SavepointType{na
> > >me='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @
> > >1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
> > >
> > >2022-08-23 20:33:23,701 INFO
> > >org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
> > >[] - Cannot create recoverable writer
> > > due to Recoverable writers on Hadoop are only supported for HDFS,
> > >will use the ordinary writer.
> > >
> > >2022-08-23 20:33:23,908 INFO
> > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > >Completed checkpoint 5 for job 8d231de75b8227a1b715b1aa665caa91
> > >(1638207 bytes, checkpointDuration=1600 ms, finalizationTime=1 ms).
> > >
> > >
> > >如果是 stop xxx 这样停止任务,则JM日志(错误)如下:
> > >
> > >2022-08-23 20:35:01,834 INFO
> > >org.apache.flink.runtime.jobmaster.JobMaster [] -
> > >Triggering stop-with-savepoint for job
> > >8d231de75b8227a1b715b1aa665caa91.
> > >
> > >2022-08-23 20:35:01,842 INFO
> > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > >Triggering checkpoint 6 (type=SavepointType{name='Suspend Savepoint',
> > >postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1661258101834
> > >for job 8d231de75b8227a1b715b1aa665caa91.
> > >
> > >2022-08-23 20:35:02,083 INFO
> > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > >Decline checkpoint 6 by task a65383dad01bc15f654c4afe4aa63b6d of job
> > >8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5 @
> > >xxx.xxx.com (dataPort=13156).
> > >(此处看起来是被decline了,原因是 task failed?)
> > >org.apache.flink.util.SerializedThrowable: Task name with subtask :
> > >Source: XXX_Kafka(startTs:latest) ->... ->... ->... (10/10)#2 Failure
> > >reason: Task has failed.
> > >at 
> > > org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >at 
> > > org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
> > >~[flink-dist-1.15.1.jar:1.15.1]
> > >at 
> > > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> > >~[?:

Re: Re: flink1.15.1 stop 任务失败

2022-08-23 文章 yidan zhao
看了下,报错很少。
反正 flink cancel -s 是可以的,flink stop 就不行。而且目测是瞬间失败。从web
ui来看,整个savepoint的完成是0/841,应该是几乎没开始就出错了。
目前4台机器:
机器1
2022-08-23 22:47:37,093 WARN
org.apache.flink.runtime.taskmanager.Task[] -
Source: JobConfig -> Split(JobName_configType)
 (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from RUNNING to
FAILED with failure cause:
org.apache.flink.util.FlinkRuntimeException: S
top-with-savepoint failed.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction
Executor.java:93)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33
8)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
下面就是各种 free task,unregister扒拉的。

机器2
...
基本看起来都是从 Attempt to cancel 开始,没有先报错,再cancel的貌似。

Xuyang  于2022年8月23日周二 22:25写道:
>
> Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
>
>
>
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> 在 2022-08-23 20:41:59,"yidan zhao"  写道:
> >补充部分信息:
> >看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
> >2022-08-23 20:33:22,307 INFO
> >org.apache.flink.runtime.jobmaster.JobMaster [] -
> >Triggering savepoint for job 8d231de75b8227a1b
> >715b1aa665caa91.
> >
> >2022-08-23 20:33:22,318 INFO
> >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> >Triggering checkpoint 5 (type=SavepointType{na
> >me='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @
> >1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
> >
> >2022-08-23 20:33:23,701 INFO
> >org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
> >[] - Cannot create recoverable writer
> > due to Recoverable writers on Hadoop are only supported for HDFS,
> >will use the ordinary writer.
> >
> >2022-08-23 20:33:23,908 INFO
> >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> >Completed checkpoint 5 for job 8d231de75b8227a1b715b1aa665caa91
> >(1638207 bytes, checkpointDuration=1600 ms, finalizationTime=1 ms).
> >
> >
> >如果是 stop xxx 这样停止任务,则JM日志(错误)如下:
> >
> >2022-08-23 20:35:01,834 INFO
> >org.apache.flink.runtime.jobmaster.JobMaster [] -
> >Triggering stop-with-savepoint for job
> >8d231de75b8227a1b715b1aa665caa91.
> >
> >2022-08-23 20:35:01,842 INFO
> >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> >Triggering checkpoint 6 (type=SavepointType{name='Suspend Savepoint',
> >postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1661258101834
> >for job 8d231de75b8227a1b715b1aa665caa91.
> >
> >2022-08-23 20:35:02,083 INFO
> >org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> >Decline checkpoint 6 by task a65383dad01bc15f654c4afe4aa63b6d of job
> >8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5 @
> >xxx.xxx.com (dataPort=13156).
> >(此处看起来是被decline了,原因是 task failed?)
> >org.apache.flink.util.SerializedThrowable: Task name with subtask :
> >Source: XXX_Kafka(startTs:latest) ->... ->... ->... (10/10)#2 Failure
> >reason: Task has failed.
> >at 
> > org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >at 
> > org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
> >~[flink-dist-1.15.1.jar:1.15.1]
> >at 
> > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> >~[?:1.8.0_251]
> >at 
> > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
> >~[?:1.8.0_251]
> >at 
> > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> >~[?:1.8.0_251]
> >at 
> > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> >~[?:1.8.0_251]
> >at 
> > org.apache.flink.streaming.

Re:升级1.14.2后作业无法启动

2022-08-23 文章 Xuyang
Hi,看起来是部分依赖还是用的旧版本,可以先确保下flink作业的代码、connector、部署的环境三者都升级到了相同的版本




--

Best!
Xuyang




Hi,看起来是部分依赖还是用的旧版本,可以先确保下flink作业的代码、connector、部署的环境三者都升级到了相同的版本



在 2022-08-23 10:43:55,"杨扬"  写道:

各位好!
最近将flink版本升级至1.14.2后作业无法启动,报错如图所示。
之前使用1.12.0版本一切正常,升级前后代码本身未做过任何修改。
使用flink on yarn部署方式,application模式启动作业。













Re:Re: flink1.15.1 stop 任务失败

2022-08-23 文章 Xuyang
Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的







--

Best!
Xuyang





Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
在 2022-08-23 20:41:59,"yidan zhao"  写道:
>补充部分信息:
>看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
>2022-08-23 20:33:22,307 INFO
>org.apache.flink.runtime.jobmaster.JobMaster [] -
>Triggering savepoint for job 8d231de75b8227a1b
>715b1aa665caa91.
>
>2022-08-23 20:33:22,318 INFO
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
>Triggering checkpoint 5 (type=SavepointType{na
>me='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @
>1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
>
>2022-08-23 20:33:23,701 INFO
>org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
>[] - Cannot create recoverable writer
> due to Recoverable writers on Hadoop are only supported for HDFS,
>will use the ordinary writer.
>
>2022-08-23 20:33:23,908 INFO
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
>Completed checkpoint 5 for job 8d231de75b8227a1b715b1aa665caa91
>(1638207 bytes, checkpointDuration=1600 ms, finalizationTime=1 ms).
>
>
>如果是 stop xxx 这样停止任务,则JM日志(错误)如下:
>
>2022-08-23 20:35:01,834 INFO
>org.apache.flink.runtime.jobmaster.JobMaster [] -
>Triggering stop-with-savepoint for job
>8d231de75b8227a1b715b1aa665caa91.
>
>2022-08-23 20:35:01,842 INFO
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
>Triggering checkpoint 6 (type=SavepointType{name='Suspend Savepoint',
>postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1661258101834
>for job 8d231de75b8227a1b715b1aa665caa91.
>
>2022-08-23 20:35:02,083 INFO
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
>Decline checkpoint 6 by task a65383dad01bc15f654c4afe4aa63b6d of job
>8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5 @
>xxx.xxx.com (dataPort=13156).
>(此处看起来是被decline了,原因是 task failed?)
>org.apache.flink.util.SerializedThrowable: Task name with subtask :
>Source: XXX_Kafka(startTs:latest) ->... ->... ->... (10/10)#2 Failure
>reason: Task has failed.
>at 
> org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
>~[flink-dist-1.15.1.jar:1.15.1]
>at 
> org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
>~[flink-dist-1.15.1.jar:1.15.1]
>at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
>~[?:1.8.0_251]
>at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
>~[?:1.8.0_251]
>at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>~[?:1.8.0_251]
>at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>~[?:1.8.0_251]
>at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
>~[flink-dist-1.15.1.jar:1.15.1]
>Caused by: org.apache.flink.util.SerializedThrowable:
>org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
>at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>~[?:1.8.0_251]
>at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>~[?:1.8.0_251]
>at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
>~[?:1.8.0_251]
>at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>~[?:1.8.0_251]
>... 3 more
>Caused by: org.apache.flink.util.SerializedThrowable
>at 
> org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
>~[?:?]
>at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
>~[?:?]
>at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:1002)
>~[?:?]
>at 
> org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
>~[flink-dist-1.15.1.jar:1.15.1]
>at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
>~[flink-dist-1.15.1.jar:1.15.1]
>at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
>~[flink-dist-1.15.1.jar:1.15.1]
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>~[flink-dist-1.15.1.jar:1.15.1]
>at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>~[flink-dist-1.15.1.jar:1.15.1]
>at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>~[flink-dist-1.15.1.jar:1.15.1]
>at 
> org.apache.flink.streami

Re: flink1.15.1 stop 任务失败

2022-08-23 文章 yidan zhao
补充部分信息:
看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
2022-08-23 20:33:22,307 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] -
Triggering savepoint for job 8d231de75b8227a1b
715b1aa665caa91.

2022-08-23 20:33:22,318 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
Triggering checkpoint 5 (type=SavepointType{na
me='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @
1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.

2022-08-23 20:33:23,701 INFO
org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
[] - Cannot create recoverable writer
 due to Recoverable writers on Hadoop are only supported for HDFS,
will use the ordinary writer.

2022-08-23 20:33:23,908 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
Completed checkpoint 5 for job 8d231de75b8227a1b715b1aa665caa91
(1638207 bytes, checkpointDuration=1600 ms, finalizationTime=1 ms).


如果是 stop xxx 这样停止任务,则JM日志(错误)如下:

2022-08-23 20:35:01,834 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] -
Triggering stop-with-savepoint for job
8d231de75b8227a1b715b1aa665caa91.

2022-08-23 20:35:01,842 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
Triggering checkpoint 6 (type=SavepointType{name='Suspend Savepoint',
postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1661258101834
for job 8d231de75b8227a1b715b1aa665caa91.

2022-08-23 20:35:02,083 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
Decline checkpoint 6 by task a65383dad01bc15f654c4afe4aa63b6d of job
8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5 @
xxx.xxx.com (dataPort=13156).
(此处看起来是被decline了,原因是 task failed?)
org.apache.flink.util.SerializedThrowable: Task name with subtask :
Source: XXX_Kafka(startTs:latest) ->... ->... ->... (10/10)#2 Failure
reason: Task has failed.
at 
org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
~[flink-dist-1.15.1.jar:1.15.1]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
~[?:1.8.0_251]
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
~[flink-dist-1.15.1.jar:1.15.1]
Caused by: org.apache.flink.util.SerializedThrowable:
org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
~[?:1.8.0_251]
... 3 more
Caused by: org.apache.flink.util.SerializedThrowable
at 
org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
~[?:?]
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
~[?:?]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:1002)
~[?:?]
at 
org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
~[flink-dist-1.15.1.jar:1.15.1]
at 
org.ap

flink1.15.1 stop 任务失败

2022-08-23 文章 yidan zhao
如题,stop,停止并保存检查点失败。
测试看 cancel、cancel -s 方式都成功。 cancel -s 可成功生成检查点并退出。

stop则不行,报错主要是
Could not stop with a savepoint job "1b87f308e2582f3cc0e3ccc812471201"
...
Caused by: java.util.concurrent.ExecutionException:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.checkpoint.CheckpointEx
ception: Task has failed.
...
Caused by: org.apache.flink.util.SerializedThrowable:
org.apache.flink.runtime.checkpoint.CheckpointException: Task has
failed.
...
Caused by: org.apache.flink.util.SerializedThrowable: Task has failed.
...

__详细日志:


HA模式,standalone集群,仅单个 JM 情况下任务异常。

2022-08-23 文章 yidan zhao
如题,目前发现任务报错是:org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
Partition c74a0a104d81bf2d38f76f104d65a2ab#27@7e1a8495f062f8ceb964a3205e584613
not found

——
任务本身问题不大,也不是网络问题。 目前发现解决方法:

换成非单 JM 即可。

同时也发现一个可能原因,或另一个明显现象:

从web ui的Taskmanager界面可以发现,执行 start-cluster 脚本的机器A(同时也是 JM ,即配置到
masters 文件的唯一机器),该机器对应的tm的resource id中ip是127.0.0.1。其他机器都是显示的内网ip。


masters文件换2个以上机器后,没问题了,包括后一个现象,ip也都是正常的。