flink1.17.1版本 flink on yarn 提交无法获取配置文件

2023-08-01 文章 guanyq
/opt/flink/flink-1.17.1/bin/flink run-application -t yarn-application -yjm 
1024m -ytm 1024m ./xx-1.0.jar 
./config.properties以上提交命令制定的配置文件,为什么在容器内找配置文件?file 
/home/yarn/nm/usercache/root/appcache/application_1690773368385_0092/container_e183_1690773368385_0092_01_01/./config.properties
 does not exist

Re: flink on yarn rocksdb内存超用

2023-06-07 文章 Hangxiang Yu
Hi, 目前对RocksDB使用的内存是没有严格限制住的,可以参考这个 ticket:
https://issues.apache.org/jira/browse/FLINK-15532
如果要定位到内存使用情况,可以先看一些粗的Metrics:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rocksdb-native-metrics
如果要再细致定位到单 instance 内部 RocksDB 的详细内存使用情况,可能需要用 malloc
的prof工具了,比如Jemalloc的Jeprof:
https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling

On Wed, Jun 7, 2023 at 4:58 PM crazy <2463829...@qq.com.invalid> wrote:

> Hi, 大佬们好,
>   请教下有个应用使用的flink1.13.5 on
> yarn,状态后端用的是rocksdb,任务运行一段时间就会内存超用,把overhead调大一些好像能缓解一些,请问有描述这类问题的相关issue吗?如何定位是哪部分内存超了呢?感谢
>
>
> crazy
> 2463829...@qq.com
>
>
>
> 



-- 
Best,
Hangxiang.


Re:Re: Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-13 文章 guanyq
我昨天模拟下断电的情况
10个ha文件的日期是错开的5秒一个
chk-xxx也不是都损坏了,有的是可以启动的,这个我也试了 现在情况是 
yarn集群停电重启首先会循环尝试从10个ha的文件中启动应用,ha文件记录的chk的相关原数据 
1.如果ha文件都损坏了,即使chk没有损坏,flink应用也是拉不起来的

现在想的是让hdfs上存在至少1组个可用的的ha文件及其对应的chk 现在是5秒一个chk,保存了10个,也会出现损坏无法启动的问题 5秒*10 = 
50秒,也想知道多长时间的存档才能保证存在一组没有损坏ha和chk呢。














在 2023-03-14 10:16:48,"Guojun Li"  写道:
>Hi
>
>确认一下这些 ha 文件的 last modification time 是一致的还是错开的?
>
>另外,指定 chk- 恢复尝试了没有?可以恢复吗?
>
>Best,
>Guojun
>
>On Fri, Mar 10, 2023 at 11:56 AM guanyq  wrote:
>
>> flink ha路径为 /tmp/flink/ha/
>> flink chk路径为 /tmp/flink/checkpoint
>>
>>
>> 我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。
>>
>>
>>
>>
>> 会尝试从10个chk恢复,日志有打印
>> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Recovering checkpoints from ZooKeeper.
>> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Found 10 checkpoints in ZooKeeper.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to fetch 10 checkpoints from storage.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7079.
>> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7080.
>> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7081.
>> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7082.
>> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7083.
>> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7084.
>> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7085.
>> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7086.
>>
>>
>>
>> 详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxx启动
>> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
>> - Starting the SlotManager.
>> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster -
>> Successfully ran initialization on master in 0 ms.
>> 2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils -
>> Initialized ZooKeeperCompletedCheckpointStore in
>> '/checkpoints/3844b96b002601d932e66233dd46899c'.
>> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
>> Using application-defined state backend: File State Backend (checkpoints:
>> 'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED,
>> fileStateThreshold: -1)
>> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
>> Configuring application-defined state backend with job/cluster config
>> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Recovering checkpoints from ZooKeeper.
>> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Found 10 checkpoints in ZooKeeper.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to fetch 10 checkpoints from storage.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7079.
>> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7080.
>> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7081.
>> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7082.
>> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7083.
>> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7084.
>> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7085.
>> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7086.
>> 2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O
>> error constructing remote block reader.
>> java.io.IOException: Got error, status message opReadBlock
>> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
>> received exception
>> 

Re: Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-13 文章 Guojun Li
Hi

确认一下这些 ha 文件的 last modification time 是一致的还是错开的?

另外,指定 chk- 恢复尝试了没有?可以恢复吗?

Best,
Guojun

On Fri, Mar 10, 2023 at 11:56 AM guanyq  wrote:

> flink ha路径为 /tmp/flink/ha/
> flink chk路径为 /tmp/flink/checkpoint
>
>
> 我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。
>
>
>
>
> 会尝试从10个chk恢复,日志有打印
> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Recovering checkpoints from ZooKeeper.
> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Found 10 checkpoints in ZooKeeper.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to fetch 10 checkpoints from storage.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7079.
> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7080.
> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7081.
> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7082.
> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7083.
> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7084.
> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7085.
> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7086.
>
>
>
> 详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxx启动
> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
> - Starting the SlotManager.
> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Successfully ran initialization on master in 0 ms.
> 2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils -
> Initialized ZooKeeperCompletedCheckpointStore in
> '/checkpoints/3844b96b002601d932e66233dd46899c'.
> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Using application-defined state backend: File State Backend (checkpoints:
> 'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED,
> fileStateThreshold: -1)
> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Configuring application-defined state backend with job/cluster config
> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Recovering checkpoints from ZooKeeper.
> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Found 10 checkpoints in ZooKeeper.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to fetch 10 checkpoints from storage.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7079.
> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7080.
> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7081.
> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7082.
> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7083.
> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7084.
> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7085.
> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7086.
> 2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O
> error constructing remote block reader.
> java.io.IOException: Got error, status message opReadBlock
> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
> received exception
> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
> The meta file length 0 is less than the expected length 7, for
> OP_READ_BLOCK, self=/192.168.200.23:45534, remote=/192.168.200.21:9866,
> for file
> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
> for pool BP-1003103929-192.168.200.11-1668473836936 block
> 1301252639_227512278
> at
> 

Re:Re: Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 文章 guanyq



理解了,非常感谢。








在 2023-03-13 16:57:18,"Weihua Hu"  写道:
>图片看不到,可以找一个图床上传图片,在邮件列表中贴一下链接。
>
>YARN 拉起 AM 还受 "yarn.application-attempt-failures-validity-interval"[1]
>控制,在这个时间内达到指定次数才会退出。
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#yarn-application-attempt-failures-validity-interval
>
>Best,
>Weihua
>
>
>On Mon, Mar 13, 2023 at 4:27 PM guanyq  wrote:
>
>> 图片在附件
>> 但是实际却是超过了10次。。
>>
>>
>>
>>
>>
>>
>> 在 2023-03-13 15:39:39,"Weihua Hu"  写道:
>> >Hi,
>> >
>> >图片看不到了
>> >
>> >按照这个配置,YARN 应该只会拉起 10 次 JobManager。
>> >
>> >Best,
>> >Weihua
>> >
>> >
>> >On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:
>> >
>> >> flink1.10版本,flink配置如下
>> >> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
>> >> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
>> >> 请问appattempt_1678102326043_0006_000409
>> >> 
>> >> 每个序号不是代表一次尝试么
>> >>
>>
>>


Re: Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 文章 Weihua Hu
图片看不到,可以找一个图床上传图片,在邮件列表中贴一下链接。

YARN 拉起 AM 还受 "yarn.application-attempt-failures-validity-interval"[1]
控制,在这个时间内达到指定次数才会退出。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#yarn-application-attempt-failures-validity-interval

Best,
Weihua


On Mon, Mar 13, 2023 at 4:27 PM guanyq  wrote:

> 图片在附件
> 但是实际却是超过了10次。。
>
>
>
>
>
>
> 在 2023-03-13 15:39:39,"Weihua Hu"  写道:
> >Hi,
> >
> >图片看不到了
> >
> >按照这个配置,YARN 应该只会拉起 10 次 JobManager。
> >
> >Best,
> >Weihua
> >
> >
> >On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:
> >
> >> flink1.10版本,flink配置如下
> >> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
> >> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
> >> 请问appattempt_1678102326043_0006_000409
> >> 
> >> 每个序号不是代表一次尝试么
> >>
>
>


Re:Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 文章 guanyq
图片在附件
但是实际却是超过了10次。。
















在 2023-03-13 15:39:39,"Weihua Hu"  写道:
>Hi,
>
>图片看不到了
>
>按照这个配置,YARN 应该只会拉起 10 次 JobManager。
>
>Best,
>Weihua
>
>
>On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:
>
>> flink1.10版本,flink配置如下
>> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
>> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
>> 请问appattempt_1678102326043_0006_000409
>> 
>> 每个序号不是代表一次尝试么
>>


Re: flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 文章 Weihua Hu
Hi,

图片看不到了

按照这个配置,YARN 应该只会拉起 10 次 JobManager。

Best,
Weihua


On Mon, Mar 13, 2023 at 3:32 PM guanyq  wrote:

> flink1.10版本,flink配置如下
> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
> 请问appattempt_1678102326043_0006_000409
> 
> 每个序号不是代表一次尝试么
>


flink on yarn关于yarn尝试重启flink job问题咨询

2023-03-13 文章 guanyq
flink1.10版本,flink配置如下
yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
请问appattempt_1678102326043_0006_000409每个序号不是代表一次尝试么


Re:Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-09 文章 guanyq
flink ha路径为 /tmp/flink/ha/
flink chk路径为 /tmp/flink/checkpoint


我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。




会尝试从10个chk恢复,日志有打印
2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Recovering checkpoints from ZooKeeper.
2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Found 10 checkpoints in ZooKeeper.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to fetch 10 checkpoints from storage.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7079.
2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7080.
2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7081.
2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7082.
2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7083.
2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7084.
2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7085.
2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7086.


详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxx启动
2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
 - Starting the SlotManager.
2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster - 
Successfully ran initialization on master in 0 ms.
2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils - 
Initialized ZooKeeperCompletedCheckpointStore in 
'/checkpoints/3844b96b002601d932e66233dd46899c'.
2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster - Using 
application-defined state backend: File State Backend (checkpoints: 
'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, 
fileStateThreshold: -1)
2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster - 
Configuring application-defined state backend with job/cluster config
2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Recovering checkpoints from ZooKeeper.
2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Found 10 checkpoints in ZooKeeper.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to fetch 10 checkpoints from storage.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7079.
2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7080.
2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7081.
2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7082.
2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7083.
2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7084.
2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7085.
2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
 - Trying to retrieve checkpoint 7086.
2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O error 
constructing remote block reader.
java.io.IOException: Got error, status message opReadBlock 
BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278 received 
exception
org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
The meta file length 0 is less than the expected length 7, for OP_READ_BLOCK, 
self=/192.168.200.23:45534, remote=/192.168.200.21:9866,
for file 
/tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a, 
for pool BP-1003103929-192.168.200.11-1668473836936 block 1301252639_227512278
at 
org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
at 
org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
at 
org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
at 

Re: Re: Re: flink on yarn 异常停电问题咨询

2023-03-09 文章 Weihua Hu
Hi

一般来说只是 YARN 集群异常停电不会影响已经完成的历史 Checkpoint(最后一次 Checkpoint 可能会写 hdfs 异常)

有更详细的 JobManager 日志吗?可以先确认下 Flink 在恢复时检索到了多少个 completedCheckpoint
以及最终尝试从哪一次 cp 恢复的。

也可以尝试按照 Yanfei 所说指定历史的 cp 作为 savepoint 恢复


Best,
Weihua


On Fri, Mar 10, 2023 at 10:38 AM guanyq  wrote:

> 没有开启增量chk
> 文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了
> 错误日志为:
>
> java.io.IOException: Got error, status message opReadBlock
> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
> received exception
> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
> The meta file length 0 is less than the expected length 7, for
> OP_READ_BLOCK, self=/ip:45534, remote=/ip:9866,
> for file
> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
> for pool BP-1003103929-192.168.200.11-1668473836936 block
> 1301252639_227512278
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-10 10:26:11,"Yanfei Lei"  写道:
> >Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定
> chk重启[1]。
> >
> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>
> >请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。
> >
> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
> >是观察到checkpoint dir下面没有文件吗?
> >
> >[1]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
> >
> >guanyq  于2023年3月10日周五 08:58写道:
> >>
> >> 目前也想着用savepoint处理异常停电的问题
> >> 但是我这面还有个疑问:
> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
> >> 就很奇怪是不是10个checkpoint都没落盘导致的。
> >> 想问下:
> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-03-10 08:47:11,"Shammon FY"  写道:
> >> >Hi
> >> >
> >> >我觉得Flink
> >>
> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
> >> >
> >> >Best,
> >> >Shammon
> >> >
> >> >On Thu, Mar 9, 2023 at 10:06 PM guanyq  wrote:
> >> >
> >> >> 前提
> >> >> 1.flink配置了高可用
> >> >> 2.flink配置checkpoint数为10
> >> >> 3.yarn集群配置了任务恢复
> >> >> 疑问
> >> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
> >> >>
> >> >>
> >> >>
> >> >>
> >
> >
> >
> >--
> >Best,
> >Yanfei
>


Re:Re: Re: flink on yarn 异常停电问题咨询

2023-03-09 文章 guanyq
没有开启增量chk
文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了
错误日志为:

java.io.IOException: Got error, status message opReadBlock 
BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278 received 
exception
org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
The meta file length 0 is less than the expected length 7, for OP_READ_BLOCK, 
self=/ip:45534, remote=/ip:9866,
for file 
/tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a, 
for pool BP-1003103929-192.168.200.11-1668473836936 block 1301252639_227512278
at 
org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
at 
org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
at 
org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
















在 2023-03-10 10:26:11,"Yanfei Lei"  写道:
>Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定 chk重启[1]。
>
>> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。
>
>> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>是观察到checkpoint dir下面没有文件吗?
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
>
>guanyq  于2023年3月10日周五 08:58写道:
>>
>> 目前也想着用savepoint处理异常停电的问题
>> 但是我这面还有个疑问:
>> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>> 就很奇怪是不是10个checkpoint都没落盘导致的。
>> 想问下:
>> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-10 08:47:11,"Shammon FY"  写道:
>> >Hi
>> >
>> >我觉得Flink
>> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
>> >
>> >Best,
>> >Shammon
>> >
>> >On Thu, Mar 9, 2023 at 10:06 PM guanyq  wrote:
>> >
>> >> 前提
>> >> 1.flink配置了高可用
>> >> 2.flink配置checkpoint数为10
>> >> 3.yarn集群配置了任务恢复
>> >> 疑问
>> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>> >>
>> >>
>> >>
>> >>
>
>
>
>-- 
>Best,
>Yanfei


Re: Re: flink on yarn 异常停电问题咨询

2023-03-09 文章 Yanfei Lei
Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定 chk重启[1]。

> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。

> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
是观察到checkpoint dir下面没有文件吗?

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints

guanyq  于2023年3月10日周五 08:58写道:
>
> 目前也想着用savepoint处理异常停电的问题
> 但是我这面还有个疑问:
> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
> 就很奇怪是不是10个checkpoint都没落盘导致的。
> 想问下:
> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-10 08:47:11,"Shammon FY"  写道:
> >Hi
> >
> >我觉得Flink
> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
> >
> >Best,
> >Shammon
> >
> >On Thu, Mar 9, 2023 at 10:06 PM guanyq  wrote:
> >
> >> 前提
> >> 1.flink配置了高可用
> >> 2.flink配置checkpoint数为10
> >> 3.yarn集群配置了任务恢复
> >> 疑问
> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
> >>
> >>
> >>
> >>



-- 
Best,
Yanfei


Re:Re: flink on yarn 异常停电问题咨询

2023-03-09 文章 guanyq
目前也想着用savepoint处理异常停电的问题
但是我这面还有个疑问:
flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
就很奇怪是不是10个checkpoint都没落盘导致的。
想问下:
checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。

















在 2023-03-10 08:47:11,"Shammon FY"  写道:
>Hi
>
>我觉得Flink
>作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
>
>Best,
>Shammon
>
>On Thu, Mar 9, 2023 at 10:06 PM guanyq  wrote:
>
>> 前提
>> 1.flink配置了高可用
>> 2.flink配置checkpoint数为10
>> 3.yarn集群配置了任务恢复
>> 疑问
>> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>>
>>
>>
>>


Re: flink on yarn 异常停电问题咨询

2023-03-09 文章 Shammon FY
Hi

我觉得Flink
作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业

Best,
Shammon

On Thu, Mar 9, 2023 at 10:06 PM guanyq  wrote:

> 前提
> 1.flink配置了高可用
> 2.flink配置checkpoint数为10
> 3.yarn集群配置了任务恢复
> 疑问
> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>
>
>
>


flink on yarn 异常停电问题咨询

2023-03-09 文章 guanyq
前提
1.flink配置了高可用
2.flink配置checkpoint数为10
3.yarn集群配置了任务恢复
疑问
yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动





Re: Flink on yarn 运行一段时间出现 TaskManager with id is no longer reachable

2023-02-16 文章 Shammon FY
Hi

上面TM心跳出现unreachable,一般是TM退出了,可以看下退出原因
下面Checkpoint超时,可以看下是否出现反压等问题,也可以看checkpoint执行时间,考虑增加checkpoint超时时间

Best,
Shammon


On Thu, Feb 16, 2023 at 10:34 AM lxk  wrote:

> 你好,可以dump下内存分析
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-02-16 10:05:19,"Fei Han"  写道:
> >@all
> >大家好!我的Flink 版本是1.14.5。CDC版本是2.2.1。在on yarn 运行一段时间后会出现如下报错:
> >org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with
> id container_e506_1673750933366_49579_01_02(
> hdp-server-010.yigongpin.com:8041) is no longer reachable. at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5] at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> ~[?:1.8.0_181] at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> ~[?:1.8.0_181] at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> ~[?:1.8.0_181] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5]
> >在以上报错后,还会出现如下checkpoint报错:org.apache.flink.runtime.checkpoint.CheckpointException:
> Checkpoint expired before completing. at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2000)
> [flink-dist_2.12-1.14.5.jar:1.14.5] at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_181] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [?:1.8.0_181] at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> [?:1.8.0_181] at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [?:1.8.0_181] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_181] at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_181] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]。
> >请教下大佬们!这2个地方还怎么优化呢?有什么好的方法没有。
>


Flink on yarn 运行一段时间出现 TaskManager with id is no longer reachable

2023-02-15 文章 Fei Han
@all
大家好!我的Flink 版本是1.14.5。CDC版本是2.2.1。在on yarn 运行一段时间后会出现如下报错:
org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id 
container_e506_1673750933366_49579_01_02(hdp-server-010.yigongpin.com:8041) 
is no longer reachable. at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359)
 ~[flink-dist_2.12-1.14.5.jar:1.14.5] at 
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
 ~[flink-dist_2.12-1.14.5.jar:1.14.5] at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
 ~[flink-dist_2.12-1.14.5.jar:1.14.5] at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
 ~[flink-dist_2.12-1.14.5.jar:1.14.5] at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
 ~[flink-dist_2.12-1.14.5.jar:1.14.5] at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248)
 ~[flink-dist_2.12-1.14.5.jar:1.14.5] at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 ~[?:1.8.0_181] at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 ~[?:1.8.0_181] at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 ~[?:1.8.0_181] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
 ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
 ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
 ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
 ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
 ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5]
在以上报错后,还会出现如下checkpoint报错:org.apache.flink.runtime.checkpoint.CheckpointException:
 Checkpoint expired before completing. at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2000)
 [flink-dist_2.12-1.14.5.jar:1.14.5] at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_181] at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_181] at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_181] at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_181] at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_181] at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_181] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]。
请教下大佬们!这2个地方还怎么优化呢?有什么好的方法没有。


Re: flink on yarn 作业挂掉反复重启

2022-07-25 文章 Weihua Hu
可以检查下是不是 JobManager 内存不足被 OOM kill 了,如果有更多的日志也可以贴出来

Best,
Weihua


On Mon, Jul 18, 2022 at 8:41 PM SmileSmile  wrote:

> hi,all
> 遇到这种场景,flink on yarn,并行度3000的场景下,作业包含了多个agg操作,作业recover from checkpoint
> 或者savepoint必现无法恢复的情况,作业反复重启
> jm报错org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> RECEIVED S
> IGNAL 15: SIGTERM. Shutting down as requested.
>
> 请问有什么好的排查思路吗
>
>
>
>
>


flink on yarn 作业挂掉反复重启

2022-07-18 文章 SmileSmile
hi,all
遇到这种场景,flink on yarn,并行度3000的场景下,作业包含了多个agg操作,作业recover from checkpoint 
或者savepoint必现无法恢复的情况,作业反复重启
jm报错org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED S
IGNAL 15: SIGTERM. Shutting down as requested.

请问有什么好的排查思路吗






Re: Flink on yarn ,并行度>1的情况下,怎么获取springboot的bean?

2022-04-22 文章 tison
@duwenwen 我比较好奇你的算子里做了什么,因为如果你就是要全局初始化唯一一次,那就用一个 parallelism=1 的算子去做就好了。

parallelism=n 你还要确保 once 的话应该得依赖外部系统来做到仅初始化一次。

Best,
tison.


Paul Lam  于2022年4月22日周五 18:16写道:

> 听起来是在 Flink 里启动 springboot? 很有意思的架构,有一点点类似 statefun 了。可以说说这么做的背景吗?
>
> 另外请附带上 flink 的部署模式和版本信息,这样大家才好判断问题在哪里。
>
> Best,
> Paul Lam
>
> > 2022年4月22日 16:30,duwenwen  写道:
> >
> > 您好:
> >首先很感谢您能在百忙之中看到我的邮件。我是一个写代码的新手,在使用flink框架过程中我遇到了一些问题,希望能得到您的解答。
> 由于需求要求,我需要将springboot和flink结合起来使用,我在open方法中获取springboot的上下文来获取bean。当设置parallelism为1时,可以发布到集群正常运行,但是当parallelism>1时,springboot的环境被多次初始化,运行就开始报错,,请问当parallelism>1
> 或者说当taskmanager>1时,应该怎么处理才能顺利获取到springboot的bean?针对上述问题希望能从您那里获得相应解决方案,十分感谢。
>
>


Re: Flink on yarn ,并行度>1的情况下,怎么获取springboot的bean?

2022-04-22 文章 Paul Lam
听起来是在 Flink 里启动 springboot? 很有意思的架构,有一点点类似 statefun 了。可以说说这么做的背景吗?

另外请附带上 flink 的部署模式和版本信息,这样大家才好判断问题在哪里。

Best,
Paul Lam

> 2022年4月22日 16:30,duwenwen  写道:
> 
> 您好:
>首先很感谢您能在百忙之中看到我的邮件。我是一个写代码的新手,在使用flink框架过程中我遇到了一些问题,希望能得到您的解答。
> 由于需求要求,我需要将springboot和flink结合起来使用,我在open方法中获取springboot的上下文来获取bean。当设置parallelism为1时,可以发布到集群正常运行,但是当parallelism>1时,springboot的环境被多次初始化,运行就开始报错,,请问当parallelism>1
>  或者说当taskmanager>1时,应该怎么处理才能顺利获取到springboot的bean?针对上述问题希望能从您那里获得相应解决方案,十分感谢。



Flink on yarn ,并行度>1的情况下,怎么获取springboot的bean?

2022-04-22 文章 duwenwen
您好:
首先很感谢您能在百忙之中看到我的邮件。我是一个写代码的新手,在使用flink框架过程中我遇到了一些问题,希望能得到您的解答。
由于需求要求,我需要将springboot和flink结合起来使用,我在open方法中获取springboot的上下文来获取bean。当设置parallelism为1时,可以发布到集群正常运行,但是当parallelism>1时,springboot的环境被多次初始化,运行就开始报错,,请问当parallelism>1
 或者说当taskmanager>1时,应该怎么处理才能顺利获取到springboot的bean?针对上述问题希望能从您那里获得相应解决方案,十分感谢。

Re: flink on yarn任务停止发生异常

2022-03-08 文章 Jiangang Liu
异常提示的很明确了,做savepoint的过程中有的task不在running状态,你可以看下你的作业是否发生了failover。

QiZhu Chan  于2022年3月8日周二 17:37写道:

> Hi,
>
> 各位社区大佬们,帮忙看一下如下报错是什么原因造成的?正常情况下客户端日志应该返回一个savepoint路径,但却出现如下异常日志,同时作业已被停止并且查看hdfs有发现当前job产生的savepoint文件
>
>
>
>
>


回复:flink on yarn HDFS_DELEGATION_TOKEN清除后,任务am attempt时失败

2022-02-10 文章 xieyi
再补充一个信息:
故障案例中:
flink 客户端flink_conf.ymal 中正确配置了security.kerberos.login.keytab。




在2022年02月11日 15:18,xieyi 写道:


老师们好:
请教一个问题,
 由于hadoop  Delegation token 会在超过Max 
Lifetime(默认7天)后过期清除,对于长期运行任务,yarn提到有三种策略解决这个问题:https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#securing-long-lived-yarn-services


想知道flink on yarn是如何解决hadoop  Delegation token 过期的呢?看官网似乎说得不够清楚


目前在生产环境遇到了如下故障:
flink 1.12 on yarn,yarn的nodemanager是容器化部署的,nodemanager偶尔会挂掉重启。当flink 
任务运行超过7天后,若某个flink任务的JM(am)所在的nodemanager重启,am会进行attempt(attempt时获取的是任务提交时的1377这个token,但这个token已经从namenode清除了),但attempt失败,失败原因为:


Failing this attempt.Diagnostics: token (HDFS_DELEGATION_TOKEN token 1377 
for user***) can't be found in cache
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token 1377for user***) can't be found in cache


疑问: flink on yarn在HADOOP Delegation token清除后,是如何更新的呢?是生成了新的token吗?
  如果生成了新的token,为何am attempt 时,还会继续获取已清除的这个token(1377)
这个故障是否和nodemanager容器化部署有关?nodemanager重启后,因为保存keytab的相关文件被清除了?



flink on yarn HDFS_DELEGATION_TOKEN清除后,任务am attempt时失败

2022-02-10 文章 xieyi


老师们好:
请教一个问题,
 由于hadoop  Delegation token 会在超过Max 
Lifetime(默认7天)后过期清除,对于长期运行任务,yarn提到有三种策略解决这个问题:https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#securing-long-lived-yarn-services


想知道flink on yarn是如何解决hadoop  Delegation token 过期的呢?看官网似乎说得不够清楚


目前在生产环境遇到了如下故障:
flink 1.12 on yarn,yarn的nodemanager是容器化部署的,nodemanager偶尔会挂掉重启。当flink 
任务运行超过7天后,若某个flink任务的JM(am)所在的nodemanager重启,am会进行attempt(attempt时获取的是任务提交时的1377这个token,但这个token已经从namenode清除了),但attempt失败,失败原因为:


Failing this attempt.Diagnostics: token (HDFS_DELEGATION_TOKEN token 1377 
for user***) can't be found in cache
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token 1377for user***) can't be found in cache


疑问: flink on yarn在HADOOP Delegation token清除后,是如何更新的呢?是生成了新的token吗?
  如果生成了新的token,为何am attempt 时,还会继续获取已清除的这个token(1377)
这个故障是否和nodemanager容器化部署有关?nodemanager重启后,因为保存keytab的相关文件被清除了?



flink on yarn HDFS_DELEGATION_TOKEN清除后,任务am attempt时失败

2022-02-10 文章 xieyi


老师们好:
请教一个问题,
 由于hadoop  Delegation token 会在超过Max 
Lifetime(默认7天)后过期清除,对于长期运行任务,yarn提到有三种策略解决这个问题:https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#securing-long-lived-yarn-services


想知道flink on yarn是如何解决hadoop  Delegation token 过期的呢?看官网似乎说得不够清楚


目前在生产环境遇到了如下故障:
flink 1.12 on yarn,yarn的nodemanager是容器化部署的,nodemanager偶尔会挂掉重启。当flink 
任务运行超过7天后,若某个flink任务的JM(am)所在的nodemanager重启,am会进行attempt(attempt时获取的是任务提交时的13770506这个token,但这个token已经从namenode清除了),但attempt失败,失败原因为:


Failing this attempt.Diagnostics: token (HDFS_DELEGATION_TOKEN token 1377 
for user***) can't be found in cache
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token 1377for user***) can't be found in cache


疑问: flink on yarn在HADOOP Delegation token清除后,是如何更新的呢?是生成了新的token吗?
  如果生成了新的token,为何am attempt 时,还会继续获取已清除的这个token(13770506)
这个故障是否和nodemanager容器化部署有关?nodemanager重启后,因为保存keytab的相关文件被清除了?





Re:Re: 关于flink on yarn 跨多hdfs集群访问的问题

2021-12-06 文章 casel.chen
如果是两套oss或s3 
bucket(每个bucket对应一组accessKey/secret)要怎么配置呢?例如写数据到bucketA,但checkpoint在bucketB

















