??????????flink-sql????mysql??????join??ddl??dml??????

2019-10-11 文章 ????????
sql ddlo.proctime


 
------
??: "Jark Wu"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table




On Fri, 27 Sep 2019 at 14:12, yelun <986463...@qq.com wrote:

 Hi

 
??flink-sqlmysql??join??ddl??dmldemo

Re: 关于使用RocksDBStateBackend 启用state.backend.rocksdb.ttl.compaction.filter.enabled 配置的问题

2019-10-11 文章 Yun Tang
Hi

我觉得你的担心是在TTL尚未过期的周期内,数据就已经写满磁盘了,这个肯定不是TTL能涵盖的问题,从作业规模上尝试限制写入量,或者增大并发,降低单个rocksDB需要承担的数据量(前提是你的所有机器的磁盘空间是大于你的数据量的)。另外如果真的很担心的话,换一个压缩率更小的算法
 也有一些帮助(代价是更耗时更耗CPU, rocksDB 官方推荐ZTSD或者Zlib)[1],设置compression type可以参考rocksdb 
ColumnFamilyOptions的setCompressionType 方法 [2]

[1] https://github.com/facebook/rocksdb/wiki/Compression#configuration
[2] 
https://github.com/facebook/rocksdb/blob/bc8b05cb779a578b5f5acf8d9390af1d17e65ff5/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java#L282

祝好
唐云


From: claylin <1012539...@qq.com>
Sent: Friday, October 11, 2019 16:16
To: user-zh 
Subject: 关于使用RocksDBStateBackend 
启用state.backend.rocksdb.ttl.compaction.filter.enabled 配置的问题

在使用RocksDBStateBackend时,为了防止state状态过大导致资源不够用(磁盘),采用了state.backend.rocksdb.ttl.compaction.filter.enabled配置,使得每次rocksdb每次进行compact时候判断状态的ttl时间,然后删除过期的state,https://github.com/facebook/rocksdb/wiki/Time-to-Live里面也有说明,但是有没有这种情况,rocksdb每次compact时候,有些状态并没有compact到,那这个时候已经过期的state就不会被删除。而且flink中的ttl刷新策略只有OnCreateAndWrite和OnReadAndWrite,没有那种指定生存时间,不用刷新,譬如说ttl为1天,那在一天后肯定过期,否则就可能出现state的ttl一直刷新,永远不过期,这样最终导致磁盘打满,看有解决方案使用定时任务自己删除,但是这样会严重损耗性能。请问大家还有其他方案吗


回复: 回复: flink 缓存本地文件被删除疑问

2019-10-11 文章 戴嘉诚
Hi 唐云
好的,我重新启动一个yarn 
session来查找原因,不过我早上把RocksDBState后端改成了FSState后端后,就没有出现过任何异常了。

发件人: Yun Tang
发送时间: 2019年10月11日 15:27
收件人: user-zh
主题: Re: 回复: flink 缓存本地文件被删除疑问

hi  嘉诚

这个异常是因为你的task 被cancel了,所以清理了相关目录,结果导致硬链时候原始目录下文件找不到了。
换言之,这个task ”累积数据 -> Sink: 写入到HBase (12/12)(55962df9fd694ed1f82b8f3ec2aaf6c4)” 
是受害者,是因为其他异常导致整个作业failover,之后导致cancel了当前task,你应该在job 
manager日志中找到第一个fail的task,那上面的异常才是root cause。

祝好
唐云

From: 戴嘉诚 
Sent: Friday, October 11, 2019 15:17
To: user-zh 
Subject: Re: 回复: flink 缓存本地文件被删除疑问

Hi
   这是早上发生异常后,我下载的日志,请麻烦查看一下。
 taskmanager.log


Yun Tang  于2019年10月11日周五 下午2:56写道:

