Re: flink web ui cancel job时能否指定要不要生成savepoint?
hi casel, 目前web ui上应该不支持触发savepoint。 如果要使用stop-with-savepoint功能的话, 可以通过bin/flink[1]或者rest api[2]的方式。 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint [2] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-stop 李晋忠 casel.chen 于2022年10月28日周五 09:41写道: > flink web ui cancel job时能否指定要不要生成savepoint? 目前是直接cancel job without > savepoint的。
flink web ui cancel job时能否指定要不要生成savepoint?
flink web ui cancel job时能否指定要不要生成savepoint? 目前是直接cancel job without savepoint的。
Re: 使用flink-operator 成功生成savepoint, 但job 并未取消
savepoint流程 1. 执行savepoint kubectl patch flinkdeployment/savepoint-job --type=merge -p '{"spec": {"job": {"state": "suspended", "upgradeMode": "savepoint"}}}’ 2. 删除job kubectl delete flinkdeployment/savepoint-job 3. 根据savepoint启动job 修改flinkdeployment yaml配置,添加如下 spec: ... job: initialSavepointPath: savepoint路径 执行kubectl apply -f xxx > 2022年10月19日 下午3:53,Liting Liu (litiliu) 写道: > > hi: > 我在使用flink-operator 1.2.0 & flink 1.14.3, 使用flink-operator 成功手动生成了savepoint, > 但savepoint 生成之后, job 并没有自动取消。 希望savepoint 成功之后job 能自动取消。请问是哪里没操作对吗?还是一个已知问题? > jobStatus: > jobId: 9de925e9d4a67e04ef6279925450907c > jobName: sql-te-lab-s334c9 > savepointInfo: > lastPeriodicSavepointTimestamp: 0 > lastSavepoint: > location: >- > > hdfs://nameservice1/user/pda/savepoint/6077944532288/flink/sql-te/savepoint-9de925-b9ead1c58e7b > timeStamp: 1666163606426 > triggerType: MANUAL > savepointHistory: > - location: >- > > hdfs://nameservice1/user/pda/savepoint/6077944532288/flink/sql-te/savepoint-9de925-b9ead1c58e7b > timeStamp: 1666163606426 > triggerType: MANUAL > triggerId: '' > triggerTimestamp: 0 > triggerType: MANUAL > startTime: '1666161791058' > state: RUNNING > updateTime: '1666161828364' >
使用flink-operator 成功生成savepoint, 但job 并未取消
hi: 我在使用flink-operator 1.2.0 & flink 1.14.3, 使用flink-operator 成功手动生成了savepoint, 但savepoint 生成之后, job 并没有自动取消。 希望savepoint 成功之后job 能自动取消。请问是哪里没操作对吗?还是一个已知问题? jobStatus: jobId: 9de925e9d4a67e04ef6279925450907c jobName: sql-te-lab-s334c9 savepointInfo: lastPeriodicSavepointTimestamp: 0 lastSavepoint: location: >- hdfs://nameservice1/user/pda/savepoint/6077944532288/flink/sql-te/savepoint-9de925-b9ead1c58e7b timeStamp: 1666163606426 triggerType: MANUAL savepointHistory: - location: >- hdfs://nameservice1/user/pda/savepoint/6077944532288/flink/sql-te/savepoint-9de925-b9ead1c58e7b timeStamp: 1666163606426 triggerType: MANUAL triggerId: '' triggerTimestamp: 0 triggerType: MANUAL startTime: '1666161791058' state: RUNNING updateTime: '1666161828364'
Re: 触发savepoint后, source算子会从对应offset处停止消费吗?
hi, 在我的理解里,savePoint 的作用和 checkPoint 是类似的,只是在 flink 1.16 以前 savePoint 只支持全量的 savePoint,底层都是采用的 barrier 实现机制。但是在1.16的规划文档里( https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints),savepoint 也将支持增量的模式。 当 savepoint 触发时, source 会去保存状态,是会停止消费的。 郑 致远 于2022年9月8日周四 19:39写道: > 各位大神好. > 请教 > savepoint 也是用 barrier机制实现的吗? > savepoint 触发的时候, source算子会停止从kafka消费吗? >
触发savepoint后, source算子会从对应offset处停止消费吗?
各位大神好. 请教 savepoint 也是用 barrier机制实现的吗? savepoint 触发的时候, source算子会停止从kafka消费吗?
Re: 基于savepoint重启作业无法保证端到端一致性
设计上是支持的. 建议贴上代码, 这样大家比较好判断问题所在. On Fri, Aug 26, 2022 at 4:08 PM 杨扬 wrote: > 各位好! > 目前有一flink作业,source与sink均为kafka。 > 在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。 > 现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。 > > 想请教这个问题是savepoint设计时本身就无法保证启停前后端到端一致性,还是我们哪里操作不当呢? > > > > > > >
Re: 基于savepoint重启作业无法保证端到端一致性
指定了,依然无法保证。 > 在 2022年8月26日,下午5:28,gulugulucxg 写道: > > flinkKafkaProducer指定EXACTLY_ONCE语义了吗 > > > > > > > > > > > > > > > > > > 在 2022-08-26 16:50:33,"杨扬" 写道: >> kafka-2.4.1 >> flink-1.14.2 >> >> >> >> >>> 在 2022年8月26日,下午4:42,Hangxiang Yu 写道: >>> >>> flink会保证自身的exactly once语义,端到端的exactly once的语义是需要source和sink保证幂等的; >>> 你用的kafka是哪个版本? >>> >>> On Fri, Aug 26, 2022 at 4:08 PM 杨扬 wrote: >>> >>>> 各位好! >>>> 目前有一flink作业,source与sink均为kafka。 >>>> 在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。 >>>> 现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。 >>>> >>>> 想请教这个问题是savepoint设计时本身就无法保证启停前后端到端一致性,还是我们哪里操作不当呢? >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>> >>> -- >>> Best, >>> Hangxiang. >>> >>> === >>> 此邮件已由 Deep Discovery Email Inspector 进行了分析。 >> > > === > 此邮件已由 Deep Discovery Email Inspector 进行了分析。
Re:Re: 基于savepoint重启作业无法保证端到端一致性
flinkKafkaProducer指定EXACTLY_ONCE语义了吗 在 2022-08-26 16:50:33,"杨扬" 写道: >kafka-2.4.1 >flink-1.14.2 > > > > >> 在 2022年8月26日,下午4:42,Hangxiang Yu 写道: >> >> flink会保证自身的exactly once语义,端到端的exactly once的语义是需要source和sink保证幂等的; >> 你用的kafka是哪个版本? >> >> On Fri, Aug 26, 2022 at 4:08 PM 杨扬 wrote: >> >>> 各位好! >>>目前有一flink作业,source与sink均为kafka。 >>>在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。 >>>现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。 >>> >>>想请教这个问题是savepoint设计时本身就无法保证启停前后端到端一致性,还是我们哪里操作不当呢? >>> >>> >>> >>> >>> >>> >>> >> >> -- >> Best, >> Hangxiang. >> >> === >> 此邮件已由 Deep Discovery Email Inspector 进行了分析。 >
Re: 基于savepoint重启作业无法保证端到端一致性
kafka-2.4.1 flink-1.14.2 > 在 2022年8月26日,下午4:42,Hangxiang Yu 写道: > > flink会保证自身的exactly once语义,端到端的exactly once的语义是需要source和sink保证幂等的; > 你用的kafka是哪个版本? > > On Fri, Aug 26, 2022 at 4:08 PM 杨扬 wrote: > >> 各位好! >>目前有一flink作业,source与sink均为kafka。 >> 在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。 >>现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。 >> >>想请教这个问题是savepoint设计时本身就无法保证启停前后端到端一致性,还是我们哪里操作不当呢? >> >> >> >> >> >> >> > > -- > Best, > Hangxiang. > > === > 此邮件已由 Deep Discovery Email Inspector 进行了分析。
Re: 基于savepoint重启作业无法保证端到端一致性
flink会保证自身的exactly once语义,端到端的exactly once的语义是需要source和sink保证幂等的; 你用的kafka是哪个版本? On Fri, Aug 26, 2022 at 4:08 PM 杨扬 wrote: > 各位好! > 目前有一flink作业,source与sink均为kafka。 > 在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。 > 现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。 > > 想请教这个问题是savepoint设计时本身就无法保证启停前后端到端一致性,还是我们哪里操作不当呢? > > > > > > > -- Best, Hangxiang.
基于savepoint重启作业无法保证端到端一致性
各位好! 目前有一flink作业,source与sink均为kafka。 在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。 现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。 想请教这个问题是savepoint设计时本身就无法保证启停前后端到端一致性,还是我们哪里操作不当呢?
Re: 作业每次手动停止做savepoint耗时较久
1. flink 1.13以前savepoint数据格式应该和checkpoint格式一样. 你的作业是否开启了增量checkpoint, 如果作业开启增量checkpoint的话, checkpoint会快(只上传增量部分), savepoint会慢(需要上传全量数据)。 2. flink 1.13 中统一了所有stateBackend的savepoint格式,因为savepoint时需要逐个遍历出state中的key-value数据,所以速度相比checkpoint也会慢很多。 3. flink 1.15中引入了native模式的savepoint[1], 放开了savepoint格式限制,其速度应该类似于一次全量checkpoint。 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/ [2] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpoints_vs_savepoints/ Best, Jinzhong Li Howie Yang 于2022年8月3日周三 17:54写道: > hello, > > > 版本:flink1.9 > 问题:作业每次手动停止做savepoint要5min,自动化checkpoint只需要秒级, > 请问: > 1. savepoint是要比checkpoint多存一些内容吗? > 2. savepoint为什么这么耗时?(在不保存savepoint的情况下,也是秒级停止) > > > > > > > > -- > > Best, > Howie
作业每次手动停止做savepoint耗时较久
hello, 版本:flink1.9 问题:作业每次手动停止做savepoint要5min,自动化checkpoint只需要秒级, 请问: 1. savepoint是要比checkpoint多存一些内容吗? 2. savepoint为什么这么耗时?(在不保存savepoint的情况下,也是秒级停止) -- Best, Howie
flink1.12.2??????1.13.5??flink sql??????savepoint????
flink1.12.2??1.13.5??flink sql??savepoint switched from INITIALIZING to FAILED with failure cause: java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedMapBundleOperator_ebe385cc1e0bce62752f87eccdb2e8c6_(6/6) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ... 10 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:571) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:119) at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 12 more Caused by: java.io.InvalidClassException: org.apache.flink.table.types.logical.LogicalType; local class incompatible: stream classdesc serialVersionUID = -7381419642101800907, local class serialVersionUID = 1 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1829) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1829) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:593) at org.apache.flink.table.runtime.typeutils.RowDataSerializer$RowDataSerializerSnapshot.readSnapshot(RowDataSerializer.java:318) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77) at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:152) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore
Re: Re: Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息
我看flink sql写到kafka的时候,也没有开启事务。所以,这一点,我非常想不通 东东 于2021年8月9日周一 下午5:56写道: > > > > 有未提交的事务才会出现重复呗,也许你重启的时候恰好所有事务都已经提交了呢,比如说,有一段时间流里面没有新数据进来。 > > > 在 2021-08-09 17:44:57,"Jim Chen" 写道: > >有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop > >with savepoint,结果没有重复 > > > >我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why.. > > > >东东 于2021年8月2日周一 下午7:13写道: > > > >> 从topic B实时写到hive,这个job需要配置 isolation.level 为 > >> read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。 > >> > >> > >> 在 2021-08-02 19:00:13,"Jim Chen" 写道: > >> >我不太懂,下游的isolation.level是不是read_committed是啥意思。 > >> >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic > >> >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了 > >> > > >> >东东 于2021年8月2日周一 下午6:20写道: > >> > > >> >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed > >> >> > >> >> > >> >> 在 2021-08-02 18:14:27,"Jim Chen" 写道: > >> >> >Hi 刘建刚, > >> >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。 > >> >> >停止命令: > >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \ > >> >> >-yid application_1625497885855_703064 \ > >> >> >-p > >> >> > >> >> > >> > >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint > >> >> >\ > >> >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827 > >> >> > > >> >> >重启命令: > >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ > >> >> >-m yarn-cluster \ > >> >> >-yjm 4096 -ytm 4096 \ > >> >> >-ynm User_Click_Log_Split_All \ > >> >> >-yqu syh_offline \ > >> >> >-ys 2 \ > >> >> >-d \ > >> >> >-p 64 \ > >> >> >-s > >> >> > >> >> > >> > >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5 > >> >> >\ > >> >> >-n \ > >> >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ > >> >> > >> >> > >> > >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > >> >> > > >> >> > > >> >> >刘建刚 于2021年8月2日周一 下午3:49写道: > >> >> > > >> >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with > >> >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考 > >> >> >> > >> >> >> > >> >> > >> > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/ > >> >> >> > >> >> >> Jim Chen 于2021年8月2日周一 下午2:33写道: > >> >> >> > >> >> >> > 我是通过savepoint的方式重启的,命令如下: > >> >> >> > > >> >> >> > cancel command: > >> >> >> > > >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ > >> >> >> > -yid application_1625497885855_698371 \ > >> >> >> > -s > >> >> >> > > >> >> >> > > >> >> >> > >> >> > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint > >> >> >> > \ > >> >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a > >> >> >> > > >> >> >> > print savepoint: > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> > >> >> > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > >> >> >> > > >> >> >> > > >> >> >> > restart command: > >> >> >> > > >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ > >> >> >> > -m yarn-cluster \ > >> >> >> > -yjm 4096 -ytm 4096 \ > >> >> >> > -ynm User_Click_Log_Split_All \ > >> >> >> > -yqu syh_offline \ > >> >> >> > -ys 2 \ > >
Re:Re: Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息
有未提交的事务才会出现重复呗,也许你重启的时候恰好所有事务都已经提交了呢,比如说,有一段时间流里面没有新数据进来。 在 2021-08-09 17:44:57,"Jim Chen" 写道: >有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop >with savepoint,结果没有重复 > >我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why.. > >东东 于2021年8月2日周一 下午7:13写道: > >> 从topic B实时写到hive,这个job需要配置 isolation.level 为 >> read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。 >> >> >> 在 2021-08-02 19:00:13,"Jim Chen" 写道: >> >我不太懂,下游的isolation.level是不是read_committed是啥意思。 >> >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic >> >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了 >> > >> >东东 于2021年8月2日周一 下午6:20写道: >> > >> >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed >> >> >> >> >> >> 在 2021-08-02 18:14:27,"Jim Chen" 写道: >> >> >Hi 刘建刚, >> >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。 >> >> >停止命令: >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \ >> >> >-yid application_1625497885855_703064 \ >> >> >-p >> >> >> >> >> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint >> >> >\ >> >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827 >> >> > >> >> >重启命令: >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ >> >> >-m yarn-cluster \ >> >> >-yjm 4096 -ytm 4096 \ >> >> >-ynm User_Click_Log_Split_All \ >> >> >-yqu syh_offline \ >> >> >-ys 2 \ >> >> >-d \ >> >> >-p 64 \ >> >> >-s >> >> >> >> >> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5 >> >> >\ >> >> >-n \ >> >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ >> >> >> >> >> >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar >> >> > >> >> > >> >> >刘建刚 于2021年8月2日周一 下午3:49写道: >> >> > >> >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with >> >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考 >> >> >> >> >> >> >> >> >> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/ >> >> >> >> >> >> Jim Chen 于2021年8月2日周一 下午2:33写道: >> >> >> >> >> >> > 我是通过savepoint的方式重启的,命令如下: >> >> >> > >> >> >> > cancel command: >> >> >> > >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ >> >> >> > -yid application_1625497885855_698371 \ >> >> >> > -s >> >> >> > >> >> >> > >> >> >> >> >> >> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint >> >> >> > \ >> >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a >> >> >> > >> >> >> > print savepoint: >> >> >> > >> >> >> > >> >> >> > >> >> >> >> >> >> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 >> >> >> > >> >> >> > >> >> >> > restart command: >> >> >> > >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ >> >> >> > -m yarn-cluster \ >> >> >> > -yjm 4096 -ytm 4096 \ >> >> >> > -ynm User_Click_Log_Split_All \ >> >> >> > -yqu syh_offline \ >> >> >> > -ys 2 \ >> >> >> > -d \ >> >> >> > -p 64 \ >> >> >> > -s >> >> >> > >> >> >> > >> >> >> >> >> >> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 >> >> >> > \ >> >> >> > -n \ >> >> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ >> >> >> > >> >> >> > >> >> >> >> >> >> /opt/case/app/realt
Re: Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息
有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop with savepoint,结果没有重复 我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why.. 东东 于2021年8月2日周一 下午7:13写道: > 从topic B实时写到hive,这个job需要配置 isolation.level 为 > read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。 > > > 在 2021-08-02 19:00:13,"Jim Chen" 写道: > >我不太懂,下游的isolation.level是不是read_committed是啥意思。 > >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic > >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了 > > > >东东 于2021年8月2日周一 下午6:20写道: > > > >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed > >> > >> > >> 在 2021-08-02 18:14:27,"Jim Chen" 写道: > >> >Hi 刘建刚, > >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。 > >> >停止命令: > >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \ > >> >-yid application_1625497885855_703064 \ > >> >-p > >> > >> > >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint > >> >\ > >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827 > >> > > >> >重启命令: > >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ > >> >-m yarn-cluster \ > >> >-yjm 4096 -ytm 4096 \ > >> >-ynm User_Click_Log_Split_All \ > >> >-yqu syh_offline \ > >> >-ys 2 \ > >> >-d \ > >> >-p 64 \ > >> >-s > >> > >> > >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5 > >> >\ > >> >-n \ > >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ > >> > >> > >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > >> > > >> > > >> >刘建刚 于2021年8月2日周一 下午3:49写道: > >> > > >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with > >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考 > >> >> > >> >> > >> > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/ > >> >> > >> >> Jim Chen 于2021年8月2日周一 下午2:33写道: > >> >> > >> >> > 我是通过savepoint的方式重启的,命令如下: > >> >> > > >> >> > cancel command: > >> >> > > >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ > >> >> > -yid application_1625497885855_698371 \ > >> >> > -s > >> >> > > >> >> > > >> >> > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint > >> >> > \ > >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a > >> >> > > >> >> > print savepoint: > >> >> > > >> >> > > >> >> > > >> >> > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > >> >> > > >> >> > > >> >> > restart command: > >> >> > > >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ > >> >> > -m yarn-cluster \ > >> >> > -yjm 4096 -ytm 4096 \ > >> >> > -ynm User_Click_Log_Split_All \ > >> >> > -yqu syh_offline \ > >> >> > -ys 2 \ > >> >> > -d \ > >> >> > -p 64 \ > >> >> > -s > >> >> > > >> >> > > >> >> > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > >> >> > \ > >> >> > -n \ > >> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ > >> >> > > >> >> > > >> >> > >> > /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > >> >> > > >> >> > Jim Chen 于2021年8月2日周一 下午2:01写道: > >> >> > > >> >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B. > >> >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢! > >> >> > > > >> >> > > My Versions > >> >> > > Flink 1.12.4 > >> >> > > Kafka 2.0.1 > >> >>
Re:Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息
从topic B实时写到hive,这个job需要配置 isolation.level 为 read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。 在 2021-08-02 19:00:13,"Jim Chen" 写道: >我不太懂,下游的isolation.level是不是read_committed是啥意思。 >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了 > >东东 于2021年8月2日周一 下午6:20写道: > >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed >> >> >> 在 2021-08-02 18:14:27,"Jim Chen" 写道: >> >Hi 刘建刚, >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。 >> >停止命令: >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \ >> >-yid application_1625497885855_703064 \ >> >-p >> >> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint >> >\ >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827 >> > >> >重启命令: >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ >> >-m yarn-cluster \ >> >-yjm 4096 -ytm 4096 \ >> >-ynm User_Click_Log_Split_All \ >> >-yqu syh_offline \ >> >-ys 2 \ >> >-d \ >> >-p 64 \ >> >-s >> >> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5 >> >\ >> >-n \ >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ >> >> >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar >> > >> > >> >刘建刚 于2021年8月2日周一 下午3:49写道: >> > >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考 >> >> >> >> >> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/ >> >> >> >> Jim Chen 于2021年8月2日周一 下午2:33写道: >> >> >> >> > 我是通过savepoint的方式重启的,命令如下: >> >> > >> >> > cancel command: >> >> > >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ >> >> > -yid application_1625497885855_698371 \ >> >> > -s >> >> > >> >> > >> >> >> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint >> >> > \ >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a >> >> > >> >> > print savepoint: >> >> > >> >> > >> >> > >> >> >> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 >> >> > >> >> > >> >> > restart command: >> >> > >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ >> >> > -m yarn-cluster \ >> >> > -yjm 4096 -ytm 4096 \ >> >> > -ynm User_Click_Log_Split_All \ >> >> > -yqu syh_offline \ >> >> > -ys 2 \ >> >> > -d \ >> >> > -p 64 \ >> >> > -s >> >> > >> >> > >> >> >> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 >> >> > \ >> >> > -n \ >> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ >> >> > >> >> > >> >> >> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar >> >> > >> >> > Jim Chen 于2021年8月2日周一 下午2:01写道: >> >> > >> >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B. >> >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢! >> >> > > >> >> > > My Versions >> >> > > Flink 1.12.4 >> >> > > Kafka 2.0.1 >> >> > > Java 1.8 >> >> > > >> >> > > Core code: >> >> > > >> >> > > env.enableCheckpointing(30); >> >> > > >> >> > > >> >> > >> >> >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> >> > > >> >> > > >> >> > >> >> >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> >> > > >> >> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); >> >> > > >> >
Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息
我不太懂,下游的isolation.level是不是read_committed是啥意思。 我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了 东东 于2021年8月2日周一 下午6:20写道: > 下游如何发现重复数据的,下游的isolation.level是不是read_committed > > > 在 2021-08-02 18:14:27,"Jim Chen" 写道: > >Hi 刘建刚, > >我使用了stop with savepoint,但是还是发现,下游有重复数据。 > >停止命令: > >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \ > >-yid application_1625497885855_703064 \ > >-p > > >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint > >\ > >-d 55e7ebb6fa38faaba61b4b9a7cd89827 > > > >重启命令: > >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ > >-m yarn-cluster \ > >-yjm 4096 -ytm 4096 \ > >-ynm User_Click_Log_Split_All \ > >-yqu syh_offline \ > >-ys 2 \ > >-d \ > >-p 64 \ > >-s > > >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5 > >\ > >-n \ > >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ > > >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > > > > > >刘建刚 于2021年8月2日周一 下午3:49写道: > > > >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with > >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考 > >> > >> > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/ > >> > >> Jim Chen 于2021年8月2日周一 下午2:33写道: > >> > >> > 我是通过savepoint的方式重启的,命令如下: > >> > > >> > cancel command: > >> > > >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ > >> > -yid application_1625497885855_698371 \ > >> > -s > >> > > >> > > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint > >> > \ > >> > 59cf6ccc83aa163bd1e0cd3304dfe06a > >> > > >> > print savepoint: > >> > > >> > > >> > > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > >> > > >> > > >> > restart command: > >> > > >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ > >> > -m yarn-cluster \ > >> > -yjm 4096 -ytm 4096 \ > >> > -ynm User_Click_Log_Split_All \ > >> > -yqu syh_offline \ > >> > -ys 2 \ > >> > -d \ > >> > -p 64 \ > >> > -s > >> > > >> > > >> > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > >> > \ > >> > -n \ > >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ > >> > > >> > > >> > /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > >> > > >> > Jim Chen 于2021年8月2日周一 下午2:01写道: > >> > > >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B. > >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢! > >> > > > >> > > My Versions > >> > > Flink 1.12.4 > >> > > Kafka 2.0.1 > >> > > Java 1.8 > >> > > > >> > > Core code: > >> > > > >> > > env.enableCheckpointing(30); > >> > > > >> > > > >> > > >> > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > >> > > > >> > > > >> > > >> > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > >> > > > >> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); > >> > > > >> > > tableEnv.createTemporaryView("data_table",dataDS); > >> > > String sql = "select * from data_table a inner join > >> > > hive_catalog.dim.dim.project for system_time as of a.proctime as b > on > >> > a.id > >> > > = b.id" > >> > > Table table = tableEnv.sqlQuery(sql); > >> > > DataStream resultDS = tableEnv.toAppendStream(table, > >> Row.class).map(xx); > >> > > > >> > > // Kafka producer parameter > >> > > Properties producerProps = new Properties(); > >> > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > >> > > bootstrapServers); > >> > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); > >> > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, > >> > kafkaBufferMemory); > >> > > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize); > >> > > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs); > >> > > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, > 30); > >> > > > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, > >> > > "1"); > >> > > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5"); > >> > > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); > >> > > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); > >> > > > >> > > resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new > >> > > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(), > >> > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5)) > >> > > .setParallelism(sinkParallelism); > >> > > > >> > > >> >
Re:Re: 通过savepoint重启任务,link消费kafka,有重复消息
下游如何发现重复数据的,下游的isolation.level是不是read_committed 在 2021-08-02 18:14:27,"Jim Chen" 写道: >Hi 刘建刚, >我使用了stop with savepoint,但是还是发现,下游有重复数据。 >停止命令: >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \ >-yid application_1625497885855_703064 \ >-p >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint >\ >-d 55e7ebb6fa38faaba61b4b9a7cd89827 > >重启命令: >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ >-m yarn-cluster \ >-yjm 4096 -ytm 4096 \ >-ynm User_Click_Log_Split_All \ >-yqu syh_offline \ >-ys 2 \ >-d \ >-p 64 \ >-s >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5 >\ >-n \ >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > > >刘建刚 于2021年8月2日周一 下午3:49写道: > >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考 >> >> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/ >> >> Jim Chen 于2021年8月2日周一 下午2:33写道: >> >> > 我是通过savepoint的方式重启的,命令如下: >> > >> > cancel command: >> > >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ >> > -yid application_1625497885855_698371 \ >> > -s >> > >> > >> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint >> > \ >> > 59cf6ccc83aa163bd1e0cd3304dfe06a >> > >> > print savepoint: >> > >> > >> > >> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 >> > >> > >> > restart command: >> > >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ >> > -m yarn-cluster \ >> > -yjm 4096 -ytm 4096 \ >> > -ynm User_Click_Log_Split_All \ >> > -yqu syh_offline \ >> > -ys 2 \ >> > -d \ >> > -p 64 \ >> > -s >> > >> > >> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 >> > \ >> > -n \ >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ >> > >> > >> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar >> > >> > Jim Chen 于2021年8月2日周一 下午2:01写道: >> > >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B. >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢! >> > > >> > > My Versions >> > > Flink 1.12.4 >> > > Kafka 2.0.1 >> > > Java 1.8 >> > > >> > > Core code: >> > > >> > > env.enableCheckpointing(30); >> > > >> > > >> > >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> > > >> > > >> > >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> > > >> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); >> > > >> > > tableEnv.createTemporaryView("data_table",dataDS); >> > > String sql = "select * from data_table a inner join >> > > hive_catalog.dim.dim.project for system_time as of a.proctime as b on >> > a.id >> > > = b.id" >> > > Table table = tableEnv.sqlQuery(sql); >> > > DataStream resultDS = tableEnv.toAppendStream(table, >> Row.class).map(xx); >> > > >> > > // Kafka producer parameter >> > > Properties producerProps = new Properties(); >> > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, >> > > bootstrapServers); >> > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); >> > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, >> > kafkaBufferMemory); >> > > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize); >> > > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs); >> > > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30); >> > > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, >> > > "1"); >> > > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5"); >> > > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); >> > > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); >> > > >> > > resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new >> > > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(), >> > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5)) >> > > .setParallelism(sinkParallelism); >> > > >> > >>
Re: 通过savepoint重启任务,link消费kafka,有重复消息
Hi 刘建刚, 我使用了stop with savepoint,但是还是发现,下游有重复数据。 停止命令: /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \ -yid application_1625497885855_703064 \ -p hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint \ -d 55e7ebb6fa38faaba61b4b9a7cd89827 重启命令: /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ -m yarn-cluster \ -yjm 4096 -ytm 4096 \ -ynm User_Click_Log_Split_All \ -yqu syh_offline \ -ys 2 \ -d \ -p 64 \ -s hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5 \ -n \ -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar 刘建刚 于2021年8月2日周一 下午3:49写道: > cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with > savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考 > > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/ > > Jim Chen 于2021年8月2日周一 下午2:33写道: > > > 我是通过savepoint的方式重启的,命令如下: > > > > cancel command: > > > > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ > > -yid application_1625497885855_698371 \ > > -s > > > > > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint > > \ > > 59cf6ccc83aa163bd1e0cd3304dfe06a > > > > print savepoint: > > > > > > > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > > > > > > restart command: > > > > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ > > -m yarn-cluster \ > > -yjm 4096 -ytm 4096 \ > > -ynm User_Click_Log_Split_All \ > > -yqu syh_offline \ > > -ys 2 \ > > -d \ > > -p 64 \ > > -s > > > > > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > > \ > > -n \ > > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ > > > > > /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > > > > Jim Chen 于2021年8月2日周一 下午2:01写道: > > > > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B. > > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢! > > > > > > My Versions > > > Flink 1.12.4 > > > Kafka 2.0.1 > > > Java 1.8 > > > > > > Core code: > > > > > > env.enableCheckpointing(30); > > > > > > > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > > > > > > > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > > > > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); > > > > > > tableEnv.createTemporaryView("data_table",dataDS); > > > String sql = "select * from data_table a inner join > > > hive_catalog.dim.dim.project for system_time as of a.proctime as b on > > a.id > > > = b.id" > > > Table table = tableEnv.sqlQuery(sql); > > > DataStream resultDS = tableEnv.toAppendStream(table, > Row.class).map(xx); > > > > > > // Kafka producer parameter > > > Properties producerProps = new Properties(); > > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > > > bootstrapServers); > > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); > > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, > > kafkaBufferMemory); > > > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize); > > > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs); > > > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30); > > > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, > > > "1"); > > > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5"); > > > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); > > > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); > > > > > > resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new > > > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(), > > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5)) > > > .setParallelism(sinkParallelism); > > > > > >
Re: 通过savepoint重启任务,link消费kafka,有重复消息
cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考 https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/ Jim Chen 于2021年8月2日周一 下午2:33写道: > 我是通过savepoint的方式重启的,命令如下: > > cancel command: > > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ > -yid application_1625497885855_698371 \ > -s > > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint > \ > 59cf6ccc83aa163bd1e0cd3304dfe06a > > print savepoint: > > > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > > > restart command: > > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ > -m yarn-cluster \ > -yjm 4096 -ytm 4096 \ > -ynm User_Click_Log_Split_All \ > -yqu syh_offline \ > -ys 2 \ > -d \ > -p 64 \ > -s > > hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 > \ > -n \ > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ > > /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar > > Jim Chen 于2021年8月2日周一 下午2:01写道: > > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B. > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢! > > > > My Versions > > Flink 1.12.4 > > Kafka 2.0.1 > > Java 1.8 > > > > Core code: > > > > env.enableCheckpointing(30); > > > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > > > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); > > > > tableEnv.createTemporaryView("data_table",dataDS); > > String sql = "select * from data_table a inner join > > hive_catalog.dim.dim.project for system_time as of a.proctime as b on > a.id > > = b.id" > > Table table = tableEnv.sqlQuery(sql); > > DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx); > > > > // Kafka producer parameter > > Properties producerProps = new Properties(); > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > > bootstrapServers); > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, > kafkaBufferMemory); > > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize); > > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs); > > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30); > > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, > > "1"); > > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5"); > > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); > > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); > > > > resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new > > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(), > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5)) > > .setParallelism(sinkParallelism); > > >
Re: 通过savepoint重启任务,link消费kafka,有重复消息
我是通过savepoint的方式重启的,命令如下: cancel command: /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \ -yid application_1625497885855_698371 \ -s hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint \ 59cf6ccc83aa163bd1e0cd3304dfe06a print savepoint: hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 restart command: /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \ -m yarn-cluster \ -yjm 4096 -ytm 4096 \ -ynm User_Click_Log_Split_All \ -yqu syh_offline \ -ys 2 \ -d \ -p 64 \ -s hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494 \ -n \ -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \ /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar Jim Chen 于2021年8月2日周一 下午2:01写道: > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B. > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢! > > My Versions > Flink 1.12.4 > Kafka 2.0.1 > Java 1.8 > > Core code: > > env.enableCheckpointing(30); > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); > > tableEnv.createTemporaryView("data_table",dataDS); > String sql = "select * from data_table a inner join > hive_catalog.dim.dim.project for system_time as of a.proctime as b on a.id > = b.id" > Table table = tableEnv.sqlQuery(sql); > DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx); > > // Kafka producer parameter > Properties producerProps = new Properties(); > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > bootstrapServers); > producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory); > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize); > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs); > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30); > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, > "1"); > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5"); > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); > > resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(), > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5)) > .setParallelism(sinkParallelism); >
通过savepoint重启任务,link消费kafka,有重复消息
大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B. 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢! My Versions Flink 1.12.4 Kafka 2.0.1 Java 1.8 Core code: env.enableCheckpointing(30); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); DataStream dataDS = env.addSource(kafkaConsumer).map(xxx); tableEnv.createTemporaryView("data_table",dataDS); String sql = "select * from data_table a inner join hive_catalog.dim.dim.project for system_time as of a.proctime as b on a.id = b.id" Table table = tableEnv.sqlQuery(sql); DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx); // Kafka producer parameter Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory); producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize); producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs); producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30); producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); producerProps.put(ProducerConfig.RETRIES_CONFIG, "5"); producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new JSONSchema(), producerProps, new FlinkFixedPartitioner<>(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5)) .setParallelism(sinkParallelism);
flink sql 从savepoint 重启遭遇新旧状态序列化不匹配的问题
各位好,我在flink 1.13中使用flink sql 在一次修改代码后的重启任务中,报以下错误: For heap backends, the new state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@a5b17bdb) must not be incompatible with the old state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@e5a9c6d8). 我更改了sql中的一个时间字段 CURRENT_TIMESTAMP --> 事件时间的字段 ts ts 属性由下面转换而来: ts AS TO_TIMESTAMP(FROM_UNIXTIME(tts, '-MM-dd HH:mm:ss')) 代码层面改动如下: checkpoint 保存时间调大: env.getCheckpointConfig().setCheckpointInterval(60L); 允许保留比原来更多的checkpoint个数: env.getCheckpointConfig().setMaxConcurrentCheckpoints(20); 新增空闲状态过期时间: Configuration configuration = tableEnv.getConfig().getConfiguration(); tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1)); 想请问一下,这种改动为什么会导致新旧状态不匹配,又该如何避免呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于savepoint恢复问题咨询
看下你的 flink 命令对不对,然后去 Flink Web UI Checkpoint 界面,看下是否从 Savepoint 恢复(下面有个 restore path). 之后再看下你的窗口时间类型用的是什么。 Best, LakeShen 王春浩 于2021年5月27日周四 上午9:26写道: > hi, 社区 > > 版本flink 1.7 > > > 我正在尝试从保存点(或检查点)还原flink作业,该作业的工作是从kafka读取->执行30分钟的窗口聚合(只是AggregationFunction,就像一个计数器)->下沉到kafka。 > > 我使用rocksdb和启用检查点。 > > 现在我尝试手动触发一个保存点。 每个汇总的期望值是30(1个数据/每分钟)。 但是,当我从保存点还原时(flink运行-d -s > {savepoint的url}),聚合值不是30(小于30,取决于我取消flink作业并还原的时间)。 但是当作业正常运行时,它将达到30。 > > 我不知道为什么有些数据似乎会丢失? > > 日志显示``No restore state for FlinkKafkaConsumer'' > > > > 四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼 > 11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd > Street, GaoXin District, Chengdu, Sichuan Province > Mobile +86 15817382279 > Email wangchun...@navercorp.com > > NCloud > > -Original Message- > From: "王春浩" > To: ; > Cc: > Sent: 2021/5/26周三 17:03 (GMT+08:00) > Subject: inquire about restore from savepoint > > Hi Community, > > version flink 1.7 > im trying to make a flink job restore from a savepoint(or checkpoint), > what the job do is reading from kafka -> do a 30-minutes-window > aggregation(just AggregationFunction, acts like a counter) -> sink to kafka. > i use rocksdb and enabled checkpoint. > now i try to trigger a savepoint manually. the expected value of each > aggregated one is 30(1 data/per minute). but when i restore from a > savepoint(flink run -d -s {savepoint's url}), the aggregated value is not > 30(less than 30, depends on the time i cancel flink job and restore). but > when the job run normally, it gets 30. > i don't know why could some data seems to be lost? > and a log shows "No restore state for FlinkKafkaConsumer" > > > > 四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼 > 11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd > Street, GaoXin District, Chengdu, Sichuan Province > Mobile +86 15817382279 > Email wangchun...@navercorp.com > > NCloud >
关于savepoint恢复问题咨询
hi, 社区 版本flink 1.7 我正在尝试从保存点(或检查点)还原flink作业,该作业的工作是从kafka读取->执行30分钟的窗口聚合(只是AggregationFunction,就像一个计数器)->下沉到kafka。 我使用rocksdb和启用检查点。 现在我尝试手动触发一个保存点。 每个汇总的期望值是30(1个数据/每分钟)。 但是,当我从保存点还原时(flink运行-d -s {savepoint的url}),聚合值不是30(小于30,取决于我取消flink作业并还原的时间)。 但是当作业正常运行时,它将达到30。 我不知道为什么有些数据似乎会丢失? 日志显示``No restore state for FlinkKafkaConsumer'' 四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼 11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd Street, GaoXin District, Chengdu, Sichuan Province Mobile +86 15817382279 Email wangchun...@navercorp.com NCloud -Original Message- From: "王春浩" To: ; Cc: Sent: 2021/5/26周三 17:03 (GMT+08:00) Subject: inquire about restore from savepoint Hi Community, version flink 1.7 im trying to make a flink job restore from a savepoint(or checkpoint), what the job do is reading from kafka -> do a 30-minutes-window aggregation(just AggregationFunction, acts like a counter) -> sink to kafka. i use rocksdb and enabled checkpoint. now i try to trigger a savepoint manually. the expected value of each aggregated one is 30(1 data/per minute). but when i restore from a savepoint(flink run -d -s {savepoint's url}), the aggregated value is not 30(less than 30, depends on the time i cancel flink job and restore). but when the job run normally, it gets 30. i don't know why could some data seems to be lost? and a log shows "No restore state for FlinkKafkaConsumer" 四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼 11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd Street, GaoXin District, Chengdu, Sichuan Province Mobile +86 15817382279 Email wangchun...@navercorp.com NCloud
Re:Re: Flink upgraded to version 1.12.0 and started from SavePoint to report an error
Hi 非常感谢您的回复,state-processor-api我也尝试过,SQL生成的job graph没办法获取到每个算子的UID,所以state-processor-api也无法获取原来的state信息,没办法操作state,如果有更好的解决方案麻烦再回复一下邮件哈 感谢 在 2021-05-20 10:46:22,"Yun Tang" 写道: >Hi > >BaseRowSerializer 已经在Flink-1.11 时候改名成 RowDataSerializer了,即使用 >state-processor-API 也没办法处理当前不存在的类,可能有种比较复杂的办法是自己把 BaseRowSerializer >的类不改变package的情况下拷贝出来,然后用 state-processor-API 将相关类强制转换成 >RowDataSerializer,不过考虑到你的job graph都是SQL生成的,state-processor-API面向地更多的是data >stream API,这一块估计还挺难弄的,确实没有想到特别好的办法。 > >祝好 >唐云 > >From: 王炳焱 <15307491...@163.com> >Sent: Tuesday, May 18, 2021 20:02 >To: user-zh@flink.apache.org >Subject: Flink upgraded to version 1.12.0 and started from SavePoint to report >an error > >我从flink1.10升级到flink1.12时,flink1.10的SQLapi的任务无法从savePoint中进行恢复,报错如下: > > >2021-05-14 22:02:44,716 WARN org.apache.flink.metrics.MetricGroup >[] - The operator name Calc(select=[((CAST((log_info >get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME >_UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 >_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 >_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 >_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 >_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 >_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info >get_json_object2 _UTF-16LE'status') SEARCH >Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET >"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info >get_json_object2 _UTF-16LE'data.itemType') SEARCH >Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET >"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET >"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info >get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 >characters length limit and was truncated. >2021-05-14 22:02:44,752 WARN org.apache.flink.metrics.MetricGroup >[] - The operator name >SourceConversion(table=[default_catalog.default_database.wkb_crm_order], >fields=[log_info, proctime]) exceeded the 80 characters length limit and was >truncated. >2021-05-14 22:02:44,879 ERROR >org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - >Caught unexpected exception. >java.io.IOException: Could not find class >'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' > in classpath. >at >org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) > ~
Re: Flink upgraded to version 1.12.0 and started from SavePoint to report an error
Hi BaseRowSerializer 已经在Flink-1.11 时候改名成 RowDataSerializer了,即使用 state-processor-API 也没办法处理当前不存在的类,可能有种比较复杂的办法是自己把 BaseRowSerializer 的类不改变package的情况下拷贝出来,然后用 state-processor-API 将相关类强制转换成 RowDataSerializer,不过考虑到你的job graph都是SQL生成的,state-processor-API面向地更多的是data stream API,这一块估计还挺难弄的,确实没有想到特别好的办法。 祝好 唐云 From: 王炳焱 <15307491...@163.com> Sent: Tuesday, May 18, 2021 20:02 To: user-zh@flink.apache.org Subject: Flink upgraded to version 1.12.0 and started from SavePoint to report an error 我从flink1.10升级到flink1.12时,flink1.10的SQLapi的任务无法从savePoint中进行恢复,报错如下: 2021-05-14 22:02:44,716 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Calc(select=[((CAST((log_info get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME _UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 _UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 _UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 _UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 _UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 _UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info get_json_object2 _UTF-16LE'status') SEARCH Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info get_json_object2 _UTF-16LE'data.itemType') SEARCH Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 characters length limit and was truncated. 2021-05-14 22:02:44,752 WARN org.apache.flink.metrics.MetricGroup [] - The operator name SourceConversion(table=[default_catalog.default_database.wkb_crm_order], fields=[log_info, proctime]) exceeded the 80 characters length limit and was truncated. 2021-05-14 22:02:44,879 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception. java.io.IOException: Could not find class 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' in classpath. at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
Flink upgraded to version 1.12.0 and started from SavePoint to report an error
我从flink1.10升级到flink1.12时,flink1.10的SQLapi的任务无法从savePoint中进行恢复,报错如下: 2021-05-14 22:02:44,716 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Calc(select=[((CAST((log_info get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME _UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 _UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 _UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 _UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 _UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 _UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info get_json_object2 _UTF-16LE'status') SEARCH Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info get_json_object2 _UTF-16LE'data.itemType') SEARCH Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 characters length limit and was truncated. 2021-05-14 22:02:44,752 WARN org.apache.flink.metrics.MetricGroup [] - The operator name SourceConversion(table=[default_catalog.default_database.wkb_crm_order], fields=[log_info, proctime]) exceeded the 80 characters length limit and was truncated. 2021-05-14 22:02:44,879 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception. java.io.IOException: Could not find class 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' in classpath. at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.str
Flink upgraded to version 1.12.0 and started from SavePoint to report an error
When I upgraded from Flink1.10.0 to Flink1.12.0. Unable to restore SavePoint And prompt the following error 2021-05-14 22:02:44,716 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Calc(select=[((CAST((log_info get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME _UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 _UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 _UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 _UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 _UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 _UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info get_json_object2 _UTF-16LE'status') SEARCH Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info get_json_object2 _UTF-16LE'data.itemType') SEARCH Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 characters length limit and was truncated. 2021-05-14 22:02:44,752 WARN org.apache.flink.metrics.MetricGroup [] - The operator name SourceConversion(table=[default_catalog.default_database.wkb_crm_order], fields=[log_info, proctime]) exceeded the 80 characters length limit and was truncated. 2021-05-14 22:02:44,879 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception. java.io.IOException: Could not find class 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' in classpath. at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(Backe
回复:flink 1.12.1 savepoint 重启问题
我也遇到了,不知道啥原因,这个也是偶尔发生,是真的难定位问题 | | 田向阳 | | 邮箱:lucas_...@163.com | 签名由 网易邮箱大师 定制 在2021年04月28日 17:00,chaos 写道: 你好,因为业务逻辑变化,需要对线上在跑的flink任务重启,采用savapoint方式。 操作如下: # 查看yarn-application-id yarn application -list # 查看flink任务id flink list -t yarn-per-job -Dyarn.application.id=application_1617064018715_0097 # 带有保存点的停止任务 flink stop -t yarn-per-job -p hdfs:///user/chaos/flink/0097_savepoint/ -Dyarn.application.id=application_1617064018715_0097 d7c1e0231f0e72cbd709baa9a0ba6415 Savepoint completed. Path: hdfs://nameservice1/user/chaos/flink/0097_savepoint/savepoint-d7c1e0-96f5b07d18be # 从保存点中重启任务 flink run -t yarn-per-job -s hdfs://nameservice1/user/chaos/flink/0097_savepoint/savepoint-d7c1e0-96f5b07d18be -d -c xxx xxx.jar 遇到的问题: Service temporarily unavailable due to an ongoing leader election. Please refresh 还请解惑,提前谢过! -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink 1.12.1 savepoint 重启问题
你好,因为业务逻辑变化,需要对线上在跑的flink任务重启,采用savapoint方式。 操作如下: # 查看yarn-application-id yarn application -list # 查看flink任务id flink list -t yarn-per-job -Dyarn.application.id=application_1617064018715_0097 # 带有保存点的停止任务 flink stop -t yarn-per-job -p hdfs:///user/chaos/flink/0097_savepoint/ -Dyarn.application.id=application_1617064018715_0097 d7c1e0231f0e72cbd709baa9a0ba6415 Savepoint completed. Path: hdfs://nameservice1/user/chaos/flink/0097_savepoint/savepoint-d7c1e0-96f5b07d18be # 从保存点中重启任务 flink run -t yarn-per-job -s hdfs://nameservice1/user/chaos/flink/0097_savepoint/savepoint-d7c1e0-96f5b07d18be -d -c xxx xxx.jar 遇到的问题: Service temporarily unavailable due to an ongoing leader election. Please refresh 还请解惑,提前谢过! -- Sent from: http://apache-flink.147419.n8.nabble.com/
recovery from savepoint appear java.lang.NullPointerException
2021-04-2716:19:34 java.lang.Exception: Exceptionwhile creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.base/java.lang.Thread.run(Thread.java:832) Causedby: org.apache.flink.util.FlinkException: Couldnot restore keyed state backend forEvictingWindowOperator_cc588d0c8a499f01bdca964487231660_(1/1) fromanyof the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ... 9 more Causedby: org.apache.flink.runtime.state.BackendBuildingException: Failedwhen trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:115) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:357) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:104) at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:181) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 11 more Causedby: java.lang.NullPointerException at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:307) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112) ... 17 more
Re: Flink savepoint迁移问题
确认了,pulsar的MessageId的实现类内部的增加了字段,导致flink在反序列化时失败了。具体的issue:https://github.com/streamnative/pulsar-flink/issues/256。 我会给flink 1.9的pulsar连接器升级下checkpoint,让MessageId的序列化使用基于 `MessageId.toByteArray`的序列化器。 非常感谢您的帮助~。 Jianyun8023 2021-03-12 2021年3月11日 下午10:43,Kezhu Wang mailto:kez...@gmail.com>> 写道: > 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。 确实是这样的,checkpoint 把 serializer 也 snapshot 了。 重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar 的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用 `MessageId.toByteArray`。 On March 11, 2021 at 20:26:15, 赵 建云 (zhaojianyu...@outlook.com<mailto:zhaojianyu...@outlook.com>) wrote: 你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗? 感谢~ 2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道: 新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。 +unionOffsetStates = stateStore.getUnionListState( +new ListStateDescriptor<>( +OFFSETS_STATE_NAME, +TypeInformation.of(new TypeHint>() { +}))); 解决方法 :? 1. 尝试通过 state-processor-api 重写下 state ? 2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗? 感觉后面还有不兼容的更新 new ListStateDescriptor<>( OFFSETS_STATE_NAME, -TypeInformation.of(new TypeHint>() { +TypeInformation.of(new TypeHint>() { }))); 不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 state + 只写新 state。 可以等 streamnative 的人确认下。 On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com<mailto:zhaojianyu...@outlook.com>) wrote: 社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。 版本从1.9.3 升级到1.11.0或1.11.3. 连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下 oldUnionOffsetStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint>() { }))); oldUnionSubscriptionNameStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME + "_subName", TypeInformation.of(new TypeHint() { }))); 我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。 任务在启动后,会遇到下面的错误 2021-03-11 10:02:25 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:832) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7Ccc0a40e3ae954472
回复: Flink savepoint迁移问题
建云, 之前我也遇到了savepoint 起作业失败的问题,是我们升级pulsar客户端以后,从2.2升级到2.5.2,我-s 启动作业的时候。因为作业也不是很重要,当时手头有其他任务,我就没有关注这个问题。你看看pulsar source那儿是不是做了什么。 | | allanqinjy | | allanqi...@163.com | 签名由网易邮箱大师定制 在2021年03月11日 22:43,Kezhu Wang 写道: 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。 确实是这样的,checkpoint 把 serializer 也 snapshot 了。 重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar 的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用 `MessageId.toByteArray`。 On March 11, 2021 at 20:26:15, 赵 建云 (zhaojianyu...@outlook.com) wrote: 你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的 initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗? 感谢~ 2021年3月11日 上午11:36,Kezhu Wang 写道: 新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。 +unionOffsetStates = stateStore.getUnionListState( +new ListStateDescriptor<>( +OFFSETS_STATE_NAME, +TypeInformation.of(new TypeHint>() { +}))); 解决方法 :? 1. 尝试通过 state-processor-api 重写下 state ? 2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗? 感觉后面还有不兼容的更新 new ListStateDescriptor<>( OFFSETS_STATE_NAME, -TypeInformation.of(new TypeHint>() { +TypeInformation.of(new TypeHint>() { }))); 不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 state + 只写新 state。 可以等 streamnative 的人确认下。 On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote: 社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。 版本从1.9.3 升级到1.11.0或1.11.3. 连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下 oldUnionOffsetStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint>() { }))); oldUnionSubscriptionNameStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME + "_subName", TypeInformation.of(new TypeHint() { }))); 我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。 任务在启动后,会遇到下面的错误 2021-03-11 10:02:25 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:832) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.io <https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D&reserved=0> <http://java.io <htt
Re: Flink savepoint迁移问题
> 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。 确实是这样的,checkpoint 把 serializer 也 snapshot 了。 重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar 的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用 `MessageId.toByteArray`。 On March 11, 2021 at 20:26:15, 赵 建云 (zhaojianyu...@outlook.com) wrote: 你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的 initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗? 感谢~ 2021年3月11日 上午11:36,Kezhu Wang 写道: 新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。 +unionOffsetStates = stateStore.getUnionListState( +new ListStateDescriptor<>( +OFFSETS_STATE_NAME, +TypeInformation.of(new TypeHint>() { +}))); 解决方法 :? 1. 尝试通过 state-processor-api 重写下 state ? 2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗? 感觉后面还有不兼容的更新 new ListStateDescriptor<>( OFFSETS_STATE_NAME, -TypeInformation.of(new TypeHint>() { +TypeInformation.of(new TypeHint>() { }))); 不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 state + 只写新 state。 可以等 streamnative 的人确认下。 On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote: 社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。 版本从1.9.3 升级到1.11.0或1.11.3. 连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下 oldUnionOffsetStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint>() { }))); oldUnionSubscriptionNameStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME + "_subName", TypeInformation.of(new TypeHint() { }))); 我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。 任务在启动后,会遇到下面的错误 2021-03-11 10:02:25 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:832) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.io <https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D&reserved=0> <http://java.io <https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed
Re: Flink savepoint迁移问题
你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗? 感谢~ 2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道: 新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。 +unionOffsetStates = stateStore.getUnionListState( +new ListStateDescriptor<>( +OFFSETS_STATE_NAME, +TypeInformation.of(new TypeHint>() { +}))); 解决方法 :? 1. 尝试通过 state-processor-api 重写下 state ? 2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗? 感觉后面还有不兼容的更新 new ListStateDescriptor<>( OFFSETS_STATE_NAME, -TypeInformation.of(new TypeHint>() { +TypeInformation.of(new TypeHint>() { }))); 不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 state + 只写新 state。 可以等 streamnative 的人确认下。 On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com<mailto:zhaojianyu...@outlook.com>) wrote: 社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。 版本从1.9.3 升级到1.11.0或1.11.3. 连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下 oldUnionOffsetStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint>() { }))); oldUnionSubscriptionNameStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME + "_subName", TypeInformation.of(new TypeHint() { }))); 我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。 任务在启动后,会遇到下面的错误 2021-03-11 10:02:25 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:832) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D&reserved=0><http://java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734217175%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=VU%2Bgy0%2B6u%2BVe0Qj1maU3nvijfSH9hBdQApSnfFjq9G8%3D&reserved=0>>.EOFExc
Re: Flink savepoint迁移问题
现在是我在维护pulsar-flink connector,是存在不兼容的升级。还是个很坑的改动。我现在尝试旧的迁移新的字段上方法,会报这个错误。我对1.11支持的代码进行修改,将state的数据结构改成旧版本的形式,同样也是这个错误。你说的StatefulSinkWriterOperator我研究下怎么使用。 2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道: StatefulSinkWriterOperator
Re: Flink savepoint迁移问题
新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。 +unionOffsetStates = stateStore.getUnionListState( +new ListStateDescriptor<>( +OFFSETS_STATE_NAME, +TypeInformation.of(new TypeHint>() { +}))); 解决方法 :? 1. 尝试通过 state-processor-api 重写下 state ? 2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗? 感觉后面还有不兼容的更新 new ListStateDescriptor<>( OFFSETS_STATE_NAME, -TypeInformation.of(new TypeHint>() { +TypeInformation.of(new TypeHint>() { }))); 不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 state + 只写新 state。 可以等 streamnative 的人确认下。 On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote: 社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。 版本从1.9.3 升级到1.11.0或1.11.3. 连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下 oldUnionOffsetStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint>() { }))); oldUnionSubscriptionNameStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME + "_subName", TypeInformation.of(new TypeHint() { }))); 我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。 任务在启动后,会遇到下面的错误 2021-03-11 10:02:25 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:832) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.io<http://java.io>.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79) at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readVarInt(Input.java:355) at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readInt(Input.java:350) at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializ
Flink savepoint迁移问题
社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。 版本从1.9.3 升级到1.11.0或1.11.3. 连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下 oldUnionOffsetStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint>() { }))); oldUnionSubscriptionNameStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME + "_subName", TypeInformation.of(new TypeHint() { }))); 我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。 任务在启动后,会遇到下面的错误 2021-03-11 10:02:25 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:832) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.io<http://java.io>.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79) at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readVarInt(Input.java:355) at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readInt(Input.java:350) at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165) at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ... 15 more 请问题大佬们可以提供排查问题的办法或者解决方案吗? Jianyun8023 2021-3-11
Re: flink-savepoint问题
对于 keyed state,需要保证同一个 key 在 同一个 keygroup 中,如果是某个 key 有热点,可以在 keyby 之前进行一次 map(在 key 后面拼接一些 后缀),然后 keyby,最后处理完成之后,将这些进行聚合 Best, Congxian guomuhua <663021...@qq.com> 于2021年3月4日周四 下午12:49写道: > 我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢? > nobleyd wrote > > 是不是使用了随机key。 > > > guaishushu1103@ > > > < > > > guaishushu1103@ > > > > 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:> > > java.lang.Exception: Could not materialize checkpoint 2404 for operator> > > KeyedProcess (21/48).> at> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)> > > > at> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)> > > > at> > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)> > > > at> > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)> > > > at java.lang.Thread.run(Thread.java:745)> Caused by: > > java.util.concurrent.ExecutionException:> > > java.lang.IllegalArgumentException: Key group 0 is not in> > > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at > > java.util.concurrent.FutureTask.report(FutureTask.java:122)> at > > java.util.concurrent.FutureTask.get(FutureTask.java:192)> at> > > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)> > > > at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer. > > > (OperatorSnapshotFinalizer.java:47)> at> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)> > > > ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is > > not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at> > > > org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)> > > > at> > > > org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)> > > > at> > > > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)> > > > at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at> > > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)> > > > ... 5 more>>> > > > guaishushu1103@ > > >> > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-savepoint问题
我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢? nobleyd wrote > 是不是使用了随机key。 > guaishushu1103@ > < > guaishushu1103@ > > 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:> > java.lang.Exception: Could not materialize checkpoint 2404 for operator> > KeyedProcess (21/48).> at> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)> > > at> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)> > > at> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)> > > at> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)> > > at java.lang.Thread.run(Thread.java:745)> Caused by: > java.util.concurrent.ExecutionException:> > java.lang.IllegalArgumentException: Key group 0 is not in> > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at > java.util.concurrent.FutureTask.report(FutureTask.java:122)> at > java.util.concurrent.FutureTask.get(FutureTask.java:192)> at> > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)> > > at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer. > (OperatorSnapshotFinalizer.java:47)> at> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)> > > ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is > not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at> > org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)> > > at> > org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)> > > at> > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)> > > at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at> > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)> > > ... 5 more>>> > guaishushu1103@ >> -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-savepoint问题
我也遇到同样问题,为了打散数据,在keyby时加了随机数作为后缀,去掉随机数,可以正常savepoint,加上随机数就savepoint失败。所以如果确有要打散数据的需求,应该怎么处理呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-savepoint问题
是不是使用了随机key。 guaishushu1...@163.com 于2021年3月3日周三 下午6:53写道: > checkpoint 可以成功保存,但是savepoint出现错误: > java.lang.Exception: Could not materialize checkpoint 2404 for operator > KeyedProcess (21/48). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalArgumentException: Key group 0 is not in > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) > ... 3 more > Caused by: java.lang.IllegalArgumentException: Key group 0 is not in > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. > at > org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142) > at > org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447) > ... 5 more > > > guaishushu1...@163.com >
flink-savepoint问题
checkpoint 可以成功保存,但是savepoint出现错误: java.lang.Exception: Could not materialize checkpoint 2404 for operator KeyedProcess (21/48). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) ... 3 more Caused by: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142) at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447) ... 5 more guaishushu1...@163.com
TaggedOperatorSubtaskState is missing when creating a new savepoint using state processor api
state processor apikafkastate??savepointmax parallelismsavepoint?? java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_e8ea6e352a1a627513ffbd4573fa1628_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState' at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545) at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225) at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204) at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189) at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164) at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:94) at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ... 15 more ?? @Override public void createNewSavepoint(ExecutionEnvironment env, String savepointPath, StateBackend stateBackend, ParameterTool config) { // ??max parallelism??savepoint String savepointOutputPath = config.get(EapSavepointConstants.EAP_SAVEPOINT_O
TaggedOperatorSubtaskState is missing when creating a new savepoint using state processor api
Hi, 各位好: 我在使用state processor api创建新的包含kafka相关state的savepoint用来修改max parallelism时,创建成功后使用此savepoint来重启任务,发现抛出如下异常: {code} java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_e8ea6e352a1a627513ffbd4573fa1628_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState' at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545) at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225) at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204) at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189) at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164) at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:94) at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ... 15 more {code} 我的使用方式如下: @Override public void createNewSavepoint(ExecutionEnvironment env, String savepointPath, StateBackend stateBackend, ParameterTool config) { // 加载未修改max parallelism的savepoint String savepointOutputPath = config.get(EapSavepointConstants.EAP_SAVEPOINT_OUTPUT_PATH); // 新的max parallelis
Re: yarn-per-job 模式 savepoint执行保存点报错
检查一下作业是否有反压的情况?目前我们也有遇到这种情况就是作业在反压的情况下,对作业做一次 savepoint 其实是很难完成的,经常超时,社区目前的版本还不支持单独设置 savepoint 的超时时间。 刘海 于2021年1月21日周四 上午10:24写道: > Hi > 我目前在进行保存点相关的测试,目前执行命令报如下错误,从错误内容上看是超时,但是没有更多的信息了,有知道大致原因希望指点一下,拜谢 > > > flink1.12 yarn-per-job 模式 > jobID:fea3d87f138ef4c260ffe9324acc0e51 > yarnID : application_1610788069646_0021 > 执行的命令如下: > ./bin/flink savepoint -t yarn-per-job -D > yarn.application.id=application_1610788069646_0021 > fea3d87f138ef4c260ffe9324acc0e51 > > > 报错如下: > > > org.apache.flink.util.FlinkException: Triggering a savepoint for the job > fea3d87f138ef4c260ffe9324acc0e51 failed. > at > org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:712) > at > org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:690) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919) > at > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:687) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:989) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) > Caused by: java.util.concurrent.TimeoutException > at > org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168) > at > org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > > 祝好! > | | > 刘海 > | > | > liuha...@163.com > | > 签名由网易邮箱大师定制
Re:yarn-per-job 模式 savepoint执行保存点报错
./bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId ./bin/flink savepoint fea3d87f138ef4c260ffe9324acc0e51 [:targetDirectory] application_1610788069646_0021 [:targetDirectory] hdfs:///flink/savepoints 在 2021-01-21 10:24:31,"刘海" 写道: >Hi > 我目前在进行保存点相关的测试,目前执行命令报如下错误,从错误内容上看是超时,但是没有更多的信息了,有知道大致原因希望指点一下,拜谢 > > >flink1.12 yarn-per-job 模式 >jobID:fea3d87f138ef4c260ffe9324acc0e51 >yarnID : application_1610788069646_0021 >执行的命令如下: >./bin/flink savepoint -t yarn-per-job -D >yarn.application.id=application_1610788069646_0021 >fea3d87f138ef4c260ffe9324acc0e51 > > >报错如下: > > >org.apache.flink.util.FlinkException: Triggering a savepoint for the job >fea3d87f138ef4c260ffe9324acc0e51 failed. >at > org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:712) >at > org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:690) >at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919) >at > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:687) >at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:989) >at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) >at java.security.AccessController.doPrivileged(Native Method) >at javax.security.auth.Subject.doAs(Subject.java:422) >at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) >at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) >Caused by: java.util.concurrent.TimeoutException >at > org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168) >at > org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) >at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549) >at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >at java.lang.Thread.run(Thread.java:748) > > >祝好! >| | >刘海 >| >| >liuha...@163.com >| >签名由网易邮箱大师定制
yarn-per-job 模式 savepoint执行保存点报错
Hi 我目前在进行保存点相关的测试,目前执行命令报如下错误,从错误内容上看是超时,但是没有更多的信息了,有知道大致原因希望指点一下,拜谢 flink1.12 yarn-per-job 模式 jobID:fea3d87f138ef4c260ffe9324acc0e51 yarnID : application_1610788069646_0021 执行的命令如下: ./bin/flink savepoint -t yarn-per-job -D yarn.application.id=application_1610788069646_0021 fea3d87f138ef4c260ffe9324acc0e51 报错如下: org.apache.flink.util.FlinkException: Triggering a savepoint for the job fea3d87f138ef4c260ffe9324acc0e51 failed. at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:712) at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:690) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919) at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:687) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:989) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) Caused by: java.util.concurrent.TimeoutException at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168) at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 祝好! | | 刘海 | | liuha...@163.com | 签名由网易邮箱大师定制
Re: 请教个Flink savepoint的问题
好的,感谢您的帮助! > 在 2021年1月11日,20:23,Yun Tang 写道: > > Hi, > > 没有暴露现成的API,但是可以参照测试用例的写法,给jobGraph设置 savepointRestoreSettings [1]。 > > [1] > https://github.com/apache/flink/blob/ab87cf4ec0260c30194f3d8e75be1f43318aa32a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java#L383 > > 祝好 > 唐云 > > From: yinghua...@163.com > Sent: Monday, January 11, 2021 19:07 > To: user-zh > Subject: 请教个Flink savepoint的问题 > > Flink APi 中我们在java代码中使用客户端调用类ClusterClient如下的方法停止了任务: > CompletableFuture stopWithSavepoint(JobID var1, boolean var2, > @Nullable String > var3);然后我们保存了该任务的savepoint信息,那恢复时没看到对应的恢复方法,找了些资料都是通过命令来恢复,有没有直接调用Java API > 层次的方法来恢复任务? > > > > yinghua...@163.com
Re: 请教个Flink savepoint的问题
Hi, 没有暴露现成的API,但是可以参照测试用例的写法,给jobGraph设置 savepointRestoreSettings [1]。 [1] https://github.com/apache/flink/blob/ab87cf4ec0260c30194f3d8e75be1f43318aa32a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java#L383 祝好 唐云 From: yinghua...@163.com Sent: Monday, January 11, 2021 19:07 To: user-zh Subject: 请教个Flink savepoint的问题 Flink APi 中我们在java代码中使用客户端调用类ClusterClient如下的方法停止了任务: CompletableFuture stopWithSavepoint(JobID var1, boolean var2, @Nullable String var3);然后我们保存了该任务的savepoint信息,那恢复时没看到对应的恢复方法,找了些资料都是通过命令来恢复,有没有直接调用Java API 层次的方法来恢复任务? yinghua...@163.com
请教个Flink savepoint的问题
Flink APi 中我们在java代码中使用客户端调用类ClusterClient如下的方法停止了任务: CompletableFuture stopWithSavepoint(JobID var1, boolean var2, @Nullable String var3);然后我们保存了该任务的savepoint信息,那恢复时没看到对应的恢复方法,找了些资料都是通过命令来恢复,有没有直接调用Java API 层次的方法来恢复任务? yinghua...@163.com
回复: flink如何使用oss作为checkpoint/savepoint/statebackend?
发送自 Windows 10 版邮件应用 发件人: Yun Tang 发送时间: 2020年12月31日 10:00 收件人: user-zh@flink.apache.org 主题: Re: flink如何使用oss作为checkpoint/savepoint/statebackend? Hi 其实社区文档 [1] 已经给了很详细的步骤: 1. 将flink-oss-fs-hadoop jar包放在plugins目录下 2. 配置oss的endpoint,id和secret 3. 在需要使用oss的地方,声明oss:// 开头的schema,例如state backend创建的时候 [1] https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/oss.html 祝好 唐云 From: 陈帅 Sent: Wednesday, December 30, 2020 20:53 To: user-zh@flink.apache.org Subject: flink如何使用oss作为checkpoint/savepoint/statebackend? 请问flink如何使用oss作为checkpoint/savepoint/statebackend? 需要依赖Hadoop并配置Hadoop on OSS吗?
Re: flink如何使用oss作为checkpoint/savepoint/statebackend?
Hi 其实社区文档 [1] 已经给了很详细的步骤: 1. 将flink-oss-fs-hadoop jar包放在plugins目录下 2. 配置oss的endpoint,id和secret 3. 在需要使用oss的地方,声明oss:// 开头的schema,例如state backend创建的时候 [1] https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/oss.html 祝好 唐云 From: 陈帅 Sent: Wednesday, December 30, 2020 20:53 To: user-zh@flink.apache.org Subject: flink如何使用oss作为checkpoint/savepoint/statebackend? 请问flink如何使用oss作为checkpoint/savepoint/statebackend? 需要依赖Hadoop并配置Hadoop on OSS吗?
flink如何使用oss作为checkpoint/savepoint/statebackend?
请问flink如何使用oss作为checkpoint/savepoint/statebackend? 需要依赖Hadoop并配置Hadoop on OSS吗?
Re: Command: Flink savepoint -d reported an error。
这个有解决吗?我的也是报 Missing required argument: savepoint path. Usage: bin/flink savepoint -d | | 赢峰 | | si_ji_f...@163.com | 签名由网易邮箱大师定制 On 09/30/2019 15:06,pengchengl...@163.com wrote: 你好,感谢回复,我的命令是参照官方网站,https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#trigger-a-savepoint $ bin/flink savepoint -d :savepointPath 这个命令是删除savepoint,你说的是触发savepoint,是没有问题的。 pengchengl...@163.com 发件人: 狄玉坤 发送时间: 2019-09-30 14:59 收件人: user-zh@flink.apache.org 主题: Re: Re: Command: Flink savepoint -d reported an error。 缺少了jobId 参数; ./bin/flink savepoint [savepointDirectory] This will trigger a savepoint for the job with ID jobId, and returns the path of the created savepoint. You need this path to restore and dispose savepoints. | | diyukun2019 | | diyukun2...@163.com | 签名由网易邮箱大师定制 On 9/30/2019 14:56,pengchengl...@163.com wrote: 谢谢你的回复,我试了下,还是说缺少必填参数。 如下: $ bin/flink savepoint -d file://home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/ The program finished with the following exception: java.lang.NullPointerException: Missing required argument: savepoint path. Usage: bin/flink savepoint -d at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.client.cli.CliFrontend.disposeSavepoint(CliFrontend.java:721) at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$8(CliFrontend.java:657) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:945) at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:654) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) pengchengl...@163.com 发件人: 星沉 发送时间: 2019-09-30 14:47 收件人: user-zh 主题: Re: Command: Flink savepoint -d reported an error。 savepoint路径参数不对。file后面多了个一个/吧。 pengchengl...@163.com wrote: $ bin/flink savepoint -dfile:///home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/ The program finished with the following exception: java.lang.NullPointerException: Missing required argument: savepoint path. Usage: bin/flink savepoint -d
Re: flink-1.11.2 rocksdb when trigger savepoint job fail and restart
Hi 请问你的TM是单slot吧,managed memory是多少? RocksDB state backend在执行savepoint的时候,会使用一个iterator来遍历数据,所以会存在额外的内存开销(并且该部分开销并不属于write buffer与block cache管理的部分),当然RocksDB的iterator是一个多层的最小堆iterator,理论上来说占用的临时内存并不会太多。不知你们能否将程序抽象成一个必现的demo来给我们做debug呢? 至于如何解决该问题,可以考虑增大JVM overhead [1] 来增大该部分的buffer空间。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12//deployment/memory/mem_setup.html 祝好 唐云 From: smailxie Sent: Thursday, December 10, 2020 17:42 To: user-zh@flink.apache.org Subject: flink-1.11.2 rocksdb when trigger savepoint job fail and restart 我有一个sql job,跑的任务是双流jion,状态保留12 �C 24小时,checkpoint是正常的,状态大小在300M到4G之间,当手动触发savepoint时,容器会被杀死,原因是超出内存限制(申请的内存是单slot 5G)。 我想问的是,rocksdb,在savepiont时,是把所有的磁盘状态读入内存,然后再全量快照? 如果是这样,后续版本有没有优化?不然每次磁盘状态超过托管内存,一手动savepoint,job就会被杀死。 下面是报错信息。 2020-12-10 09:18:50 java.lang.Exception: Container [pid=33290,containerID=container_e47_1594105654926_6890682_01_02] is running beyond physical memory limits. Current usage: 5.1 GB of 5 GB physical memory used; 7.4 GB of 10.5 GB virtual memory used. Killing container. Dump of the process-tree for container_e47_1594105654926_6890682_01_02 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 4 33290 33290 33290 (java) 787940 76501 7842340864 1337121 /usr/java/default/bin/java -Xmx1234802980 -Xms1234802980 -XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=456340281b -D taskmanager.memory.network.min=456340281b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=2738041755b -D taskmanager.cpu.cores=5.0 -D taskmanager.memory.task.heap.size=1100585252b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager.rpc.address=shd177.yonghui.cn -Dpipeline.classpaths= -Dweb.port=0 -Dexecution.target=embedded -Dweb.tmpdir=/tmp/flink-web-c415ad8e-c019-4398-869d-7c9e540c2479 -Djobmanager.rpc.port=44058 -Dpipeline.jars=file:/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/yh-datacenter-platform-flink-1.0.0.jar -Drest.address=shd177.yonghui.cn -Dsecurity.kerberos.login.keytab=/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/krb5.keytab |- 33290 33288 33290 33290 (bash) 0 0 108679168 318 /bin/bash -c /usr/java/default/bin/java -Xmx1234802980 -Xms1234802980 -XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=456340281b -D taskmanager.memory.network.min=456340281b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=2738041755b -D taskmanager.cpu.cores=5.0 -D taskmanager.memory.task.heap.size=1100585252b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager.rpc.address='shd177.yonghui.cn' -Dpipeline.classpaths='' -Dweb.port='0' -Dexecution.target='embedded' -Dweb.tmpdir='/tmp/flink-web-c415ad8e-c019-4398-869d-7c9e540c2479' -Djobmanager.rpc.port='44058' -Dpipeline.jars='file:/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/yh-datacenter-platform-flink-1.0.0.jar' -Drest.address='shd177.yonghui.cn' -Dsecurity.kerberos.login.keytab='/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/krb5.keytab' 1> /data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.out 2> /data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.err Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 发送自 Windows 10 版邮件应用 -- Name:谢波 Mobile:13764228893
flink-1.11.2 rocksdb when trigger savepoint job fail and restart
我有一个sql job,跑的任务是双流jion,状态保留12 – 24小时,checkpoint是正常的,状态大小在300M到4G之间,当手动触发savepoint时,容器会被杀死,原因是超出内存限制(申请的内存是单slot 5G)。 我想问的是,rocksdb,在savepiont时,是把所有的磁盘状态读入内存,然后再全量快照? 如果是这样,后续版本有没有优化?不然每次磁盘状态超过托管内存,一手动savepoint,job就会被杀死。 下面是报错信息。 2020-12-10 09:18:50 java.lang.Exception: Container [pid=33290,containerID=container_e47_1594105654926_6890682_01_02] is running beyond physical memory limits. Current usage: 5.1 GB of 5 GB physical memory used; 7.4 GB of 10.5 GB virtual memory used. Killing container. Dump of the process-tree for container_e47_1594105654926_6890682_01_02 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 4 33290 33290 33290 (java) 787940 76501 7842340864 1337121 /usr/java/default/bin/java -Xmx1234802980 -Xms1234802980 -XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=456340281b -D taskmanager.memory.network.min=456340281b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=2738041755b -D taskmanager.cpu.cores=5.0 -D taskmanager.memory.task.heap.size=1100585252b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager.rpc.address=shd177.yonghui.cn -Dpipeline.classpaths= -Dweb.port=0 -Dexecution.target=embedded -Dweb.tmpdir=/tmp/flink-web-c415ad8e-c019-4398-869d-7c9e540c2479 -Djobmanager.rpc.port=44058 -Dpipeline.jars=file:/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/yh-datacenter-platform-flink-1.0.0.jar -Drest.address=shd177.yonghui.cn -Dsecurity.kerberos.login.keytab=/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/krb5.keytab |- 33290 33288 33290 33290 (bash) 0 0 108679168 318 /bin/bash -c /usr/java/default/bin/java -Xmx1234802980 -Xms1234802980 -XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=456340281b -D taskmanager.memory.network.min=456340281b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=2738041755b -D taskmanager.cpu.cores=5.0 -D taskmanager.memory.task.heap.size=1100585252b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager.rpc.address='shd177.yonghui.cn' -Dpipeline.classpaths='' -Dweb.port='0' -Dexecution.target='embedded' -Dweb.tmpdir='/tmp/flink-web-c415ad8e-c019-4398-869d-7c9e540c2479' -Djobmanager.rpc.port='44058' -Dpipeline.jars='file:/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/yh-datacenter-platform-flink-1.0.0.jar' -Drest.address='shd177.yonghui.cn' -Dsecurity.kerberos.login.keytab='/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/krb5.keytab' 1> /data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.out 2> /data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.err Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 发送自 Windows 10 版邮件应用 -- Name:谢波 Mobile:13764228893
Re: 修改topic名称后从Savepoint重启会怎么消费Kafka
这个是正解,参考之前提的一个 Issue https://issues.apache.org/jira/browse/FLINK-16865 Best zhisheng Shuai Xia 于2020年12月2日周三 下午2:03写道: > > hi,实时上并不是你说的这样,从sp重启时因为存在RestoreState,而且Topic名称被修改,会导致restoredState内找不到新的KafkaTopicPartition > 新的消费位置会置为EARLIEST_OFFSET > > > if (restoredState != null) { >for (KafkaTopicPartition partition : allPartitions) { > if (!restoredState.containsKey(partition)) { > restoredState.put(partition, > KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); > } >} > > > > > -- > 发件人:熊云昆 > 发送时间:2020年12月1日(星期二) 22:57 > 收件人:user-zh ; Shuai Xia > 主 题:Re:修改topic名称后从Savepoint重启会怎么消费Kafka > > > 可以手动控制的,如果不设置默认是从最新位置开始消费,否则按照你的startup-mode来 > > > > > > 在 2020-12-01 20:59:48,"Shuai Xia" 写道: > > > >Hi,大佬们 > >突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。 > >会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢? > >可以手动控制么? > > >
回复:修改topic名称后从Savepoint重启会怎么消费Kafka
hi,实时上并不是你说的这样,从sp重启时因为存在RestoreState,而且Topic名称被修改,会导致restoredState内找不到新的KafkaTopicPartition 新的消费位置会置为EARLIEST_OFFSET if (restoredState != null) { for (KafkaTopicPartition partition : allPartitions) { if (!restoredState.containsKey(partition)) { restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET); } } -- 发件人:熊云昆 发送时间:2020年12月1日(星期二) 22:57 收件人:user-zh ; Shuai Xia 主 题:Re:修改topic名称后从Savepoint重启会怎么消费Kafka 可以手动控制的,如果不设置默认是从最新位置开始消费,否则按照你的startup-mode来 在 2020-12-01 20:59:48,"Shuai Xia" 写道: > >Hi,大佬们 >突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。 >会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢? >可以手动控制么?
Re:修改topic名称后从Savepoint重启会怎么消费Kafka
可以手动控制的,如果不设置默认是从最新位置开始消费,否则按照你的startup-mode来 在 2020-12-01 20:59:48,"Shuai Xia" 写道: > >Hi,大佬们 >突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。 >会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢? >可以手动控制么?
修改topic名称后从Savepoint重启会怎么消费Kafka
Hi,大佬们 突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。 会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢? 可以手动控制么?
从 savepoint 中恢复应用时发生 KafkaException: Unexpected error in InitProducerIdResponse
这个程序用于测试 flink kafka exactly once, 如果普通提交可以正常运行, 但如果从 savepoint 中恢复就会报下面的错误 kafka server 端, 配置了 transaction.max.timeout.ms = 360 client producer 端 配置了 transaction.timeout.ms = 90 参考代码: https://gist.github.com/giraffe-tree/15c5f707d9dfe3221959ae37b4e9d786 2020-11-17 15:24:51 org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. at org.apache.kafka.clients.producer.internals. TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager .java:1370) at org.apache.kafka.clients.producer.internals. TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278 ) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse .java:109) at org.apache.kafka.clients.NetworkClient.completeResponses( NetworkClient.java:566) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558) at org.apache.kafka.clients.producer.internals.Sender .maybeSendAndPollTransactionalRequest(Sender.java:415) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender .java:313) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: 240) at java.lang.Thread.run(Thread.java:748) 查阅了 google 上的相关资料, 但是仍然无法解决, 有人遇到过类似的问题? 或者能提供排查思路么?
Re: flink savepoint 异常
Hi 异常信息中有 “Failed to trigger savepoint. Failure reason: An Exception occurred while triggering the checkpoint.” 或许你可以看看 JM 的日志,找一下看看有没有什么详细日志 Best, Congxian 张锴 于2020年11月7日周六 下午4:14写道: > 本人用flink 1.10.1版本进行savepoint时遇到下列错误,暂时不清楚错误的原因,特来寻求帮助,麻烦大佬们看看 > > 已经排除反压和重启的原因,checkpoint超时设置了十分钟,conf配置增加客户端连接master的时间,但还是出现异常。 > > 命令 > > flink savepoint -yid application_1604456903594_2381 > fb8131bcb78cbdf2bb9a705d8a4ceebc > hdfs:///hadoopnamenodeHA/flink/flink-savepoints > > 异常 > > The program finished with the following exception: > > org.apache.flink.util.FlinkException: Triggering a savepoint for the job > fb8131bcb78cbdf2bb9a705d8a4ceebc failed. > at > > org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) > at > > org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) > at > > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) > at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) > Caused by: java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: > org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger > savepoint. Failure reason: An Exception occurred while triggering the > checkpoint. > at > > org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:756) > at > > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at > > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) > at > > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseState >
Re: flink savepoint
看到了,通过JM看到是写的权限没有,改了之后就好了 Congxian Qiu 于2020年11月6日周五 下午1:31写道: > Hi > 从 client 端日志,或者 JM 日志还能看到其他的异常么? > Best, > Congxian > > > 张锴 于2020年11月6日周五 上午11:42写道: > > > 重启和反压都正常 > > 另外增加了从客户端到master的时间,还是有这个问题 > > > > hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道: > > > > > Hi, > > > > > > > > > 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时, > > > 具体的原因需要看下 Jobmaster 的日志。 > > > PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。 > > > > > > > > > Best, > > > Hailong Wang > > > > > > > > > > > > > > > 在 2020-11-06 09:33:48,"张锴" 写道: > > > >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。 > > > > > > > >flink 版本1.10.1 > > > > > > > > > > > >执行 flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47 > > > >hdfs://hadoopnamenodeHA/flink/flink-savepoints > > > > > > > > > > > >出现错误信息 > > > > > > > > > > > >org.apache.flink.util.FlinkException: Triggering a savepoint for the > job > > > >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed. > > > > > > > > at > > > > > > > > > >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) > > > > > > > > at > > > > > > > > > >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) > > > > > > > > at > > > > > > > > > >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) > > > > > > > > at > > > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) > > > > > > > > at > > > > > > > > > >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) > > > > > > > > at > > > > > > > > > >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) > > > > > > > > at java.security.AccessController.doPrivileged(Native Method) > > > > > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > > > > > at > > > > > > > > > >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > > > > > > > > at > > > > > > > > > >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > > > > > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) > > > > > > > >Caused by: java.util.concurrent.TimeoutException > > > > > > > > at > > > > > > > > > >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > > > > > > > > at > > > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > > > > > > > > at > > > > > > > > > >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625) > > > > > >
Re: flink savepoint
已经指定了 admin <17626017...@163.com> 于2020年11月6日周五 下午3:17写道: > Hi, > 你的任务时跑在yarn上的吗?如果是 需要指定 -yid > > > 2020年11月6日 下午1:31,Congxian Qiu 写道: > > > > Hi > > 从 client 端日志,或者 JM 日志还能看到其他的异常么? > > Best, > > Congxian > > > > > > 张锴 于2020年11月6日周五 上午11:42写道: > > > >> 重启和反压都正常 > >> 另外增加了从客户端到master的时间,还是有这个问题 > >> > >> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道: > >> > >>> Hi, > >>> > >>> > >>> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时, > >>> 具体的原因需要看下 Jobmaster 的日志。 > >>> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。 > >>> > >>> > >>> Best, > >>> Hailong Wang > >>> > >>> > >>> > >>> > >>> 在 2020-11-06 09:33:48,"张锴" 写道: > >>>> 本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。 > >>>> > >>>> flink 版本1.10.1 > >>>> > >>>> > >>>> 执行 flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47 > >>>> hdfs://hadoopnamenodeHA/flink/flink-savepoints > >>>> > >>>> > >>>> 出现错误信息 > >>>> > >>>> > >>>> org.apache.flink.util.FlinkException: Triggering a savepoint for the > job > >>>> a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed. > >>>> > >>>> at > >>> > >>> > >>> > org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) > >>>> > >>>> at > >>> > >>> > >>> > org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) > >>>> > >>>> at > >>> > >>> > >>> > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) > >>>> > >>>> at > >>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) > >>>> > >>>> at > >>> > >>> > >>> > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) > >>>> > >>>> at > >>> > >>> > >>> > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) > >>>> > >>>> at java.security.AccessController.doPrivileged(Native Method) > >>>> > >>>> at javax.security.auth.Subject.doAs(Subject.java:422) > >>>> > >>>> at > >>> > >>> > >>> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > >>>> > >>>> at > >>> > >>> > >>> > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > >>>> > >>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) > >>>> > >>>> Caused by: java.util.concurrent.TimeoutException > >>>> > >>>> at > >>> > >>> > >>> > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > >>>> > >>>> at > >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > >>>> > >>>> at > >>> > >>> > >>> > org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625) > >>> > >> > >
flink savepoint 异常
本人用flink 1.10.1版本进行savepoint时遇到下列错误,暂时不清楚错误的原因,特来寻求帮助,麻烦大佬们看看 已经排除反压和重启的原因,checkpoint超时设置了十分钟,conf配置增加客户端连接master的时间,但还是出现异常。 命令 flink savepoint -yid application_1604456903594_2381 fb8131bcb78cbdf2bb9a705d8a4ceebc hdfs:///hadoopnamenodeHA/flink/flink-savepoints 异常 The program finished with the following exception: org.apache.flink.util.FlinkException: Triggering a savepoint for the job fb8131bcb78cbdf2bb9a705d8a4ceebc failed. at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: An Exception occurred while triggering the checkpoint. at org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:756) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseState
Re: flink savepoint
Hi, 你的任务时跑在yarn上的吗?如果是 需要指定 -yid > 2020年11月6日 下午1:31,Congxian Qiu 写道: > > Hi > 从 client 端日志,或者 JM 日志还能看到其他的异常么? > Best, > Congxian > > > 张锴 于2020年11月6日周五 上午11:42写道: > >> 重启和反压都正常 >> 另外增加了从客户端到master的时间,还是有这个问题 >> >> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道: >> >>> Hi, >>> >>> >>> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时, >>> 具体的原因需要看下 Jobmaster 的日志。 >>> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。 >>> >>> >>> Best, >>> Hailong Wang >>> >>> >>> >>> >>> 在 2020-11-06 09:33:48,"张锴" 写道: >>>> 本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。 >>>> >>>> flink 版本1.10.1 >>>> >>>> >>>> 执行 flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47 >>>> hdfs://hadoopnamenodeHA/flink/flink-savepoints >>>> >>>> >>>> 出现错误信息 >>>> >>>> >>>> org.apache.flink.util.FlinkException: Triggering a savepoint for the job >>>> a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed. >>>> >>>> at >>> >>> >>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) >>>> >>>> at >>> >>> >>> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) >>>> >>>> at >>> >>> >>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) >>>> >>>> at >>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) >>>> >>>> at >>> >>> >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) >>>> >>>> at >>> >>> >>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) >>>> >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> >>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>> >>>> at >>> >>> >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) >>>> >>>> at >>> >>> >>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>>> >>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) >>>> >>>> Caused by: java.util.concurrent.TimeoutException >>>> >>>> at >>> >>> >>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) >>>> >>>> at >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >>>> >>>> at >>> >>> >>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625) >>> >>
Re: flink savepoint
Hi 从 client 端日志,或者 JM 日志还能看到其他的异常么? Best, Congxian 张锴 于2020年11月6日周五 上午11:42写道: > 重启和反压都正常 > 另外增加了从客户端到master的时间,还是有这个问题 > > hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道: > > > Hi, > > > > > > 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时, > > 具体的原因需要看下 Jobmaster 的日志。 > > PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。 > > > > > > Best, > > Hailong Wang > > > > > > > > > > 在 2020-11-06 09:33:48,"张锴" 写道: > > >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。 > > > > > >flink 版本1.10.1 > > > > > > > > >执行 flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47 > > >hdfs://hadoopnamenodeHA/flink/flink-savepoints > > > > > > > > >出现错误信息 > > > > > > > > >org.apache.flink.util.FlinkException: Triggering a savepoint for the job > > >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed. > > > > > > at > > > > > >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) > > > > > > at > > > > > >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) > > > > > > at > > > > > >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) > > > > > > at > > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) > > > > > > at > > > > > >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) > > > > > > at > > > > > >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) > > > > > > at java.security.AccessController.doPrivileged(Native Method) > > > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > > > at > > > > > >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > > > > > > at > > > > > >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > > > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) > > > > > >Caused by: java.util.concurrent.TimeoutException > > > > > > at > > > > > >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > > > > > > at > > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > > > > > > at > > > > > >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625) > > >
Re: flink savepoint
重启和反压都正常 另外增加了从客户端到master的时间,还是有这个问题 hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道: > Hi, > > > 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时, > 具体的原因需要看下 Jobmaster 的日志。 > PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。 > > > Best, > Hailong Wang > > > > > 在 2020-11-06 09:33:48,"张锴" 写道: > >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。 > > > >flink 版本1.10.1 > > > > > >执行 flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47 > >hdfs://hadoopnamenodeHA/flink/flink-savepoints > > > > > >出现错误信息 > > > > > >org.apache.flink.util.FlinkException: Triggering a savepoint for the job > >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed. > > > > at > > >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) > > > > at > > >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) > > > > at > > >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) > > > > at > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) > > > > at > > >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) > > > > at > > >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) > > > > at java.security.AccessController.doPrivileged(Native Method) > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > at > > >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > > > > at > > >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) > > > >Caused by: java.util.concurrent.TimeoutException > > > > at > > >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > > > > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > > > > at > > >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625) >
Re:flink savepoint
Hi, 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时, 具体的原因需要看下 Jobmaster 的日志。 PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。 Best, Hailong Wang 在 2020-11-06 09:33:48,"张锴" 写道: >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。 > >flink 版本1.10.1 > > >执行 flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47 >hdfs://hadoopnamenodeHA/flink/flink-savepoints > > >出现错误信息 > > >org.apache.flink.util.FlinkException: Triggering a savepoint for the job >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed. > > at >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) > > at >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) > > at >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) > > at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) > > at >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) > > at >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:422) > > at >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > > at >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) > >Caused by: java.util.concurrent.TimeoutException > > at >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > > at >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
flink savepoint
本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。 flink 版本1.10.1 执行 flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47 hdfs://hadoopnamenodeHA/flink/flink-savepoints 出现错误信息 org.apache.flink.util.FlinkException: Triggering a savepoint for the job a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed. at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) Caused by: java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
Re:几十G状态数据时,savepoint失败
Hi, 可以观察下 savepoint 失败之前最近的 checkpoint 是否也是失败的,如果也是失败的,应该就是任务本身目前处于亚健康情况。 Best, Hailong Wang At 2020-10-21 13:09:45, "廖辉轩" <726830...@qq.com> wrote: >Hi,all > >当数据到达50G左右时,savepoint总是失败。初步分析是执行savepoint时,内存溢出导致task挂掉,然后task重启。 > > >background: >flink version:1.10.0 >flink on yarn:Total Task Slots8 >Task Managers8,5G memory > > > > >backend:rocksdb,Increment >checkpoint:3 min > > >Container Error: >container_1596184446716_365651_01_09 because: Container >[pid=104949,containerID=container_1596184446716_365651_01_09] is running >beyond physical memory limits. Current usage: 5.3 GB of 5 GB physical memory >used; 7.4 GB of 10.5 GB virtual memory used. Killing container. >Dump of the process-tree for container_1596184446716_365651_01_09 : > > > >=== >Command: > > >flink savepoint 389923a0198dd74f5e1bc9e968258dba -yid >application_1596184446716_365651 > > >Error: > > >org.apache.flink.util.FlinkException: Triggering a savepoint for the job >389923a0198dd74f5e1bc9e968258dba failed. > at >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633) > at >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611) > at >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at >org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608) > at >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910) > at >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at >java.security.AccessController.doPrivileged(Native Method) > at >javax.security.auth.Subject.doAs(Subject.java:422) > at >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at >org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >Caused by: java.util.concurrent.CompletionException: >java.util.concurrent.CompletionException: >org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint >Coordinator is suspending. > at >org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:744) > at >java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at >java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at >java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at >org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) > at >org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) > at >org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at >org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at >akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at >akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at >scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at >akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at >scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at >scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at >scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at >akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at >akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at >akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at >akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at >akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at >akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at >akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at >akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >Caused by: java.util.concurrent.CompletionException: >org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint >Coordinator is suspending. > at >java.util.concurrent.CompletableFuture.encodeThrowable(Complet
Re: 几十G状态数据时,savepoint失败
Hi Taylor, 几十G的状态应该不算大,可以看一下出发savepoint的时候,job本身是否已经进入了反压等不健康状态 廖辉轩 <726830...@qq.com> 于2020年10月21日周三 下午2:10写道: > Hi,all > > 当数据到达50G左右时,savepoint总是失败。初步分析是执行savepoint时,内存溢出导致task挂掉,然后task重启。 > > > background: > flink version:1.10.0 > flink on yarn:Total Task Slots8 > Task Managers8,5G memory > > > > > backend:rocksdb,Increment > checkpoint:3 min > > > Container Error: > container_1596184446716_365651_01_09 because: Container > [pid=104949,containerID=container_1596184446716_365651_01_09] is > running beyond physical memory limits. Current usage: 5.3 GB of 5 GB > physical memory used; 7.4 GB of 10.5 GB virtual memory used. Killing > container. > Dump of the process-tree for container_1596184446716_365651_01_09 : > > > > === > Command: > > > flink savepoint 389923a0198dd74f5e1bc9e968258dba -yid > application_1596184446716_365651 > > > Error: > > > org.apache.flink.util.FlinkException: Triggering a savepoint for the job > 389923a0198dd74f5e1bc9e968258dba failed. > at > org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633) > at > org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at > java.security.AccessController.doPrivileged(Native Method) > at > javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: > org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint > Coordinator is suspending. > at > org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:744) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf > .UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf > .UnitCaseStatement.apply(CaseStatements.scala:21) > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf > .UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at > akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at > akka.actor.ActorCell.invoke(ActorCell.scala:561) > at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at > akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint > Coordinator is suspending. > at > java.util.con
????G????????????savepoint????
Hi??all ??50Gsavepointsavepointtask??task?? background?? flink version??1.10.0 flink on yarn??Total Task Slots8 Task Managers8??5G memory backend:rocksdb,Increment checkpoint:3 min Container Error?? container_1596184446716_365651_01_09 because: Container [pid=104949,containerID=container_1596184446716_365651_01_09] is running beyond physical memory limits. Current usage: 5.3 GB of 5 GB physical memory used; 7.4 GB of 10.5 GB virtual memory used. Killing container. Dump of the process-tree for container_1596184446716_365651_01_09 : === Command?? flink savepoint 389923a0198dd74f5e1bc9e968258dba -yid application_1596184446716_365651 Error?? org.apache.flink.util.FlinkException: Triggering a savepoint for the job 389923a0198dd74f5e1bc9e968258dba failed. at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633) at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending. at org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:744) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$1(CheckpointCoordinator.java:457
Re: flink1.10 stop with a savepoint失败
Hi,zilong 的确是这个问题,感谢帮助。 Best, Robin zilong xiao wrote > Hi Robin Zhang > 你应该是遇到了这个issue报告的问题:https://issues.apache.org/jira/browse/FLINK-16626 > ,可以看下这个issue描述,祝好~ > > Robin Zhang < > vincent2015qdlg@ > > 于2020年10月19日周一 下午3:42写道: > >> 普通的source -> map -> filter-> sink 测试应用。 >> >> 触发savepoint的脚本 : >> ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID} >> 具体报错信息: >> >> org.apache.flink.util.FlinkException: Could not stop with a savepoint job >> "81990282a4686ebda3d04041e3620776". >> at >> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) >> at >> >> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) >> at >> org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) >> at >> >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) >> at >> >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962) >> at >> >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> at >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >> Caused by: java.util.concurrent.TimeoutException >> at >> >> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) >> at >> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) >> ... 9 more >> >> >> >> 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop >> 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。 >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink1.10 stop with a savepoint失败
Hi,Congxian 感谢提供思路,看了一下,JM端没有暴露日志,只能查看到ck正常的日志 Best, Robin Congxian Qiu wrote > Hi > 你可以看下 JM log 中这个 savepoint 失败是什么原因导致的,如果是 savepoint 超时了,就要看哪个 task > 完成的慢,(savepoint 可能比 checkpoint 要慢) > Best, > Congxian > > > Robin Zhang < > vincent2015qdlg@ > > 于2020年10月19日周一 下午3:42写道: > >> 普通的source -> map -> filter-> sink 测试应用。 >> >> 触发savepoint的脚本 : >> ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID} >> 具体报错信息: >> >> org.apache.flink.util.FlinkException: Could not stop with a savepoint job >> "81990282a4686ebda3d04041e3620776". >> at >> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) >> at >> >> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) >> at >> org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) >> at >> >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) >> at >> >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962) >> at >> >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> at >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >> Caused by: java.util.concurrent.TimeoutException >> at >> >> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) >> at >> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) >> ... 9 more >> >> >> >> 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop >> 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。 >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink1.10 stop with a savepoint失败
Hi Robin Zhang 你应该是遇到了这个issue报告的问题:https://issues.apache.org/jira/browse/FLINK-16626 ,可以看下这个issue描述,祝好~ Robin Zhang 于2020年10月19日周一 下午3:42写道: > 普通的source -> map -> filter-> sink 测试应用。 > > 触发savepoint的脚本 : > ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID} > 具体报错信息: > > org.apache.flink.util.FlinkException: Could not stop with a savepoint job > "81990282a4686ebda3d04041e3620776". > at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at > org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962) > at > > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more > > > > 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop > 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: flink1.10 stop with a savepoint失败
Hi 你可以看下 JM log 中这个 savepoint 失败是什么原因导致的,如果是 savepoint 超时了,就要看哪个 task 完成的慢,(savepoint 可能比 checkpoint 要慢) Best, Congxian Robin Zhang 于2020年10月19日周一 下午3:42写道: > 普通的source -> map -> filter-> sink 测试应用。 > > 触发savepoint的脚本 : > ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID} > 具体报错信息: > > org.apache.flink.util.FlinkException: Could not stop with a savepoint job > "81990282a4686ebda3d04041e3620776". > at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at > org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962) > at > > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more > > > > 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop > 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
flink1.10 stop with a savepoint失败
普通的source -> map -> filter-> sink 测试应用。 触发savepoint的脚本 : ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID} 具体报错信息: org.apache.flink.util.FlinkException: Could not stop with a savepoint job "81990282a4686ebda3d04041e3620776". at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) ... 9 more 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re:回复: Re: flink savepoint和checkpoint相关事项
我理解是一样的,关于两者的不同点在这里[1]有介绍。 恢复方法是启动任务时 -s 指定从哪个路径恢复,例如 -s file:///tmp/test/db262ffab6b00db9820c54f25a3f956f/chk-61 [1] https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint 在 2020-10-10 08:43:38,"zjfpla...@hotmail.com" 写道: >请问savepoint也是一样吗? > > > >zjfpla...@hotmail.com > >发件人: Yun Tang >发送时间: 2020-10-10 01:35 >收件人: user-zh >主题: Re: flink savepoint和checkpoint相关事项 >Hi > >在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored >state [2] 恢复 > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids >[2] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state > >祝好 >唐云 > >From: zjfpla...@hotmail.com >Sent: Friday, October 9, 2020 8:59 >To: user-zh >Subject: flink savepoint和checkpoint相关事项 > >Hi, >flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了 > > > >zjfpla...@hotmail.com
回复: Re: flink savepoint和checkpoint相关事项
请问savepoint也是一样吗? zjfpla...@hotmail.com 发件人: Yun Tang 发送时间: 2020-10-10 01:35 收件人: user-zh 主题: Re: flink savepoint和checkpoint相关事项 Hi 在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored state [2] 恢复 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state 祝好 唐云 From: zjfpla...@hotmail.com Sent: Friday, October 9, 2020 8:59 To: user-zh Subject: flink savepoint和checkpoint相关事项 Hi, flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了 zjfpla...@hotmail.com
Re: flink savepoint和checkpoint相关事项
Hi 在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored state [2] 恢复 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state 祝好 唐云 From: zjfpla...@hotmail.com Sent: Friday, October 9, 2020 8:59 To: user-zh Subject: flink savepoint和checkpoint相关事项 Hi, flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了 zjfpla...@hotmail.com
flink savepoint和checkpoint相关事项
Hi, flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了 zjfpla...@hotmail.com
Re: Flink 1.5.0 savepoint 失败
Hi 从错误栈看是 Wrong FS: file:///data/emr_flink_savepoint_share/savepoint-0705a3-09a2f171a080/e2f63448-eed9-4038-a64a-e874a1a99ba1, expected: hdfs://flink-hdfs 这个导致的,你能把 savepoint 写到 hdfs://flink-hdfs 这个集群吗? Best, Congxian hk__lrzy 于2020年9月11日周五 下午2:46写道: > 代码是不是主动设置过stagebackend的地址呢 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: Flink 1.5.0 savepoint 失败
代码是不是主动设置过stagebackend的地址呢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取
flink用的自己的序列化机制。从chk恢复的时候,在open方法里会进行状态数据的注入。 按我的理解,该transient标记有没有都可以从chk恢复,但一般加入transient可以明确只有open方法中的数据注入这一种方法。 至于不加上transient是否可能产生其他影响,就不太清楚了。 范超 于2020年9月10日周四 上午9:35写道: > Transient 都不参与序列化了,怎么可能从checkopont里恢复? > > -邮件原件- > 发件人: Yun Tang [mailto:myas...@live.com] > 发送时间: 2020年9月7日 星期一 12:50 > 收件人: user-zh@flink.apache.org > 主题: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 > > Hi > > 首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1] > > 可以排查的思路 > > 1. 你的state是否开启了TTL呢 > 2. 能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。 > 3. 目前使用的state backend是什么,更换一种state backend,问题还能复现么 > > [1] > https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158 > > 祝好 > 唐云 > > From: Liu Rising > Sent: Sunday, September 6, 2020 17:45 > To: user-zh@flink.apache.org > Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 > > Hi 唐云 > > 以下是state定义以及初始化的code > > public class FlinkKeyedProcessFunction extends > KeyedProcessFunction, Tuple2 JsonNode>> { > > private static final Logger LOG = > LoggerFactory.getLogger(FlinkKeyedProcessFunction.class); > > ... > > private final ParameterTool params; > private transient ListState unmatchedProbesState; > > ... > > FlinkKeyedProcessFunction(ParameterTool params) { > this.params = params; > } > > @Override > public void open(Configuration parameters) { > > ListStateDescriptor descriptor = new > ListStateDescriptor<>( > "unmatchedProbes", TypeInformation.of(ObjectNode.class) > ); > unmatchedProbesState = > getRuntimeContext().getListState(descriptor); > > 以下是往state里add内容的部分 > ... > > List unmatchedProbes = > mapMatching.getUnMatchedProbes(id); > unmatchedProbesState.clear(); > > if (unmatchedProbes.size() > 0) { > try { > unmatchedProbesState.addAll(unmatchedProbes); > } catch (Exception e) { > LOG.warn("Continue processing although failed to add > unmatchedProbes to ListState. ID: " + id, e); > } > } > >... > > 以下是从state读取的code > > for (ObjectNode unmatchedProbe : > unmatchedProbesState.get()) { > LOG.info("Processing unmatched probe: " + > unmatchedProbe); > matchedValues.addAll(mapMatching.matchLocation(id, > unmatchedProbe)); > } > > > 之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值) > 去掉定义state那里的transient之后,上述问题不再出现。 > > 谢谢。 > Rising > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Flink 1.5.0 savepoint 失败
hi all. 我使用flink 1.5.0 在触发 savepoint失败。 共享目录:/data/emr_flink_savepoint_share/ 触发命令:bin/flink savepoint feaab3ec9031bce4eab0b677693ab9f0 file:///data/emr_flink_savepoint_share Hadoop conf 默认文件系统是 hdfs://flink-hdfs 报错: Caused by: java.lang.Exception: Could not materialize checkpoint 9381 for operator Source: KafkaJSONStringTableSource -> Map -> where: (OR(=(e, _UTF-16LE'INSERT'), =(e, _UTF-16LE'DELETE'), =(e, _UTF-16LE'UPDATE'))), select: (CAST(get_json_object(data, _UTF-16LE'pid')) AS EXPR$0, CAST(get_json_object(data, _UTF-16LE'tag_id')) AS EXPR$1, CAST(get_json_object(data, _UTF-16LE'tag_type')) AS EXPR$2, get_json_object(data, _UTF-16LE'tag_name') AS EXPR$3, CAST(get_json_object(data, _UTF-16LE'tag_version')) AS EXPR$4, CAST(get_json_object(data, _UTF-16LE'att_type')) AS EXPR$5, CAST(get_json_object(data, _UTF-16LE'is_del')) AS EXPR$6, e) -> to: Tuple2 -> Sink: Unnamed (1/1). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:943) ... 6 more Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not open output stream for state backend at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:854) ... 5 more Caused by: java.io.IOException: Could not open output stream for state backend at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:360) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:161) at org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoWriterV2.writeOperatorStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:142) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.write(OperatorBackendSerializationProxy.java:77) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:411) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:352) at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) ... 7 more Caused by: java.lang.IllegalArgumentException: Wrong FS: file:///data/emr_flink_savepoint_share/savepoint-0705a3-09a2f171a080/e2f63448-eed9-4038-a64a-e874a1a99ba1, expected: hdfs://flink-hdfs at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645) at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:193) at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105) at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397) at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:140) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:36) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointState
答复: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取
Transient 都不参与序列化了,怎么可能从checkopont里恢复? -邮件原件- 发件人: Yun Tang [mailto:myas...@live.com] 发送时间: 2020年9月7日 星期一 12:50 收件人: user-zh@flink.apache.org 主题: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 Hi 首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1] 可以排查的思路 1. 你的state是否开启了TTL呢 2. 能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。 3. 目前使用的state backend是什么,更换一种state backend,问题还能复现么 [1] https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158 祝好 唐云 From: Liu Rising Sent: Sunday, September 6, 2020 17:45 To: user-zh@flink.apache.org Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 Hi 唐云 以下是state定义以及初始化的code public class FlinkKeyedProcessFunction extends KeyedProcessFunction, Tuple2> { private static final Logger LOG = LoggerFactory.getLogger(FlinkKeyedProcessFunction.class); ... private final ParameterTool params; private transient ListState unmatchedProbesState; ... FlinkKeyedProcessFunction(ParameterTool params) { this.params = params; } @Override public void open(Configuration parameters) { ListStateDescriptor descriptor = new ListStateDescriptor<>( "unmatchedProbes", TypeInformation.of(ObjectNode.class) ); unmatchedProbesState = getRuntimeContext().getListState(descriptor); 以下是往state里add内容的部分 ... List unmatchedProbes = mapMatching.getUnMatchedProbes(id); unmatchedProbesState.clear(); if (unmatchedProbes.size() > 0) { try { unmatchedProbesState.addAll(unmatchedProbes); } catch (Exception e) { LOG.warn("Continue processing although failed to add unmatchedProbes to ListState. ID: " + id, e); } } ... 以下是从state读取的code for (ObjectNode unmatchedProbe : unmatchedProbesState.get()) { LOG.info("Processing unmatched probe: " + unmatchedProbe); matchedValues.addAll(mapMatching.matchLocation(id, unmatchedProbe)); } 之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值) 去掉定义state那里的transient之后,上述问题不再出现。 谢谢。 Rising -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取
Hi 首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1] 可以排查的思路 1. 你的state是否开启了TTL呢 2. 能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。 3. 目前使用的state backend是什么,更换一种state backend,问题还能复现么 [1] https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158 祝好 唐云 From: Liu Rising Sent: Sunday, September 6, 2020 17:45 To: user-zh@flink.apache.org Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 Hi 唐云 以下是state定义以及初始化的code public class FlinkKeyedProcessFunction extends KeyedProcessFunction, Tuple2> { private static final Logger LOG = LoggerFactory.getLogger(FlinkKeyedProcessFunction.class); ... private final ParameterTool params; private transient ListState unmatchedProbesState; ... FlinkKeyedProcessFunction(ParameterTool params) { this.params = params; } @Override public void open(Configuration parameters) { ListStateDescriptor descriptor = new ListStateDescriptor<>( "unmatchedProbes", TypeInformation.of(ObjectNode.class) ); unmatchedProbesState = getRuntimeContext().getListState(descriptor); 以下是往state里add内容的部分 ... List unmatchedProbes = mapMatching.getUnMatchedProbes(id); unmatchedProbesState.clear(); if (unmatchedProbes.size() > 0) { try { unmatchedProbesState.addAll(unmatchedProbes); } catch (Exception e) { LOG.warn("Continue processing although failed to add unmatchedProbes to ListState. ID: " + id, e); } } ... 以下是从state读取的code for (ObjectNode unmatchedProbe : unmatchedProbesState.get()) { LOG.info("Processing unmatched probe: " + unmatchedProbe); matchedValues.addAll(mapMatching.matchLocation(id, unmatchedProbe)); } 之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值) 去掉定义state那里的transient之后,上述问题不再出现。 谢谢。 Rising -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取
Hi 唐云 以下是state定义以及初始化的code public class FlinkKeyedProcessFunction extends KeyedProcessFunction, Tuple2> { private static final Logger LOG = LoggerFactory.getLogger(FlinkKeyedProcessFunction.class); ... private final ParameterTool params; private transient ListState unmatchedProbesState; ... FlinkKeyedProcessFunction(ParameterTool params) { this.params = params; } @Override public void open(Configuration parameters) { ListStateDescriptor descriptor = new ListStateDescriptor<>( "unmatchedProbes", TypeInformation.of(ObjectNode.class) ); unmatchedProbesState = getRuntimeContext().getListState(descriptor); 以下是往state里add内容的部分 ... List unmatchedProbes = mapMatching.getUnMatchedProbes(id); unmatchedProbesState.clear(); if (unmatchedProbes.size() > 0) { try { unmatchedProbesState.addAll(unmatchedProbes); } catch (Exception e) { LOG.warn("Continue processing although failed to add unmatchedProbes to ListState. ID: " + id, e); } } ... 以下是从state读取的code for (ObjectNode unmatchedProbe : unmatchedProbesState.get()) { LOG.info("Processing unmatched probe: " + unmatchedProbe); matchedValues.addAll(mapMatching.matchLocation(id, unmatchedProbe)); } 之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值) 去掉定义state那里的transient之后,上述问题不再出现。 谢谢。 Rising -- Sent from: http://apache-flink.147419.n8.nabble.com/
?????? ????savepoint
-- -- ??: "x" <35907...@qq.com>; : 2020??9??3??(??) 12:22 ??: "user-zh"https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html ~ x <35907...@qq.com> ??2020??9??3?? 11:30?? > /flink/flink-1.10.1/bin/flink cancel -s > hdfs://nameservice1/user/flink_1.10.1/flink-savepoints > f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301 > Unrecognized option: -yid
Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取
Hi 我觉得这个不是root cause,实际上 transient ListState 是一种正确的用法,因为state应该是在函数open方法里面进行初始化,所以transient 修饰即可。 麻烦把这个list state的初始化以及使用方法的代码都贴出来吧。 祝好 唐云 From: Liu Rising Sent: Thursday, September 3, 2020 12:26 To: user-zh@flink.apache.org Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 Hi 找到原因了。 问题在于在定义ListState时使用了transient关键字,如下。 private transient ListState state; 去掉了transient之后,问题解决。 虽然不太清粗为何transient会造成这种情况。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取
Hi 找到原因了。 问题在于在定义ListState时使用了transient关键字,如下。 private transient ListState state; 去掉了transient之后,问题解决。 虽然不太清粗为何transient会造成这种情况。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
?????? ????savepoint
??V1.10.1?? -- -- ??: "user-zh" https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html ~ x <35907...@qq.com> ??2020??9??3?? 11:30?? > /flink/flink-1.10.1/bin/flink cancel -s > hdfs://nameservice1/user/flink_1.10.1/flink-savepoints > f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301 > Unrecognized option: -yid
Re: 无法savepoint
看官方文档 cancel 语法格式是:Syntax: cancel [OPTIONS] ,所以-yid xxx是不是要放到job id之前? 另外文档中有提示到Cancel with a savepoint (deprecated), 建议使用stop语法,见: https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html 希望对你有帮助,祝好~ x <35907...@qq.com> 于2020年9月3日周四 上午11:30写道: > /flink/flink-1.10.1/bin/flink cancel -s > hdfs://nameservice1/user/flink_1.10.1/flink-savepoints > f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301 > 提示Unrecognized option: -yid
????savepoint
/flink/flink-1.10.1/bin/flink cancel -s hdfs://nameservice1/user/flink_1.10.1/flink-savepoints f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301 Unrecognized option: -yid