在 2021-12-06 18:59:46,"Yang Wang"  写道:
>我觉得你可以尝试一下ship本地的hadoop conf,然后设置HADOOP_CONF_DIR环境变量的方式
>
>-yt /path/of/my-hadoop-conf
>-yD containerized.master.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'
>-yD containerized.taskmanager.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'
>
>
>Best,
>Yang
>
>chenqizhu  于2021年11月30日周二 上午10:00写道:
>
>> all,您好:
>>
>>  flink 1.13 版本支持了在flink-conf.yaml通过flink.hadoop.* 的方式
>> 配置hadoop属性。有个需求是将checkpoint写到装有ssd的hdfs(称之为集群B)以加速checkpoint写入速度,但这个hdfs集群不是flink客户端本地的默认hdfs(默认hdfs称为集群A),于是想通过在flink-conf.yaml里配置A、B两个集群的nameservices,类似与hdfs联邦模式,访问到两个hdfs集群,具体配置如下:
>>
>>
>>
>>
>> flink.hadoop.dfs.nameservices: ACluster,BCluster
>>
>> flink.hadoop.fs.defaultFS: hdfs://BCluster
>>
>>
>>
>>
>> flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
>>
>> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000
>>
>> flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070
>>
>> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000
>>
>> flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070
>>
>> flink.hadoop.dfs.client.failover.proxy.provider.ACluster:
>> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>>
>>
>>
>>
>> flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
>>
>> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000
>>
>> flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070
>>
>> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000
>>
>> flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070
>>
>> flink.hadoop.dfs.client.failover.proxy.provider.BCluster:
>> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>>
>>
>>
>>
>> 但在作业启动时候出错了,似乎是无法识别集群B的nameservices高可用配置,转而当成域名识别,具体报错如下:
>>
>> (将配置项改成flink客户端本地的默认hdfs集群A,则作业可正常启动 :flink.hadoop.fs.defaultFS:
>> hdfs://ACluster)
>>
>>
>>
>>
>> Caused by: BCluster
>>
>> java.net.UnknownHostException: BCluster
>>
>> at
>> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
>>
>> at
>> org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
>>
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)
>>
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)
>>
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
>>
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
>>
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
>>
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
>>
>> at
>> org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
>>
>> at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
>>
>> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
>>
>> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
>>
>> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
>>
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
>>
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
>>
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
>>
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>> 对于以上问题,是否有解决方案?痛点是flink访问两个hdfs集群,最好是能通过flink-conf.yaml的配置实现。
>>
>>
>>
>>
>> 我的组件版本:
>>
>> flink : 1.13.3
>>
>> hadoop : 3.3.0
>>
>>
>>
>>
>> 期待回复,感谢!


Re: 关于flink on yarn 跨多hdfs集群访问的问题

2021-12-06 文章 Yang Wang
我觉得你可以尝试一下ship本地的hadoop conf,然后设置HADOOP_CONF_DIR环境变量的方式

-yt /path/of/my-hadoop-conf
-yD containerized.master.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'
-yD containerized.taskmanager.env.HADOOP_CONF_DIR='$PWD/my-hadoop-conf'


Best,
Yang

chenqizhu  于2021年11月30日周二 上午10:00写道:

> all,您好:
>
>  flink 1.13 版本支持了在flink-conf.yaml通过flink.hadoop.* 的方式
> 配置hadoop属性。有个需求是将checkpoint写到装有ssd的hdfs(称之为集群B)以加速checkpoint写入速度,但这个hdfs集群不是flink客户端本地的默认hdfs(默认hdfs称为集群A),于是想通过在flink-conf.yaml里配置A、B两个集群的nameservices,类似与hdfs联邦模式,访问到两个hdfs集群,具体配置如下:
>
>
>
>
> flink.hadoop.dfs.nameservices: ACluster,BCluster
>
> flink.hadoop.fs.defaultFS: hdfs://BCluster
>
>
>
>
> flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
>
> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000
>
> flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070
>
> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000
>
> flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070
>
> flink.hadoop.dfs.client.failover.proxy.provider.ACluster:
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>
>
>
>
> flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
>
> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000
>
> flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070
>
> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000
>
> flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070
>
> flink.hadoop.dfs.client.failover.proxy.provider.BCluster:
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>
>
>
>
> 但在作业启动时候出错了,似乎是无法识别集群B的nameservices高可用配置,转而当成域名识别,具体报错如下:
>
> (将配置项改成flink客户端本地的默认hdfs集群A,则作业可正常启动 :flink.hadoop.fs.defaultFS:
> hdfs://ACluster)
>
>
>
>
> Caused by: BCluster
>
> java.net.UnknownHostException: BCluster
>
> at
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
>
> at
> org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
>
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)
>
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
>
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
>
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
>
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
>
> at
> org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
>
> at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
>
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
>
> at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
>
> at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> 对于以上问题,是否有解决方案?痛点是flink访问两个hdfs集群,最好是能通过flink-conf.yaml的配置实现。
>
>
>
>
> 我的组件版本:
>
> flink : 1.13.3
>
> hadoop : 3.3.0
>
>
>
>
> 期待回复,感谢!


关于flink on yarn 跨多hdfs集群访问的问题

2021-11-29 文章 chenqizhu
all,您好:

 flink 1.13 版本支持了在flink-conf.yaml通过flink.hadoop.* 的方式 
配置hadoop属性。有个需求是将checkpoint写到装有ssd的hdfs(称之为集群B)以加速checkpoint写入速度,但这个hdfs集群不是flink客户端本地的默认hdfs(默认hdfs称为集群A),于是想通过在flink-conf.yaml里配置A、B两个集群的nameservices,类似与hdfs联邦模式,访问到两个hdfs集群,具体配置如下:




flink.hadoop.dfs.nameservices: ACluster,BCluster

flink.hadoop.fs.defaultFS: hdfs://BCluster




flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2

flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000

flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070

flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000

flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070

flink.hadoop.dfs.client.failover.proxy.provider.ACluster: 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider




flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2

flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000

flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070

flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000

flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070

flink.hadoop.dfs.client.failover.proxy.provider.BCluster: 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider




但在作业启动时候出错了,似乎是无法识别集群B的nameservices高可用配置,转而当成域名识别,具体报错如下:

(将配置项改成flink客户端本地的默认hdfs集群A,则作业可正常启动 :flink.hadoop.fs.defaultFS: 
hdfs://ACluster)




Caused by: BCluster

java.net.UnknownHostException: BCluster

at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)

at 
org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)

at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)

at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)

at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)

at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)

at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)

at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)

at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)

at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)




对于以上问题,是否有解决方案?痛点是flink访问两个hdfs集群,最好是能通过flink-conf.yaml的配置实现。




我的组件版本:

flink : 1.13.3

hadoop : 3.3.0




期待回复,感谢!

关于flink on yarn 跨多hdfs集群访问的问题

2021-11-29 文章 chenqizhu
all,您好:
 flink 1.13 版本支持了在flink-conf.yaml通过flink.hadoop.* 的方式 
配置hadoop属性。有个需求是将checkpoint写到装有ssd的hdfs(称之为集群B)以加速checkpoint写入速度,但这个hdfs集群不是flink客户端本地的默认hdfs(默认hdfs称为集群A),于是想通过在flink-conf.yaml里配置A、B两个集群的nameservices,类似与hdfs联邦模式,访问到两个hdfs集群,具体配置如下:


flink.hadoop.dfs.nameservices: ACluster,BCluster
flink.hadoop.fs.defaultFS: hdfs://BCluster


flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000
flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070
flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000
flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070
flink.hadoop.dfs.client.failover.proxy.provider.ACluster: 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider


flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000
flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070
flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000
flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070
flink.hadoop.dfs.client.failover.proxy.provider.BCluster: 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider


但在作业启动时候出错了,似乎是无法识别集群B的nameservices高可用配置,转而当成域名识别,具体报错如下:
(将配置项改成flink客户端本地的默认hdfs集群A,则作业可正常启动 :flink.hadoop.fs.defaultFS: 
hdfs://ACluster)


Caused by: BCluster
java.net.UnknownHostException: BCluster
at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
at 
org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:412)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:412)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:247)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:240)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:228)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


对于以上问题,是否有解决方案?痛点是flink访问两个hdfs集群,最好是能通过flink-conf.yaml的配置实现。


我的组件版本:
flink : 1.13.3
hadoop : 3.3.0











??????flink on yarn ??pre_job????????,????session????????????

2021-11-04 文章 JasonLee
hi


?? jar ??Flink ??


Best
JasonLee
??2021??11??4?? 18:41<2572805...@qq.com.INVALID> ??
yarn??:
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint YarnJobClusterEntrypoint.   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
   at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:91)
 Caused by: java.lang.VerifyError: class 
org.apache.flink.yarn.YarnResourceManager overrides final method 
onStop.()Ljava/util/concurrent/CompletableFuture;   at 
java.lang.ClassLoader.defineClass1(Native Method)   at 
java.lang.ClassLoader.defineClass(ClassLoader.java:763)   at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)   at 
java.net.URLClassLoader.defineClass(URLClassLoader.java:467)   at 
java.net.URLClassLoader.access$100(URLClassLoader.java:73)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:368)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:362)   at 
java.security.AccessController.doPrivileged(Native Method)   at 
java.net.URLClassLoader.findClass(URLClassLoader.java:361)   at 
java.lang.ClassLoader.loadClass(ClassLoader.java:424)   at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)   at 
java.lang.ClassLoader.loadClass(ClassLoader.java:357)   at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:54)
   at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:38)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:231)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
   at java.security.AccessController.doPrivileged(Native Method)   at 
javax.security.auth.Subject.doAs(Subject.java:422)   at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
   at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
   ... 2 common frames omitted

Diagnostics:
  Application application_1635998548270_0028 failed 1  times (global limit 
=2; local limit is =1) due to AM Container for  
appattempt_1635998548270_0028_01 exited with  exitCode: 1   
For more detailed output, check the application  tracking page:  
http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028  Then 
click on links to logs of each attempt.   
Diagnostics: Exception from container-launch.   
Container id: container_e391_1635998548270_0028_01_01   
Exit code: 1   
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:944)   
at org.apache.hadoop.util.Shell.run(Shell.java:848)   
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1142)   

at  
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:237)
   
at  
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
   
at  
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
   
at java.util.concurrent.FutureTask.run(FutureTask.java:266)   
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
  
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
  
at java.lang.Thread.run(Thread.java:748)   


Container exited with a non-zero exit code 1   
Failing this attempt. Failing the application.   

??




Re: flink on yarn 的pre_job提交失败,但是session模式可以成功

2021-11-04 文章 刘建刚
通过你上面的信息是看不出来的,里头的链接你可以看下详细日志
http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028

陈卓宇 <2572805...@qq.com.invalid> 于2021年11月4日周四 下午6:29写道:

> yarn的错误日志:
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint YarnJobClusterEntrypoint.   at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
>  at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
>  at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:91)
> Caused by: java.lang.VerifyError: class
> org.apache.flink.yarn.YarnResourceManager overrides final method
> onStop.()Ljava/util/concurrent/CompletableFuture;  at
> java.lang.ClassLoader.defineClass1(Native Method)at
> java.lang.ClassLoader.defineClass(ClassLoader.java:763)  at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>   at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>  at java.net.URLClassLoader.access$100(URLClassLoader.java:73)   at
> java.net.URLClassLoader$1.run(URLClassLoader.java:368)   at
> java.net.URLClassLoader$1.run(URLClassLoader.java:362)   at
> java.security.AccessController.doPrivileged(Native Method)   at
> java.net.URLClassLoader.findClass(URLClassLoader.java:361)   at
> java.lang.ClassLoader.loadClass(ClassLoader.java:424)at
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)at
> java.lang.ClassLoader.loadClass(ClassLoader.java:357)at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:54)
> at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:38)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:231)
>at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
> at java.security.AccessController.doPrivileged(Native Method)   at
> javax.security.auth.Subject.doAs(Subject.java:422)   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
>at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
>  ... 2 common frames omitted
>
> Diagnostics:
> Application application_1635998548270_0028 failed 1  times
> (global limit =2; local limit is =1) due to AM Container for
> appattempt_1635998548270_0028_01 exited with  exitCode: 1
>
> For more detailed output, check
> the application  tracking page:
> http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028
> Then click on links to logs of each attempt.
> Diagnostics: Exception from
> container-launch.
> Container id:
> container_e391_1635998548270_0028_01_01
> Exit code: 1
> Stack trace: ExitCodeException
> exitCode=1:
> at
> org.apache.hadoop.util.Shell.runCommand(Shell.java:944)
> at
> org.apache.hadoop.util.Shell.run(Shell.java:848)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1142)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:237)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
>
> at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at
> java.lang.Thread.run(Thread.java:748)
>
>
> Container exited with a non-zero
> exit code 1
> Failing this attempt. Failing the
> application.
>
> 陈
>
>
> 


flink on yarn ??pre_job????????,????session????????????

2021-11-04 文章 ??????
yarn??:
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint YarnJobClusterEntrypoint.   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
   at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:91)
 Caused by: java.lang.VerifyError: class 
org.apache.flink.yarn.YarnResourceManager overrides final method 
onStop.()Ljava/util/concurrent/CompletableFuture;  at 
java.lang.ClassLoader.defineClass1(Native Method)at 
java.lang.ClassLoader.defineClass(ClassLoader.java:763)  at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)  at 
java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at 
java.net.URLClassLoader.access$100(URLClassLoader.java:73)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:368)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:362)   at 
java.security.AccessController.doPrivileged(Native Method)   at 
java.net.URLClassLoader.findClass(URLClassLoader.java:361)   at 
java.lang.ClassLoader.loadClass(ClassLoader.java:424)at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)at 
java.lang.ClassLoader.loadClass(ClassLoader.java:357)at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:54)
  at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:38)
  at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:231)
 at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
  at java.security.AccessController.doPrivileged(Native Method)   at 
javax.security.auth.Subject.doAs(Subject.java:422)   at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
 at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
   ... 2 common frames omitted

Diagnostics:
  Application application_1635998548270_0028 failed 1  times (global limit 
=2; local limit is =1) due to AM Container for  
appattempt_1635998548270_0028_01 exited with  exitCode: 1   
For more detailed output, check the 
application  tracking page:  
http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028  Then 
click on links to logs of each attempt.   
Diagnostics: Exception from 
container-launch.   
Container id: 
container_e391_1635998548270_0028_01_01   
Exit code: 1   
Stack trace: ExitCodeException 
exitCode=1:
at 
org.apache.hadoop.util.Shell.runCommand(Shell.java:944)   
at 
org.apache.hadoop.util.Shell.run(Shell.java:848)   
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1142)  
 
at  
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:237)
   
at  
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
   
at  
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
   
at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)   
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
  
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
  
at 
java.lang.Thread.run(Thread.java:748)   
   
   

flink 1.13.1 ????yarn-application????????????????mysql??????????????hive??????????????16G+??Taskmangaer????

2021-11-04 文章 Asahi Lee
hi!
??flink 
sqlmysql??hive??yarn-application??16G??

回复:Flink on yarn的日志监控和checkpoint的监控生产是如何处理的?

