Re: flink web ui cancel job时能否指定要不要生成savepoint?

2022-10-27 文章 Jinzhong Li
hi casel,
目前web ui上应该不支持触发savepoint。  如果要使用stop-with-savepoint功能的话,
可以通过bin/flink[1]或者rest
api[2]的方式。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-stop


李晋忠


casel.chen  于2022年10月28日周五 09:41写道:

> flink web ui cancel job时能否指定要不要生成savepoint? 目前是直接cancel job without
> savepoint的。


flink web ui cancel job时能否指定要不要生成savepoint?

2022-10-27 文章 casel.chen
flink web ui cancel job时能否指定要不要生成savepoint? 目前是直接cancel job without savepoint的。

Re: 使用flink-operator 成功生成savepoint, 但job 并未取消

2022-10-19 文章 shimin huang
savepoint流程
1. 执行savepoint
kubectl patch flinkdeployment/savepoint-job --type=merge -p '{"spec": {"job": 
{"state": "suspended", "upgradeMode": "savepoint"}}}’
2. 删除job
kubectl delete flinkdeployment/savepoint-job
3. 根据savepoint启动job
修改flinkdeployment yaml配置,添加如下
spec:
 ...
 job:
   initialSavepointPath: savepoint路径
执行kubectl apply -f xxx

> 2022年10月19日 下午3:53,Liting Liu (litiliu)  写道:
> 
> hi:
> 我在使用flink-operator 1.2.0 & flink 1.14.3,  使用flink-operator 成功手动生成了savepoint, 
> 但savepoint 生成之后, job 并没有自动取消。 希望savepoint 成功之后job 能自动取消。请问是哪里没操作对吗?还是一个已知问题?
> jobStatus:
>   jobId: 9de925e9d4a67e04ef6279925450907c
>   jobName: sql-te-lab-s334c9
>   savepointInfo:
> lastPeriodicSavepointTimestamp: 0
> lastSavepoint:
>   location: >-
> 
> hdfs://nameservice1/user/pda/savepoint/6077944532288/flink/sql-te/savepoint-9de925-b9ead1c58e7b
>   timeStamp: 1666163606426
>   triggerType: MANUAL
> savepointHistory:
>   - location: >-
>   
> hdfs://nameservice1/user/pda/savepoint/6077944532288/flink/sql-te/savepoint-9de925-b9ead1c58e7b
> timeStamp: 1666163606426
> triggerType: MANUAL
> triggerId: ''
> triggerTimestamp: 0
> triggerType: MANUAL
>   startTime: '1666161791058'
>   state: RUNNING
>   updateTime: '1666161828364'
> 



使用flink-operator 成功生成savepoint, 但job 并未取消

2022-10-19 文章 Liting Liu (litiliu)
hi:
  我在使用flink-operator 1.2.0 & flink 1.14.3,  使用flink-operator 成功手动生成了savepoint, 
但savepoint 生成之后, job 并没有自动取消。 希望savepoint 成功之后job 能自动取消。请问是哪里没操作对吗?还是一个已知问题?
  jobStatus:
jobId: 9de925e9d4a67e04ef6279925450907c
jobName: sql-te-lab-s334c9
savepointInfo:
  lastPeriodicSavepointTimestamp: 0
  lastSavepoint:
location: >-
  
hdfs://nameservice1/user/pda/savepoint/6077944532288/flink/sql-te/savepoint-9de925-b9ead1c58e7b
timeStamp: 1666163606426
triggerType: MANUAL
  savepointHistory:
- location: >-

hdfs://nameservice1/user/pda/savepoint/6077944532288/flink/sql-te/savepoint-9de925-b9ead1c58e7b
  timeStamp: 1666163606426
  triggerType: MANUAL
  triggerId: ''
  triggerTimestamp: 0
  triggerType: MANUAL
startTime: '1666161791058'
state: RUNNING
updateTime: '1666161828364'



Re: 触发savepoint后, source算子会从对应offset处停止消费吗?

2022-09-08 文章 yh z
hi, 在我的理解里,savePoint 的作用和 checkPoint 是类似的,只是在 flink 1.16 以前 savePoint
只支持全量的 savePoint,底层都是采用的 barrier 实现机制。但是在1.16的规划文档里(
https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints),savepoint
也将支持增量的模式。
当 savepoint 触发时, source 会去保存状态,是会停止消费的。

郑 致远  于2022年9月8日周四 19:39写道:

> 各位大神好.
> 请教
> savepoint 也是用 barrier机制实现的吗?
> savepoint 触发的时候,  source算子会停止从kafka消费吗?
>


触发savepoint后, source算子会从对应offset处停止消费吗?

2022-09-08 文章 郑 致远
各位大神好.
请教
savepoint 也是用 barrier机制实现的吗?
savepoint 触发的时候,  source算子会停止从kafka消费吗?


Re: 基于savepoint重启作业无法保证端到端一致性

2022-09-02 文章 Shuo Cheng
设计上是支持的. 建议贴上代码, 这样大家比较好判断问题所在.

On Fri, Aug 26, 2022 at 4:08 PM 杨扬  wrote:

> 各位好!
> 目前有一flink作业,source与sink均为kafka。
> 在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。
> 现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。
>
> 想请教这个问题是savepoint设计时本身就无法保证启停前后端到端一致性,还是我们哪里操作不当呢?
>
>
>
>
>
>
>


Re: 基于savepoint重启作业无法保证端到端一致性

2022-09-01 文章 杨扬
指定了,依然无法保证。





> 在 2022年8月26日,下午5:28,gulugulucxg  写道:
> 
> flinkKafkaProducer指定EXACTLY_ONCE语义了吗
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2022-08-26 16:50:33,"杨扬"  写道:
>> kafka-2.4.1
>> flink-1.14.2
>> 
>> 
>> 
>> 
>>> 在 2022年8月26日,下午4:42,Hangxiang Yu  写道:
>>> 
>>> flink会保证自身的exactly once语义,端到端的exactly once的语义是需要source和sink保证幂等的;
>>> 你用的kafka是哪个版本?
>>> 
>>> On Fri, Aug 26, 2022 at 4:08 PM 杨扬  wrote:
>>> 
>>>> 各位好!
>>>>   目前有一flink作业,source与sink均为kafka。
>>>>   在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。
>>>>   现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。
>>>> 
>>>>   想请教这个问题是savepoint设计时本身就无法保证启停前后端到端一致性,还是我们哪里操作不当呢?
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>> 
>>> -- 
>>> Best,
>>> Hangxiang.
>>> 
>>> === 
>>> 此邮件已由 Deep Discovery Email Inspector 进行了分析。
>> 
> 
> === 
> 此邮件已由 Deep Discovery Email Inspector 进行了分析。



Re:Re: 基于savepoint重启作业无法保证端到端一致性

2022-08-26 文章 gulugulucxg
flinkKafkaProducer指定EXACTLY_ONCE语义了吗

















在 2022-08-26 16:50:33,"杨扬"  写道:
>kafka-2.4.1
>flink-1.14.2
>
>
>
>
>> 在 2022年8月26日,下午4:42,Hangxiang Yu  写道:
>> 
>> flink会保证自身的exactly once语义,端到端的exactly once的语义是需要source和sink保证幂等的;
>> 你用的kafka是哪个版本?
>> 
>> On Fri, Aug 26, 2022 at 4:08 PM 杨扬  wrote:
>> 
>>> 各位好!
>>>目前有一flink作业,source与sink均为kafka。
>>>在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。
>>>现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。
>>> 
>>>想请教这个问题是savepoint设计时本身就无法保证启停前后端到端一致性,还是我们哪里操作不当呢?
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>> 
>> -- 
>> Best,
>> Hangxiang.
>> 
>> === 
>> 此邮件已由 Deep Discovery Email Inspector 进行了分析。
>


Re: 基于savepoint重启作业无法保证端到端一致性

2022-08-26 文章 杨扬
kafka-2.4.1
flink-1.14.2




> 在 2022年8月26日,下午4:42,Hangxiang Yu  写道:
> 
> flink会保证自身的exactly once语义,端到端的exactly once的语义是需要source和sink保证幂等的;
> 你用的kafka是哪个版本?
> 
> On Fri, Aug 26, 2022 at 4:08 PM 杨扬  wrote:
> 
>> 各位好!
>>目前有一flink作业,source与sink均为kafka。
>>    在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。
>>现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。
>> 
>>想请教这个问题是savepoint设计时本身就无法保证启停前后端到端一致性,还是我们哪里操作不当呢?
>> 
>> 
>> 
>> 
>> 
>> 
>> 
> 
> -- 
> Best,
> Hangxiang.
> 
> === 
> 此邮件已由 Deep Discovery Email Inspector 进行了分析。



Re: 基于savepoint重启作业无法保证端到端一致性

2022-08-26 文章 Hangxiang Yu
flink会保证自身的exactly once语义,端到端的exactly once的语义是需要source和sink保证幂等的;
你用的kafka是哪个版本?

On Fri, Aug 26, 2022 at 4:08 PM 杨扬  wrote:

> 各位好!
> 目前有一flink作业,source与sink均为kafka。
> 在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。
> 现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。
>
> 想请教这个问题是savepoint设计时本身就无法保证启停前后端到端一致性,还是我们哪里操作不当呢?
>
>
>
>
>
>
>

-- 
Best,
Hangxiang.


基于savepoint重启作业无法保证端到端一致性

2022-08-26 文章 杨扬
各位好!
目前有一flink作业,source与sink均为kafka。
在换版时(未修改任何代码)基于官网文档命令,创建savepoint并停止作业;而后基于之前创建的savepoint启动作业。
现在发现如此操作无法实现启停前后数据无缝对接,会出现一定的数据重复。

想请教这个问题是savepoint设计时本身就无法保证启停前后端到端一致性,还是我们哪里操作不当呢?








Re: 作业每次手动停止做savepoint耗时较久

2022-08-03 文章 Jinzhong Li
1. flink 1.13以前savepoint数据格式应该和checkpoint格式一样.
你的作业是否开启了增量checkpoint, 如果作业开启增量checkpoint的话, checkpoint会快(只上传增量部分),
 savepoint会慢(需要上传全量数据)。

2. flink 1.13
中统一了所有stateBackend的savepoint格式,因为savepoint时需要逐个遍历出state中的key-value数据,所以速度相比checkpoint也会慢很多。

3. flink 1.15中引入了native模式的savepoint[1],
放开了savepoint格式限制,其速度应该类似于一次全量checkpoint。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpoints_vs_savepoints/

Best,
Jinzhong Li

Howie Yang  于2022年8月3日周三 17:54写道:

> hello,
>
>
> 版本:flink1.9
> 问题:作业每次手动停止做savepoint要5min,自动化checkpoint只需要秒级,
> 请问:
> 1. savepoint是要比checkpoint多存一些内容吗?
> 2. savepoint为什么这么耗时?(在不保存savepoint的情况下,也是秒级停止)
>
>
>
>
>
>
>
> --
>
> Best,
> Howie


作业每次手动停止做savepoint耗时较久

2022-08-03 文章 Howie Yang
hello,


版本:flink1.9
问题:作业每次手动停止做savepoint要5min,自动化checkpoint只需要秒级,
请问:
1. savepoint是要比checkpoint多存一些内容吗?
2. savepoint为什么这么耗时?(在不保存savepoint的情况下,也是秒级停止)







--

Best,
Howie

flink1.12.2??????1.13.5??flink sql??????savepoint????

2022-02-08 文章 If
flink1.12.2??1.13.5??flink sql??savepoint
switched from INITIALIZING to FAILED with failure cause: java.lang.Exception: 
Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedMapBundleOperator_ebe385cc1e0bce62752f87eccdb2e8c6_(6/6) from 
any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
... 10 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:571)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:119)
at 
org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 12 more
Caused by: java.io.InvalidClassException: 
org.apache.flink.table.types.logical.LogicalType; local class incompatible: 
stream classdesc serialVersionUID = -7381419642101800907, local class 
serialVersionUID = 1
at 
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1829)
at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1829)
at 
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:593)
at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer$RowDataSerializerSnapshot.readSnapshot(RowDataSerializer.java:318)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77)
at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:152)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore

Re: Re: Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-09 文章 Jim Chen
我看flink sql写到kafka的时候,也没有开启事务。所以,这一点,我非常想不通


东东  于2021年8月9日周一 下午5:56写道:

>
>
>
> 有未提交的事务才会出现重复呗,也许你重启的时候恰好所有事务都已经提交了呢,比如说,有一段时间流里面没有新数据进来。
>
>
> 在 2021-08-09 17:44:57,"Jim Chen"  写道:
> >有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop
> >with savepoint,结果没有重复
> >
> >我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why..
> >
> >东东  于2021年8月2日周一 下午7:13写道:
> >
> >> 从topic B实时写到hive,这个job需要配置 isolation.level 为
> >> read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。
> >>
> >>
> >> 在 2021-08-02 19:00:13,"Jim Chen"  写道:
> >> >我不太懂,下游的isolation.level是不是read_committed是啥意思。
> >> >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic
> >> >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了
> >> >
> >> >东东  于2021年8月2日周一 下午6:20写道:
> >> >
> >> >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed
> >> >>
> >> >>
> >> >> 在 2021-08-02 18:14:27,"Jim Chen"  写道:
> >> >> >Hi 刘建刚,
> >> >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。
> >> >> >停止命令:
> >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
> >> >> >-yid application_1625497885855_703064 \
> >> >> >-p
> >> >>
> >> >>
> >>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >> >> >\
> >> >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827
> >> >> >
> >> >> >重启命令:
> >> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >> >> >-m yarn-cluster \
> >> >> >-yjm 4096 -ytm 4096 \
> >> >> >-ynm User_Click_Log_Split_All \
> >> >> >-yqu syh_offline \
> >> >> >-ys 2 \
> >> >> >-d \
> >> >> >-p 64 \
> >> >> >-s
> >> >>
> >> >>
> >>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5
> >> >> >\
> >> >> >-n \
> >> >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
> >> >>
> >> >>
> >>
> >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >> >> >
> >> >> >
> >> >> >刘建刚  于2021年8月2日周一 下午3:49写道:
> >> >> >
> >> >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
> >> >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
> >> >> >>
> >> >> >>
> >> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
> >> >> >>
> >> >> >> Jim Chen  于2021年8月2日周一 下午2:33写道:
> >> >> >>
> >> >> >> > 我是通过savepoint的方式重启的,命令如下:
> >> >> >> >
> >> >> >> > cancel command:
> >> >> >> >
> >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
> >> >> >> > -yid application_1625497885855_698371 \
> >> >> >> > -s
> >> >> >> >
> >> >> >> >
> >> >> >>
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >> >> >> > \
> >> >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a
> >> >> >> >
> >> >> >> > print savepoint:
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >>
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >> >> >> >
> >> >> >> >
> >> >> >> > restart command:
> >> >> >> >
> >> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >> >> >> > -m yarn-cluster \
> >> >> >> > -yjm 4096 -ytm 4096 \
> >> >> >> > -ynm User_Click_Log_Split_All \
> >> >> >> > -yqu syh_offline \
> >> >> >> > -ys 2 \
> >

