Re: flink异常
樂 Zhanghao Chen 于2022年7月25日周一 13:39写道: > 你好,可以检查下: > > 1. tm 侧是否有异常,导致 tm 退出; > 2. tm 侧是否 gc 严重导致没有及时处理心跳; > 3. jm - tm 间是否网络有异常导致心跳信息无法传达。 > > Best, > Zhanghao Chen > > From: 陈卓宇 <2572805...@qq.com.INVALID> > Sent: Friday, July 22, 2022 11:30 > To: user-zh > Subject: flink异常 > > 社区大佬您好,小弟请教一个问题: > flink版本:1.14.5 > 异常内容如下: > 2022-07-22 10:07:51 > java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id > bdp-changlog-mid-relx-middle-promotion-dev-taskmanager-1-1 timed out. > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299) > at > org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf > .UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > 我该如何解决,如何优化 >
Re: flink异常
你好,可以检查下: 1. tm 侧是否有异常,导致 tm 退出; 2. tm 侧是否 gc 严重导致没有及时处理心跳; 3. jm - tm 间是否网络有异常导致心跳信息无法传达。 Best, Zhanghao Chen From: 陈卓宇 <2572805...@qq.com.INVALID> Sent: Friday, July 22, 2022 11:30 To: user-zh Subject: flink异常 社区大佬您好,小弟请教一个问题: flink版本:1.14.5 异常内容如下: 2022-07-22 10:07:51 java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id bdp-changlog-mid-relx-middle-promotion-dev-taskmanager-1-1 timed out. at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299) at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 我该如何解决,如何优化
Re: 回复:Flink异常及重启容错处理
Hi 我想你的问题是数据源中存在之前代码中没有很好处理的corner case,导致在处理某一条“脏数据”时,作业进入FAILED状态。此时即使从之前的checkpoint恢复,由于作业代码逻辑未变,之前的corner case依然无法处理,作业只能无限进去失败状态。 这种场景可以一开始时候将checkpoint的保留策略设置成RETAIN_ON_CANCELLATION [1],这样cancel作业之后,更改业务代码逻辑,从而可以处理之前的问题,再降作业上线从之前的checkpoint恢复 [2],这样做的话,数据是不会丢失的。 [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint 祝好 唐云 From: Sun.Zhu <17626017...@163.com> Sent: Sunday, June 14, 2020 0:20 To: user-zh@flink.apache.org Cc: user-zh Subject: 回复:Flink异常及重启容错处理 问题1: Flink希望开发者怎么处理程序的异常? --异常数据当然需要用户自己try catch处理掉,否则即使从上个checkpoint恢复依然会使程序挂掉。 问题2:checkpoint是否可以转化为savepoint,使得在没来得及savepoint之前能够进行恢复? ―没太明白你的问题 问题3:如果我某条数据导致了异常,怎么样才忽略这条数据而不影响应用的状态和运行? ―本质是和问题1一样的,如果是flinksql任务,1.11会支持format.ignore-parse-errors | | Sun.Zhu | | 17626017...@163.com | 签名由网易邮箱大师定制 在2020年06月9日 13:49,Z-Z 写道: Hi, 各位大佬们,请教几个问题: 背景:Flink程序在正常运行过程中由于某些原因抛出异常了(比如数据不正确、NullPointer等),设置了checkpoint,程序默认会无限重启,都没办法savepoint。 问题1: Flink希望开发者怎么处理程序的异常? 问题2:checkpoint是否可以转化为savepoint,使得在没来得及savepoint之前能够进行恢复? 问题3:如果我某条数据导致了异常,怎么样才忽略这条数据而不影响应用的状态和运行?
Re: flink异常恢复
上个checkpoint 王金海 于2019年8月27日周二 下午6:14写道: > 讨论下flink异常重启问题 > > > 从kafka消费数据,checkpoint周期在5分钟,如果在第6分钟,因为异常导致flink任务重启,flink是从上个checkpoint恢复呢?还是从异常时的offset恢复呢? > > > > csbl...@163.com > Have a nice day ! > > -- Best Regards Jeff Zhang
flink异常恢复
讨论下flink异常重启问题 从kafka消费数据,checkpoint周期在5分钟,如果在第6分钟,因为异常导致flink任务重启,flink是从上个checkpoint恢复呢?还是从异常时的offset恢复呢? csbl...@163.com Have a nice day !