2021-08-31 文章 JasonLee
Hi


可以参考这两篇文章:
https://mp.weixin.qq.com/s/2S4M8p-rBRinIRxmZrZq5Q 
https://mp.weixin.qq.com/s/44SXmCAUOqSWhQrNiZftoQ


Best
JasonLee


在2021年08月31日 13:23,guanyq 写道:
flink on yarn 在集群中启动很多的task,生产应用中是如何监控task的日志,和checkpoint的呢?


求大佬指导。

Flink on yarn的日志监控和checkpoint的监控生产是如何处理的?

2021-08-30 文章 guanyq
flink on yarn 在集群中启动很多的task,生产应用中是如何监控task的日志,和checkpoint的呢?


求大佬指导。

Re: Flink On Yarn HA 部署模式下Flink程序无法启动

2021-08-17 文章 周瑞
您好,我的版本是1.13.1


--Original--
From: "Yang Wang"https://issues.apache.org/jira/browse/FLINK-19212

Best,
Yang

周瑞 

Re: Flink On Yarn HA 部署模式下Flink程序无法启动

2021-08-17 文章 Yang Wang
看报错应该是个已知问题[1]并且已经在1.11.2中修复

[1]. https://issues.apache.org/jira/browse/FLINK-19212

Best,
Yang

周瑞  于2021年8月17日周二 上午11:04写道:

> 您好:Flink程序部署在Yran上以Appliation Mode 模式启动的,在没有采用HA
> 模式的时候可以正常启动,配置了HA之后,启动异常,麻烦帮忙看下是什么原因导致的.
>
>
> HA 配置如下:
> high-availability: zookeeper high-availability.storageDir:
> hdfs://mycluster/flink/ha high-availability.zookeeper.quorum:
> zk-1:2181,zk-2:2181,zk-3:2181 high-availability.zookeeper.path.root: /flink
> high-availability.cluster-id: /flink_cluster
>
>
> 异常如下:
> 2021-08-17 10:24:18,938 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Starting DefaultLeaderElectionService with
> ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}.
> 2021-08-17 10:25:09,706 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> [] - Unhandled exception.
> org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to
> serialize the result for RPC call : requestTaskManagerDetailsInfo.
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:404)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:360)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> ~[?:1.8.0_292]
> at
> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)
> ~[?:1.8.0_292]
> at
> java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168)
> ~[?:1.8.0_292]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:352)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:319)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at 
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at 
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.Actor.aroundReceive$(Actor.scala:515)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.13.1.jar:1.13.1]
> Caused by: java.io.NotSerializableException:
> org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[?:1.8.0_292]
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[?:1.8.0_292]
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> 

Flink On Yarn HA 部署模式下Flink程序无法启动

2021-08-16 文章 周瑞
您好:Flink程序部署在Yran上以Appliation Mode 模式启动的,在没有采用HA 
模式的时候可以正常启动,配置了HA之后,启动异常,麻烦帮忙看下是什么原因导致的.


HA 配置如下:
high-availability: zookeeper high-availability.storageDir: 
hdfs://mycluster/flink/ha high-availability.zookeeper.quorum: 
zk-1:2181,zk-2:2181,zk-3:2181 high-availability.zookeeper.path.root: /flink 
high-availability.cluster-id: /flink_cluster


异常如下:
2021-08-17 10:24:18,938 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Starting DefaultLeaderElectionService with 
ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}.
2021-08-17 10:25:09,706 ERROR 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler [] 
- Unhandled exception.
org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to 
serialize the result for RPC call : requestTaskManagerDetailsInfo.
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:404)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:360)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) 
~[?:1.8.0_292]
at 
java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)
 ~[?:1.8.0_292]
at 
java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168) 
~[?:1.8.0_292]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:352)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:319)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.12-1.13.1.jar:1.13.1]
Caused by: java.io.NotSerializableException: 
org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots
at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
~[?:1.8.0_292]
at 
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
~[?:1.8.0_292]
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:387)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 29 more

flink on yarn报错

2021-07-30 文章 wangjingen
有没有大佬帮忙看看这个问题
The RMClient's and  YarnResourceManagers internal state about the 
number of pending container requests for resource has 
diverged .Number client's pending container requests 1 !=Number RM's pending 
container requests 0;

flink on yarn??????????log4j????

2021-07-22 文章 comsir
hi all
flink??log4jlog4j
??
 ??

Flink on yarn-cluster模式提交任务报错

2021-06-08 文章 maker_d...@foxmail.com
我在CDH集群上使用Flink on yarn-cluster模式提交任务,报错不能部署,找不到jar包。
这个jar包是我没有用到的,但是在flink的lib中是存在的,并且我已经将lib的目录添加到环境变量中:
export HADOOP_CLASSPATH=/opt/cloudera/parcels/FLINK/lib/flink/lib


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Could not deploy Yarn job cluster.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not deploy Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)
at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1053)
at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:129)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:417)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1748)
at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:96)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
... 11 more
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1623148752688_0041 failed 1 
times (global limit =2; local limit is =1) due to AM Container for 
appattempt_1623148752688_0041_01 exited with  exitCode: -1000
Failing this attempt.Diagnostics: [2021-06-08 18:56:14.062]File 
file:/root/.flink/application_1623148752688_0041/lib/flink-table-blink_2.12-1.12.4.jar
 does not exist
java.io.FileNotFoundException: File 
file:/root/.flink/application_1623148752688_0041/lib/flink-table-blink_2.12-1.12.4.jar
 does not exist
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:867)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442)
at 
org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:269)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:67)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:414)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:411)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:411)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:242)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:235)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call

回复:flink on yarn日志清理

2021-06-07 文章 王刚
你可以在客户端的log4j.properties或者logback.xml文件上配置下相关清理策略

你先确认下使用的哪个slf4j的实现类
 原始邮件
发件人: zjfpla...@hotmail.com
收件人: user-zh
发送时间: 2021年6月7日(周一) 12:17
主题: flink on yarn日志清理


大家好,
请问下如下问题:
flink on yarn模式,日志清理机制有没有的?
是不是也是按照log4j/logback/log4j2等的清理机制来的?还是yarn上配置的。
是实时流作业,非离线一次性作业,一直跑着的



zjfpla...@hotmail.com<mailto:zjfpla...@hotmail.com>



flink on yarn日志清理

2021-06-06 文章 zjfpla...@hotmail.com
大家好,
请问下如下问题:
flink on yarn模式,日志清理机制有没有的?
是不是也是按照log4j/logback/log4j2等的清理机制来的?还是yarn上配置的。
是实时流作业,非离线一次性作业,一直跑着的



zjfpla...@hotmail.com


Re: Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-31 文章 Yang Wang
HA在ZK里面记录了最后一次成功的checkpoint counter和地址,没有启用HA的话,就是从指定的savepoint恢复的。


Best,
Yang

刘建刚  于2021年5月28日周五 下午6:51写道:

> 那应该是master failover后把快照信息丢失了,ha应该能解决这个问题。
>
> 董建 <62...@163.com> 于2021年5月28日周五 下午6:24写道:
>
> > 稳定复现
> > checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。
> > 我们jobmanager没有做ha,不知道是否是这个原因导致的?
> > 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。
> > 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。
> > >> org.apache.flink.configuration.GlobalConfiguration   [] -
> > Loading
> > >> configuration property: execution.savepoint.path,
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-05-28 18:15:38,"刘建刚"  写道:
> > >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
> > >1、从savepoint恢复;
> > >2、作业开始定期做savepoint;
> > >3、作业failover。
> > >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
> > >如果还是有问题,需要通过日志来排查了。
> > >
> > >董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道:
> > >
> > >> 我遇到的问题现象是这样的
> > >>
> > >>
> > >>
> > >>
> > >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
> > >>
> > >>
> > >>
> > >>
> > >> flink run -d -s
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> > >> -t yarn-per-job -m yarn-cluser -D yarn.application.name=
> > >> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
> > >>
> > >>
> > >>
> > >>
> > >> 2、flink-conf.xml
> > >>
> > >>
> > >>
> > >>
> > >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
> > >>
> > >>
> > >>
> > >>
> > >> 3、代码checkpoint设置
> > >>
> > >>
> > >>
> > >>
> > >>StreamExecutionEnvironment env =
> > >> StreamExecutionEnvironment.getExecutionEnvironment();
> > >>
> > >>
> > >>
> > >>
> > >>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
> > >> 10));
> > >>
> > >>
> > >>
> > >>
> > >>CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> >
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > >>
> > >>
> > >>
> > >>
> > >>env.enableCheckpointing(1 * 60 * 1000);
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > >>
> > >>
> > >>
> > >>
> > >>checkpointConfig.setTolerableCheckpointFailureNumber(100);
> > >>
> > >>
> > >>
> > >>
> > >>checkpointConfig.setCheckpointTimeout(60 * 1000);
> > >>
> > >>
> > >>
> > >>
> > >>checkpointConfig.setMaxConcurrentCheckpoints(1);
> > >>
> > >>
> > >>
> > >>
> > >> 4、问题现象
> > >>
> > >>
> > >>
> > >>
> > >> a)运维同事切换yarn
> > >>
> >
> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
> > >>
> > >>
> > >>
> > >>
> > >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
> > >> restore,从日志中看还是从chk-100 restore的。
> > >>
> > >>
> > >>
> > >>
> > >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction
> > >> sourceMilApplysLogStream = MySQLSource.builder()
> > >>
> > >>
> > >>
> > >>
> > >>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
> > >>
> > >>
> > >>
> > >>
> > >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
> > >>
> > >>
> > >>
> > >>
> > >> 2021-05-24 16:49:50,398 INFO
> > >> org.apache.flink.configuration.GlobalConfiguration   [] -
> > Loading
> > >> configuration property: execution.savepoint.path,
> > >>
> >
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> > >>
> > >>
> > >>
> > >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
> > >>
> > >>
> > >>
> > >>
> > >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。
> >
>


Re: Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-28 文章 刘建刚
那应该是master failover后把快照信息丢失了,ha应该能解决这个问题。

董建 <62...@163.com> 于2021年5月28日周五 下午6:24写道:

> 稳定复现
> checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。
> 我们jobmanager没有做ha,不知道是否是这个原因导致的?
> 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。
> 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。
> >> org.apache.flink.configuration.GlobalConfiguration   [] -
> Loading
> >> configuration property: execution.savepoint.path,
> >>
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-05-28 18:15:38,"刘建刚"  写道:
> >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
> >1、从savepoint恢复;
> >2、作业开始定期做savepoint;
> >3、作业failover。
> >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
> >如果还是有问题,需要通过日志来排查了。
> >
> >董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道:
> >
> >> 我遇到的问题现象是这样的
> >>
> >>
> >>
> >>
> >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
> >>
> >>
> >>
> >>
> >> flink run -d -s
> >>
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> >> -t yarn-per-job -m yarn-cluser -D yarn.application.name=
> >> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
> >>
> >>
> >>
> >>
> >> 2、flink-conf.xml
> >>
> >>
> >>
> >>
> >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
> >>
> >>
> >>
> >>
> >> 3、代码checkpoint设置
> >>
> >>
> >>
> >>
> >>StreamExecutionEnvironment env =
> >> StreamExecutionEnvironment.getExecutionEnvironment();
> >>
> >>
> >>
> >>
> >>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
> >> 10));
> >>
> >>
> >>
> >>
> >>CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> >>
> >>
> >>
> >>
> >>
> >>
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >>
> >>
> >>
> >>
> >>env.enableCheckpointing(1 * 60 * 1000);
> >>
> >>
> >>
> >>
> >>
> >>  checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >>
> >>
> >>
> >>
> >>checkpointConfig.setTolerableCheckpointFailureNumber(100);
> >>
> >>
> >>
> >>
> >>checkpointConfig.setCheckpointTimeout(60 * 1000);
> >>
> >>
> >>
> >>
> >>checkpointConfig.setMaxConcurrentCheckpoints(1);
> >>
> >>
> >>
> >>
> >> 4、问题现象
> >>
> >>
> >>
> >>
> >> a)运维同事切换yarn
> >>
> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
> >>
> >>
> >>
> >>
> >>
> >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
> >>
> >>
> >>
> >>
> >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
> >> restore,从日志中看还是从chk-100 restore的。
> >>
> >>
> >>
> >>
> >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction
> >> sourceMilApplysLogStream = MySQLSource.builder()
> >>
> >>
> >>
> >>
> >>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
> >>
> >>
> >>
> >>
> >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
> >>
> >>
> >>
> >>
> >> 2021-05-24 16:49:50,398 INFO
> >> org.apache.flink.configuration.GlobalConfiguration   [] -
> Loading
> >> configuration property: execution.savepoint.path,
> >>
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> >>
> >>
> >>
> >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
> >>
> >>
> >>
> >>
> >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。
>


Re:Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-28 文章 董建
稳定复现
checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。
我们jobmanager没有做ha,不知道是否是这个原因导致的?
日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。
目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。
>> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: execution.savepoint.path,
>> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100














在 2021-05-28 18:15:38,"刘建刚"  写道:
>这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
>1、从savepoint恢复;
>2、作业开始定期做savepoint;
>3、作业failover。
>如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
>如果还是有问题,需要通过日志来排查了。
>
>董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道:
>
>> 我遇到的问题现象是这样的
>>
>>
>>
>>
>> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
>>
>>
>>
>>
>> flink run -d -s
>> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>> -t yarn-per-job -m yarn-cluser -D yarn.application.name=
>> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
>>
>>
>>
>>
>> 2、flink-conf.xml
>>
>>
>>
>>
>> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
>>
>>
>>
>>
>> 3、代码checkpoint设置
>>
>>
>>
>>
>>StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>
>>
>>
>>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
>> 10));
>>
>>
>>
>>
>>CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>>
>>
>>
>>
>>
>>  
>> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>
>>
>>
>>
>>env.enableCheckpointing(1 * 60 * 1000);
>>
>>
>>
>>
>>
>>  checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>
>>
>>
>>
>>checkpointConfig.setTolerableCheckpointFailureNumber(100);
>>
>>
>>
>>
>>checkpointConfig.setCheckpointTimeout(60 * 1000);
>>
>>
>>
>>
>>checkpointConfig.setMaxConcurrentCheckpoints(1);
>>
>>
>>
>>
>> 4、问题现象
>>
>>
>>
>>
>> a)运维同事切换yarn
>> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
>>
>>
>>
>>
>>
>> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
>>
>>
>>
>>
>> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
>> restore,从日志中看还是从chk-100 restore的。
>>
>>
>>
>>
>> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction
>> sourceMilApplysLogStream = MySQLSource.builder()
>>
>>
>>
>>
>>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
>>
>>
>>
>>
>> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
>>
>>
>>
>>
>> 2021-05-24 16:49:50,398 INFO
>> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: execution.savepoint.path,
>> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>>
>>
>>
>> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
>>
>>
>>
>>
>> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。


Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-28 文章 刘建刚
这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
1、从savepoint恢复;
2、作业开始定期做savepoint;
3、作业failover。
如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
如果还是有问题,需要通过日志来排查了。