Re:Re: Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-09 文章 东东



有未提交的事务才会出现重复呗,也许你重启的时候恰好所有事务都已经提交了呢,比如说,有一段时间流里面没有新数据进来。


在 2021-08-09 17:44:57,"Jim Chen"  写道:
>有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop
>with savepoint,结果没有重复
>
>我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why..
>
>东东  于2021年8月2日周一 下午7:13写道:
>
>> 从topic B实时写到hive,这个job需要配置 isolation.level 为
>> read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。
>>
>>
>> 在 2021-08-02 19:00:13,"Jim Chen"  写道:
>> >我不太懂,下游的isolation.level是不是read_committed是啥意思。
>> >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic
>> >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了
>> >
>> >东东  于2021年8月2日周一 下午6:20写道:
>> >
>> >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed
>> >>
>> >>
>> >> 在 2021-08-02 18:14:27,"Jim Chen"  写道:
>> >> >Hi 刘建刚,
>> >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。
>> >> >停止命令:
>> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
>> >> >-yid application_1625497885855_703064 \
>> >> >-p
>> >>
>> >>
>> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
>> >> >\
>> >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827
>> >> >
>> >> >重启命令:
>> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
>> >> >-m yarn-cluster \
>> >> >-yjm 4096 -ytm 4096 \
>> >> >-ynm User_Click_Log_Split_All \
>> >> >-yqu syh_offline \
>> >> >-ys 2 \
>> >> >-d \
>> >> >-p 64 \
>> >> >-s
>> >>
>> >>
>> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5
>> >> >\
>> >> >-n \
>> >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
>> >>
>> >>
>> >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
>> >> >
>> >> >
>> >> >刘建刚  于2021年8月2日周一 下午3:49写道:
>> >> >
>> >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
>> >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
>> >> >>
>> >> >>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
>> >> >>
>> >> >> Jim Chen  于2021年8月2日周一 下午2:33写道:
>> >> >>
>> >> >> > 我是通过savepoint的方式重启的,命令如下:
>> >> >> >
>> >> >> > cancel command:
>> >> >> >
>> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
>> >> >> > -yid application_1625497885855_698371 \
>> >> >> > -s
>> >> >> >
>> >> >> >
>> >> >>
>> >>
>> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
>> >> >> > \
>> >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a
>> >> >> >
>> >> >> > print savepoint:
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >>
>> >>
>> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
>> >> >> >
>> >> >> >
>> >> >> > restart command:
>> >> >> >
>> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
>> >> >> > -m yarn-cluster \
>> >> >> > -yjm 4096 -ytm 4096 \
>> >> >> > -ynm User_Click_Log_Split_All \
>> >> >> > -yqu syh_offline \
>> >> >> > -ys 2 \
>> >> >> > -d \
>> >> >> > -p 64 \
>> >> >> > -s
>> >> >> >
>> >> >> >
>> >> >>
>> >>
>> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
>> >> >> > \
>> >> >> > -n \
>> >> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
>> >> >> >
>> >> >> >
>> >> >>
>> >>
>> /opt/case/app/realt

Re: Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-09 文章 Jim Chen
有个奇怪的问题:用flink sql去做上面的逻辑,*没有设置isolation.level 为 read_committed,*重启方式用stop
with savepoint,结果没有重复

我理解,flink sql没有设置隔离级别的话,应该会出现重复的。Tell me why..

东东  于2021年8月2日周一 下午7:13写道:

> 从topic B实时写到hive,这个job需要配置 isolation.level 为
> read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。
>
>
> 在 2021-08-02 19:00:13,"Jim Chen"  写道:
> >我不太懂,下游的isolation.level是不是read_committed是啥意思。
> >我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic
> >B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了
> >
> >东东  于2021年8月2日周一 下午6:20写道:
> >
> >> 下游如何发现重复数据的,下游的isolation.level是不是read_committed
> >>
> >>
> >> 在 2021-08-02 18:14:27,"Jim Chen"  写道:
> >> >Hi 刘建刚,
> >> >我使用了stop with savepoint,但是还是发现,下游有重复数据。
> >> >停止命令:
> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
> >> >-yid application_1625497885855_703064 \
> >> >-p
> >>
> >>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >> >\
> >> >-d 55e7ebb6fa38faaba61b4b9a7cd89827
> >> >
> >> >重启命令:
> >> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >> >-m yarn-cluster \
> >> >-yjm 4096 -ytm 4096 \
> >> >-ynm User_Click_Log_Split_All \
> >> >-yqu syh_offline \
> >> >-ys 2 \
> >> >-d \
> >> >-p 64 \
> >> >-s
> >>
> >>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5
> >> >\
> >> >-n \
> >> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
> >>
> >>
> >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >> >
> >> >
> >> >刘建刚  于2021年8月2日周一 下午3:49写道:
> >> >
> >> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
> >> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
> >> >>
> >> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
> >> >>
> >> >> Jim Chen  于2021年8月2日周一 下午2:33写道:
> >> >>
> >> >> > 我是通过savepoint的方式重启的,命令如下:
> >> >> >
> >> >> > cancel command:
> >> >> >
> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
> >> >> > -yid application_1625497885855_698371 \
> >> >> > -s
> >> >> >
> >> >> >
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >> >> > \
> >> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a
> >> >> >
> >> >> > print savepoint:
> >> >> >
> >> >> >
> >> >> >
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >> >> >
> >> >> >
> >> >> > restart command:
> >> >> >
> >> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >> >> > -m yarn-cluster \
> >> >> > -yjm 4096 -ytm 4096 \
> >> >> > -ynm User_Click_Log_Split_All \
> >> >> > -yqu syh_offline \
> >> >> > -ys 2 \
> >> >> > -d \
> >> >> > -p 64 \
> >> >> > -s
> >> >> >
> >> >> >
> >> >>
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >> >> > \
> >> >> > -n \
> >> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
> >> >> >
> >> >> >
> >> >>
> >>
> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >> >> >
> >> >> > Jim Chen  于2021年8月2日周一 下午2:01写道:
> >> >> >
> >> >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
> >> >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
> >> >> > >
> >> >> > > My Versions
> >> >> > > Flink 1.12.4
> >> >> > > Kafka 2.0.1
> >> >> 

Re:Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-02 文章 东东
从topic B实时写到hive,这个job需要配置 isolation.level 为 
read_committed,否则会把还没有提交甚至是已经终止的事务消息读出来,这样就很难不出现重复了。


在 2021-08-02 19:00:13,"Jim Chen"  写道:
>我不太懂,下游的isolation.level是不是read_committed是啥意思。
>我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic
>B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了
>
>东东  于2021年8月2日周一 下午6:20写道:
>
>> 下游如何发现重复数据的,下游的isolation.level是不是read_committed
>>
>>
>> 在 2021-08-02 18:14:27,"Jim Chen"  写道:
>> >Hi 刘建刚,
>> >我使用了stop with savepoint,但是还是发现,下游有重复数据。
>> >停止命令:
>> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
>> >-yid application_1625497885855_703064 \
>> >-p
>>
>> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
>> >\
>> >-d 55e7ebb6fa38faaba61b4b9a7cd89827
>> >
>> >重启命令:
>> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
>> >-m yarn-cluster \
>> >-yjm 4096 -ytm 4096 \
>> >-ynm User_Click_Log_Split_All \
>> >-yqu syh_offline \
>> >-ys 2 \
>> >-d \
>> >-p 64 \
>> >-s
>>
>> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5
>> >\
>> >-n \
>> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
>>
>> >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
>> >
>> >
>> >刘建刚  于2021年8月2日周一 下午3:49写道:
>> >
>> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
>> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
>> >>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
>> >>
>> >> Jim Chen  于2021年8月2日周一 下午2:33写道:
>> >>
>> >> > 我是通过savepoint的方式重启的,命令如下:
>> >> >
>> >> > cancel command:
>> >> >
>> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
>> >> > -yid application_1625497885855_698371 \
>> >> > -s
>> >> >
>> >> >
>> >>
>> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
>> >> > \
>> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a
>> >> >
>> >> > print savepoint:
>> >> >
>> >> >
>> >> >
>> >>
>> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
>> >> >
>> >> >
>> >> > restart command:
>> >> >
>> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
>> >> > -m yarn-cluster \
>> >> > -yjm 4096 -ytm 4096 \
>> >> > -ynm User_Click_Log_Split_All \
>> >> > -yqu syh_offline \
>> >> > -ys 2 \
>> >> > -d \
>> >> > -p 64 \
>> >> > -s
>> >> >
>> >> >
>> >>
>> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
>> >> > \
>> >> > -n \
>> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
>> >> >
>> >> >
>> >>
>> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
>> >> >
>> >> > Jim Chen  于2021年8月2日周一 下午2:01写道:
>> >> >
>> >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
>> >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
>> >> > >
>> >> > > My Versions
>> >> > > Flink 1.12.4
>> >> > > Kafka 2.0.1
>> >> > > Java 1.8
>> >> > >
>> >> > > Core code:
>> >> > >
>> >> > > env.enableCheckpointing(30);
>> >> > >
>> >> > >
>> >> >
>> >>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> >> > >
>> >> > >
>> >> >
>> >>
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> >> > >
>> >> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
>> >> > >
>> >

Re: Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-02 文章 Jim Chen
我不太懂,下游的isolation.level是不是read_committed是啥意思。
我是把topic A中的partitionId和offset写到消息体中,然后flink程序,把消息写到下游的topic B中。将topic
B实时写到hive上,然后在hive表中,根据partitionId和offset去重,发现有重复消费了

东东  于2021年8月2日周一 下午6:20写道:

> 下游如何发现重复数据的,下游的isolation.level是不是read_committed
>
>
> 在 2021-08-02 18:14:27,"Jim Chen"  写道:
> >Hi 刘建刚,
> >我使用了stop with savepoint,但是还是发现,下游有重复数据。
> >停止命令:
> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
> >-yid application_1625497885855_703064 \
> >-p
>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >\
> >-d 55e7ebb6fa38faaba61b4b9a7cd89827
> >
> >重启命令:
> >/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >-m yarn-cluster \
> >-yjm 4096 -ytm 4096 \
> >-ynm User_Click_Log_Split_All \
> >-yqu syh_offline \
> >-ys 2 \
> >-d \
> >-p 64 \
> >-s
>
> >hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5
> >\
> >-n \
> >-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
>
> >/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >
> >
> >刘建刚  于2021年8月2日周一 下午3:49写道:
> >
> >> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
> >> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
> >>
> >> Jim Chen  于2021年8月2日周一 下午2:33写道:
> >>
> >> > 我是通过savepoint的方式重启的,命令如下:
> >> >
> >> > cancel command:
> >> >
> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
> >> > -yid application_1625497885855_698371 \
> >> > -s
> >> >
> >> >
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> >> > \
> >> > 59cf6ccc83aa163bd1e0cd3304dfe06a
> >> >
> >> > print savepoint:
> >> >
> >> >
> >> >
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >> >
> >> >
> >> > restart command:
> >> >
> >> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> >> > -m yarn-cluster \
> >> > -yjm 4096 -ytm 4096 \
> >> > -ynm User_Click_Log_Split_All \
> >> > -yqu syh_offline \
> >> > -ys 2 \
> >> > -d \
> >> > -p 64 \
> >> > -s
> >> >
> >> >
> >>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >> > \
> >> > -n \
> >> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
> >> >
> >> >
> >>
> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >> >
> >> > Jim Chen  于2021年8月2日周一 下午2:01写道:
> >> >
> >> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
> >> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
> >> > >
> >> > > My Versions
> >> > > Flink 1.12.4
> >> > > Kafka 2.0.1
> >> > > Java 1.8
> >> > >
> >> > > Core code:
> >> > >
> >> > > env.enableCheckpointing(30);
> >> > >
> >> > >
> >> >
> >>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >> > >
> >> > >
> >> >
> >>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >> > >
> >> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
> >> > >
> >> > > tableEnv.createTemporaryView("data_table",dataDS);
> >> > > String sql = "select * from data_table a inner join
> >> > > hive_catalog.dim.dim.project for system_time as of a.proctime as b
> on
> >> > a.id
> >> > > = b.id"
> >> > > Table table = tableEnv.sqlQuery(sql);
> >> > > DataStream resultDS = tableEnv.toAppendStream(table,
> >> Row.class).map(xx);
> >> > >
> >> > > // Kafka producer parameter
> >> > > Properties producerProps = new Properties();
> >> > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> >> > > bootstrapServers);
> >> > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
> >> > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
> >> > kafkaBufferMemory);
> >> > > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
> >> > > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
> >> > > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
> 30);
> >> > >
> producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> >> > > "1");
> >> > > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
> >> > > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
> >> > > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
> >> > >
> >> > > resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new
> >> > > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
> >> > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
> >> > > .setParallelism(sinkParallelism);
> >> > >
> >> >
> >>
>


Re:Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-02 文章 东东
下游如何发现重复数据的,下游的isolation.level是不是read_committed


