Re: flink 长时间运行后出现报错

2020-03-09 文章 Kurt Young
我帮你 cc 了对 runtime 更熟悉的 zhuzhu 同学。

Best,
Kurt


On Mon, Mar 9, 2020 at 6:44 PM lucas.wu  wrote:

> 没人回复大概是之前没人遇到过这种问题,所以下午看了flink的代码,终于有了点头绪。
> 原因分析:
> 这个异常的原因就是在task出现异常之后,它需要调用updateTaskExecutionState(TaskExecutionState
> taskExecutionState)这个rpc接口去通知flink jobmanager
>
> 去改变对应task的状态并且重启task。但是呢,taskExecutionState这个参数里面有个error属性,当我的的task打出来的错误栈太多的时候,在序列化的之后超过了
> rpc接口要求的最大数据大小(也就是maximum akka framesize),导致调用updateTaskExecutionState
> 这个rpc接口失败,jobmanager无法获知这个task已经fail
>
> 的状态,也无法重启。这就导致了一系列连锁反应,其中一个就是我的checkpoint一直失败,原因就是我的task其实已经释放了,但是jobmanger无法感知。
>
> 结论:
> 这个算不算flink的一个bug,对于task已经失效,但是无法通知到jobmanger,导致该task一直无法重启。
> 原始邮件
> 发件人:lucas.wulucas...@xiaoying.com
> 收件人:user-zhuser...@flink.apache.org
> 发送时间:2020年3月9日(周一) 11:06
> 主题:flink 长时间运行后出现报错
>
>
> 大家好: 我的flink程序的主要功能是从kafka消费数据,简单处理后,通过jdbc
> outputformat发往mysql,但是长时间运行后,报下面的错。请问是什么原因造成的,我增加参数就可以解决吗? 2020-03-08
> 06:10:30,480 WARN org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler -
> Could not create remote rpc invocation message. Failing rpc invocation
> because... java.io.IOException: The rpc invocation size 34500577 exceeds
> the maximum akka framesize. at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78)
> at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at
> akka.actor.ActorCell.invoke(ActorCell.scala:496) at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at
> akka.dispatch.Mailbox.run(Mailbox.scala:224) at
> akka.dispatch.Mailbox.exec(Mailbox.scala:234) at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2020-03-08 06:10:30,480 ERROR
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Caught exception while
> executing runnable in main thread.
> java.lang.reflect.UndeclaredThrowableException at
> com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
> 

回复:flink 长时间运行后出现报错

2020-03-09 文章 lucas.wu
没人回复大概是之前没人遇到过这种问题,所以下午看了flink的代码,终于有了点头绪。
原因分析:
这个异常的原因就是在task出现异常之后,它需要调用updateTaskExecutionState(TaskExecutionState 
taskExecutionState)这个rpc接口去通知flink jobmanager
去改变对应task的状态并且重启task。但是呢,taskExecutionState这个参数里面有个error属性,当我的的task打出来的错误栈太多的时候,在序列化的之后超过了
rpc接口要求的最大数据大小(也就是maximum akka framesize),导致调用updateTaskExecutionState 
这个rpc接口失败,jobmanager无法获知这个task已经fail
的状态,也无法重启。这就导致了一系列连锁反应,其中一个就是我的checkpoint一直失败,原因就是我的task其实已经释放了,但是jobmanger无法感知。

结论:
这个算不算flink的一个bug,对于task已经失效,但是无法通知到jobmanger,导致该task一直无法重启。
原始邮件
发件人:lucas.wulucas...@xiaoying.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年3月9日(周一) 11:06
主题:flink 长时间运行后出现报错


大家好: 我的flink程序的主要功能是从kafka消费数据,简单处理后,通过jdbc 
outputformat发往mysql,但是长时间运行后,报下面的错。请问是什么原因造成的,我增加参数就可以解决吗? 2020-03-08 
06:10:30,480 WARN org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler - 
Could not create remote rpc invocation message. Failing rpc invocation 
because... java.io.IOException: The rpc invocation size 34500577 exceeds the 
maximum akka framesize. at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271)
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200)
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78)
 at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
akka.actor.Actor$class.aroundReceive(Actor.scala:517) at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at 
akka.actor.ActorCell.invoke(ActorCell.scala:496) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at 
akka.dispatch.Mailbox.run(Mailbox.scala:224) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:234) at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
2020-03-08 06:10:30,480 ERROR org.apache.flink.runtime.rpc.akka.AkkaRpcActor - 
Caught exception while executing runnable in main thread. 
java.lang.reflect.UndeclaredThrowableException at 
com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
akka.actor.Actor$class.aroundReceive(Actor.scala:517) at 
akka.actor.AbstractActor.aroundReceive

flink 长时间运行后出现报错

2020-03-08 文章 lucas.wu
大家好:
我的flink程序的主要功能是从kafka消费数据,简单处理后,通过jdbc 
outputformat发往mysql,但是长时间运行后,报下面的错。请问是什么原因造成的,我增加参数就可以解决吗?


2020-03-08 06:10:30,480 WARN 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler - Could not create 
remote rpc invocation message. Failing rpc invocation because...
java.io.IOException: The rpc invocation size 34500577 exceeds the maximum akka 
framesize.
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78)
at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
at akka.actor.ActorCell.invoke(ActorCell.scala:496)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-03-08 06:10:30,480 ERROR org.apache.flink.runtime.rpc.akka.AkkaRpcActor - 
Caught exception while executing runnable in main thread.
java.lang.reflect.UndeclaredThrowableException
at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
at akka.actor.ActorCell.invoke(ActorCell.scala:496)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: The rpc invocation size 34500577 exceeds the 
maximum akka