董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道:

> 我遇到的问题现象是这样的
>
>
>
>
> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
>
>
>
>
> flink run -d -s
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> -t yarn-per-job -m yarn-cluser -D yarn.application.name=
> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
>
>
>
>
> 2、flink-conf.xml
>
>
>
>
> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
>
>
>
>
> 3、代码checkpoint设置
>
>
>
>
>StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
>
>
>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
> 10));
>
>
>
>
>CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>
>
>
>
>
>  
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>
>
>
>env.enableCheckpointing(1 * 60 * 1000);
>
>
>
>
>
>  checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
>
>
>
>checkpointConfig.setTolerableCheckpointFailureNumber(100);
>
>
>
>
>checkpointConfig.setCheckpointTimeout(60 * 1000);
>
>
>
>
>checkpointConfig.setMaxConcurrentCheckpoints(1);
>
>
>
>
> 4、问题现象
>
>
>
>
> a)运维同事切换yarn
> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
>
>
>
>
>
> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
>
>
>
>
> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
> restore,从日志中看还是从chk-100 restore的。
>
>
>
>
> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction
> sourceMilApplysLogStream = MySQLSource.builder()
>
>
>
>
>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
>
>
>
>
> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
>
>
>
>
> 2021-05-24 16:49:50,398 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: execution.savepoint.path,
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>
>
>
> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
>
>
>
>
> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。


flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-28 文章 董建
我遇到的问题现象是这样的




1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。




flink run -d -s 
hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 
-t yarn-per-job -m yarn-cluser -D yarn.application.name= 
/tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod




2、flink-conf.xml




state.checkpoints.dir: hdfs:///user/flink/checkpoints/default




3、代码checkpoint设置




   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();




   env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 10));




   CheckpointConfig checkpointConfig = env.getCheckpointConfig();




   
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);




   env.enableCheckpointing(1 * 60 * 1000);




   checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);




   checkpointConfig.setTolerableCheckpointFailureNumber(100);




   checkpointConfig.setCheckpointTimeout(60 * 1000);




   checkpointConfig.setMaxConcurrentCheckpoints(1);




4、问题现象




a)运维同事切换yarn 
resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器




  b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200




c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200 restore,从日志中看还是从chk-100 
restore的。




d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction 
sourceMilApplysLogStream = MySQLSource.builder()




  重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费




e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?




2021-05-24 16:49:50,398 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: execution.savepoint.path, 
hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100



预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费




现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。

flink on yarn更新文件后重启失败

2021-04-30 文章 zjfpla...@hotmail.com
flink任务停止后,将相关配置文件进行更新(keytab),然后报错:
Resource hdfs://nameservice1/user/hbase/.flink/${appid}/hbase.keytab changed on 
src filesystem(excepted ,was )



zjfpla...@hotmail.com


flink on yarn kerberos认证问题

2021-04-30 文章 zjfpla...@hotmail.com
大家好,
 问题点:
   
1.cdh中kerberos已经被cm托管的情况下,cm中修改kerberos配置,/var/kerberos/krb5kdc/kdc.conf和/etc/krb5.conf都不变,好像是存在其他位置,这个有没有人清楚?
2.flink 1.8 on cdh5.14 yarn运行时,一天后报GSS initiate failed{caused by 
GSSException:No valid credentials 
provided}的报错,然后程序失败终止,怀疑是kerberos票据renew失效了,是不是我有哪里没配对,还是flink1.8还不支持renew?原先用spark-submit
 --keytab就会自动renew tgt

   服务器端的相关部分配置:
1.flink-conf.yaml:
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /home/zjf/zjf.keytab
security.kerberos.login.principal: z...@test.com
security.kerberos.login.contexts: Client,KafkaClient
zookeeper.sasl.service-name: zookeeper
zookeeper.sasl.login-context-name: Client
2./var/kerberos/krb5kdc/kdc.conf:

3./etc/krb5.conf:
其中的max_renewable_life是后面手动加的,kdc已重启过还是无效。

然后查看kerberos debug的日志, 发现有如下日志段:
Forwardable Ticket true
Forwarded Ticket false
Proxiable Ticket false
Proxy Ticket false 
Postdated Ticket false
Renewable Ticket false
Initial Ticket false
Auth Time =Fri Apr 30 14:38:36 CST 2021
Start Time =Fri Apr 30 14:38:36 CST 2021
End Time =Sat May 01 14:38:36 CST 2021  
Renew Till = null   




zjfpla...@hotmail.com
 


flink在yarn集群上启动的问题

2021-04-21 文章 tanggen...@163.com
您好,我在向yarn 集群提交flink任务时遇到了一些问题,希望能帮忙回答一下
我布署了一个三个节点hadoop集群,两个工作节点为4c24G,yarn-site中配置了8个vcore,可用内存为20G,总共是16vcore 
40G的资源,现在我向yarn提交了两个任务,分别分配了3vcore,6G内存,共消耗6vcore,12G内存,从hadoop的web 
ui上也能反映这一点,如下图:
但是当我提交第三个任务时,却无法提交成功,没有明显的报错日志,可是整个集群的资源明显是充足的,所以不知道问题是出现在哪里,还请多多指教
附1(控制台输出):
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Could not deploy Yarn job cluster.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not deploy Yarn job cluster.
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1818)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
at 
com.hfhj.akb.report.order.PlatformOrderStream.main(PlatformOrderStream.java:81)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1618931441017_0004 failed 2 
times in previous 1 milliseconds (global limit =4; local limit is =2) due 
to AM Container for appattempt_1618931441017_0004_03 exited with  exitCode: 
1
Failing this attempt.Diagnostics: [2021-04-20 23:34:16.067]Exception from 
container-launch.
Container id: container_1618931441017_0004_03_01
Exit code: 1

[2021-04-20 23:34:16.069]Container exited with a non-zero exit code 1. Error 
file: prelaunch.err.
Last 4096 bytes of prelaunch.err :

[2021-04-20 23:34:16.069]Container exited with a non-zero exit code 1. Error 
file: prelaunch.err.
Last 4096 bytes of prelaunch.err :

For more detailed output, check the application tracking page: 
http://master107:8088/cluster/app/application_1618931441017_0004 Then click on 
links to logs of each attempt.
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further 
investigate the issue:
yarn logs -applicationId application_1618931441017_0004
at 
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1021)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:524)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:424)
... 22 more
附2(hadoop日志):
2021-04-20 23:31:40,293 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Stopping resource-monitoring for container_1618931441017_0004_01_01
2021-04-20 23:31:40,293 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got 
event CONTAINER_STOP for appId application_1618931441017_0004
2021-04-20 23:31:41,297 INFO 
org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: Removed 
completed containers from NM context: [container_1618931441017_0004_01_01]
2021-04-20 23:34:12,558 INFO SecurityLogger.org.apache.hadoop.ipc.Server: Auth 
successful for appattempt_1618931441017_0004_03 (auth:SIMPLE)
2021-04-20 23:34:12,565 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 

flink on yarn 启动报错

2021-04-18 文章 Bruce Zhang
flink on yarn per-job 模式提交报错,命令是 bin/flink run -m yarn-cluster -d -yjm 1024 
-ytm 4096 /home/XX.jar

 

 yarn 资源足够,提交别的程序也可以,只有这个程序提交就报错,但是命令修改为bin/flink run -m yarn-cluster -yjm 1024 
-ytm 4096 /home/testjar/XX.jar 就能成功,即去掉-d 这个命令参数,但是是session模式,并且还会影响别的程序执行

 

报错信息:

2021-04-19 10:08:13,116 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- No path for the flink jar passed. Using the location of class 
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar

2021-04-19 10:08:13,116 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- No path for the flink jar passed. Using the location of class 
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar

2021-04-19 10:08:13,541 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster 
specification: ClusterSpecification{masterMemoryMB=1024, 
taskManagerMemoryMB=4096, numberTaskManagers=1, slotsPerTaskManager=1}

2021-04-19 10:08:13,843 WARN  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The 
configuration directory ('/home/software/flink-1.7.0/conf') contains both LOG4J 
and Logback configuration files. Please delete or rename one of them.

2021-04-19 10:08:14,769 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Submitting 
application master application_1618796268543_0019

2021-04-19 10:08:14,789 INFO  
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
application application_1618796268543_0019

2021-04-19 10:08:14,789 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for the 
cluster to be allocated

2021-04-19 10:08:14,791 INFO  
org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying 
cluster, current state ACCEPTED






 The program finished with the following exception:




org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy 
Yarn job cluster.

at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)

at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)

at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)

at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

Caused by: 
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state FAILED during deployment.

Diagnostics from YARN: Application application_1618796268543_0019 failed 1 
times due to AM Container for appattempt_1618796268543_0019_01 exited with  
exitCode: 1

For more detailed output, check application tracking 
page:http://siact-11:8088/cluster/app/application_1618796268543_0019Then, click 
on links to logs of each attempt.

Diagnostics: Exception from container-launch.

Container id: container_e24_1618796268543_0019_01_01

Exit code: 1

Stack trace: ExitCodeException exitCode=1:

at org.apache.hadoop.util.Shell.runCommand(Shell.java:585)

at org.apache.hadoop.util.Shell.run(Shell.java:482)

at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)

at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)







Container exited with a non-zero exit code 1

Failing this attempt. Failing the application.

If log aggregation is enabled on your cluster, use this command to further 
investigate the issue:

yarn logs -applicationId application_1618796268543_0019

at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1065)

at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545

Re: flink on yarn 多TaskManager 拒绝连接问题

2021-04-12 文章 haihua
hi请问楼主这个问题解决了 ,有什么思路可以分享一下吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: flink on yarn session模式与yarn通信失败的问题 (job模式可以成功)

2021-03-22 文章 刘乘九
多谢大佬呀~尝试了一下没有解决。这两个参数有配置上,启动的时候也显示的与配置中一致。看上面的注释说好像仅Standalone 
模式下有效,而且奇怪的是pre-job可以很顺利  session却连不上。对啦我的版本是1.11.2,大佬有空再帮忙看一眼呀











在 2021-03-23 09:28:20,"wxpcc"  写道:
>第一个问题可以尝试在flink.conf 中配上jobmanager.rpc.address 和jobmanager.rpc.port
>第二个问题不是很清楚
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink on yarn session模式与yarn通信失败的问题 (job模式可以成功)

2021-03-22 文章 wxpcc
第一个问题可以尝试在flink.conf 中配上jobmanager.rpc.address 和jobmanager.rpc.port
第二个问题不是很清楚



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink on yarn session模式与yarn通信失败的问题 (job模式可以成功)

2021-03-22 文章 刘乘九
大佬们请教一下:
之前一直使用job模式来提交任务,可以顺利提交计算任务。最近有需求比较适合session模式来提交,按照论坛里的教程进行提交的时候,一直报错连接不上resource
  manage。观察启动log发现两种任务连接的resource manage不同,一个是正确的端口,一个一直请求本机端口。

session  模式启动log:


job  模式启动log:


想请教一下:
 1.如何配置session 模式下的  resource  manage 端口?

 2.job 模式下假如我有一个8核taskmanage服务器A配置了16个slot。job 
模式提交了一个并行度为5的任务分配到了服务器A,再通过job模式提交任务的话,就不能有任务分配到服务器A了。我有办法将剩下空闲的slot利用起来吗?或者是设计上就是对小规模的任务,减少信息传递来提升计算完成的时间?






 

flink on yarn session模式与yarn通信失败 (job模式可以成功)的问题

2021-03-22 文章 刘乘九
大佬们请教一下:
之前一直使用job模式来提交任务,可以顺利提交计算任务。最近有需求比较适合session模式来提交,按照论坛里的教程进行提交的时候,一直报错连接不上resource
  manage。观察启动log发现两种任务连接的resource manage不同,一个是正确的端口,一个一直请求本机端口。

session  模式启动log:


job  模式启动log:


想请教一下:
 1.如何配置session 模式下的  resource  manage 端口?

 2.job 模式下假如我有一个8核taskmanage服务器A配置了16个slot。job 
模式提交了一个并行度为5的任务分配到了服务器A,再通过job模式提交任务的话,就不能有任务分配到服务器A了。我有办法将剩下空闲的slot利用起来吗?或者是设计上就是对小规模的任务,减少信息传递来提升计算完成的时间?



Flink on yarn per-job HA

2021-03-19 文章 Ink????
??
??flink1.12 flink on yarn per-job 
HAHA??

Re: Flink On Yarn Per Job 作业提交失败问题

2021-02-24 文章 Robin Zhang
Hi,凌战
看看hadoop环境变量是否正确设置,可以参考文档
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#preparation

Best,
Robin


凌战 wrote
> hi,社区
> 在接口端设置用户为 hdfs 用户,在调度执行作业后,发现在/user/hdfs/.flink/application-id 目录下 存在相关包,如
> -rw-r--r--   3 hdfs supergroup   9402 2021-02-24 11:02
> /user/hdfs/.flink/application_1610671284452_0257/WordCount.jar
> -rw-r--r--   3 hdfs supergroup   1602 2021-02-24 11:09
> /user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_0257-flink-conf.yaml7449295579763306480.tmp
> -rw-r--r--   3 hdfs supergroup  32629 2021-02-24 11:09 
> /user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_02573784840919107496826.tmp
> -rw-r--r--   3 hdfs supergroup  110075001 2021-02-24 11:09
> /user/hdfs/.flink/application_1610671284452_0257/flink-dist_2.11-1.10.1.jar
> 
> 
> 但是报错 Could not find or load main class
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
> 发现上传文件目录的权限是  -rw-r--r-- ,不知道是不是因为权限问题导致
> 
> 
> 希望有人解惑!
> | |
> 凌战
> |
> |

> m18340872285@

> |
> 签名由网易邮箱大师定制





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink On Yarn Per Job 作业提交失败问题

2021-02-23 文章 凌战
hi,社区
在接口端设置用户为 hdfs 用户,在调度执行作业后,发现在/user/hdfs/.flink/application-id 目录下 存在相关包,如
-rw-r--r--   3 hdfs supergroup   9402 2021-02-24 11:02 
/user/hdfs/.flink/application_1610671284452_0257/WordCount.jar
-rw-r--r--   3 hdfs supergroup   1602 2021-02-24 11:09 
/user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_0257-flink-conf.yaml7449295579763306480.tmp
-rw-r--r--   3 hdfs supergroup  32629 2021-02-24 11:09  
/user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_02573784840919107496826.tmp
-rw-r--r--   3 hdfs supergroup  110075001 2021-02-24 11:09 
/user/hdfs/.flink/application_1610671284452_0257/flink-dist_2.11-1.10.1.jar