在 2021-08-02 18:14:27,"Jim Chen"  写道:
>Hi 刘建刚,
>我使用了stop with savepoint,但是还是发现,下游有重复数据。
>停止命令:
>/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
>-yid application_1625497885855_703064 \
>-p
>hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
>\
>-d 55e7ebb6fa38faaba61b4b9a7cd89827
>
>重启命令:
>/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
>-m yarn-cluster \
>-yjm 4096 -ytm 4096 \
>-ynm User_Click_Log_Split_All \
>-yqu syh_offline \
>-ys 2 \
>-d \
>-p 64 \
>-s
>hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5
>\
>-n \
>-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
>/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
>
>
>刘建刚  于2021年8月2日周一 下午3:49写道:
>
>> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
>> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
>>
>> Jim Chen  于2021年8月2日周一 下午2:33写道:
>>
>> > 我是通过savepoint的方式重启的,命令如下:
>> >
>> > cancel command:
>> >
>> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
>> > -yid application_1625497885855_698371 \
>> > -s
>> >
>> >
>> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
>> > \
>> > 59cf6ccc83aa163bd1e0cd3304dfe06a
>> >
>> > print savepoint:
>> >
>> >
>> >
>> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
>> >
>> >
>> > restart command:
>> >
>> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
>> > -m yarn-cluster \
>> > -yjm 4096 -ytm 4096 \
>> > -ynm User_Click_Log_Split_All \
>> > -yqu syh_offline \
>> > -ys 2 \
>> > -d \
>> > -p 64 \
>> > -s
>> >
>> >
>> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
>> > \
>> > -n \
>> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
>> >
>> >
>> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
>> >
>> > Jim Chen  于2021年8月2日周一 下午2:01写道:
>> >
>> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
>> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
>> > >
>> > > My Versions
>> > > Flink 1.12.4
>> > > Kafka 2.0.1
>> > > Java 1.8
>> > >
>> > > Core code:
>> > >
>> > > env.enableCheckpointing(30);
>> > >
>> > >
>> >
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> > >
>> > >
>> >
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> > >
>> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
>> > >
>> > > tableEnv.createTemporaryView("data_table",dataDS);
>> > > String sql = "select * from data_table a inner join
>> > > hive_catalog.dim.dim.project for system_time as of a.proctime as b on
>> > a.id
>> > > = b.id"
>> > > Table table = tableEnv.sqlQuery(sql);
>> > > DataStream resultDS = tableEnv.toAppendStream(table,
>> Row.class).map(xx);
>> > >
>> > > // Kafka producer parameter
>> > > Properties producerProps = new Properties();
>> > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>> > > bootstrapServers);
>> > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
>> > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
>> > kafkaBufferMemory);
>> > > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
>> > > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
>> > > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30);
>> > > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
>> > > "1");
>> > > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
>> > > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
>> > > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
>> > >
>> > > resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new
>> > > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
>> > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
>> > > .setParallelism(sinkParallelism);
>> > >
>> >
>>


Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-02 文章 Jim Chen
Hi 刘建刚,
我使用了stop with savepoint,但是还是发现,下游有重复数据。
停止命令:
/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink stop \
-yid application_1625497885855_703064 \
-p
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
\
-d 55e7ebb6fa38faaba61b4b9a7cd89827

重启命令:
/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
-m yarn-cluster \
-yjm 4096 -ytm 4096 \
-ynm User_Click_Log_Split_All \
-yqu syh_offline \
-ys 2 \
-d \
-p 64 \
-s
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-55e7eb-11203031f2a5
\
-n \
-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar


刘建刚  于2021年8月2日周一 下午3:49写道:

> cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
> savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
>
> Jim Chen  于2021年8月2日周一 下午2:33写道:
>
> > 我是通过savepoint的方式重启的,命令如下:
> >
> > cancel command:
> >
> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
> > -yid application_1625497885855_698371 \
> > -s
> >
> >
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> > \
> > 59cf6ccc83aa163bd1e0cd3304dfe06a
> >
> > print savepoint:
> >
> >
> >
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> >
> >
> > restart command:
> >
> > /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> > -m yarn-cluster \
> > -yjm 4096 -ytm 4096 \
> > -ynm User_Click_Log_Split_All \
> > -yqu syh_offline \
> > -ys 2 \
> > -d \
> > -p 64 \
> > -s
> >
> >
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> > \
> > -n \
> > -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
> >
> >
> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
> >
> > Jim Chen  于2021年8月2日周一 下午2:01写道:
> >
> > > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
> > > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
> > >
> > > My Versions
> > > Flink 1.12.4
> > > Kafka 2.0.1
> > > Java 1.8
> > >
> > > Core code:
> > >
> > > env.enableCheckpointing(30);
> > >
> > >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > >
> > >
> >
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> > >
> > > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
> > >
> > > tableEnv.createTemporaryView("data_table",dataDS);
> > > String sql = "select * from data_table a inner join
> > > hive_catalog.dim.dim.project for system_time as of a.proctime as b on
> > a.id
> > > = b.id"
> > > Table table = tableEnv.sqlQuery(sql);
> > > DataStream resultDS = tableEnv.toAppendStream(table,
> Row.class).map(xx);
> > >
> > > // Kafka producer parameter
> > > Properties producerProps = new Properties();
> > > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > bootstrapServers);
> > > producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
> > > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
> > kafkaBufferMemory);
> > > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
> > > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
> > > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30);
> > > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> > > "1");
> > > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
> > > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
> > > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
> > >
> > > resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new
> > > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
> > > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
> > > .setParallelism(sinkParallelism);
> > >
> >
>


Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-02 文章 刘建刚
cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/

Jim Chen  于2021年8月2日周一 下午2:33写道:

> 我是通过savepoint的方式重启的,命令如下:
>
> cancel command:
>
> /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
> -yid application_1625497885855_698371 \
> -s
>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> \
> 59cf6ccc83aa163bd1e0cd3304dfe06a
>
> print savepoint:
>
>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
>
>
> restart command:
>
> /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> -m yarn-cluster \
> -yjm 4096 -ytm 4096 \
> -ynm User_Click_Log_Split_All \
> -yqu syh_offline \
> -ys 2 \
> -d \
> -p 64 \
> -s
>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> \
> -n \
> -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
>
> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
>
> Jim Chen  于2021年8月2日周一 下午2:01写道:
>
> > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
> > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
> >
> > My Versions
> > Flink 1.12.4
> > Kafka 2.0.1
> > Java 1.8
> >
> > Core code:
> >
> > env.enableCheckpointing(30);
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >
> >
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >
> > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
> >
> > tableEnv.createTemporaryView("data_table",dataDS);
> > String sql = "select * from data_table a inner join
> > hive_catalog.dim.dim.project for system_time as of a.proctime as b on
> a.id
> > = b.id"
> > Table table = tableEnv.sqlQuery(sql);
> > DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx);
> >
> > // Kafka producer parameter
> > Properties producerProps = new Properties();
> > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> > bootstrapServers);
> > producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
> > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
> kafkaBufferMemory);
> > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
> > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
> > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30);
> > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> > "1");
> > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
> > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
> > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
> >
> > resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new
> > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
> > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
> > .setParallelism(sinkParallelism);
> >
>


Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-01 文章 Jim Chen
我是通过savepoint的方式重启的,命令如下:

cancel command:

/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
-yid application_1625497885855_698371 \
-s
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
\
59cf6ccc83aa163bd1e0cd3304dfe06a

print savepoint:

hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494


restart command:

/home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
-m yarn-cluster \
-yjm 4096 -ytm 4096 \
-ynm User_Click_Log_Split_All \
-yqu syh_offline \
-ys 2 \
-d \
-p 64 \
-s
hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
\
-n \
-c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
/opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar

Jim Chen  于2021年8月2日周一 下午2:01写道:

> 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
> 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
>
> My Versions
> Flink 1.12.4
> Kafka 2.0.1
> Java 1.8
>
> Core code:
>
> env.enableCheckpointing(30);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
>
> tableEnv.createTemporaryView("data_table",dataDS);
> String sql = "select * from data_table a inner join
> hive_catalog.dim.dim.project for system_time as of a.proctime as b on a.id
> = b.id"
> Table table = tableEnv.sqlQuery(sql);
> DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx);
>
> // Kafka producer parameter
> Properties producerProps = new Properties();
> producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> bootstrapServers);
> producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
> producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory);
> producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
> producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
> producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30);
> producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> "1");
> producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
> producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
> producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
>
> resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new
> JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
> .setParallelism(sinkParallelism);
>


通过savepoint重启任务,link消费kafka,有重复消息

2021-08-01 文章 Jim Chen
大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!

My Versions
Flink 1.12.4
Kafka 2.0.1
Java 1.8

Core code:

env.enableCheckpointing(30);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);

tableEnv.createTemporaryView("data_table",dataDS);
String sql = "select * from data_table a inner join
hive_catalog.dim.dim.project for system_time as of a.proctime as b on a.id
= b.id"
Table table = tableEnv.sqlQuery(sql);
DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx);

// Kafka producer parameter
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory);
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30);
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
"1");
producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new
JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
.setParallelism(sinkParallelism);


flink sql 从savepoint 重启遭遇新旧状态序列化不匹配的问题

2021-06-07 文章 mangguozhi
各位好,我在flink 1.13中使用flink sql 在一次修改代码后的重启任务中,报以下错误:

For heap backends, the new state serializer
(org.apache.flink.api.common.typeutils.base.MapSerializer@a5b17bdb) must not
be incompatible with the old state serializer
(org.apache.flink.api.common.typeutils.base.MapSerializer@e5a9c6d8).


我更改了sql中的一个时间字段

CURRENT_TIMESTAMP --> 事件时间的字段 ts  

ts 属性由下面转换而来:

ts AS TO_TIMESTAMP(FROM_UNIXTIME(tts, '-MM-dd HH:mm:ss'))

代码层面改动如下:
checkpoint 保存时间调大:

 env.getCheckpointConfig().setCheckpointInterval(60L);

允许保留比原来更多的checkpoint个数:  
 
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(20);

新增空闲状态过期时间:

Configuration configuration = tableEnv.getConfig().getConfiguration();
tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1));

想请问一下,这种改动为什么会导致新旧状态不匹配,又该如何避免呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于savepoint恢复问题咨询

2021-05-26 文章 LakeShen
看下你的 flink 命令对不对,然后去 Flink Web UI  Checkpoint 界面,看下是否从 Savepoint 恢复(下面有个
restore path).
之后再看下你的窗口时间类型用的是什么。

Best,
LakeShen

王春浩  于2021年5月27日周四 上午9:26写道:

