Flink batch 模式消费hdfs上的文件,并做了一个word count 操作,但是task一直运行,查看taskmanager的log,发现如下异常:
java.lang.reflect.UndeclaredThrowableException: null
at com.sun.proxy.$Proxy35.updateTaskExecutionState(UnknownSource) ~[?:?]
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1558)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1588)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2400(TaskExecutor.java:173)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1921)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.12-1.11.1.jar:1.11.1]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.12-1.11.1.jar:1.11.1]
Causedby: java.io.IOException: The rpc invocation size 113602196 exceeds the
maximum akka framesize.
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79)
~[flink-dist_2.12-1.11.1.jar:1.11.1]
... 28 more
我有尝试过在flink-conf.yaml配置akka framesize大小为30M,但是还是不能解决上述问题。
请求帮助。