但是报错 Could not find or load main class 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
发现上传文件目录的权限是  -rw-r--r-- ,不知道是不是因为权限问题导致


希望有人解惑!
| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制

Re: 本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 文章 Smile
你好,
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint 这个类应该是在 flink-yarn
这个 module 里面,打 lib 包的时候作为依赖被打进 flink-dist 里面。
为什么你同时添加了 flink-dist_2.11-1.10.1.jar 和 flink-yarn_2.11-1.11.1.jar 这两个 jar
呀,不会冲突吗?

Smile



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:回复:本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 文章 Smile@LETTers
你好,org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint 这个类应该是在 
flink-yarn 这个 module 里面,打 lib 包的时候作为依赖被打进 flink-dist 里面。为什么你同时添加了 
flink-dist_2.11-1.10.1.jar 和 flink-yarn_2.11-1.11.1.jar 这两个 jar 
呀,不会冲突吗?Smile
在 2021-02-23 19:27:43,"凌战"  写道:
>上面添加的jar包没有显示,补充一下:目前除了用户jar包,添加的依赖jar包就是
>flink-dist_2.11-1.10.1.jar
>flink-queryable-state-runtime_2.11-1.10.1.jar
>flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
>flink-table-blink_2.11-1.10.1.jar
>flink-table_2.11-1.10.1.jar
>flink-yarn_2.11-1.11.1.jar
>
>
>| |
>凌战
>|
>|
>m18340872...@163.com
>|
>签名由网易邮箱大师定制
>在2021年2月23日 19:02,凌战 写道:
>
>
>  List userClassPaths = new ArrayList<>();
>File file = ResourceUtils.getFile(new 
> URL(Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toString()+"lib"));
>if(file.isDirectory()&()!=null){
>for(File ele: Objects.requireNonNull(file.listFiles())) {
>userClassPaths.add(ele.toURI().toURL());
>}
>}
>
>
>// 构建PackagedProgram
>PackagedProgram packagedProgram =
>PackagedProgram.newBuilder()
>.setJarFile(jar)
>.setUserClassPaths(userClassPaths)
>.build();
>
>
>// 获取Configuration
>String configurationDirectory = 
> CliFrontend.getConfigurationDirectoryFromEnv();
>
>
>// 2. load the global configuration
>// 加载 flink-conf.yaml构成 Configuration
>Configuration configuration = 
> GlobalConfiguration.loadConfiguration(configurationDirectory);
>
>
>
>
>// 3. 加载jar包
>ConfigUtils.encodeCollectionToConfig(
>configuration,
>PipelineOptions.JARS,
>packagedProgram.getJobJarAndDependencies(),
>URL::toString
>);
>
>
>ConfigUtils.encodeCollectionToConfig(
>configuration,
>PipelineOptions.CLASSPATHS,
>packagedProgram.getClasspaths(),
>URL::toString
>);
>
>
>
>
>Pipeline pipeline = 
> this.wrapClassLoader(packagedProgram.getUserCodeClassLoader(),() -> {
>try {
>return PackagedProgramUtils.
>getPipelineFromProgram(packagedProgram,
>configuration,
>10,
>false);
>} catch (ProgramInvocationException e) {
>e.printStackTrace();
>return null;
>}
>});
>
>
>
>
>// yarn-per-job模式
>return new PlatformAbstractJobClusterExecutor<>(new 
> YarnClusterClientFactory()).
>
> execute(pipeline,configuration,packagedProgram.getUserCodeClassLoader()).get().getJobID().toString();
>
>
>
>
>
>
>这里添加的依赖jar包如下
>
>
>
>
>但是出现报错:
>
>
>2021-02-23 18:49:23.404  INFO 26116 --- [nio-8080-exec-4] 
>o.a.flink.api.java.utils.PlanGenerator   : The job has 0 registered types and 
>0 default Kryo serializers
>2021-02-23 18:50:38.746  INFO 26116 --- [nio-8080-exec-4] 
>o.a.flink.yarn.YarnClusterDescriptor : No path for the flink jar passed. 
>Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to 
>locate the jar
>2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
>.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
>not specified, use the configured deprecated task manager heap value 
>(1024.000mb (1073741824 bytes)) for it.
>2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
>o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
>overhead memory (102.400mb (107374184 bytes)) is less than its min value 
>192.000mb (201326592 bytes), min value will be used instead
>2021-02-23 18:50:41.159  INFO 26116 --- [nio-8080-exec-4] 
>.h.y.c.ConfiguredRMFailoverProxyProvider : Failing over to rm80
>2021-02-23 18:50:41.848  INFO 26116 --- [nio-8080-exec-4] 
>o.a.flink.yarn.YarnClusterDescriptor : Cluster specification: 
>ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, 
>slotsPerTaskManager=1}
>2021-02-23 18:50:41.849  WARN 26116 --- [nio-8080-exec-4] 
>o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] 
>does not exist.
>2021-02-23 18:50:42.555  WARN 26116 --- [nio-8080-exec-4] 
>o.a.flink.yarn.YarnClusterDescriptor : Environment variable 
>'FLINK_LIB_DIR' not set and ship files have not been provided manually. Not 
>shipping any library files.
>2021-02-23 18:50:42.556  WARN 26116 --- [nio-8080-exe

回复:本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 文章 凌战
上面添加的jar包没有显示,补充一下:目前除了用户jar包,添加的依赖jar包就是
flink-dist_2.11-1.10.1.jar
flink-queryable-state-runtime_2.11-1.10.1.jar
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
flink-table-blink_2.11-1.10.1.jar
flink-table_2.11-1.10.1.jar
flink-yarn_2.11-1.11.1.jar


| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 19:02,凌战 写道:


  List userClassPaths = new ArrayList<>();
File file = ResourceUtils.getFile(new 
URL(Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toString()+"lib"));
if(file.isDirectory()&()!=null){
for(File ele: Objects.requireNonNull(file.listFiles())) {
userClassPaths.add(ele.toURI().toURL());
}
}


// 构建PackagedProgram
PackagedProgram packagedProgram =
PackagedProgram.newBuilder()
.setJarFile(jar)
.setUserClassPaths(userClassPaths)
.build();


// 获取Configuration
String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();


// 2. load the global configuration
// 加载 flink-conf.yaml构成 Configuration
Configuration configuration = 
GlobalConfiguration.loadConfiguration(configurationDirectory);




// 3. 加载jar包
ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.JARS,
packagedProgram.getJobJarAndDependencies(),
URL::toString
);


ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.CLASSPATHS,
packagedProgram.getClasspaths(),
URL::toString
);




Pipeline pipeline = 
this.wrapClassLoader(packagedProgram.getUserCodeClassLoader(),() -> {
try {
return PackagedProgramUtils.
getPipelineFromProgram(packagedProgram,
configuration,
10,
false);
} catch (ProgramInvocationException e) {
e.printStackTrace();
return null;
}
});




// yarn-per-job模式
return new PlatformAbstractJobClusterExecutor<>(new 
YarnClusterClientFactory()).

execute(pipeline,configuration,packagedProgram.getUserCodeClassLoader()).get().getJobID().toString();






这里添加的依赖jar包如下




但是出现报错:


2021-02-23 18:49:23.404  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.api.java.utils.PlanGenerator   : The job has 0 registered types and 0 
default Kryo serializers
2021-02-23 18:50:38.746  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : No path for the flink jar passed. 
Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to 
locate the jar
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:41.159  INFO 26116 --- [nio-8080-exec-4] 
.h.y.c.ConfiguredRMFailoverProxyProvider : Failing over to rm80
2021-02-23 18:50:41.848  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Cluster specification: 
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, 
slotsPerTaskManager=1}
2021-02-23 18:50:41.849  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:42.555  WARN 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Environment variable 'FLINK_LIB_DIR' 
not set and ship files have not been provided manually. Not shipping any 
library files.
2021-02-23 18:50:42.556  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:48.554  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Submitting application master 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- [nio-8080-exec-4] 

回复:本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 文章 凌战
上面添加的jar包没有显示,补充一下:目前除了用户jar包,添加的依赖jar包就是
flink-dist_2.11-1.10.1.jar
flink-queryable-state-runtime_2.11-1.10.1.jar
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
flink-table-blink_2.11-1.10.1.jar
flink-table_2.11-1.10.1.jar
flink-yarn_2.11-1.11.1.jar


但是提交到flink on yarn那边,仍然报错


| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 19:02,凌战 写道:


  List userClassPaths = new ArrayList<>();
File file = ResourceUtils.getFile(new 
URL(Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toString()+"lib"));
if(file.isDirectory()&()!=null){
for(File ele: Objects.requireNonNull(file.listFiles())) {
userClassPaths.add(ele.toURI().toURL());
}
}


// 构建PackagedProgram
PackagedProgram packagedProgram =
PackagedProgram.newBuilder()
.setJarFile(jar)
.setUserClassPaths(userClassPaths)
.build();


// 获取Configuration
String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();


// 2. load the global configuration
// 加载 flink-conf.yaml构成 Configuration
Configuration configuration = 
GlobalConfiguration.loadConfiguration(configurationDirectory);




// 3. 加载jar包
ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.JARS,
packagedProgram.getJobJarAndDependencies(),
URL::toString
);


ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.CLASSPATHS,
packagedProgram.getClasspaths(),
URL::toString
);




Pipeline pipeline = 
this.wrapClassLoader(packagedProgram.getUserCodeClassLoader(),() -> {
try {
return PackagedProgramUtils.
getPipelineFromProgram(packagedProgram,
configuration,
10,
false);
} catch (ProgramInvocationException e) {
e.printStackTrace();
return null;
}
});




// yarn-per-job模式
return new PlatformAbstractJobClusterExecutor<>(new 
YarnClusterClientFactory()).

execute(pipeline,configuration,packagedProgram.getUserCodeClassLoader()).get().getJobID().toString();






这里添加的依赖jar包如下




但是出现报错:


2021-02-23 18:49:23.404  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.api.java.utils.PlanGenerator   : The job has 0 registered types and 0 
default Kryo serializers
2021-02-23 18:50:38.746  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : No path for the flink jar passed. 
Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to 
locate the jar
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:41.159  INFO 26116 --- [nio-8080-exec-4] 
.h.y.c.ConfiguredRMFailoverProxyProvider : Failing over to rm80
2021-02-23 18:50:41.848  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Cluster specification: 
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, 
slotsPerTaskManager=1}
2021-02-23 18:50:41.849  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:42.555  WARN 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Environment variable 'FLINK_LIB_DIR' 
not set and ship files have not been provided manually. Not shipping any 
library files.
2021-02-23 18:50:42.556  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:48.554  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Submitting application master 
application_1610671284452_0243
2021-

本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 文章 凌战


  List userClassPaths = new ArrayList<>();
File file = ResourceUtils.getFile(new 
URL(Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toString()+"lib"));
if(file.isDirectory()&()!=null){
for(File ele: Objects.requireNonNull(file.listFiles())) {
userClassPaths.add(ele.toURI().toURL());
}
}


// 构建PackagedProgram
PackagedProgram packagedProgram =
PackagedProgram.newBuilder()
.setJarFile(jar)
.setUserClassPaths(userClassPaths)
.build();


// 获取Configuration
String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();


// 2. load the global configuration
// 加载 flink-conf.yaml构成 Configuration
Configuration configuration = 
GlobalConfiguration.loadConfiguration(configurationDirectory);




// 3. 加载jar包
ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.JARS,
packagedProgram.getJobJarAndDependencies(),
URL::toString
);


ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.CLASSPATHS,
packagedProgram.getClasspaths(),
URL::toString
);




Pipeline pipeline = 
this.wrapClassLoader(packagedProgram.getUserCodeClassLoader(),() -> {
try {
return PackagedProgramUtils.
getPipelineFromProgram(packagedProgram,
configuration,
10,
false);
} catch (ProgramInvocationException e) {
e.printStackTrace();
return null;
}
});




// yarn-per-job模式
return new PlatformAbstractJobClusterExecutor<>(new 
YarnClusterClientFactory()).

execute(pipeline,configuration,packagedProgram.getUserCodeClassLoader()).get().getJobID().toString();






这里添加的依赖jar包如下




但是出现报错:


2021-02-23 18:49:23.404  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.api.java.utils.PlanGenerator   : The job has 0 registered types and 0 
default Kryo serializers
2021-02-23 18:50:38.746  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : No path for the flink jar passed. 
Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to 
locate the jar
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:41.159  INFO 26116 --- [nio-8080-exec-4] 
.h.y.c.ConfiguredRMFailoverProxyProvider : Failing over to rm80
2021-02-23 18:50:41.848  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Cluster specification: 
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, 
slotsPerTaskManager=1}
2021-02-23 18:50:41.849  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:42.555  WARN 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Environment variable 'FLINK_LIB_DIR' 
not set and ship files have not been provided manually. Not shipping any 
library files.
2021-02-23 18:50:42.556  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:48.554  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Submitting application master 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- [nio-8080-exec-4] 
o.a.h.y.client.api.impl.YarnClientImpl   : Submitted application 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Waiting for the cluster to be 
allocated
2021-02-23 18:50:49.281  INFO 26116 --- [nio-8080-exec-4] 

回复:本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 文章 凌战




| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 18:57,凌战 写道:
List userClassPaths = new ArrayList<>();
File file = ResourceUtils.getFile(new 
URL(Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toString()+"lib"));
if(file.isDirectory()&()!=null){
for(File ele: Objects.requireNonNull(file.listFiles())) {
userClassPaths.add(ele.toURI().toURL());
}
}


// 构建PackagedProgram
PackagedProgram packagedProgram =
PackagedProgram.newBuilder()
.setJarFile(jar)
.setUserClassPaths(userClassPaths)
.build();


// 获取Configuration
String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();


// 2. load the global configuration
// 加载 flink-conf.yaml构成 Configuration
Configuration configuration = 
GlobalConfiguration.loadConfiguration(configurationDirectory);




// 3. 加载jar包
ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.JARS,
packagedProgram.getJobJarAndDependencies(),
URL::toString
);


ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.CLASSPATHS,
packagedProgram.getClasspaths(),
URL::toString
);




Pipeline pipeline = 
this.wrapClassLoader(packagedProgram.getUserCodeClassLoader(),() -> {
try {
return PackagedProgramUtils.
getPipelineFromProgram(packagedProgram,
configuration,
10,
false);
} catch (ProgramInvocationException e) {
e.printStackTrace();
return null;
}
});




// yarn-per-job模式
return new PlatformAbstractJobClusterExecutor<>(new 
YarnClusterClientFactory()).

execute(pipeline,configuration,packagedProgram.getUserCodeClassLoader()).get().getJobID().toString();






这里添加的依赖jar包如下




但是出现报错:


