Hi
   理论上第一次能启动,后续的 failover 也应该是可以正常恢复的。你这边是稳定复现吗?如果能够稳定复现的话,有可能是 bug
Best,
Congxian


xiao cai <[email protected]> 于2020年8月20日周四 下午2:27写道:

> Hi:
> 感谢答复,确实是个思路。
>
> 不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。
>
>
> Best,
> xiao cai
>
>
>  原始邮件
> 发件人: 范超<[email protected]>
> 收件人: [email protected]<[email protected]>
> 发送时间: 2020年8月20日(周四) 09:11
> 主题: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件
>
>
> 我之前开启job的failover
> restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task
> executor No TaskExecutor registered under containe_xxxx.
> 我这边干脆写了个脚本通过savepoint的方式来reload应用了 希望对你有帮助 -----邮件原件----- 发件人: xiao cai
> [mailto:[email protected]] 发送时间: 2020年8月19日 星期三 12:50 收件人: user-zh <
> [email protected]> 主题: Flink on Yarn
> 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 如题:link on Yarn
> 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我的任务时kafka source -> hbase sink
> 任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。
> Best xiao cai 错误堆栈: 2020-08-19 11:23:08,099 INFO
> org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers.
> 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] -
> Received 1 containers with resource <memory:2048, vCores:4>, 1 pending
> container requests. 2020-08-19 11:23:08,100 INFO
> org.apache.flink.yarn.YarnResourceManager [] - TaskExecutor
> container_e07_1596440446172_0094_01_000069 will be started on 10.3.15.22
> with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb
> (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
> taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb
> (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
> jvmOverheadSize=192.000mb (201326592 bytes)}. 2020-08-19 11:23:08,101 INFO
> org.apache.flink.yarn.YarnResourceManager [] - Creating container launch
> context for TaskManagers 2020-08-19 11:23:08,101 INFO
> org.apache.flink.yarn.YarnResourceManager [] - Starting TaskManagers
> 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] -
> Removing container request Capability[<memory:2048, vCores:4>]Priority[1].
> 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] -
> Accepted 1 requested containers, returned 0 excess containers, 0 pending
> container requests of resource <memory:2048, vCores:4>. 2020-08-19
> 11:23:08,102 INFO
> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] -
> Processing Event EventType: START_CONTAINER for Container
> container_e07_1596440446172_0094_01_000069 2020-08-19 11:23:10,851 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> [] - Unhandled exception.
> org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
> No TaskExecutor registered under
> container_e07_1596440446172_0094_01_000068. at
> org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498)
> ~[?:1.8.0_191] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.actor.ActorCell.invoke(ActorCell.scala:561)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.dispatch.Mailbox.run(Mailbox.scala:225)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.11.0.jar:1.11.0] at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.11.0.jar:1.11.0] 2020-08-19 11:23:10,987 INFO
> org.apache.flink.yarn.YarnResourceManager [] - Registering TaskManager with
> ResourceID container_e07_1596440446172_0094_01_000069 (akka.tcp://
> [email protected]:37461/user/rpc/taskmanager_0) at ResourceManager
> 2020-08-19 11:23:11,029 WARN
> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname
> could be resolved for the IP address 10.3.15.22, using IP address as host
> name. Local input split assignment (such as for HDFS files) may be
> impacted. 2020-08-19 11:23:11,043 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> KafkaTableSource(id, name, kafka_partition, event_time, write_time,
> snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
> wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64)
> switched from SCHEDULED to DEPLOYING. 2020-08-19 11:23:11,043 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
> Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time,
> snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
> wpt_offset, wpt_timestamp) (1/2) (attempt #68) to
> container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755)
> 2020-08-19 11:23:11,043 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> KafkaTableSource(id, name, kafka_partition, event_time, write_time,
> snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
> wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963)
> switched from SCHEDULED to DEPLOYING. 2020-08-19 11:23:11,043 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
> Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time,
> snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
> wpt_offset, wpt_timestamp) (2/2) (attempt #68) to
> container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755)
> 2020-08-19 11:23:11,043 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> Window(TumblingEventTimeWindows(1000), EventTimeTrigger,
> PassThroughWindowFunction) ->
> SourceConversion(table=[default_catalog.default_database.catalog_source,
> source: [KafkaTableSource(id, name, kafka_partition, event_time,
> write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time,
> wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name,
> kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time,
> proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) ->
> Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW
> event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS
> EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey,
> cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from SCHEDULED to
> DEPLOYING. 2020-08-19 11:23:11,043 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
> Window(TumblingEventTimeWindows(1000), EventTimeTrigger,
> PassThroughWindowFunction) ->
> SourceConversion(table=[default_catalog.default_database.catalog_source,
> source: [KafkaTableSource(id, name, kafka_partition, event_time,
> write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time,
> wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name,
> kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time,
> proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) ->
> Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW
> event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS
> EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey,
> cf) (1/1) (attempt #68) to container_e07_1596440446172_0094_01_000069 @
> 10.3.15.22 (dataPort=34755) 2020-08-19 11:23:11,161 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> KafkaTableSource(id, name, kafka_partition, event_time, write_time,
> snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
> wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64)
> switched from DEPLOYING to RUNNING. 2020-08-19 11:23:11,161 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> Window(TumblingEventTimeWindows(1000), EventTimeTrigger,
> PassThroughWindowFunction) ->
> SourceConversion(table=[default_catalog.default_database.catalog_source,
> source: [KafkaTableSource(id, name, kafka_partition, event_time,
> write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time,
> wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name,
> kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time,
> proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) ->
> Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW
> event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS
> EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey,
> cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from DEPLOYING to
> RUNNING. 2020-08-19 11:23:11,162 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> KafkaTableSource(id, name, kafka_partition, event_time, write_time,
> snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
> wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963)
> switched from DEPLOYING to RUNNING. 2020-08-19 11:23:12,733 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> KafkaTableSource(id, name, kafka_partition, event_time, write_time,
> snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
> wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64)
> switched from RUNNING to FAILED on
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@71e7e541.
> java.lang.NoClassDefFoundError: Could not initialize class xxxx

回复