> hi, 社区
> ​
> 版本flink 1.7
> ​
>
> 我正在尝试从保存点(或检查点)还原flink作业,该作业的工作是从kafka读取->执行30分钟的窗口聚合(只是AggregationFunction,就像一个计数器)->下沉到kafka。
> ​
> 我使用rocksdb和启用检查点。
> ​
> 现在我尝试手动触发一个保存点。 每个汇总的期望值是30(1个数据/每分钟)。 但是,当我从保存点还原时(flink运行-d -s
> {savepoint的url}),聚合值不是30(小于30,取决于我取消flink作业并还原的时间)。 但是当作业正常运行时,它将达到30。
> ​
> 我不知道为什么有些数据似乎会丢失?
> ​
> 日志显示``No restore state for FlinkKafkaConsumer''
> ​
> ​​
> ​
> 四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼
> 11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd
> Street, GaoXin District, Chengdu, Sichuan Province
> Mobile +86 15817382279
> Email  wangchun...@navercorp.com
>
> NCloud
>
> -Original Message-
> From: "王春浩"
> To: ;
> Cc:
> Sent: 2021/5/26周三 17:03 (GMT+08:00)
> Subject: inquire about restore from savepoint
>
> Hi Community,
> ​
> version flink 1.7
> im trying to make a flink job restore from a savepoint(or checkpoint),
> what the job do is reading from kafka -> do a 30-minutes-window
> aggregation(just AggregationFunction, acts like a counter) -> sink to kafka.
> i use rocksdb and enabled checkpoint.
> now i try to trigger a savepoint manually. the expected value of each
> aggregated one is 30(1 data/per minute). but when i restore from a
> savepoint(flink run -d -s {savepoint's url}), the aggregated value is not
> 30(less than 30, depends on the time i cancel flink job and restore). but
> when the job run normally, it gets 30.
> i don't know why could some data seems to be lost?
> and a log shows "No restore state for FlinkKafkaConsumer"​
> ​
> ​
> ​
> 四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼
> 11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd
> Street, GaoXin District, Chengdu, Sichuan Province
> Mobile +86 15817382279
> Email  wangchun...@navercorp.com
>
> NCloud
>


关于savepoint恢复问题咨询

2021-05-26 文章 王春浩
hi, 社区
​
版本flink 1.7
​
我正在尝试从保存点(或检查点)还原flink作业,该作业的工作是从kafka读取->执行30分钟的窗口聚合(只是AggregationFunction,就像一个计数器)->下沉到kafka。
​
我使用rocksdb和启用检查点。
​
现在我尝试手动触发一个保存点。 每个汇总的期望值是30(1个数据/每分钟)。 但是,当我从保存点还原时(flink运行-d -s 
{savepoint的url}),聚合值不是30(小于30,取决于我取消flink作业并还原的时间)。 但是当作业正常运行时,它将达到30。
​
我不知道为什么有些数据似乎会丢失?
​
日志显示``No restore state for FlinkKafkaConsumer''
​
​​
​
四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼
11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd 
Street, GaoXin District, Chengdu, Sichuan Province
Mobile +86 15817382279
Email  wangchun...@navercorp.com  

NCloud

-Original Message-
From: "王春浩"
To: ;
Cc:
Sent: 2021/5/26周三 17:03 (GMT+08:00)
Subject: inquire about restore from savepoint
 
Hi Community,
​
version flink 1.7
im trying to make a flink job restore from a savepoint(or checkpoint), what the 
job do is reading from kafka -> do a 30-minutes-window aggregation(just 
AggregationFunction, acts like a counter) -> sink to kafka.
i use rocksdb and enabled checkpoint.
now i try to trigger a savepoint manually. the expected value of each 
aggregated one is 30(1 data/per minute). but when i restore from a 
savepoint(flink run -d -s {savepoint's url}), the aggregated value is not 
30(less than 30, depends on the time i cancel flink job and restore). but when 
the job run normally, it gets 30.
i don't know why could some data seems to be lost?
and a log shows "No restore state for FlinkKafkaConsumer"​
​
​
​
四川省成都市高新区天府三街199号 太平洋保险金融大厦A区11楼
11th Floor,Tower A,Pacific insurance finance Building, No.199 TianFu 3rd 
Street, GaoXin District, Chengdu, Sichuan Province
Mobile +86 15817382279
Email  wangchun...@navercorp.com  

NCloud


Re:Re: Flink upgraded to version 1.12.0 and started from SavePoint to report an error

2021-05-19 文章 王炳焱
Hi
  非常感谢您的回复,state-processor-api我也尝试过,SQL生成的job 
graph没办法获取到每个算子的UID,所以state-processor-api也无法获取原来的state信息,没办法操作state,如果有更好的解决方案麻烦再回复一下邮件哈


感谢



















在 2021-05-20 10:46:22,"Yun Tang"  写道:
>Hi
>
>BaseRowSerializer 已经在Flink-1.11 时候改名成 RowDataSerializer了,即使用 
>state-processor-API 也没办法处理当前不存在的类,可能有种比较复杂的办法是自己把 BaseRowSerializer 
>的类不改变package的情况下拷贝出来,然后用 state-processor-API 将相关类强制转换成 
>RowDataSerializer,不过考虑到你的job graph都是SQL生成的,state-processor-API面向地更多的是data 
>stream API,这一块估计还挺难弄的,确实没有想到特别好的办法。
>
>祝好
>唐云
>
>From: 王炳焱 <15307491...@163.com>
>Sent: Tuesday, May 18, 2021 20:02
>To: user-zh@flink.apache.org 
>Subject: Flink upgraded to version 1.12.0 and started from SavePoint to report 
>an error
>
>我从flink1.10升级到flink1.12时,flink1.10的SQLapi的任务无法从savePoint中进行恢复,报错如下:
>
>
>2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup 
>[] - The operator name Calc(select=[((CAST((log_info 
>get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME 
>_UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 
>_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 
>_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 
>_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 
>_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 
>_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info 
>get_json_object2 _UTF-16LE'status') SEARCH 
>Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET 
>"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info 
>get_json_object2 _UTF-16LE'data.itemType') SEARCH 
>Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
>(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET 
>"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
>(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET 
>"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info 
>get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 
>characters length limit and was truncated.
>2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup 
>[] - The operator name 
>SourceConversion(table=[default_catalog.default_database.wkb_crm_order], 
>fields=[log_info, proctime]) exceeded the 80 characters length limit and was 
>truncated.
>2021-05-14 22:02:44,879 ERROR 
>org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
>Caught unexpected exception.
>java.io.IOException: Could not find class 
>'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
> in classpath.
>at 
>org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
> ~

Re: Flink upgraded to version 1.12.0 and started from SavePoint to report an error

2021-05-19 文章 Yun Tang
Hi

BaseRowSerializer 已经在Flink-1.11 时候改名成 RowDataSerializer了,即使用 
state-processor-API 也没办法处理当前不存在的类,可能有种比较复杂的办法是自己把 BaseRowSerializer 
的类不改变package的情况下拷贝出来,然后用 state-processor-API 将相关类强制转换成 
RowDataSerializer,不过考虑到你的job graph都是SQL生成的,state-processor-API面向地更多的是data 
stream API,这一块估计还挺难弄的,确实没有想到特别好的办法。

祝好
唐云

From: 王炳焱 <15307491...@163.com>
Sent: Tuesday, May 18, 2021 20:02
To: user-zh@flink.apache.org 
Subject: Flink upgraded to version 1.12.0 and started from SavePoint to report 
an error

我从flink1.10升级到flink1.12时,flink1.10的SQLapi的任务无法从savePoint中进行恢复,报错如下:


2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name Calc(select=[((CAST((log_info 
get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME 
_UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 
_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 
_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info 
get_json_object2 _UTF-16LE'status') SEARCH 
Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info 
get_json_object2 _UTF-16LE'data.itemType') SEARCH 
Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info 
get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 
characters length limit and was truncated.
2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name 
SourceConversion(table=[default_catalog.default_database.wkb_crm_order], 
fields=[log_info, proctime]) exceeded the 80 characters length limit and was 
truncated.
2021-05-14 22:02:44,879 ERROR 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Caught unexpected exception.
java.io.IOException: Could not find class 
'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
 in classpath.
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at

Flink upgraded to version 1.12.0 and started from SavePoint to report an error

2021-05-18 文章 王炳焱
我从flink1.10升级到flink1.12时,flink1.10的SQLapi的任务无法从savePoint中进行恢复,报错如下:


2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name Calc(select=[((CAST((log_info 
get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME 
_UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 
_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 
_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info 
get_json_object2 _UTF-16LE'status') SEARCH 
Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info 
get_json_object2 _UTF-16LE'data.itemType') SEARCH 
Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info 
get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 
characters length limit and was truncated.
2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name 
SourceConversion(table=[default_catalog.default_database.wkb_crm_order], 
fields=[log_info, proctime]) exceeded the 80 characters length limit and was 
truncated.
2021-05-14 22:02:44,879 ERROR 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Caught unexpected exception.
java.io.IOException: Could not find class 
'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
 in classpath.
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.str

Flink upgraded to version 1.12.0 and started from SavePoint to report an error

2021-05-16 文章 王炳焱
When I upgraded from Flink1.10.0 to Flink1.12.0.  Unable to restore SavePoint  
And prompt the following error  


2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name Calc(select=[((CAST((log_info 
get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME 
_UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 
_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 
_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info 
get_json_object2 _UTF-16LE'status') SEARCH 
Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info 
get_json_object2 _UTF-16LE'data.itemType') SEARCH 
Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info 
get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 
characters length limit and was truncated.
2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name 
SourceConversion(table=[default_catalog.default_database.wkb_crm_order], 
fields=[log_info, proctime]) exceeded the 80 characters length limit and was 
truncated.
2021-05-14 22:02:44,879 ERROR 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Caught unexpected exception.
java.io.IOException: Could not find class 
'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
 in classpath.
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(Backe

回复:flink 1.12.1 savepoint 重启问题

2021-04-28 文章 田向阳
我也遇到了,不知道啥原因,这个也是偶尔发生,是真的难定位问题


| |
田向阳
|
|
邮箱:lucas_...@163.com
|

签名由 网易邮箱大师 定制

在2021年04月28日 17:00,chaos 写道:
你好,因为业务逻辑变化,需要对线上在跑的flink任务重启,采用savapoint方式。

操作如下:
# 查看yarn-application-id
yarn application -list
# 查看flink任务id
flink list -t yarn-per-job
-Dyarn.application.id=application_1617064018715_0097
# 带有保存点的停止任务
flink stop -t yarn-per-job -p hdfs:///user/chaos/flink/0097_savepoint/
-Dyarn.application.id=application_1617064018715_0097
d7c1e0231f0e72cbd709baa9a0ba6415
Savepoint completed. Path:
hdfs://nameservice1/user/chaos/flink/0097_savepoint/savepoint-d7c1e0-96f5b07d18be
# 从保存点中重启任务
flink run -t yarn-per-job -s
hdfs://nameservice1/user/chaos/flink/0097_savepoint/savepoint-d7c1e0-96f5b07d18be
-d -c xxx xxx.jar

遇到的问题:
Service temporarily unavailable due to an ongoing leader election. Please
refresh

还请解惑,提前谢过!



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 1.12.1 savepoint 重启问题

2021-04-28 文章 chaos
你好,因为业务逻辑变化,需要对线上在跑的flink任务重启,采用savapoint方式。

操作如下:
# 查看yarn-application-id
yarn application -list
# 查看flink任务id
flink list -t yarn-per-job
-Dyarn.application.id=application_1617064018715_0097
# 带有保存点的停止任务
flink stop -t yarn-per-job -p hdfs:///user/chaos/flink/0097_savepoint/
-Dyarn.application.id=application_1617064018715_0097
d7c1e0231f0e72cbd709baa9a0ba6415
Savepoint completed. Path:
hdfs://nameservice1/user/chaos/flink/0097_savepoint/savepoint-d7c1e0-96f5b07d18be
# 从保存点中重启任务
flink run -t yarn-per-job -s
hdfs://nameservice1/user/chaos/flink/0097_savepoint/savepoint-d7c1e0-96f5b07d18be
-d -c xxx xxx.jar

遇到的问题:
Service temporarily unavailable due to an ongoing leader election. Please
refresh

还请解惑,提前谢过!



--
Sent from: http://apache-flink.147419.n8.nabble.com/


recovery from savepoint appear java.lang.NullPointerException

2021-04-27 文章 张美玲
2021-04-2716:19:34
java.lang.Exception: Exceptionwhile creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Thread.java:832)
Causedby: org.apache.flink.util.FlinkException: Couldnot restore keyed state 
backend forEvictingWindowOperator_cc588d0c8a499f01bdca964487231660_(1/1) 
fromanyof the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
... 9 more
Causedby: org.apache.flink.runtime.state.BackendBuildingException: Failedwhen 
trying to restore heap backend
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:115)
at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:357)
at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:104)
at 
org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:181)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 11 more
Causedby: java.lang.NullPointerException
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:307)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112)
... 17 more

Re: Flink savepoint迁移问题

2021-03-12 文章 赵 建云
确认了,pulsar的MessageId的实现类内部的增加了字段,导致flink在反序列化时失败了。具体的issue:https://github.com/streamnative/pulsar-flink/issues/256。
我会给flink 1.9的pulsar连接器升级下checkpoint,让MessageId的序列化使用基于 
`MessageId.toByteArray`的序列化器。
非常感谢您的帮助~。

Jianyun8023
2021-03-12

2021年3月11日 下午10:43,Kezhu Wang mailto:kez...@gmail.com>> 写道:

> 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。

确实是这样的,checkpoint 把 serializer 也 snapshot 了。

重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar 
的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用 
`MessageId.toByteArray`。



On March 11, 2021 at 20:26:15, 赵 建云 
(zhaojianyu...@outlook.com<mailto:zhaojianyu...@outlook.com>) wrote:

你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 
这个错误似乎发生在source的initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

 new ListStateDescriptor<>(
 OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
 })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 
state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 
(zhaojianyu...@outlook.com<mailto:zhaojianyu...@outlook.com>) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any 
of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: 
java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7Ccc0a40e3ae954472

回复: Flink savepoint迁移问题

2021-03-11 文章 allanqinjy
建云,
之前我也遇到了savepoint 起作业失败的问题,是我们升级pulsar客户端以后,从2.2升级到2.5.2,我-s 
启动作业的时候。因为作业也不是很重要,当时手头有其他任务,我就没有关注这个问题。你看看pulsar source那儿是不是做了什么。


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年03月11日 22:43,Kezhu Wang 写道:
有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。

确实是这样的,checkpoint 把 serializer 也 snapshot 了。

重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar
的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用
`MessageId.toByteArray`。


On March 11, 2021 at 20:26:15, 赵 建云 (zhaojianyu...@outlook.com) wrote:

你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的
initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang  写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
})));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator
的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more
Caused by: java.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D&reserved=0>
<http://java.io
<htt

Re: Flink savepoint迁移问题

2021-03-11 文章 Kezhu Wang
> 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。

确实是这样的,checkpoint 把 serializer 也 snapshot 了。

重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar
的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用
`MessageId.toByteArray`。


On March 11, 2021 at 20:26:15, 赵 建云 (zhaojianyu...@outlook.com) wrote:

你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的
initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang  写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

 new ListStateDescriptor<>(
 OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
 })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator
的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more
Caused by: java.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D&reserved=0>
<http://java.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed

Re: Flink savepoint迁移问题

2021-03-11 文章 赵 建云
你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 
这个错误似乎发生在source的initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

 new ListStateDescriptor<>(
 OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
 })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 
state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 
(zhaojianyu...@outlook.com<mailto:zhaojianyu...@outlook.com>) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any 
of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: 
java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D&reserved=0><http://java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734217175%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=VU%2Bgy0%2B6u%2BVe0Qj1maU3nvijfSH9hBdQApSnfFjq9G8%3D&reserved=0>>.EOFExc

Re: Flink savepoint迁移问题

2021-03-10 文章 赵 建云
现在是我在维护pulsar-flink 
connector,是存在不兼容的升级。还是个很坑的改动。我现在尝试旧的迁移新的字段上方法,会报这个错误。我对1.11支持的代码进行修改,将state的数据结构改成旧版本的形式,同样也是这个错误。你说的StatefulSinkWriterOperator我研究下怎么使用。

2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道:

StatefulSinkWriterOperator



Re: Flink savepoint迁移问题

2021-03-10 文章 Kezhu Wang
新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

 new ListStateDescriptor<>(
 OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
 })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator
的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more
Caused by: java.io<http://java.io>.EOFException: No more bytes left.
at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)

at 
com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readVarInt(Input.java:355)

at 
com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readInt(Input.java:350)

at
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)

at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)

at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)

at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializ

Flink savepoint迁移问题

2021-03-10 文章 赵 建云
社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
    })));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any 
of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.io<http://java.io>.EOFException: No more bytes left.
at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)
at 
com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readVarInt(Input.java:355)
at 
com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readInt(Input.java:350)
at 
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)
at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
... 15 more

请问题大佬们可以提供排查问题的办法或者解决方案吗?


Jianyun8023
2021-3-11


Re: flink-savepoint问题

2021-03-03 文章 Congxian Qiu
对于 keyed state,需要保证同一个 key 在 同一个 keygroup 中,如果是某个 key 有热点,可以在 keyby 之前进行一次
map(在 key 后面拼接一些 后缀),然后 keyby,最后处理完成之后,将这些进行聚合
Best,
Congxian


guomuhua <663021...@qq.com> 于2021年3月4日周四 下午12:49写道:

> 我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢?
> nobleyd wrote
> > 是不是使用了随机key。
>
> > guaishushu1103@
>
> >  <
>
> > guaishushu1103@
>
> > > 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:>
> > java.lang.Exception: Could not materialize checkpoint 2404 for operator>
> > KeyedProcess (21/48).> at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)>
>
> > at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)>
>
> > at>
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)>
>
> > at>
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)>
>
> > at java.lang.Thread.run(Thread.java:745)> Caused by:
> > java.util.concurrent.ExecutionException:>
> > java.lang.IllegalArgumentException: Key group 0 is not in>
> > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at
> > java.util.concurrent.FutureTask.report(FutureTask.java:122)> at
> > java.util.concurrent.FutureTask.get(FutureTask.java:192)> at>
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)>
>
> > at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.
>
> > (OperatorSnapshotFinalizer.java:47)> at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)>
>
> > ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is
> > not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at>
> >
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)>
>
> > at>
> >
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)>
>
> > at>
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)>
>
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at>
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)>
>
> > ... 5 more>>>
>
> > guaishushu1103@
>
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-savepoint问题

2021-03-03 文章 guomuhua
我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢?
nobleyd wrote
> 是不是使用了随机key。

> guaishushu1103@

>  <

> guaishushu1103@

> > 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:>
> java.lang.Exception: Could not materialize checkpoint 2404 for operator>
> KeyedProcess (21/48).> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)>
> 
> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)>
> 
> at>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)>
> 
> at>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)>
> 
> at java.lang.Thread.run(Thread.java:745)> Caused by:
> java.util.concurrent.ExecutionException:>
> java.lang.IllegalArgumentException: Key group 0 is not in>
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at
> java.util.concurrent.FutureTask.report(FutureTask.java:122)> at
> java.util.concurrent.FutureTask.get(FutureTask.java:192)> at>
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)>
> 
> at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.

> (OperatorSnapshotFinalizer.java:47)> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)>
> 
> ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is
> not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at>
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)>
> 
> at>
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)>
> 
> at>
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)>
> 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at>
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)>
> 
> ... 5 more>>> 

> guaishushu1103@

>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-savepoint问题

2021-03-03 文章 guomuhua
我也遇到同样问题,为了打散数据,在keyby时加了随机数作为后缀,去掉随机数,可以正常savepoint,加上随机数就savepoint失败。所以如果确有要打散数据的需求,应该怎么处理呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-savepoint问题

2021-03-03 文章 yidan zhao
是不是使用了随机key。

guaishushu1...@163.com  于2021年3月3日周三 下午6:53写道:

> checkpoint 可以成功保存,但是savepoint出现错误:
> java.lang.Exception: Could not materialize checkpoint 2404 for operator
> KeyedProcess (21/48).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
> ... 3 more
> Caused by: java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
> ... 5 more
>
>
> guaishushu1...@163.com
>