2021-02-23 18:49:23.404  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.api.java.utils.PlanGenerator   : The job has 0 registered types and 0 
default Kryo serializers
2021-02-23 18:50:38.746  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : No path for the flink jar passed. 
Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to 
locate the jar
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:41.159  INFO 26116 --- [nio-8080-exec-4] 
.h.y.c.ConfiguredRMFailoverProxyProvider : Failing over to rm80
2021-02-23 18:50:41.848  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Cluster specification: 
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, 
slotsPerTaskManager=1}
2021-02-23 18:50:41.849  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:42.555  WARN 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Environment variable 'FLINK_LIB_DIR' 
not set and ship files have not been provided manually. Not shipping any 
library files.
2021-02-23 18:50:42.556  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:48.554  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Submitting application master 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- [nio-8080-exec-4] 
o.a.h.y.client.api.impl.YarnClientImpl   : Submitted application 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Waiting for the cluster to be 
allocated
2021-02-23 

本地api提交jar包到Flink on Yarn集群,报错 Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

2021-02-23 文章 凌战
List userClassPaths = new ArrayList<>();
File file = ResourceUtils.getFile(new 
URL(Objects.requireNonNull(this.getClass().getClassLoader().getResource("")).toString()+"lib"));
if(file.isDirectory()&()!=null){
for(File ele: Objects.requireNonNull(file.listFiles())) {
userClassPaths.add(ele.toURI().toURL());
}
}


// 构建PackagedProgram
PackagedProgram packagedProgram =
PackagedProgram.newBuilder()
.setJarFile(jar)
.setUserClassPaths(userClassPaths)
.build();


// 获取Configuration
String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();


// 2. load the global configuration
// 加载 flink-conf.yaml构成 Configuration
Configuration configuration = 
GlobalConfiguration.loadConfiguration(configurationDirectory);




// 3. 加载jar包
ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.JARS,
packagedProgram.getJobJarAndDependencies(),
URL::toString
);


ConfigUtils.encodeCollectionToConfig(
configuration,
PipelineOptions.CLASSPATHS,
packagedProgram.getClasspaths(),
URL::toString
);




Pipeline pipeline = 
this.wrapClassLoader(packagedProgram.getUserCodeClassLoader(),() -> {
try {
return PackagedProgramUtils.
getPipelineFromProgram(packagedProgram,
configuration,
10,
false);
} catch (ProgramInvocationException e) {
e.printStackTrace();
return null;
}
});




// yarn-per-job模式
return new PlatformAbstractJobClusterExecutor<>(new 
YarnClusterClientFactory()).

execute(pipeline,configuration,packagedProgram.getUserCodeClassLoader()).get().getJobID().toString();






这里添加的依赖jar包如下




但是出现报错:


2021-02-23 18:49:23.404  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.api.java.utils.PlanGenerator   : The job has 0 registered types and 0 
default Kryo serializers
2021-02-23 18:50:38.746  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : No path for the flink jar passed. 
Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to 
locate the jar
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:38.747  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:41.159  INFO 26116 --- [nio-8080-exec-4] 
.h.y.c.ConfiguredRMFailoverProxyProvider : Failing over to rm80
2021-02-23 18:50:41.848  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Cluster specification: 
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, 
slotsPerTaskManager=1}
2021-02-23 18:50:41.849  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:42.555  WARN 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Environment variable 'FLINK_LIB_DIR' 
not set and ship files have not been provided manually. Not shipping any 
library files.
2021-02-23 18:50:42.556  WARN 26116 --- [nio-8080-exec-4] 
o.apache.flink.core.plugin.PluginConfig  : The plugins directory [plugins] does 
not exist.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
.u.c.m.MemoryBackwardsCompatibilityUtils : 'jobmanager.memory.process.size' is 
not specified, use the configured deprecated task manager heap value 
(1024.000mb (1073741824 bytes)) for it.
2021-02-23 18:50:48.552  INFO 26116 --- [nio-8080-exec-4] 
o.a.f.r.u.c.memory.ProcessMemoryUtils: The derived from fraction jvm 
overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
2021-02-23 18:50:48.554  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Submitting application master 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- [nio-8080-exec-4] 
o.a.h.y.client.api.impl.YarnClientImpl   : Submitted application 
application_1610671284452_0243
2021-02-23 18:50:49.133  INFO 26116 --- [nio-8080-exec-4] 
o.a.flink.yarn.YarnClusterDescriptor : Waiting for the cluster to be 
allocated
2021-02-23 18:50:49.281  INFO 26116 --- [nio-8080-exec-4] 

flink on yarn任务的唯一性id问题

2021-02-18 文章 datayangl
目前使用flink1.11进行数据的etl工作,使用snowflake算法生成唯一性id,一个taskmanager有4个slot,etl任务并行度设为16,假设在单机节点上,那么实际运行的任务会运行4个yarn
container,由于同一台机器上的雪花算法有相同的时钟和机器id,因此有机率会出现重复id。请问,1.雪花算法怎么应用到单节点多container的场景且不重复
2.还有什么唯一性id的算法(除了UUID)



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink on yarn 多TaskManager 拒绝连接问题

2021-02-07 文章 Yang Wang
那你可能需要把你的JobManager和TaskManager的日志发一下,才能进一步分析

主要需要确认的是连的端口是正确的,如果网络层面没有问题,那就有可能是哪个配置项使用了某个特定端口导致的

Best,
Yang

Junpb  于2021年2月8日周一 上午9:30写道:

> 你好,
> 我的测试环境yarn有三个节点,当TM启动只有一个时,JM和Tm随机启动在任何节点上都很正常,只有TM变为两个时,会出现报错。
> 每次启动JM和TM端口都是随机的,以上配置是确保2个TM启动,我现在怀疑是我其他配置导致的错误,谢谢
>
> Best,
> Bi
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink on yarn 多TaskManager 拒绝连接问题

2021-02-07 文章 Junpb
你好,
我的测试环境yarn有三个节点,当TM启动只有一个时,JM和Tm随机启动在任何节点上都很正常,只有TM变为两个时,会出现报错。
每次启动JM和TM端口都是随机的,以上配置是确保2个TM启动,我现在怀疑是我其他配置导致的错误,谢谢

Best,
Bi



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink on yarn 多TaskManager 拒绝连接问题

2021-02-06 文章 Yang Wang
建议你使用telnet检查一下JM和有问题TM之间的网络连通性,Flink在这个地方没有已知的bug


Best,
Yang

Junpb  于2021年2月5日周五 下午8:09写道:

> nohup bin/flink run -m yarn-cluster \
> -c main \
> -ynm ${FLINK_NAME} \
> -ys 3 \
> -p 4 \
> -yjm 2048m \
> -ytm 2048m \
>
> 在flink on yarn 的情况下,使用以上flink run 参数,确保TaskManager 为 2
>
> 奇怪的是 JobManager 里面报如下错误,但TaskManager的确启动2个,只是报错的那个TaskManager无法正常工作
>
> 谢谢解答
>
> 错误:
> Caused by:
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
> Connection refused: ip:port
> Caused by: java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:336)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
> at
>
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
> at
>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
> at
>
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink on yarn 多TaskManager 拒绝连接问题

2021-02-05 文章 Junpb
nohup bin/flink run -m yarn-cluster \
-c main \
-ynm ${FLINK_NAME} \
-ys 3 \
-p 4 \
-yjm 2048m \
-ytm 2048m \

在flink on yarn 的情况下,使用以上flink run 参数,确保TaskManager 为 2

奇怪的是 JobManager 里面报如下错误,但TaskManager的确启动2个,只是报错的那个TaskManager无法正常工作

谢谢解答

错误:
Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
Connection refused: ip:port
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:336)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 文章 lp
谢谢!
我摘录的是flink1.11.2版本文档最后那部分:Background / Internals,介绍flink 如何在yarn上运行的
的内容:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html
 
。感觉版本比较新了,应该没有问题吧,也是我们生产上在用的版本。1.12版本中没有找到相关内容。
仔细看了下文档,可能是我对flink on yarn的理解不太清楚,还是有几个问题请教下:
①flink on yarn模式下,jobmanager 和
appLicationMaster是两个独立的线程的概念,运行在一个container这个JVM里面的,对么?
②flink on yarn per-job mod提交作业后,节点上执行jps,
有YarnJobClusterEntrypoint和YarnTaskExecutorRunner这两个进程,这两个进程是什么?
③1.12版本有最新的关于flink on yarn 运行原理的介绍吗,我在官网没有看到这部分内容





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 文章 Xintong Song
你之前的理解是正确的。Yarn 的 AM 就是 Flink 的 JM。

你看到的文档描述是有问题的。我查了一下 git history,你所摘录的内容 2014 年撰写的,描述的应该是项目初期的 on yarn
部署方式,早已经过时了。这部分内容在最新的 1.12 版本文档中已经被移除了。

Thank you~

Xintong Song



On Tue, Feb 2, 2021 at 6:43 PM lp <973182...@qq.com> wrote:

> 或者说,我知道,对于MapReduce任务,ApplicationMaster的实现是MRAppMaster,那flink on yarn
> ,ApplicationMaster对应的实现是啥?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 文章 lp
或者说,我知道,对于MapReduce任务,ApplicationMaster的实现是MRAppMaster,那flink on yarn
,ApplicationMaster对应的实现是啥?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink on yarn , JobManager和ApplicationMaster的关系

2021-02-02 文章 lp
flink on yarn中,yarn的applicationMaster和flink
JobManager的关系是啥,我对yarn不是很熟悉,之前的理解是
JobManager就是yarn中的applicationMaster的角色。但我在官网中看到如下摘录:...Once that has
finished, the ApplicationMaster (AM) is started.The JobManager and AM are
running in the same container. Once they successfully started, the AM knows
the address of the JobManager (its own host).  说明 AM和JM是两个进程,可是我将flink
job提交到yarn集群,只看到有jobManager进程(YarnJobClusterEntrypoint),并没有看到过ApplicationMaster进程,请帮忙解释他们之间的联系和区别?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink on yarn JDK 版本支持问题

2021-01-24 文章 Yun Tang
Hi,

MaxMetaspaceSize 是在JDK8中新增的,用以取代以前的PermGen[1],JDK7中自然不支持。可以在hadoop集群中再安装JDK8,将 
env.java.home 指向新的JDK


[1] https://www.baeldung.com/java-permgen-metaspace#metaspace

祝好
唐云

From: Jacob <17691150...@163.com>
Sent: Saturday, January 23, 2021 16:17
To: user-zh@flink.apache.org 
Subject: Flink on yarn JDK 版本支持问题

使用Flink1.11.2客户端 往hadoop集群提交job,报错如下:

LogType:jobmanager.err
Log Upload Time:Sat Jan 23 00:06:47 -0800 2021
LogLength:160
Log Contents:
Unrecognized VM option 'MaxMetaspaceSize=268435456'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

请问是因为hadoop集群jdk版本低的问题吗?


现在已知的是hadoop集群jdk版本为1.7


之前一直以为在flink配置文件中配置的*env.java.home*就应该是hadoop集群的java home,通过测试,发现并不是,这个java
home就是客户端(本地)所在机器的java home。这个java版本已经是1.8+,但提交job时,仍然报错,如上。



是因为hadoop集群的jdk低吗?如果升级了hadoop集群的jdk,那么在flink配置文件中的env.java.home
需要改成hadoop集群的java home吗?还是不用改变,依旧使用本地的java home路径?

这两个jdk对于启动一个flink job的作用是什么呀?( 配置的env.java.home和hadoop集群的java home)







-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink On Yarn部署模式下,yarn能否实现自定义日志聚合策略而不是作业结束后才聚合。

2021-01-23 文章 Bobby
在Flink On
Yarn部署模式下,发现只有当作业终止后,yarn才会对各个tm和jm的日志进行聚合放到hdfs里。这对实际生产查日志解决问题非常不方便。
有没有可以自定义日志聚合策略,如每间隔多久就聚合一次放到yarn里这种实现方式。
亦或者对Flink程序各位大佬在日常使用中是如何做到实时查询日志的。
感谢。





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink on yarn JDK 版本支持问题

2021-01-23 文章 Jacob
使用Flink1.11.2客户端 往hadoop集群提交job,报错如下:

LogType:jobmanager.err
Log Upload Time:Sat Jan 23 00:06:47 -0800 2021
LogLength:160
Log Contents:
Unrecognized VM option 'MaxMetaspaceSize=268435456'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

请问是因为hadoop集群jdk版本低的问题吗?


现在已知的是hadoop集群jdk版本为1.7


之前一直以为在flink配置文件中配置的*env.java.home*就应该是hadoop集群的java home,通过测试,发现并不是,这个java
home就是客户端(本地)所在机器的java home。这个java版本已经是1.8+,但提交job时,仍然报错,如上。



是因为hadoop集群的jdk低吗?如果升级了hadoop集群的jdk,那么在flink配置文件中的env.java.home
需要改成hadoop集群的java home吗?还是不用改变,依旧使用本地的java home路径?

这两个jdk对于启动一个flink job的作用是什么呀?( 配置的env.java.home和hadoop集群的java home)







-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink On Yarn部署模式下,提交Flink作业 如何指定自定义log4j 配置

2021-01-18 文章 Yang Wang
具体你可以看一下YarnClusterDescriptor和YarnLogConfigUtil这两个类的代码
里面包含了如何来发现log4j的配置文件,以及如何来注册LocalResource,让Yarn来进行配置分发

Best,
Yang

Bobby <1010445...@qq.com> 于2021年1月18日周一 下午11:17写道:

> 首先感谢提供解决方案。我回头就去试试。
>
>
> 关于提到的“在Yarn部署的时候是依赖log4j.properties这个文件名来ship资源的,所以不能手动指定一个其他文件”,怎么理解,可以提供相关资料吗,我去了解具体flink
> on yarn 部署逻辑。
>
> thx.
>
>
> Yang Wang wrote
> > 在Yarn部署的时候是依赖log4j.properties这个文件名来ship资源的,所以不能手动指定一个其他文件
> >
> > 但是你可以export一个FLINK_CONF_DIR=/path/of/your/flink-conf环境变量
> > 在相应的目录下放自己的flink-conf.yaml和log4j.properties
> >
> > Best,
> > Yang
> >
> > Bobby <
>
> > 1010445050@
>
> >> 于2021年1月18日周一 下午7:18写道:
> >
> >> Flink On Yarn 日志配置log4j.properties 文件默认读取flink/conf中的log4j.properties。
> >> 有没有方法可以在提交flink 作业时指定自己编写的log4.properties。
> >> thx。
> >>
> >>
> >> Flink版本:1.9.1
> >> 部署方式:Flink on Yarn
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink On Yarn部署模式下,提交Flink作业 如何指定自定义log4j 配置

2021-01-18 文章 Bobby
首先感谢提供解决方案。我回头就去试试。