> Hi 戴嘉诚
>
> 你的异常发生在failover时,rocksDB恢复的时候,会从hdfs下载文件到临时restore目录,然后再做硬链到DB目录下。原先已经下载到临时restore目录的文件在做硬链的时候,突然报出原始文件找不到了
> [1]。这个异常的出现比较奇怪,能否先share一下出问题的taskmanager的所有日志?
>
>
> [1]
> https://github.com/apache/flink/blob/cda6dc0c44239aa7a36105988328de5744aea125/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
>
> 祝好
> 唐云
> 
> From: 戴嘉诚 
> Sent: Friday, October 11, 2019 14:54
> To: user-zh@flink.apache.org 
> Subject: 回复: flink 缓存本地文件被删除疑问
>
> Hi,
> 我在代码中已经是显式的给每个算子都设置了uid了
>
> 发件人: Qi Kang
> 发送时间: 2019年10月11日 14:48
> 收件人: user-zh@flink.apache.org
> 主题: Re: flink 缓存本地文件被删除疑问
>
> Hi,
>
> 从报错看,可能是因为自动生成的算子ID改变,无法取得state backend里存储的状态。检查点维护的是[算子ID ->
> 状态]的映射关系,所以Flink推荐用uid()方法给算子显式设置ID,才能保证正常恢复现场。可以参考文档[1]。
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#assigning-operator-ids
>
>
> > On Oct 11, 2019, at 11:00, 戴嘉诚  wrote:
> >
> > 大家好:
> >最近我的程序迁移到了flink1.9 on yarn
> session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为:
> > java.lang.Exception: Exception while creating StreamOperatorStateContext.
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> > at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for WindowOperator_3572e2ca02acab3056013a7c1c4fa645_(6/6)
> from any of the 1 provided restore options.
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> > ... 6 more
> > Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> Caught unexpected exception.
> > at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326)
> > at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> > ... 8 more
> > Caused by: java.nio.file.NoSuchFileException:
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/db/000835.sst
> ->
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/14634105-83be-429e-9cdc-36dbff14a7fa/000835.sst
> > at
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> > at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> > at
> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
> > at 

????????RocksDBStateBackend ????state.backend.rocksdb.ttl.compaction.filter.enabled ??????????

2019-10-11 文章 claylin
??RocksDBStateBackend??,state??(),??state.backend.rocksdb.ttl.compaction.filter.enabled,rocksdbcompact??ttl,??state,https://github.com/facebook/rocksdb/wiki/Time-to-Live,??,rocksdbcompact,??compact??,state??flinkttlOnCreateAndWrite??OnReadAndWrite??,,??ttl??1??,??state??ttl,??

Re: 回复: flink 缓存本地文件被删除疑问

2019-10-11 文章 Yun Tang
hi  嘉诚

这个异常是因为你的task 被cancel了,所以清理了相关目录,结果导致硬链时候原始目录下文件找不到了。
换言之,这个task ”累积数据 -> Sink: 写入到HBase (12/12)(55962df9fd694ed1f82b8f3ec2aaf6c4)” 
是受害者,是因为其他异常导致整个作业failover,之后导致cancel了当前task,你应该在job 
manager日志中找到第一个fail的task,那上面的异常才是root cause。

祝好
唐云

From: 戴嘉诚 
Sent: Friday, October 11, 2019 15:17
To: user-zh 
Subject: Re: 回复: flink 缓存本地文件被删除疑问

Hi
   这是早上发生异常后,我下载的日志,请麻烦查看一下。
 taskmanager.log


Yun Tang  于2019年10月11日周五 下午2:56写道:

> Hi 戴嘉诚
>
> 你的异常发生在failover时,rocksDB恢复的时候,会从hdfs下载文件到临时restore目录,然后再做硬链到DB目录下。原先已经下载到临时restore目录的文件在做硬链的时候,突然报出原始文件找不到了
> [1]。这个异常的出现比较奇怪,能否先share一下出问题的taskmanager的所有日志?
>
>
> [1]
> https://github.com/apache/flink/blob/cda6dc0c44239aa7a36105988328de5744aea125/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
>
> 祝好
> 唐云
> 
> From: 戴嘉诚 
> Sent: Friday, October 11, 2019 14:54
> To: user-zh@flink.apache.org 
> Subject: 回复: flink 缓存本地文件被删除疑问
>
> Hi,
> 我在代码中已经是显式的给每个算子都设置了uid了
>
> 发件人: Qi Kang
> 发送时间: 2019年10月11日 14:48
> 收件人: user-zh@flink.apache.org
> 主题: Re: flink 缓存本地文件被删除疑问
>
> Hi,
>
> 从报错看,可能是因为自动生成的算子ID改变,无法取得state backend里存储的状态。检查点维护的是[算子ID ->
> 状态]的映射关系,所以Flink推荐用uid()方法给算子显式设置ID,才能保证正常恢复现场。可以参考文档[1]。
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#assigning-operator-ids
>
>
> > On Oct 11, 2019, at 11:00, 戴嘉诚  wrote:
> >
> > 大家好:
> >最近我的程序迁移到了flink1.9 on yarn
> session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为:
> > java.lang.Exception: Exception while creating StreamOperatorStateContext.
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> > at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for WindowOperator_3572e2ca02acab3056013a7c1c4fa645_(6/6)
> from any of the 1 provided restore options.
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> > ... 6 more
> > Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> Caught unexpected exception.
> > at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326)
> > at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> > ... 8 more
> > Caused by: java.nio.file.NoSuchFileException:
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/db/000835.sst
> ->
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/14634105-83be-429e-9cdc-36dbff14a7fa/000835.sst
> > at
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> > at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> > at
> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
> > at java.nio.file.Files.createLink(Files.java:1086)
> > at
> 

回复: 回复: flink1.9 webui exception日志显示问题