flink-savepoint问题

2021-03-03 文章 guaishushu1...@163.com
checkpoint 可以成功保存,但是savepoint出现错误:
java.lang.Exception: Could not materialize checkpoint 2404 for operator 
KeyedProcess (21/48).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: 
java.lang.IllegalArgumentException: Key group 0 is not in 
KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
... 3 more
Caused by: java.lang.IllegalArgumentException: Key group 0 is not in 
KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
at 
org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at 
org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
... 5 more


guaishushu1...@163.com


TaggedOperatorSubtaskState is missing when creating a new savepoint using state processor api

2021-02-01 文章 ????????
state processor apikafkastate??savepointmax 
parallelismsavepoint??
 java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for StreamSource_e8ea6e352a1a627513ffbd4573fa1628_(1/1) from any 
of the 1 provided restore options.
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
    at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
    at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
    ... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 
'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
    at 
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
    at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
    at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
    at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
    at 
java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
    at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
    at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
    at 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
    at 
org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
    at 
org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
    at 
org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113)
    at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:94)
    at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
    ... 15 more


??
@Override
 public void createNewSavepoint(ExecutionEnvironment env, String 
savepointPath, StateBackend stateBackend,
 ParameterTool config) {
    // ??max parallelism??savepoint
     String savepointOutputPath = 
config.get(EapSavepointConstants.EAP_SAVEPOINT_O

TaggedOperatorSubtaskState is missing when creating a new savepoint using state processor api

2021-01-26 文章 郭斌
Hi, 各位好:
   我在使用state processor api创建新的包含kafka相关state的savepoint用来修改max 
parallelism时,创建成功后使用此savepoint来重启任务,发现抛出如下异常:
  {code}
  java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for StreamSource_e8ea6e352a1a627513ffbd4573fa1628_(1/1) from any 
of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 
'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
at 
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
at 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
at 
org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
at 
org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
at 
org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113)
at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:94)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
... 15 more
  {code}
我的使用方式如下:
@Override
 public void createNewSavepoint(ExecutionEnvironment env, String savepointPath, 
StateBackend stateBackend,
 ParameterTool config) {
// 加载未修改max parallelism的savepoint
 String savepointOutputPath = 
config.get(EapSavepointConstants.EAP_SAVEPOINT_OUTPUT_PATH);
   // 新的max parallelis

Re: yarn-per-job 模式 savepoint执行保存点报错

2021-01-20 文章 zhisheng
检查一下作业是否有反压的情况?目前我们也有遇到这种情况就是作业在反压的情况下,对作业做一次 savepoint
其实是很难完成的,经常超时,社区目前的版本还不支持单独设置 savepoint 的超时时间。


刘海  于2021年1月21日周四 上午10:24写道:

> Hi
>  我目前在进行保存点相关的测试,目前执行命令报如下错误,从错误内容上看是超时,但是没有更多的信息了,有知道大致原因希望指点一下,拜谢
>
>
> flink1.12 yarn-per-job 模式
> jobID:fea3d87f138ef4c260ffe9324acc0e51
> yarnID : application_1610788069646_0021
> 执行的命令如下:
> ./bin/flink savepoint -t yarn-per-job -D 
> yarn.application.id=application_1610788069646_0021
> fea3d87f138ef4c260ffe9324acc0e51
>
>
> 报错如下:
>
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> fea3d87f138ef4c260ffe9324acc0e51 failed.
> at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:712)
>     at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:690)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919)
> at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:687)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:989)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
> Caused by: java.util.concurrent.TimeoutException
> at
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168)
> at
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
>
> 祝好!
> | |
> 刘海
> |
> |
> liuha...@163.com
> |
> 签名由网易邮箱大师定制


Re:yarn-per-job 模式 savepoint执行保存点报错

2021-01-20 文章 guanyq
./bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
./bin/flink savepoint fea3d87f138ef4c260ffe9324acc0e51 [:targetDirectory] 
application_1610788069646_0021



[:targetDirectory] 

hdfs:///flink/savepoints











在 2021-01-21 10:24:31,"刘海"  写道:
>Hi
> 我目前在进行保存点相关的测试,目前执行命令报如下错误,从错误内容上看是超时,但是没有更多的信息了,有知道大致原因希望指点一下,拜谢
>
>
>flink1.12 yarn-per-job 模式
>jobID:fea3d87f138ef4c260ffe9324acc0e51  
>yarnID : application_1610788069646_0021 
>执行的命令如下:
>./bin/flink savepoint -t yarn-per-job -D 
>yarn.application.id=application_1610788069646_0021 
>fea3d87f138ef4c260ffe9324acc0e51
>
>
>报错如下:
>
>
>org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
>fea3d87f138ef4c260ffe9324acc0e51 failed.
>at 
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:712)
>at 
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:690)
>at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919)
>at 
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:687)
>at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:989)
>at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
>at java.security.AccessController.doPrivileged(Native Method)
>at javax.security.auth.Subject.doAs(Subject.java:422)
>at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
>Caused by: java.util.concurrent.TimeoutException
>at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168)
>at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549)
>at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>at java.lang.Thread.run(Thread.java:748)
>
>
>祝好!
>| |
>刘海
>|
>|
>liuha...@163.com
>|
>签名由网易邮箱大师定制


yarn-per-job 模式 savepoint执行保存点报错

2021-01-20 文章 刘海
Hi
 我目前在进行保存点相关的测试,目前执行命令报如下错误,从错误内容上看是超时,但是没有更多的信息了,有知道大致原因希望指点一下,拜谢


flink1.12 yarn-per-job 模式
jobID:fea3d87f138ef4c260ffe9324acc0e51  
yarnID : application_1610788069646_0021 
执行的命令如下:
./bin/flink savepoint -t yarn-per-job -D 
yarn.application.id=application_1610788069646_0021 
fea3d87f138ef4c260ffe9324acc0e51


报错如下:


org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
fea3d87f138ef4c260ffe9324acc0e51 failed.
at 
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:712)
at 
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:690)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919)
at 
org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:687)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:989)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168)
at 
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


祝好!
| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制

Re: 请教个Flink savepoint的问题

2021-01-11 文章 占英华
好的,感谢您的帮助!

> 在 2021年1月11日,20:23,Yun Tang  写道:
> 
> Hi,
> 
> 没有暴露现成的API,但是可以参照测试用例的写法,给jobGraph设置 savepointRestoreSettings [1]。
> 
> [1] 
> https://github.com/apache/flink/blob/ab87cf4ec0260c30194f3d8e75be1f43318aa32a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java#L383
> 
> 祝好
> 唐云
> 
> From: yinghua...@163.com 
> Sent: Monday, January 11, 2021 19:07
> To: user-zh 
> Subject: 请教个Flink savepoint的问题
> 
> Flink APi 中我们在java代码中使用客户端调用类ClusterClient如下的方法停止了任务:
> CompletableFuture stopWithSavepoint(JobID var1, boolean var2, 
> @Nullable String 
> var3);然后我们保存了该任务的savepoint信息,那恢复时没看到对应的恢复方法,找了些资料都是通过命令来恢复,有没有直接调用Java API 
> 层次的方法来恢复任务?
> 
> 
> 
> yinghua...@163.com




Re: 请教个Flink savepoint的问题

2021-01-11 文章 Yun Tang
Hi,

没有暴露现成的API,但是可以参照测试用例的写法,给jobGraph设置 savepointRestoreSettings [1]。

[1] 
https://github.com/apache/flink/blob/ab87cf4ec0260c30194f3d8e75be1f43318aa32a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java#L383

祝好
唐云

From: yinghua...@163.com 
Sent: Monday, January 11, 2021 19:07
To: user-zh 
Subject: 请教个Flink savepoint的问题

Flink APi 中我们在java代码中使用客户端调用类ClusterClient如下的方法停止了任务:
CompletableFuture stopWithSavepoint(JobID var1, boolean var2, @Nullable 
String var3);然后我们保存了该任务的savepoint信息,那恢复时没看到对应的恢复方法,找了些资料都是通过命令来恢复,有没有直接调用Java 
API 层次的方法来恢复任务?



yinghua...@163.com


请教个Flink savepoint的问题

2021-01-11 文章 yinghua...@163.com
Flink APi 中我们在java代码中使用客户端调用类ClusterClient如下的方法停止了任务:
CompletableFuture stopWithSavepoint(JobID var1, boolean var2, @Nullable 
String var3);然后我们保存了该任务的savepoint信息,那恢复时没看到对应的恢复方法,找了些资料都是通过命令来恢复,有没有直接调用Java 
API 层次的方法来恢复任务?



yinghua...@163.com


回复: flink如何使用oss作为checkpoint/savepoint/statebackend?

2020-12-31 文章 changleying


发送自 Windows 10 版邮件应用

发件人: Yun Tang
发送时间: 2020年12月31日 10:00
收件人: user-zh@flink.apache.org
主题: Re: flink如何使用oss作为checkpoint/savepoint/statebackend?

Hi

其实社区文档 [1] 已经给了很详细的步骤:

  1.  将flink-oss-fs-hadoop jar包放在plugins目录下
  2.  配置oss的endpoint,id和secret
  3.  在需要使用oss的地方,声明oss:// 开头的schema,例如state backend创建的时候

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/oss.html

祝好
唐云

From: 陈帅 
Sent: Wednesday, December 30, 2020 20:53
To: user-zh@flink.apache.org 
Subject: flink如何使用oss作为checkpoint/savepoint/statebackend?

请问flink如何使用oss作为checkpoint/savepoint/statebackend? 需要依赖Hadoop并配置Hadoop on OSS吗?



Re: flink如何使用oss作为checkpoint/savepoint/statebackend?

2020-12-30 文章 Yun Tang
Hi

其实社区文档 [1] 已经给了很详细的步骤:

  1.  将flink-oss-fs-hadoop jar包放在plugins目录下
  2.  配置oss的endpoint,id和secret
  3.  在需要使用oss的地方,声明oss:// 开头的schema,例如state backend创建的时候

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/oss.html

祝好
唐云

From: 陈帅 
Sent: Wednesday, December 30, 2020 20:53
To: user-zh@flink.apache.org 
Subject: flink如何使用oss作为checkpoint/savepoint/statebackend?

请问flink如何使用oss作为checkpoint/savepoint/statebackend? 需要依赖Hadoop并配置Hadoop on OSS吗?


flink如何使用oss作为checkpoint/savepoint/statebackend?

2020-12-30 文章 陈帅
请问flink如何使用oss作为checkpoint/savepoint/statebackend? 需要依赖Hadoop并配置Hadoop on OSS吗?


Re: Command: Flink savepoint -d reported an error。

2020-12-27 文章 赢峰


这个有解决吗?我的也是报 Missing required argument: savepoint path. Usage: bin/flink 
savepoint -d 
| |
赢峰
|
|
si_ji_f...@163.com
|
签名由网易邮箱大师定制


On 09/30/2019 15:06,pengchengl...@163.com wrote:
你好,感谢回复,我的命令是参照官方网站,https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#trigger-a-savepoint
$ bin/flink savepoint -d :savepointPath
这个命令是删除savepoint,你说的是触发savepoint,是没有问题的。


pengchengl...@163.com

发件人: 狄玉坤
发送时间: 2019-09-30 14:59
收件人: user-zh@flink.apache.org
主题: Re: Re: Command: Flink savepoint -d reported an error。
缺少了jobId 参数;
./bin/flink savepoint  [savepointDirectory]
This will trigger a savepoint for the job with ID jobId, and returns the path 
of the created savepoint. You need this path to restore and dispose savepoints.
| |
diyukun2019
|
|
diyukun2...@163.com
|
签名由网易邮箱大师定制
On 9/30/2019 14:56,pengchengl...@163.com wrote:
谢谢你的回复,我试了下,还是说缺少必填参数。
如下:
$ bin/flink savepoint -d 
file://home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/


The program finished with the following exception:

java.lang.NullPointerException: Missing required argument: savepoint path. 
Usage: bin/flink savepoint -d 
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at 
org.apache.flink.client.cli.CliFrontend.disposeSavepoint(CliFrontend.java:721)
at 
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$8(CliFrontend.java:657)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:945)
at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:654)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)




pengchengl...@163.com

发件人: 星沉
发送时间: 2019-09-30 14:47
收件人: user-zh
主题: Re: Command: Flink savepoint -d reported an error。
savepoint路径参数不对。file后面多了个一个/吧。


pengchengl...@163.com wrote:
$ bin/flink savepoint 
-dfile:///home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/


The program finished with the following exception:

java.lang.NullPointerException: Missing required argument: savepoint path. 
Usage: bin/flink savepoint -d 


Re: flink-1.11.2 rocksdb when trigger savepoint job fail and restart

2020-12-10 文章 Yun Tang
Hi

请问你的TM是单slot吧,managed memory是多少? RocksDB state 
backend在执行savepoint的时候,会使用一个iterator来遍历数据,所以会存在额外的内存开销(并且该部分开销并不属于write 
buffer与block 
cache管理的部分),当然RocksDB的iterator是一个多层的最小堆iterator,理论上来说占用的临时内存并不会太多。不知你们能否将程序抽象成一个必现的demo来给我们做debug呢?

至于如何解决该问题,可以考虑增大JVM overhead [1] 来增大该部分的buffer空间。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12//deployment/memory/mem_setup.html

祝好
唐云

From: smailxie 
Sent: Thursday, December 10, 2020 17:42
To: user-zh@flink.apache.org 
Subject: flink-1.11.2 rocksdb when trigger savepoint job fail and restart







我有一个sql job,跑的任务是双流jion,状态保留12 �C 
24小时,checkpoint是正常的,状态大小在300M到4G之间,当手动触发savepoint时,容器会被杀死,原因是超出内存限制(申请的内存是单slot 
5G)。

我想问的是,rocksdb,在savepiont时,是把所有的磁盘状态读入内存,然后再全量快照?

如果是这样,后续版本有没有优化?不然每次磁盘状态超过托管内存,一手动savepoint,job就会被杀死。

下面是报错信息。



2020-12-10 09:18:50

java.lang.Exception: Container 
[pid=33290,containerID=container_e47_1594105654926_6890682_01_02] is 
running beyond physical memory limits. Current usage: 5.1 GB of 5 GB physical 
memory used; 7.4 GB of 10.5 GB virtual memory used. Killing container.

Dump of the process-tree for container_e47_1594105654926_6890682_01_02 :

  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE

  |- 4 33290 33290 33290 (java) 787940 76501 7842340864 1337121 