关于提到的“在Yarn部署的时候是依赖log4j.properties这个文件名来ship资源的,所以不能手动指定一个其他文件”,怎么理解,可以提供相关资料吗,我去了解具体flink
on yarn 部署逻辑。

thx.


Yang Wang wrote
> 在Yarn部署的时候是依赖log4j.properties这个文件名来ship资源的,所以不能手动指定一个其他文件
> 
> 但是你可以export一个FLINK_CONF_DIR=/path/of/your/flink-conf环境变量
> 在相应的目录下放自己的flink-conf.yaml和log4j.properties
> 
> Best,
> Yang
> 
> Bobby <

> 1010445050@

>> 于2021年1月18日周一 下午7:18写道:
> 
>> Flink On Yarn 日志配置log4j.properties 文件默认读取flink/conf中的log4j.properties。
>> 有没有方法可以在提交flink 作业时指定自己编写的log4.properties。
>> thx。
>>
>>
>> Flink版本:1.9.1
>> 部署方式:Flink on Yarn
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink On Yarn部署模式下,提交Flink作业 如何指定自定义log4j 配置

2021-01-18 文章 Bobby
11



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink On Yarn部署模式下,提交Flink作业 如何指定自定义log4j 配置

2021-01-18 文章 Bobby
首先感谢提供解决方案。我回头就去试试。

关于提到的“在Yarn部署的时候是依赖log4j.properties这个文件名来ship资源的,所以不能手动指定一个其他文件”,怎么理解,可以提供相关资料吗,我去了解具体flink
on yarn 部署逻辑。

thx.



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink On Yarn部署模式下,提交Flink作业 如何指定自定义log4j 配置

2021-01-18 文章 Yang Wang
在Yarn部署的时候是依赖log4j.properties这个文件名来ship资源的,所以不能手动指定一个其他文件

但是你可以export一个FLINK_CONF_DIR=/path/of/your/flink-conf环境变量
在相应的目录下放自己的flink-conf.yaml和log4j.properties

Best,
Yang

Bobby <1010445...@qq.com> 于2021年1月18日周一 下午7:18写道:

> Flink On Yarn 日志配置log4j.properties 文件默认读取flink/conf中的log4j.properties。
> 有没有方法可以在提交flink 作业时指定自己编写的log4.properties。
> thx。
>
>
> Flink版本:1.9.1
> 部署方式:Flink on Yarn
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Flink On Yarn部署模式下,提交Flink作业 如何指定自定义log4j 配置

2021-01-18 文章 Bobby
Flink On Yarn 日志配置log4j.properties 文件默认读取flink/conf中的log4j.properties。
有没有方法可以在提交flink 作业时指定自己编写的log4.properties。
thx。


Flink版本:1.9.1
部署方式:Flink on Yarn



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink on yarn??????????????

2020-12-30 文章 ??????
??
bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 -c 
com.dwd.lunch.dwd_lunch.Dim_Cc_Media -ys 1 xjia_shuyun-6.0.jar

nodemanager??


Deployment took more than 60 seconds. Please check if the requested resources 
are available in the YARN cluster
??
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state KILLED during 
deployment.
Diagnostics from YARN: Application application_1607073161732_0137 was killed by 
user xjia at 192.168.100.31

Re: Re: flink on yarn启动失败

2020-12-23 文章 magichuang
感谢感谢感谢!!!

原来是这样,以为solt 缩写就是-s了,,,感谢这位朋友的解答,已经可以提交了~


> -- 原始邮件 --
> 发 件 人:"Yang Wang" 
> 发送时间:2020-12-24 11:01:46
> 收 件 人:user-zh 
> 抄 送:
> 主 题:Re: flink on yarn启动失败
>
> 你这个命令写的有点问题,flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py
> traffic.py
>
> 应该是-ys,而不是-s
> -s是从savepoints恢复,所以报错里面会有找不到savepoints目录
>
>
> Best,
> Yang
>
> magichuang 于2020年12月23日周三 下午8:29写道:
>
> > 机器参数:三台 32C64G centos 7.8,cdh集群在这上面先部署
> > flink版本:1.11.2,在三台集群上搭建的集群
> >
> > hadoop集群是用cdh搭建的
> >
> >
> > 启动命令:flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py
> > traffic.py
> >
> > 程序使用pyflink开发的,从kafka读取数据,然后用滚动窗口聚合每分钟的数据在写入kafka
> >
> >
> >
> >
> > 这个程序在local模式下是正常运行的,但是用per-job模式提交总是失败
> >
> > 测试官方例子 flink run -m yarn-cluster examples/batch/WordCount.jar
> > 是可以输出结果的,所以想请教一下这个是yarn的问题还是程序的问题啊?
> >
> >
> >
> >
> > 下面是主要报错信息
> >
> > Caused by: java.util.concurrent.CompletionException:
> > org.apache.flink.runtime.client.JobExecutionException: Could not
> > instantiate JobManager.
> >
> > at
> > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> > ~[?:1.8.0_202]
> >
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > ... 4 more
> >
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> > not instantiate JobManager.
> >
> > at
> > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> > ~[?:1.8.0_202]
> >
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > ... 4 more
> >
> > Caused by: java.io.FileNotFoundException: Cannot find checkpoint or
> > savepoint file/directory '2' on file system 'file'.
> >
> > at
> > org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:243)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1394)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:300)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:253)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> > ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> >
> > at
> > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.create

Re: flink on yarn启动失败

2020-12-23 文章 Yang Wang
你这个命令写的有点问题,flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py
traffic.py

应该是-ys,而不是-s
-s是从savepoints恢复,所以报错里面会有找不到savepoints目录


Best,
Yang

magichuang  于2020年12月23日周三 下午8:29写道:

> 机器参数:三台  32C64G  centos  7.8,cdh集群在这上面先部署
> flink版本:1.11.2,在三台集群上搭建的集群
>
> hadoop集群是用cdh搭建的
>
>
> 启动命令:flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py
> traffic.py
>
> 程序使用pyflink开发的,从kafka读取数据,然后用滚动窗口聚合每分钟的数据在写入kafka
>
>
>
>
> 这个程序在local模式下是正常运行的,但是用per-job模式提交总是失败
>
> 测试官方例子  flink run -m yarn-cluster examples/batch/WordCount.jar
>  是可以输出结果的,所以想请教一下这个是yarn的问题还是程序的问题啊?
>
>
>
>
> 下面是主要报错信息
>
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.client.JobExecutionException: Could not
> instantiate JobManager.
>
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ~[?:1.8.0_202]
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> ... 4 more
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not instantiate JobManager.
>
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ~[?:1.8.0_202]
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> ... 4 more
>
> Caused by: java.io.FileNotFoundException: Cannot find checkpoint or
> savepoint file/directory '2' on file system 'file'.
>
> at
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:243)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1394)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:300)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:253)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ~[?:1.8.0_202]
>
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
>
> at
> akka.di

flink on yarn启动失败

2020-12-23 文章 magichuang
机器参数:三台  32C64G  centos  7.8,cdh集群在这上面先部署
flink版本:1.11.2,在三台集群上搭建的集群

hadoop集群是用cdh搭建的


启动命令:flink run -m yarn-cluster -ynm traffic -s 2 -p 2 -ytm 1024 -py traffic.py

程序使用pyflink开发的,从kafka读取数据,然后用滚动窗口聚合每分钟的数据在写入kafka




这个程序在local模式下是正常运行的,但是用per-job模式提交总是失败

测试官方例子  flink run -m yarn-cluster examples/batch/WordCount.jar   
是可以输出结果的,所以想请教一下这个是yarn的问题还是程序的问题啊?




下面是主要报错信息

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.client.JobExecutionException: Could not instantiate 
JobManager.

at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ~[?:1.8.0_202]

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

... 4 more

Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
instantiate JobManager.

at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ~[?:1.8.0_202]

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

... 4 more

Caused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint 
file/directory '2' on file system 'file'.

at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:243)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1394)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:300)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:253)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ~[?:1.8.0_202]

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
~[flink-dist_2.12-1.11.2.jar:1.11.2]

at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]

... 4 more

2020-12-23 20:12:46,459 INFO org.apache.flink.runtime.blob.BlobServer [] - 
Stopped BLOB server at 0.0.0.0:16109







全部日志可以打开下面的链接:https://note.youdao.com/ynoteshare1/index.html?id=25f1af945e277057c2251e8f60d90f8a=note
加载可能慢一些,请稍等一会就出来了~













Best,

MagicHuang







Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-22 文章 Yang Wang
在Yarn上将需要给Flink使用的机器单独划分到一个partition里面,使用node label可以实现

然后在提交Flink任务的时候使用yarn.application.node-label来指定就可以了。partition是可以保证排他的,不带这个label的调度不上来

Best,
Yang

r pp  于2020年12月23日周三 上午11:18写道:

> flink 提交到特定的node ,可以保证 其它的任务 不能提交到flink特定的node 上么?
>
> xiao cai  于2020年12月22日周二 上午10:28写道:
>
> > Hi
> > 可以考虑使用yarn的node label特性,将flink的任务提交到特定的node上
> >
> >
> >  Original Message
> > Sender: r pp
> > Recipient: user-zh
> > Date: Monday, Dec 21, 2020 21:25
> > Subject: Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点
> >
> >
> > 嗯,指定几台机子 用于使用flink 运行,为什么 不在yarn 为flink 专门制定 一个队列呢?需要 网络隔离 。。。内网速度多大? <
> > afweij...@163.com> 于2020年12月21日周一 下午5:48写道: > 通过yarn label可以实现 > >
> > -邮件原件- > 发件人: user-zh-return-10095-afweijian=
> > 163@flink.apache.org >  > 163@flink.apache.org> 代表 yujianbo > 发送时间: 2020年12月21日 16:44 > 收件人:
> > user-zh@flink.apache.org > 主题: Flink on yarn
> 如何指定固定几台yarn节点当做flink任务的运行节点
> > > > 各位大佬好: > 请问Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点? > > > > -- >
> > Sent from: http://apache-flink.147419.n8.nabble.com/ >
>


Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-22 文章 r pp
flink 提交到特定的node ,可以保证 其它的任务 不能提交到flink特定的node 上么?

xiao cai  于2020年12月22日周二 上午10:28写道:

> Hi
> 可以考虑使用yarn的node label特性,将flink的任务提交到特定的node上
>
>
>  Original Message
> Sender: r pp
> Recipient: user-zh
> Date: Monday, Dec 21, 2020 21:25
> Subject: Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点
>
>
> 嗯,指定几台机子 用于使用flink 运行,为什么 不在yarn 为flink 专门制定 一个队列呢?需要 网络隔离 。。。内网速度多大? <
> afweij...@163.com> 于2020年12月21日周一 下午5:48写道: > 通过yarn label可以实现 > >
> -邮件原件- > 发件人: user-zh-return-10095-afweijian=
> 163@flink.apache.org >  163@flink.apache.org> 代表 yujianbo > 发送时间: 2020年12月21日 16:44 > 收件人:
> user-zh@flink.apache.org > 主题: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点
> > > 各位大佬好: > 请问Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点? > > > > -- >
> Sent from: http://apache-flink.147419.n8.nabble.com/ >


Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 文章 xiao cai
Hi
可以考虑使用yarn的node label特性,将flink的任务提交到特定的node上


 Original Message 
Sender: r pp
Recipient: user-zh
Date: Monday, Dec 21, 2020 21:25
Subject: Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点


嗯,指定几台机子 用于使用flink 运行,为什么 不在yarn 为flink 专门制定 一个队列呢?需要 网络隔离 。。。内网速度多大? 
 于2020年12月21日周一 下午5:48写道: > 通过yarn label可以实现 > > 
-邮件原件- > 发件人: user-zh-return-10095-afweijian=163@flink.apache.org > 
 代表 yujianbo > 发送时间: 
2020年12月21日 16:44 > 收件人: user-zh@flink.apache.org > 主题: Flink on yarn 
如何指定固定几台yarn节点当做flink任务的运行节点 > > 各位大佬好: > 请问Flink on yarn 
如何指定固定几台yarn节点当做flink任务的运行节点? > > > > -- > Sent from: 
http://apache-flink.147419.n8.nabble.com/ >

Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 文章 r pp
嗯,指定几台机子 用于使用flink 运行,为什么 不在yarn 为flink 专门制定 一个队列呢?需要 网络隔离 。。。内网速度多大?

 于2020年12月21日周一 下午5:48写道:

> 通过yarn label可以实现
>
> -邮件原件-
> 发件人: user-zh-return-10095-afweijian=163@flink.apache.org
>  代表 yujianbo
> 发送时间: 2020年12月21日 16:44
> 收件人: user-zh@flink.apache.org
> 主题: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点
>
> 各位大佬好:
>  请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


答复: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 文章 afweijian
通过yarn label可以实现

-邮件原件-
发件人: user-zh-return-10095-afweijian=163@flink.apache.org 
 代表 yujianbo
发送时间: 2020年12月21日 16:44
收件人: user-zh@flink.apache.org
主题: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

各位大佬好:
 请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 文章 amen...@163.com
这个问题应该问yarn吧。。。



 
发件人: yujianbo
发送时间: 2020-12-21 16:43
收件人: user-zh
主题: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点
各位大佬好:
 请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 文章 yujianbo
各位大佬好:
 请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: flink on yarn 任务FAILED后 没有错误日志 输出到yarn log

2020-11-20 文章 air23
但是 在yarn上跑的spark 任务 都是可以看到错误日志的,  flink这边配置的是log4j的日志文件,本地运行 控制台是可以看到错误原因 和日志
在 2020-11-20 17:53:03,"caozhen"  写道:
>
>1、jobmanager的日志有没有错误呢?
>2、或者通过yarn history查下日志  yarn logs -applicationId xxx
>3、如果是flink client 提交作业可以看下客户端日志
>
>
>
>air23 wrote
>> 你好 
>> flink on yarn 任务FAILED后 没有错误日志 输出到yarn log 
>> 这样定位不到 具体是什么问题导致任务 失败了,请问怎么配置把log输出到yarn的log里面
>
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: flink on yarn 任务FAILED后 没有错误日志 输出到yarn log

2020-11-20 文章 air23
yarn logs -applicationId xxx 和 yarn 历史log 都查看不到FAILED 错误日志。
在 2020-11-20 17:53:03,"caozhen"  写道:
>
>1、jobmanager的日志有没有错误呢?
>2、或者通过yarn history查下日志  yarn logs -applicationId xxx
>3、如果是flink client 提交作业可以看下客户端日志
>
>
>
>air23 wrote
>> 你好 
>> flink on yarn 任务FAILED后 没有错误日志 输出到yarn log 
>> 这样定位不到 具体是什么问题导致任务 失败了,请问怎么配置把log输出到yarn的log里面
>
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


  1   2   3   >