2019-10-11 文章 戴嘉诚
Hi

是彻底的看不到任何一条历史异常。

发件人: Yun Tang
发送时间: 2019年10月11日 14:43
收件人: user-zh@flink.apache.org
主题: Re: 回复: flink1.9 webui exception日志显示问题

hi

Web上保存的历史异常数目是有限的,只会保存20个 
[1],如果更旧的异常被冲掉了,直接去jobmanager日志里面检索异常信息吧。另外,你的问题是彻底看不到任何一条历史异常,还是看不到最老的历史异常?


[1] 
https://github.com/apache/flink/blob/cda6dc0c44239aa7a36105988328de5744aea125/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java#L54

祝好
唐云


From: 戴嘉诚 
Sent: Thursday, October 10, 2019 16:26
To: user-zh@flink.apache.org 
Subject: 回复: flink1.9 webui exception日志显示问题

+1 这个我也遇到了这个问题,主要原因是异常了,然后region 重启,重启后,会重新加载,就自动清空了异常日志信息..现在不能再webui上排查异常信息了

发件人: 李杰
发送时间: 2019年10月10日 14:41
收件人: user-zh@flink.apache.org
主题: flink1.9 webui exception日志显示问题

log4j.properties为官方默认。
weib ui exception日志一闪而过,ui上看不到历史异常信息






Re: 回复: flink 缓存本地文件被删除疑问

2019-10-11 文章 Yun Tang
Hi 戴嘉诚

你的异常发生在failover时,rocksDB恢复的时候,会从hdfs下载文件到临时restore目录,然后再做硬链到DB目录下。原先已经下载到临时restore目录的文件在做硬链的时候,突然报出原始文件找不到了
 [1]。这个异常的出现比较奇怪,能否先share一下出问题的taskmanager的所有日志?


[1] 
https://github.com/apache/flink/blob/cda6dc0c44239aa7a36105988328de5744aea125/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473

祝好
唐云

From: 戴嘉诚 
Sent: Friday, October 11, 2019 14:54
To: user-zh@flink.apache.org 
Subject: 回复: flink 缓存本地文件被删除疑问

Hi,
我在代码中已经是显式的给每个算子都设置了uid了

发件人: Qi Kang
发送时间: 2019年10月11日 14:48
收件人: user-zh@flink.apache.org
主题: Re: flink 缓存本地文件被删除疑问

Hi,

从报错看,可能是因为自动生成的算子ID改变,无法取得state backend里存储的状态。检查点维护的是[算子ID -> 
状态]的映射关系,所以Flink推荐用uid()方法给算子显式设置ID,才能保证正常恢复现场。可以参考文档[1]。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#assigning-operator-ids


> On Oct 11, 2019, at 11:00, 戴嘉诚  wrote:
>
> 大家好:
>最近我的程序迁移到了flink1.9 on yarn 
> session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为:
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_3572e2ca02acab3056013a7c1c4fa645_(6/6) from 
> any of the 1 provided restore options.
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> ... 6 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 8 more
> Caused by: java.nio.file.NoSuchFileException: 
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/db/000835.sst
>  -> 
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/14634105-83be-429e-9cdc-36dbff14a7fa/000835.sst
> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at 
> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
> at java.nio.file.Files.createLink(Files.java:1086)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:473)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:212)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
> at 
> 

回复: flink 缓存本地文件被删除疑问

2019-10-11 文章 戴嘉诚
Hi,
我在代码中已经是显式的给每个算子都设置了uid了

发件人: Qi Kang
发送时间: 2019年10月11日 14:48
收件人: user-zh@flink.apache.org
主题: Re: flink 缓存本地文件被删除疑问

Hi,

从报错看,可能是因为自动生成的算子ID改变,无法取得state backend里存储的状态。检查点维护的是[算子ID -> 
状态]的映射关系,所以Flink推荐用uid()方法给算子显式设置ID,才能保证正常恢复现场。可以参考文档[1]。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#assigning-operator-ids


> On Oct 11, 2019, at 11:00, 戴嘉诚  wrote:
> 
> 大家好:
>   最近我的程序迁移到了flink1.9 on yarn 
> session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为:
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_3572e2ca02acab3056013a7c1c4fa645_(6/6) from 
> any of the 1 provided restore options.
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> ... 6 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 8 more
> Caused by: java.nio.file.NoSuchFileException: 
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/db/000835.sst
>  -> 
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/14634105-83be-429e-9cdc-36dbff14a7fa/000835.sst
> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at 
> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
> at java.nio.file.Files.createLink(Files.java:1086)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:473)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:212)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
> ... 12 more
> 
> 
> 我这里的session中是有多个job在session上运行,但是这里job都是写入hbase中的。
> 这里看上去是本地机器上的目录文件被删除了?但是这个文件我是没有删除过的…
> 
>