/usr/java/default/bin/java -Xmx1234802980 -Xms1234802980 
-XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=456340281b -D 
taskmanager.memory.network.min=456340281b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=2738041755b -D taskmanager.cpu.cores=5.0 -D 
taskmanager.memory.task.heap.size=1100585252b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address=shd177.yonghui.cn -Dpipeline.classpaths= -Dweb.port=0 
-Dexecution.target=embedded 
-Dweb.tmpdir=/tmp/flink-web-c415ad8e-c019-4398-869d-7c9e540c2479 
-Djobmanager.rpc.port=44058 
-Dpipeline.jars=file:/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/yh-datacenter-platform-flink-1.0.0.jar
 -Drest.address=shd177.yonghui.cn 
-Dsecurity.kerberos.login.keytab=/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/krb5.keytab

  |- 33290 33288 33290 33290 (bash) 0 0 108679168 318 /bin/bash -c 
/usr/java/default/bin/java -Xmx1234802980 -Xms1234802980 
-XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=456340281b -D 
taskmanager.memory.network.min=456340281b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=2738041755b -D taskmanager.cpu.cores=5.0 -D 
taskmanager.memory.task.heap.size=1100585252b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address='shd177.yonghui.cn' -Dpipeline.classpaths='' 
-Dweb.port='0' -Dexecution.target='embedded' 
-Dweb.tmpdir='/tmp/flink-web-c415ad8e-c019-4398-869d-7c9e540c2479' 
-Djobmanager.rpc.port='44058' 
-Dpipeline.jars='file:/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/yh-datacenter-platform-flink-1.0.0.jar'
 -Drest.address='shd177.yonghui.cn' 
-Dsecurity.kerberos.login.keytab='/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/krb5.keytab'
 1> 
/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.out
 2> 
/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.err



Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143



发送自 Windows 10 版邮件应用









--

Name:谢波
Mobile:13764228893


flink-1.11.2 rocksdb when trigger savepoint job fail and restart

2020-12-10 文章 smailxie






我有一个sql job,跑的任务是双流jion,状态保留12 – 
24小时,checkpoint是正常的,状态大小在300M到4G之间,当手动触发savepoint时,容器会被杀死,原因是超出内存限制(申请的内存是单slot 
5G)。

我想问的是,rocksdb,在savepiont时,是把所有的磁盘状态读入内存,然后再全量快照?

如果是这样,后续版本有没有优化?不然每次磁盘状态超过托管内存,一手动savepoint,job就会被杀死。

下面是报错信息。

 

2020-12-10 09:18:50

java.lang.Exception: Container 
[pid=33290,containerID=container_e47_1594105654926_6890682_01_02] is 
running beyond physical memory limits. Current usage: 5.1 GB of 5 GB physical 
memory used; 7.4 GB of 10.5 GB virtual memory used. Killing container.

Dump of the process-tree for container_e47_1594105654926_6890682_01_02 :

  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE

  |- 4 33290 33290 33290 (java) 787940 76501 7842340864 1337121 
/usr/java/default/bin/java -Xmx1234802980 -Xms1234802980 
-XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=456340281b -D 
taskmanager.memory.network.min=456340281b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=2738041755b -D taskmanager.cpu.cores=5.0 -D 
taskmanager.memory.task.heap.size=1100585252b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address=shd177.yonghui.cn -Dpipeline.classpaths= -Dweb.port=0 
-Dexecution.target=embedded 
-Dweb.tmpdir=/tmp/flink-web-c415ad8e-c019-4398-869d-7c9e540c2479 
-Djobmanager.rpc.port=44058 
-Dpipeline.jars=file:/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/yh-datacenter-platform-flink-1.0.0.jar
 -Drest.address=shd177.yonghui.cn 
-Dsecurity.kerberos.login.keytab=/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/krb5.keytab

  |- 33290 33288 33290 33290 (bash) 0 0 108679168 318 /bin/bash -c 
/usr/java/default/bin/java -Xmx1234802980 -Xms1234802980 
-XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=456340281b -D 
taskmanager.memory.network.min=456340281b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=2738041755b -D taskmanager.cpu.cores=5.0 -D 
taskmanager.memory.task.heap.size=1100585252b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address='shd177.yonghui.cn' -Dpipeline.classpaths='' 
-Dweb.port='0' -Dexecution.target='embedded' 
-Dweb.tmpdir='/tmp/flink-web-c415ad8e-c019-4398-869d-7c9e540c2479' 
-Djobmanager.rpc.port='44058' 
-Dpipeline.jars='file:/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/yh-datacenter-platform-flink-1.0.0.jar'
 -Drest.address='shd177.yonghui.cn' 
-Dsecurity.kerberos.login.keytab='/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/krb5.keytab'
 1> 
/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.out
 2> 
/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.err

 

Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143

 

发送自 Windows 10 版邮件应用

 







--

Name:谢波
Mobile:13764228893


Re: 修改topic名称后从Savepoint重启会怎么消费Kafka

2020-12-02 文章 zhisheng
这个是正解,参考之前提的一个 Issue https://issues.apache.org/jira/browse/FLINK-16865

Best
zhisheng

Shuai Xia  于2020年12月2日周三 下午2:03写道:

>
> hi,实时上并不是你说的这样,从sp重启时因为存在RestoreState,而且Topic名称被修改,会导致restoredState内找不到新的KafkaTopicPartition
> 新的消费位置会置为EARLIEST_OFFSET
>
>
> if (restoredState != null) {
>for (KafkaTopicPartition partition : allPartitions) {
>   if (!restoredState.containsKey(partition)) {
>  restoredState.put(partition,
> KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
>   }
>}
>
>
>
>
> --
> 发件人:熊云昆 
> 发送时间:2020年12月1日(星期二) 22:57
> 收件人:user-zh ; Shuai Xia 
> 主 题:Re:修改topic名称后从Savepoint重启会怎么消费Kafka
>
>
> 可以手动控制的,如果不设置默认是从最新位置开始消费,否则按照你的startup-mode来
>
>
>
>
>
> 在 2020-12-01 20:59:48,"Shuai Xia"  写道:
> >
> >Hi,大佬们
> >突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。
> >会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢?
> >可以手动控制么?
>
>
>


回复:修改topic名称后从Savepoint重启会怎么消费Kafka

2020-12-01 文章 Shuai Xia
hi,实时上并不是你说的这样,从sp重启时因为存在RestoreState,而且Topic名称被修改,会导致restoredState内找不到新的KafkaTopicPartition
新的消费位置会置为EARLIEST_OFFSET


if (restoredState != null) {
   for (KafkaTopicPartition partition : allPartitions) {
  if (!restoredState.containsKey(partition)) {
 restoredState.put(partition, 
KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
  }
   }




--
发件人:熊云昆 
发送时间:2020年12月1日(星期二) 22:57
收件人:user-zh ; Shuai Xia 
主 题:Re:修改topic名称后从Savepoint重启会怎么消费Kafka


可以手动控制的,如果不设置默认是从最新位置开始消费,否则按照你的startup-mode来





在 2020-12-01 20:59:48,"Shuai Xia"  写道:
>
>Hi,大佬们
>突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。
>会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢?
>可以手动控制么?




Re:修改topic名称后从Savepoint重启会怎么消费Kafka

2020-12-01 文章 熊云昆



可以手动控制的,如果不设置默认是从最新位置开始消费,否则按照你的startup-mode来














在 2020-12-01 20:59:48,"Shuai Xia"  写道:
>
>Hi,大佬们
>突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。
>会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢?
>可以手动控制么?


修改topic名称后从Savepoint重启会怎么消费Kafka

2020-12-01 文章 Shuai Xia

Hi,大佬们
突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。
会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢?
可以手动控制么?

从 savepoint 中恢复应用时发生 KafkaException: Unexpected error in InitProducerIdResponse

2020-11-16 文章 chang chan
 这个程序用于测试 flink kafka exactly once, 如果普通提交可以正常运行, 但如果从 savepoint 中恢复就会报下面的错误
kafka server 端, 配置了 transaction.max.timeout.ms = 360 client producer 端
配置了 transaction.timeout.ms = 90

参考代码:

https://gist.github.com/giraffe-tree/15c5f707d9dfe3221959ae37b4e9d786
2020-11-17 15:24:51
org.apache.kafka.common.KafkaException: Unexpected error in
InitProducerIdResponse; Producer attempted an operation with an old epoch.
Either there is a newer producer with the same transactionalId, or the
producer's transaction has been expired by the broker.
at org.apache.kafka.clients.producer.internals.
TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
.java:1370)
at org.apache.kafka.clients.producer.internals.
TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278
)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(
NetworkClient.java:566)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558)
at org.apache.kafka.clients.producer.internals.Sender
.maybeSendAndPollTransactionalRequest(Sender.java:415)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
.java:313)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
240)
at java.lang.Thread.run(Thread.java:748)

查阅了 google 上的相关资料, 但是仍然无法解决, 有人遇到过类似的问题? 或者能提供排查思路么?


Re: flink savepoint 异常

2020-11-09 文章 Congxian Qiu
Hi
 异常信息中有 “Failed to trigger savepoint. Failure reason: An Exception
occurred while triggering the checkpoint.”  或许你可以看看 JM 的日志,找一下看看有没有什么详细日志
Best,
Congxian


张锴  于2020年11月7日周六 下午4:14写道:

> 本人用flink 1.10.1版本进行savepoint时遇到下列错误,暂时不清楚错误的原因,特来寻求帮助,麻烦大佬们看看
>
> 已经排除反压和重启的原因,checkpoint超时设置了十分钟,conf配置增加客户端连接master的时间,但还是出现异常。
>
> 命令
>
> flink savepoint -yid application_1604456903594_2381
> fb8131bcb78cbdf2bb9a705d8a4ceebc
> hdfs:///hadoopnamenodeHA/flink/flink-savepoints
>
> 异常
>
> The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> fb8131bcb78cbdf2bb9a705d8a4ceebc failed.
>  at
>
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
>  at
>
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
>  at
>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
>  at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
>  at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
>  at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>  at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger
> savepoint. Failure reason: An Exception occurred while triggering the
> checkpoint.
>  at
>
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:756)
>  at
>
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>  at
>
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  at
>
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>  at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>  at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>  at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>  at akka.japi.pf.UnitCaseState
>


Re: flink savepoint

2020-11-08 文章 张锴
看到了,通过JM看到是写的权限没有,改了之后就好了

Congxian Qiu  于2020年11月6日周五 下午1:31写道:

> Hi
>  从 client 端日志,或者 JM 日志还能看到其他的异常么?
> Best,
> Congxian
>
>
> 张锴  于2020年11月6日周五 上午11:42写道:
>
> > 重启和反压都正常
> > 另外增加了从客户端到master的时间,还是有这个问题
> >
> > hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
> >
> > > Hi,
> > >
> > >
> > > 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> > > 具体的原因需要看下 Jobmaster 的日志。
> > > PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
> > >
> > >
> > > Best,
> > > Hailong Wang
> > >
> > >
> > >
> > >
> > > 在 2020-11-06 09:33:48,"张锴"  写道:
> > > >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> > > >
> > > >flink 版本1.10.1
> > > >
> > > >
> > > >执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
> > > >hdfs://hadoopnamenodeHA/flink/flink-savepoints
> > > >
> > > >
> > > >出现错误信息
> > > >
> > > >
> > > >org.apache.flink.util.FlinkException: Triggering a savepoint for the
> job
> > > >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> > > >
> > > > at
> > > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> > > >
> > > > at java.security.AccessController.doPrivileged(Native Method)
> > > >
> > > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > >
> > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> > > >
> > > >Caused by: java.util.concurrent.TimeoutException
> > > >
> > > > at
> > >
> > >
> >
> >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> > > >
> > > > at
> > > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
> > >
> >
>


Re: flink savepoint

2020-11-08 文章 张锴
已经指定了

admin <17626017...@163.com> 于2020年11月6日周五 下午3:17写道:

> Hi,
> 你的任务时跑在yarn上的吗?如果是 需要指定 -yid
>
> > 2020年11月6日 下午1:31,Congxian Qiu  写道:
> >
> > Hi
> > 从 client 端日志,或者 JM 日志还能看到其他的异常么?
> > Best,
> > Congxian
> >
> >
> > 张锴  于2020年11月6日周五 上午11:42写道:
> >
> >> 重启和反压都正常
> >> 另外增加了从客户端到master的时间,还是有这个问题
> >>
> >> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
> >>
> >>> Hi,
> >>>
> >>>
> >>> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> >>> 具体的原因需要看下 Jobmaster 的日志。
> >>> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
> >>>
> >>>
> >>> Best,
> >>> Hailong Wang
> >>>
> >>>
> >>>
> >>>
> >>> 在 2020-11-06 09:33:48,"张锴"  写道:
> >>>> 本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> >>>>
> >>>> flink 版本1.10.1
> >>>>
> >>>>
> >>>> 执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
> >>>> hdfs://hadoopnamenodeHA/flink/flink-savepoints
> >>>>
> >>>>
> >>>> 出现错误信息
> >>>>
> >>>>
> >>>> org.apache.flink.util.FlinkException: Triggering a savepoint for the
> job
> >>>> a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> >>>>
> >>>> at
> >>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> >>>>
> >>>> at java.security.AccessController.doPrivileged(Native Method)
> >>>>
> >>>> at javax.security.auth.Subject.doAs(Subject.java:422)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >>>>
> >>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> >>>>
> >>>> Caused by: java.util.concurrent.TimeoutException
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> >>>>
> >>>> at
> >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
> >>>
> >>
>
>


flink savepoint 异常

2020-11-07 文章 张锴
本人用flink 1.10.1版本进行savepoint时遇到下列错误,暂时不清楚错误的原因,特来寻求帮助,麻烦大佬们看看

已经排除反压和重启的原因,checkpoint超时设置了十分钟,conf配置增加客户端连接master的时间,但还是出现异常。

命令

flink savepoint -yid application_1604456903594_2381
fb8131bcb78cbdf2bb9a705d8a4ceebc
hdfs:///hadoopnamenodeHA/flink/flink-savepoints

异常

The program finished with the following exception:

org.apache.flink.util.FlinkException: Triggering a savepoint for the job
fb8131bcb78cbdf2bb9a705d8a4ceebc failed.
 at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
 at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
 at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
 at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
 at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
 at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
 at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger
savepoint. Failure reason: An Exception occurred while triggering the
checkpoint.
 at
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:756)
 at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
 at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
 at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 at akka.japi.pf.UnitCaseState


Re: flink savepoint

2020-11-05 文章 admin
Hi,
你的任务时跑在yarn上的吗?如果是 需要指定 -yid

