如题:link on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件
我的任务时kafka source -> hbase sink
任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。
Best
xiao cai
错误堆栈:
2020-08-19 11:23:08,099 INFO org.apache.flink.yarn.YarnResourceManager
[] - Received 1 containers.
2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager
[] - Received 1 containers with resource <memory:2048, vCores:4>, 1
pending container requests.
2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager
[] - TaskExecutor container_e07_1596440446172_0094_01_000069 will be
started on 10.3.15.22 with TaskExecutorProcessSpec {cpuCores=4.0,
frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb
(134217728 bytes), taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0
bytes, networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb
(536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
jvmOverheadSize=192.000mb (201326592 bytes)}.
2020-08-19 11:23:08,101 INFO org.apache.flink.yarn.YarnResourceManager
[] - Creating container launch context for TaskManagers
2020-08-19 11:23:08,101 INFO org.apache.flink.yarn.YarnResourceManager
[] - Starting TaskManagers
2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager
[] - Removing container request Capability[<memory:2048,
vCores:4>]Priority[1].
2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager
[] - Accepted 1 requested containers, returned 0 excess containers,
0 pending container requests of resource <memory:2048, vCores:4>.
2020-08-19 11:23:08,102 INFO
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Processing
Event EventType: START_CONTAINER for Container
container_e07_1596440446172_0094_01_000069
2020-08-19 11:23:10,851 ERROR
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler []
- Unhandled exception.
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
No TaskExecutor registered under container_e07_1596440446172_0094_01_000068.
at
org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.11.0.jar:1.11.0]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.11.0.jar:1.11.0]
2020-08-19 11:23:10,987 INFO org.apache.flink.yarn.YarnResourceManager
[] - Registering TaskManager with ResourceID
container_e07_1596440446172_0094_01_000069
(akka.tcp://[email protected]:37461/user/rpc/taskmanager_0) at ResourceManager
2020-08-19 11:23:11,029 WARN
org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname
could be resolved for the IP address 10.3.15.22, using IP address as host name.
Local input split assignment (such as for HDFS files) may be impacted.
2020-08-19 11:23:11,043 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
KafkaTableSource(id, name, kafka_partition, event_time, write_time,
snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) switched
from SCHEDULED to DEPLOYING.
2020-08-19 11:23:11,043 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time,
snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
wpt_offset, wpt_timestamp) (1/2) (attempt #68) to
container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755)
2020-08-19 11:23:11,043 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
KafkaTableSource(id, name, kafka_partition, event_time, write_time,
snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) switched
from SCHEDULED to DEPLOYING.
2020-08-19 11:23:11,043 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time,
snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
wpt_offset, wpt_timestamp) (2/2) (attempt #68) to
container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755)
2020-08-19 11:23:11,043 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
Window(TumblingEventTimeWindows(1000), EventTimeTrigger,
PassThroughWindowFunction) ->
SourceConversion(table=[default_catalog.default_database.catalog_source,
source: [KafkaTableSource(id, name, kafka_partition, event_time, write_time,
snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
wpt_offset, wpt_timestamp)]], fields=[id, name, kafka_partition, event_time,
write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time,
wpt_partition, wpt_offset, wpt_timestamp]) -> Calc(select=[CAST(id) AS rowkey,
(id ROW name ROW kafka_partition ROW event_time ROW write_time ROW
snapshot_time ROW max_snapshot_time) AS EXPR$1]) -> SinkConversionToTuple2 ->
Sink: HBaseUpsertTableSink(rowkey, cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85)
switched from SCHEDULED to DEPLOYING.
2020-08-19 11:23:11,043 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying
Window(TumblingEventTimeWindows(1000), EventTimeTrigger,
PassThroughWindowFunction) ->
SourceConversion(table=[default_catalog.default_database.catalog_source,
source: [KafkaTableSource(id, name, kafka_partition, event_time, write_time,
snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
wpt_offset, wpt_timestamp)]], fields=[id, name, kafka_partition, event_time,
write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time,
wpt_partition, wpt_offset, wpt_timestamp]) -> Calc(select=[CAST(id) AS rowkey,
(id ROW name ROW kafka_partition ROW event_time ROW write_time ROW
snapshot_time ROW max_snapshot_time) AS EXPR$1]) -> SinkConversionToTuple2 ->
Sink: HBaseUpsertTableSink(rowkey, cf) (1/1) (attempt #68) to
container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755)
2020-08-19 11:23:11,161 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
KafkaTableSource(id, name, kafka_partition, event_time, write_time,
snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) switched
from DEPLOYING to RUNNING.
2020-08-19 11:23:11,161 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
Window(TumblingEventTimeWindows(1000), EventTimeTrigger,
PassThroughWindowFunction) ->
SourceConversion(table=[default_catalog.default_database.catalog_source,
source: [KafkaTableSource(id, name, kafka_partition, event_time, write_time,
snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
wpt_offset, wpt_timestamp)]], fields=[id, name, kafka_partition, event_time,
write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time,
wpt_partition, wpt_offset, wpt_timestamp]) -> Calc(select=[CAST(id) AS rowkey,
(id ROW name ROW kafka_partition ROW event_time ROW write_time ROW
snapshot_time ROW max_snapshot_time) AS EXPR$1]) -> SinkConversionToTuple2 ->
Sink: HBaseUpsertTableSink(rowkey, cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85)
switched from DEPLOYING to RUNNING.
2020-08-19 11:23:11,162 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
KafkaTableSource(id, name, kafka_partition, event_time, write_time,
snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) switched
from DEPLOYING to RUNNING.
2020-08-19 11:23:12,733 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
KafkaTableSource(id, name, kafka_partition, event_time, write_time,
snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) switched
from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@71e7e541.
java.lang.NoClassDefFoundError: Could not initialize class xxxx