> 2020年11月6日 下午1:31,Congxian Qiu  写道:
> 
> Hi
> 从 client 端日志,或者 JM 日志还能看到其他的异常么?
> Best,
> Congxian
> 
> 
> 张锴  于2020年11月6日周五 上午11:42写道:
> 
>> 重启和反压都正常
>> 另外增加了从客户端到master的时间,还是有这个问题
>> 
>> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
>> 
>>> Hi,
>>> 
>>> 
>>> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
>>> 具体的原因需要看下 Jobmaster 的日志。
>>> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
>>> 
>>> 
>>> Best,
>>> Hailong Wang
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-11-06 09:33:48,"张锴"  写道:
>>>> 本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
>>>> 
>>>> flink 版本1.10.1
>>>> 
>>>> 
>>>> 执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
>>>> hdfs://hadoopnamenodeHA/flink/flink-savepoints
>>>> 
>>>> 
>>>> 出现错误信息
>>>> 
>>>> 
>>>> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
>>>> a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
>>>> 
>>>> at
>>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>>>> 
>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>> 
>>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>>> 
>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>>>> 
>>>> Caused by: java.util.concurrent.TimeoutException
>>>> 
>>>> at
>>> 
>>> 
>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>>> 
>>>> at
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
>>> 
>> 



Re: flink savepoint

2020-11-05 文章 Congxian Qiu
Hi
 从 client 端日志,或者 JM 日志还能看到其他的异常么?
Best,
Congxian


张锴  于2020年11月6日周五 上午11:42写道:

> 重启和反压都正常
> 另外增加了从客户端到master的时间,还是有这个问题
>
> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
>
> > Hi,
> >
> >
> > 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> > 具体的原因需要看下 Jobmaster 的日志。
> > PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
> >
> >
> > Best,
> > Hailong Wang
> >
> >
> >
> >
> > 在 2020-11-06 09:33:48,"张锴"  写道:
> > >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> > >
> > >flink 版本1.10.1
> > >
> > >
> > >执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
> > >hdfs://hadoopnamenodeHA/flink/flink-savepoints
> > >
> > >
> > >出现错误信息
> > >
> > >
> > >org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> > >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> > >
> > > at
> > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> > >
> > > at java.security.AccessController.doPrivileged(Native Method)
> > >
> > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > >
> > > at
> >
> >
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> > >
> > > at
> >
> >
> >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > >
> > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> > >
> > >Caused by: java.util.concurrent.TimeoutException
> > >
> > > at
> >
> >
> >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> > >
> > > at
> > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
> >
>


Re: flink savepoint

2020-11-05 文章 张锴
重启和反压都正常
另外增加了从客户端到master的时间,还是有这个问题

hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:

> Hi,
>
>
> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> 具体的原因需要看下 Jobmaster 的日志。
> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
>
>
> Best,
> Hailong Wang
>
>
>
>
> 在 2020-11-06 09:33:48,"张锴"  写道:
> >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> >
> >flink 版本1.10.1
> >
> >
> >执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
> >hdfs://hadoopnamenodeHA/flink/flink-savepoints
> >
> >
> >出现错误信息
> >
> >
> >org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> >
> > at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> >
> > at java.security.AccessController.doPrivileged(Native Method)
> >
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> >
> > at
>
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> >
> > at
>
> >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> >
> >Caused by: java.util.concurrent.TimeoutException
> >
> > at
>
> >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> >
> > at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
>


Re:flink savepoint

2020-11-05 文章 hailongwang
Hi,


这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
具体的原因需要看下 Jobmaster 的日志。
PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。


Best,
Hailong Wang




在 2020-11-06 09:33:48,"张锴"  写道:
>本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
>
>flink 版本1.10.1
>
>
>执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
>hdfs://hadoopnamenodeHA/flink/flink-savepoints
>
>
>出现错误信息
>
>
>org.apache.flink.util.FlinkException: Triggering a savepoint for the job
>a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
>
> at
>org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
>
> at
>org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
>
> at
>org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
>
> at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
>
> at
>org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
>
> at
>org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
>org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>
> at
>org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>
>Caused by: java.util.concurrent.TimeoutException
>
> at
>java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>
> at
>org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)


flink savepoint

2020-11-05 文章 张锴
本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。

flink 版本1.10.1


执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
hdfs://hadoopnamenodeHA/flink/flink-savepoints


出现错误信息


org.apache.flink.util.FlinkException: Triggering a savepoint for the job
a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.

 at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)

 at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)

 at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)

 at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)

 at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)

 at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)

 at java.security.AccessController.doPrivileged(Native Method)

 at javax.security.auth.Subject.doAs(Subject.java:422)

 at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)

 at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)

Caused by: java.util.concurrent.TimeoutException

 at
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)

 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)

 at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)


Re:几十G状态数据时,savepoint失败

2020-10-21 文章 hailongwang



Hi,
  可以观察下 savepoint 失败之前最近的 checkpoint 是否也是失败的,如果也是失败的,应该就是任务本身目前处于亚健康情况。


Best,
Hailong Wang




At 2020-10-21 13:09:45, "廖辉轩" <726830...@qq.com> wrote:
>Hi,all
>
>当数据到达50G左右时,savepoint总是失败。初步分析是执行savepoint时,内存溢出导致task挂掉,然后task重启。
>
>
>background:
>flink version:1.10.0
>flink on yarn:Total Task Slots8
>Task Managers8,5G memory
>
>
>
>
>backend:rocksdb,Increment
>checkpoint:3 min
>
>
>Container Error:
>container_1596184446716_365651_01_09 because: Container 
>[pid=104949,containerID=container_1596184446716_365651_01_09] is running 
>beyond physical memory limits. Current usage: 5.3 GB of 5 GB physical memory 
>used; 7.4 GB of 10.5 GB virtual memory used. Killing container.
>Dump of the process-tree for container_1596184446716_365651_01_09 :
>
>
>
>===
>Command:
>
>
>flink savepoint 389923a0198dd74f5e1bc9e968258dba -yid 
>application_1596184446716_365651
>
>
>Error:
>
>
>org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
>389923a0198dd74f5e1bc9e968258dba failed.
>        at 
>org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633)
>        at 
>org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611)
>        at 
>org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>        at 
>org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608)
>        at 
>org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910)
>        at 
>org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>        at 
>java.security.AccessController.doPrivileged(Native Method)
>        at 
>javax.security.auth.Subject.doAs(Subject.java:422)
>        at 
>org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>        at 
>org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>        at 
>org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>Caused by: java.util.concurrent.CompletionException: 
>java.util.concurrent.CompletionException: 
>org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
>Coordinator is suspending.
>        at 
>org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:744)
>        at 
>java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>        at 
>java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>        at 
>java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>        at 
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>        at 
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
>        at 
>org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>        at 
>org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>        at 
>akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>        at 
>akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>        at 
>scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>        at 
>akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>        at 
>scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>        at 
>scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>        at 
>scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>        at 
>akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>        at 
>akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>        at 
>akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>        at 
>akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>        at 
>akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>        at 
>akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>        at 
>akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>        at 
>akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>Caused by: java.util.concurrent.CompletionException: 
>org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
>Coordinator is suspending.
>        at 
>java.util.concurrent.CompletableFuture.encodeThrowable(Complet

Re: 几十G状态数据时,savepoint失败

2020-10-20 文章 Lee Sysuke
Hi Taylor,
几十G的状态应该不算大,可以看一下出发savepoint的时候,job本身是否已经进入了反压等不健康状态

廖辉轩 <726830...@qq.com> 于2020年10月21日周三 下午2:10写道:

> Hi,all
>
> 当数据到达50G左右时,savepoint总是失败。初步分析是执行savepoint时,内存溢出导致task挂掉,然后task重启。
>
>
> background:
> flink version:1.10.0
> flink on yarn:Total Task Slots8
> Task Managers8,5G memory
>
>
>
>
> backend:rocksdb,Increment
> checkpoint:3 min
>
>
> Container Error:
> container_1596184446716_365651_01_09 because: Container
> [pid=104949,containerID=container_1596184446716_365651_01_09] is
> running beyond physical memory limits. Current usage: 5.3 GB of 5 GB
> physical memory used; 7.4 GB of 10.5 GB virtual memory used. Killing
> container.
> Dump of the process-tree for container_1596184446716_365651_01_09 :
>
>
>
> ===
> Command:
>
>
> flink savepoint 389923a0198dd74f5e1bc9e968258dba -yid
> application_1596184446716_365651
>
>
> Error:
>
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 389923a0198dd74f5e1bc9e968258dba failed.
>         at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611)
>         at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>         at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608)
>         at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>         at
> java.security.AccessController.doPrivileged(Native Method)
>         at
> javax.security.auth.Subject.doAs(Subject.java:422)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>         at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
> Coordinator is suspending.
>         at
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:744)
>         at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>         at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>         at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>         at akka.japi.pf
> .UnitCaseStatement.apply(CaseStatements.scala:26)
>         at akka.japi.pf
> .UnitCaseStatement.apply(CaseStatements.scala:21)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>         at akka.japi.pf
> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>         at
> akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>         at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>         at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>         at
> akka.actor.ActorCell.invoke(ActorCell.scala:561)
>         at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>         at
> akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
> Coordinator is suspending.
>         at
> java.util.con

????G????????????savepoint????

2020-10-20 文章 ??????
Hi??all

??50Gsavepointsavepointtask??task??


background??
flink version??1.10.0
flink on yarn??Total Task Slots8
Task Managers8??5G memory




backend:rocksdb,Increment
checkpoint:3 min


Container Error??
container_1596184446716_365651_01_09 because: Container 
[pid=104949,containerID=container_1596184446716_365651_01_09] is running 
beyond physical memory limits. Current usage: 5.3 GB of 5 GB physical memory 
used; 7.4 GB of 10.5 GB virtual memory used. Killing container.
Dump of the process-tree for container_1596184446716_365651_01_09 :



===
Command??


flink savepoint 389923a0198dd74f5e1bc9e968258dba -yid 
application_1596184446716_365651


Error??


org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
389923a0198dd74f5e1bc9e968258dba failed.
        at 
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:633)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:611)
        at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
        at 
org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:608)
        at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:910)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
        at 
java.security.AccessController.doPrivileged(Native Method)
        at 
javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator 
is suspending.
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:744)
        at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
        at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at 
akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator 
is suspending.
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$1(CheckpointCoordinator.java:457

Re: flink1.10 stop with a savepoint失败

2020-10-19 文章 Robin Zhang
Hi,zilong
的确是这个问题,感谢帮助。
Best,
Robin


zilong xiao wrote
> Hi Robin Zhang
> 你应该是遇到了这个issue报告的问题:https://issues.apache.org/jira/browse/FLINK-16626
> ,可以看下这个issue描述,祝好~
> 
> Robin Zhang <

> vincent2015qdlg@

> > 于2020年10月19日周一 下午3:42写道:
> 
>> 普通的source -> map -> filter-> sink 测试应用。
>>
>> 触发savepoint的脚本 :
>> ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID}
>> 具体报错信息:
>>
>> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
>> "81990282a4686ebda3d04041e3620776".
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> at
>> org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
>> at
>>
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.util.concurrent.TimeoutException
>> at
>>
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460)
>> ... 9 more
>>
>>
>>
>> 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop
>> 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10 stop with a savepoint失败

2020-10-19 文章 Robin Zhang
Hi,Congxian
感谢提供思路,看了一下,JM端没有暴露日志,只能查看到ck正常的日志

Best,
Robin



Congxian Qiu wrote
> Hi
> 你可以看下 JM log 中这个 savepoint 失败是什么原因导致的,如果是 savepoint 超时了,就要看哪个 task
> 完成的慢,(savepoint 可能比 checkpoint 要慢)
> Best,
> Congxian
> 
> 
> Robin Zhang <

> vincent2015qdlg@

> > 于2020年10月19日周一 下午3:42写道:
> 
>> 普通的source -> map -> filter-> sink 测试应用。
>>
>> 触发savepoint的脚本 :
>> ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID}
>> 具体报错信息:
>>
>> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
>> "81990282a4686ebda3d04041e3620776".
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> at
>> org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
>> at
>>
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.util.concurrent.TimeoutException
>> at
>>
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460)
>> ... 9 more
>>
>>
>>
>> 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop
>> 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10 stop with a savepoint失败

2020-10-19 文章 zilong xiao
Hi Robin Zhang
你应该是遇到了这个issue报告的问题:https://issues.apache.org/jira/browse/FLINK-16626
,可以看下这个issue描述,祝好~

Robin Zhang  于2020年10月19日周一 下午3:42写道:

> 普通的source -> map -> filter-> sink 测试应用。
>
> 触发savepoint的脚本 :
> ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID}
> 具体报错信息:
>
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
> "81990282a4686ebda3d04041e3620776".
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462)
> at
>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> at
> org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
> at
>
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460)
> ... 9 more
>
>
>
> 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop
> 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.10 stop with a savepoint失败

2020-10-19 文章 Congxian Qiu
Hi
你可以看下 JM log 中这个 savepoint 失败是什么原因导致的,如果是 savepoint 超时了,就要看哪个 task
完成的慢,(savepoint 可能比 checkpoint 要慢)
Best,
Congxian


Robin Zhang  于2020年10月19日周一 下午3:42写道:

> 普通的source -> map -> filter-> sink 测试应用。
>
> 触发savepoint的脚本 :
> ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID}
> 具体报错信息:
>
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
> "81990282a4686ebda3d04041e3620776".
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462)
> at
>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> at
> org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
> at
>
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460)
> ... 9 more
>
>
>
> 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop
> 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink1.10 stop with a savepoint失败

2020-10-19 文章 Robin Zhang
普通的source -> map -> filter-> sink 测试应用。

触发savepoint的脚本 :
${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID}
具体报错信息:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job
"81990282a4686ebda3d04041e3620776".
at
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.util.concurrent.TimeoutException
at
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460)
... 9 more


查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop
应用遇到权限问题,但是不知道怎么解决,目前卡在这里。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:回复: Re: flink savepoint和checkpoint相关事项

2020-10-09 文章 izual






我理解是一样的,关于两者的不同点在这里[1]有介绍。
恢复方法是启动任务时 -s 指定从哪个路径恢复,例如 -s
file:///tmp/test/db262ffab6b00db9820c54f25a3f956f/chk-61


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint

在 2020-10-10 08:43:38,"zjfpla...@hotmail.com"  写道:
>请问savepoint也是一样吗?
>
>
>
>zjfpla...@hotmail.com
> 
>发件人: Yun Tang
>发送时间: 2020-10-10 01:35
>收件人: user-zh
>主题: Re: flink savepoint和checkpoint相关事项
>Hi
> 
>在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored 
>state [2] 恢复
> 
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
>[2] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state
> 
>祝好
>唐云
>
>From: zjfpla...@hotmail.com 
>Sent: Friday, October 9, 2020 8:59
>To: user-zh 
>Subject: flink savepoint和checkpoint相关事项
> 
>Hi,
>flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了
> 
> 
> 
>zjfpla...@hotmail.com


回复: Re: flink savepoint和checkpoint相关事项

2020-10-09 文章 zjfpla...@hotmail.com
请问savepoint也是一样吗?



zjfpla...@hotmail.com
 
发件人: Yun Tang
发送时间: 2020-10-10 01:35
收件人: user-zh
主题: Re: flink savepoint和checkpoint相关事项
Hi
 
在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored 
state [2] 恢复
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state
 
祝好
唐云

From: zjfpla...@hotmail.com 
Sent: Friday, October 9, 2020 8:59
To: user-zh 
Subject: flink savepoint和checkpoint相关事项
 
Hi,
flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了
 
 
 
zjfpla...@hotmail.com


Re: flink savepoint和checkpoint相关事项

2020-10-09 文章 Yun Tang
Hi

在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored 
state [2] 恢复

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state

祝好
唐云

From: zjfpla...@hotmail.com 
Sent: Friday, October 9, 2020 8:59
To: user-zh 
Subject: flink savepoint和checkpoint相关事项

Hi,
flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了



zjfpla...@hotmail.com


flink savepoint和checkpoint相关事项

2020-10-08 文章 zjfpla...@hotmail.com
Hi,
flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了



zjfpla...@hotmail.com


Re: Flink 1.5.0 savepoint 失败

2020-09-13 文章 Congxian Qiu
Hi
   从错误栈看是  Wrong FS:
file:///data/emr_flink_savepoint_share/savepoint-0705a3-09a2f171a080/e2f63448-eed9-4038-a64a-e874a1a99ba1,
expected: hdfs://flink-hdfs 这个导致的,你能把 savepoint 写到 hdfs://flink-hdfs 这个集群吗?
Best,
Congxian


hk__lrzy  于2020年9月11日周五 下午2:46写道:

> 代码是不是主动设置过stagebackend的地址呢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink 1.5.0 savepoint 失败

2020-09-10 文章 hk__lrzy
代码是不是主动设置过stagebackend的地址呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-10 文章 shizk233
flink用的自己的序列化机制。从chk恢复的时候,在open方法里会进行状态数据的注入。

按我的理解,该transient标记有没有都可以从chk恢复,但一般加入transient可以明确只有open方法中的数据注入这一种方法。
至于不加上transient是否可能产生其他影响,就不太清楚了。

范超  于2020年9月10日周四 上午9:35写道:

> Transient 都不参与序列化了,怎么可能从checkopont里恢复?
>
> -邮件原件-
> 发件人: Yun Tang [mailto:myas...@live.com]
> 发送时间: 2020年9月7日 星期一 12:50
> 收件人: user-zh@flink.apache.org
> 主题: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取
>
> Hi
>
> 首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1]
>
> 可以排查的思路
>
>   1.  你的state是否开启了TTL呢
>   2.  能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。
>   3.  目前使用的state backend是什么,更换一种state backend,问题还能复现么
>
> [1]
> https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158
>
> 祝好
> 唐云
> 
> From: Liu Rising 
> Sent: Sunday, September 6, 2020 17:45
> To: user-zh@flink.apache.org 
> Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取
>
> Hi 唐云
>
> 以下是state定义以及初始化的code
>
> public class FlinkKeyedProcessFunction extends
> KeyedProcessFunction, Tuple2 JsonNode>> {
>
> private static final Logger LOG =
> LoggerFactory.getLogger(FlinkKeyedProcessFunction.class);
>
> ...
>
> private final ParameterTool params;
> private transient ListState unmatchedProbesState;
>
> ...
>
> FlinkKeyedProcessFunction(ParameterTool params) {
> this.params = params;
> }
>
> @Override
> public void open(Configuration parameters) {
>
> ListStateDescriptor descriptor = new
> ListStateDescriptor<>(
> "unmatchedProbes", TypeInformation.of(ObjectNode.class)
> );
> unmatchedProbesState =
> getRuntimeContext().getListState(descriptor);
>
> 以下是往state里add内容的部分
> ...
>
> List unmatchedProbes =
> mapMatching.getUnMatchedProbes(id);
> unmatchedProbesState.clear();
>
> if (unmatchedProbes.size() > 0) {
> try {
> unmatchedProbesState.addAll(unmatchedProbes);
> } catch (Exception e) {
> LOG.warn("Continue processing although failed to add
> unmatchedProbes to ListState. ID: " + id, e);
> }
> }
>
>...
>
> 以下是从state读取的code
>
> for (ObjectNode unmatchedProbe :
> unmatchedProbesState.get()) {
> LOG.info("Processing unmatched probe: " +
> unmatchedProbe);
> matchedValues.addAll(mapMatching.matchLocation(id,
> unmatchedProbe));
> }
>
>
> 之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值)
> 去掉定义state那里的transient之后,上述问题不再出现。
>
> 谢谢。
> Rising
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Flink 1.5.0 savepoint 失败

2020-09-09 文章 likai
hi all. 我使用flink 1.5.0 在触发 savepoint失败。
共享目录:/data/emr_flink_savepoint_share/
触发命令:bin/flink savepoint feaab3ec9031bce4eab0b677693ab9f0 
file:///data/emr_flink_savepoint_share 
Hadoop conf 默认文件系统是  hdfs://flink-hdfs
报错:
Caused by: java.lang.Exception: Could not materialize checkpoint 9381 for 
operator Source: KafkaJSONStringTableSource -> Map -> where: (OR(=(e, 
_UTF-16LE'INSERT'), =(e, _UTF-16LE'DELETE'), =(e, _UTF-16LE'UPDATE'))), select: 
(CAST(get_json_object(data, _UTF-16LE'pid')) AS EXPR$0, 
CAST(get_json_object(data, _UTF-16LE'tag_id')) AS EXPR$1, 
CAST(get_json_object(data, _UTF-16LE'tag_type')) AS EXPR$2, 
get_json_object(data, _UTF-16LE'tag_name') AS EXPR$3, 
CAST(get_json_object(data, _UTF-16LE'tag_version')) AS EXPR$4, 
CAST(get_json_object(data, _UTF-16LE'att_type')) AS EXPR$5, 
CAST(get_json_object(data, _UTF-16LE'is_del')) AS EXPR$6, e) -> to: Tuple2 -> 
Sink: Unnamed (1/1).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:943)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could 
not open output stream for state backend
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:854)
... 5 more
Caused by: java.io.IOException: Could not open output stream for state backend
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:360)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:161)
at 
org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoWriterV2.writeOperatorStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:142)
at 
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.write(OperatorBackendSerializationProxy.java:77)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:411)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:352)
at 
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
Caused by: java.lang.IllegalArgumentException: Wrong FS: 
file:///data/emr_flink_savepoint_share/savepoint-0705a3-09a2f171a080/e2f63448-eed9-4038-a64a-e874a1a99ba1,
 expected: hdfs://flink-hdfs
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:193)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:140)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:36)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointState

答复: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-09 文章 范超
Transient 都不参与序列化了,怎么可能从checkopont里恢复?

-邮件原件-
发件人: Yun Tang [mailto:myas...@live.com] 
发送时间: 2020年9月7日 星期一 12:50
收件人: user-zh@flink.apache.org
主题: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Hi

首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1]

可以排查的思路

  1.  你的state是否开启了TTL呢
  2.  能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。
  3.  目前使用的state backend是什么,更换一种state backend,问题还能复现么

[1] 
https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158

祝好
唐云

From: Liu Rising 
Sent: Sunday, September 6, 2020 17:45
To: user-zh@flink.apache.org 
Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Hi 唐云

以下是state定义以及初始化的code

public class FlinkKeyedProcessFunction extends KeyedProcessFunction, Tuple2> {

private static final Logger LOG =
LoggerFactory.getLogger(FlinkKeyedProcessFunction.class);

  ...

private final ParameterTool params;
private transient ListState unmatchedProbesState;

...

FlinkKeyedProcessFunction(ParameterTool params) {
this.params = params;
}

@Override
public void open(Configuration parameters) {

ListStateDescriptor descriptor = new ListStateDescriptor<>(
"unmatchedProbes", TypeInformation.of(ObjectNode.class)
);
unmatchedProbesState = getRuntimeContext().getListState(descriptor);

以下是往state里add内容的部分
   ...

List unmatchedProbes = mapMatching.getUnMatchedProbes(id);
unmatchedProbesState.clear();

if (unmatchedProbes.size() > 0) {
try {
unmatchedProbesState.addAll(unmatchedProbes);
} catch (Exception e) {
LOG.warn("Continue processing although failed to add 
unmatchedProbes to ListState. ID: " + id, e);
}
}

   ...

以下是从state读取的code

for (ObjectNode unmatchedProbe :
unmatchedProbesState.get()) {
LOG.info("Processing unmatched probe: " + 
unmatchedProbe);
matchedValues.addAll(mapMatching.matchLocation(id,
unmatchedProbe));
}


之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值)
去掉定义state那里的transient之后,上述问题不再出现。

谢谢。
Rising




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-06 文章 Yun Tang
Hi

首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1]

可以排查的思路

  1.  你的state是否开启了TTL呢
  2.  能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。
  3.  目前使用的state backend是什么,更换一种state backend,问题还能复现么

[1] 
https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158

祝好
唐云

From: Liu Rising 
Sent: Sunday, September 6, 2020 17:45
To: user-zh@flink.apache.org 
Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Hi 唐云

以下是state定义以及初始化的code

public class FlinkKeyedProcessFunction extends KeyedProcessFunction, Tuple2> {

private static final Logger LOG =
LoggerFactory.getLogger(FlinkKeyedProcessFunction.class);

  ...

private final ParameterTool params;
private transient ListState unmatchedProbesState;

...

FlinkKeyedProcessFunction(ParameterTool params) {
this.params = params;
}

@Override
public void open(Configuration parameters) {

ListStateDescriptor descriptor = new
ListStateDescriptor<>(
"unmatchedProbes", TypeInformation.of(ObjectNode.class)
);
unmatchedProbesState = getRuntimeContext().getListState(descriptor);

以下是往state里add内容的部分
   ...

List unmatchedProbes =
mapMatching.getUnMatchedProbes(id);
unmatchedProbesState.clear();

if (unmatchedProbes.size() > 0) {
try {
unmatchedProbesState.addAll(unmatchedProbes);
} catch (Exception e) {
LOG.warn("Continue processing although failed to add
unmatchedProbes to ListState. ID: " + id, e);
}
}

   ...

以下是从state读取的code

for (ObjectNode unmatchedProbe :
unmatchedProbesState.get()) {
LOG.info("Processing unmatched probe: " +
unmatchedProbe);
matchedValues.addAll(mapMatching.matchLocation(id,
unmatchedProbe));
}


之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值)
去掉定义state那里的transient之后,上述问题不再出现。

谢谢。
Rising




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-06 文章 Liu Rising
Hi 唐云

以下是state定义以及初始化的code

public class FlinkKeyedProcessFunction extends KeyedProcessFunction, Tuple2> {

private static final Logger LOG =
LoggerFactory.getLogger(FlinkKeyedProcessFunction.class);

  ...

private final ParameterTool params;
private transient ListState unmatchedProbesState;

...

FlinkKeyedProcessFunction(ParameterTool params) {
this.params = params;
}

@Override
public void open(Configuration parameters) {

ListStateDescriptor descriptor = new
ListStateDescriptor<>(
"unmatchedProbes", TypeInformation.of(ObjectNode.class)
);
unmatchedProbesState = getRuntimeContext().getListState(descriptor);

以下是往state里add内容的部分
   ...

List unmatchedProbes =
mapMatching.getUnMatchedProbes(id);
unmatchedProbesState.clear();

if (unmatchedProbes.size() > 0) {
try {
unmatchedProbesState.addAll(unmatchedProbes);
} catch (Exception e) {
LOG.warn("Continue processing although failed to add
unmatchedProbes to ListState. ID: " + id, e);
}
}

   ...

以下是从state读取的code

for (ObjectNode unmatchedProbe :
unmatchedProbesState.get()) {
LOG.info("Processing unmatched probe: " +
unmatchedProbe);
matchedValues.addAll(mapMatching.matchLocation(id,
unmatchedProbe));
}


之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值)
去掉定义state那里的transient之后,上述问题不再出现。

谢谢。
Rising




--
Sent from: http://apache-flink.147419.n8.nabble.com/


?????? ????savepoint

2020-09-03 文章 x





--  --
??: 
   "x"  
  <35907...@qq.com>;
: 2020??9??3??(??) 12:22
??: "user-zh"https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html

~

x <35907...@qq.com> ??2020??9??3?? 11:30??

> /flink/flink-1.10.1/bin/flink cancel -s
> hdfs://nameservice1/user/flink_1.10.1/flink-savepoints
> f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301
> Unrecognized option: -yid

Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-03 文章 Yun Tang
Hi

我觉得这个不是root cause,实际上 transient ListState 
是一种正确的用法,因为state应该是在函数open方法里面进行初始化,所以transient 修饰即可。

麻烦把这个list state的初始化以及使用方法的代码都贴出来吧。

祝好
唐云

From: Liu Rising 
Sent: Thursday, September 3, 2020 12:26
To: user-zh@flink.apache.org 
Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Hi

找到原因了。

问题在于在定义ListState时使用了transient关键字,如下。
 private transient ListState state;

去掉了transient之后,问题解决。
虽然不太清粗为何transient会造成这种情况。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-02 文章 Liu Rising
Hi

找到原因了。

问题在于在定义ListState时使用了transient关键字,如下。
 private transient ListState state;

去掉了transient之后,问题解决。
虽然不太清粗为何transient会造成这种情况。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


?????? ????savepoint

2020-09-02 文章 x
??V1.10.1??


--  --
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html

~

x <35907...@qq.com> ??2020??9??3?? 11:30??

> /flink/flink-1.10.1/bin/flink cancel -s
> hdfs://nameservice1/user/flink_1.10.1/flink-savepoints
> f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301
> Unrecognized option: -yid

Re: 无法savepoint

2020-09-02 文章 zilong xiao
看官方文档 cancel 语法格式是:Syntax: cancel [OPTIONS] ,所以-yid xxx是不是要放到job
id之前? 另外文档中有提示到Cancel with a savepoint (deprecated), 建议使用stop语法,见:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html

希望对你有帮助,祝好~

x <35907...@qq.com> 于2020年9月3日周四 上午11:30写道:

> /flink/flink-1.10.1/bin/flink cancel -s
> hdfs://nameservice1/user/flink_1.10.1/flink-savepoints
> f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301
> 提示Unrecognized option: -yid


????savepoint

2020-09-02 文章 x
/flink/flink-1.10.1/bin/flink cancel -s 
hdfs://nameservice1/user/flink_1.10.1/flink-savepoints 
f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301
Unrecognized option: -yid

  1   2   >