Re: Flink savepoint迁移问题
确认了,pulsar的MessageId的实现类内部的增加了字段,导致flink在反序列化时失败了。具体的issue:https://github.com/streamnative/pulsar-flink/issues/256。 我会给flink 1.9的pulsar连接器升级下checkpoint,让MessageId的序列化使用基于 `MessageId.toByteArray`的序列化器。 非常感谢您的帮助~。 Jianyun8023 2021-03-12 2021年3月11日 下午10:43,Kezhu Wang mailto:kez...@gmail.com>> 写道: > 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。 确实是这样的,checkpoint 把 serializer 也 snapshot 了。 重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar 的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用 `MessageId.toByteArray`。 On March 11, 2021 at 20:26:15, 赵 建云 (zhaojianyu...@outlook.com<mailto:zhaojianyu...@outlook.com>) wrote: 你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗? 感谢~ 2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道: 新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。 +unionOffsetStates = stateStore.getUnionListState( +new ListStateDescriptor<>( +OFFSETS_STATE_NAME, +TypeInformation.of(new TypeHint>() { +}))); 解决方法 :? 1. 尝试通过 state-processor-api 重写下 state ? 2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗? 感觉后面还有不兼容的更新 new ListStateDescriptor<>( OFFSETS_STATE_NAME, -TypeInformation.of(new TypeHint>() { +TypeInformation.of(new TypeHint>() { }))); 不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 state + 只写新 state。 可以等 streamnative 的人确认下。 On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com<mailto:zhaojianyu...@outlook.com>) wrote: 社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。 版本从1.9.3 升级到1.11.0或1.11.3. 连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下 oldUnionOffsetStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint>() { }))); oldUnionSubscriptionNameStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME + "_subName", TypeInformation.of(new TypeHint() { }))); 我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。 任务在启动后,会遇到下面的错误 2021-03-11 10:02:25 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:832) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F=04%7C01%7C%7Ccc0a40e3ae954472c75b08d8e49c12
回复: Flink savepoint迁移问题
建云, 之前我也遇到了savepoint 起作业失败的问题,是我们升级pulsar客户端以后,从2.2升级到2.5.2,我-s 启动作业的时候。因为作业也不是很重要,当时手头有其他任务,我就没有关注这个问题。你看看pulsar source那儿是不是做了什么。 | | allanqinjy | | allanqi...@163.com | 签名由网易邮箱大师定制 在2021年03月11日 22:43,Kezhu Wang 写道: 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。 确实是这样的,checkpoint 把 serializer 也 snapshot 了。 重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar 的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用 `MessageId.toByteArray`。 On March 11, 2021 at 20:26:15, 赵 建云 (zhaojianyu...@outlook.com) wrote: 你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的 initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗? 感谢~ 2021年3月11日 上午11:36,Kezhu Wang 写道: 新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。 +unionOffsetStates = stateStore.getUnionListState( +new ListStateDescriptor<>( +OFFSETS_STATE_NAME, +TypeInformation.of(new TypeHint>() { +}))); 解决方法 :? 1. 尝试通过 state-processor-api 重写下 state ? 2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗? 感觉后面还有不兼容的更新 new ListStateDescriptor<>( OFFSETS_STATE_NAME, -TypeInformation.of(new TypeHint>() { +TypeInformation.of(new TypeHint>() { }))); 不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 state + 只写新 state。 可以等 streamnative 的人确认下。 On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote: 社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。 版本从1.9.3 升级到1.11.0或1.11.3. 连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下 oldUnionOffsetStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint>() { }))); oldUnionSubscriptionNameStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME + "_subName", TypeInformation.of(new TypeHint() { }))); 我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。 任务在启动后,会遇到下面的错误 2021-03-11 10:02:25 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:832) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.io <https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D=0> <http://java.io <https://apac01.safelinks.prot
Re: Flink savepoint迁移问题
> 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。 确实是这样的,checkpoint 把 serializer 也 snapshot 了。 重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar 的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用 `MessageId.toByteArray`。 On March 11, 2021 at 20:26:15, 赵 建云 (zhaojianyu...@outlook.com) wrote: 你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的 initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗? 感谢~ 2021年3月11日 上午11:36,Kezhu Wang 写道: 新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。 +unionOffsetStates = stateStore.getUnionListState( +new ListStateDescriptor<>( +OFFSETS_STATE_NAME, +TypeInformation.of(new TypeHint>() { +}))); 解决方法 :? 1. 尝试通过 state-processor-api 重写下 state ? 2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗? 感觉后面还有不兼容的更新 new ListStateDescriptor<>( OFFSETS_STATE_NAME, -TypeInformation.of(new TypeHint>() { +TypeInformation.of(new TypeHint>() { }))); 不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 state + 只写新 state。 可以等 streamnative 的人确认下。 On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote: 社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。 版本从1.9.3 升级到1.11.0或1.11.3. 连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下 oldUnionOffsetStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint>() { }))); oldUnionSubscriptionNameStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME + "_subName", TypeInformation.of(new TypeHint() { }))); 我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。 任务在启动后,会遇到下面的错误 2021-03-11 10:02:25 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:832) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.io <https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D=0> <http://java.io <https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9
Re: Flink savepoint迁移问题
你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗? 感谢~ 2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道: 新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。 +unionOffsetStates = stateStore.getUnionListState( +new ListStateDescriptor<>( +OFFSETS_STATE_NAME, +TypeInformation.of(new TypeHint>() { +}))); 解决方法 :? 1. 尝试通过 state-processor-api 重写下 state ? 2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗? 感觉后面还有不兼容的更新 new ListStateDescriptor<>( OFFSETS_STATE_NAME, -TypeInformation.of(new TypeHint>() { +TypeInformation.of(new TypeHint>() { }))); 不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 state + 只写新 state。 可以等 streamnative 的人确认下。 On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com<mailto:zhaojianyu...@outlook.com>) wrote: 社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。 版本从1.9.3 升级到1.11.0或1.11.3. 连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下 oldUnionOffsetStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint>() { }))); oldUnionSubscriptionNameStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME + "_subName", TypeInformation.of(new TypeHint() { }))); 我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。 任务在启动后,会遇到下面的错误 2021-03-11 10:02:25 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:832) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D=0><http://java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734217175%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=VU%2Bgy0%2B6u%2BVe0Qj1maU3nvijfSH9hBdQApSnfFjq9G8%3D=0>>.EOFException: No more bytes left. at org.apache.flin
Re: Flink savepoint迁移问题
现在是我在维护pulsar-flink connector,是存在不兼容的升级。还是个很坑的改动。我现在尝试旧的迁移新的字段上方法,会报这个错误。我对1.11支持的代码进行修改,将state的数据结构改成旧版本的形式,同样也是这个错误。你说的StatefulSinkWriterOperator我研究下怎么使用。 2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道: StatefulSinkWriterOperator
Re: Flink savepoint迁移问题
新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。 +unionOffsetStates = stateStore.getUnionListState( +new ListStateDescriptor<>( +OFFSETS_STATE_NAME, +TypeInformation.of(new TypeHint>() { +}))); 解决方法 :? 1. 尝试通过 state-processor-api 重写下 state ? 2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗? 感觉后面还有不兼容的更新 new ListStateDescriptor<>( OFFSETS_STATE_NAME, -TypeInformation.of(new TypeHint>() { +TypeInformation.of(new TypeHint>() { }))); 不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 state + 只写新 state。 可以等 streamnative 的人确认下。 On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote: 社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。 版本从1.9.3 升级到1.11.0或1.11.3. 连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下 oldUnionOffsetStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint>() { }))); oldUnionSubscriptionNameStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME + "_subName", TypeInformation.of(new TypeHint() { }))); 我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。 任务在启动后,会遇到下面的错误 2021-03-11 10:02:25 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:832) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.io<http://java.io>.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79) at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readVarInt(Input.java:355) at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readInt(Input.java:350) at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deser
Flink savepoint迁移问题
社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。 版本从1.9.3 升级到1.11.0或1.11.3. 连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下 oldUnionOffsetStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint>() { }))); oldUnionSubscriptionNameStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME + "_subName", TypeInformation.of(new TypeHint() { }))); 我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。 任务在启动后,会遇到下面的错误 2021-03-11 10:02:25 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:832) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.io<http://java.io>.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79) at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readVarInt(Input.java:355) at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readInt(Input.java:350) at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165) at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ... 15 more 请问题大佬们可以提供排查问题的办法或者解决方案吗? Jianyun8023 2021-3-11
Re: flink-savepoint问题
对于 keyed state,需要保证同一个 key 在 同一个 keygroup 中,如果是某个 key 有热点,可以在 keyby 之前进行一次 map(在 key 后面拼接一些 后缀),然后 keyby,最后处理完成之后,将这些进行聚合 Best, Congxian guomuhua <663021...@qq.com> 于2021年3月4日周四 下午12:49写道: > 我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢? > nobleyd wrote > > 是不是使用了随机key。 > > > guaishushu1103@ > > > > > > guaishushu1103@ > > > 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:> > > java.lang.Exception: Could not materialize checkpoint 2404 for operator> > > KeyedProcess (21/48).> at> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)> > > > at> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)> > > > at> > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)> > > > at> > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)> > > > at java.lang.Thread.run(Thread.java:745)> Caused by: > > java.util.concurrent.ExecutionException:> > > java.lang.IllegalArgumentException: Key group 0 is not in> > > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at > > java.util.concurrent.FutureTask.report(FutureTask.java:122)> at > > java.util.concurrent.FutureTask.get(FutureTask.java:192)> at> > > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)> > > > at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer. > > > (OperatorSnapshotFinalizer.java:47)> at> > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)> > > > ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is > > not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at> > > > org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)> > > > at> > > > org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)> > > > at> > > > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)> > > > at> > > > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)> > > > at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at> > > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)> > > > ... 5 more>>> > > > guaishushu1103@ > > >> > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-savepoint问题
我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢? nobleyd wrote > 是不是使用了随机key。 > guaishushu1103@ > > guaishushu1103@ > 于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:> > java.lang.Exception: Could not materialize checkpoint 2404 for operator> > KeyedProcess (21/48).> at> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)> > > at> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)> > > at> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)> > > at> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)> > > at java.lang.Thread.run(Thread.java:745)> Caused by: > java.util.concurrent.ExecutionException:> > java.lang.IllegalArgumentException: Key group 0 is not in> > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at > java.util.concurrent.FutureTask.report(FutureTask.java:122)> at > java.util.concurrent.FutureTask.get(FutureTask.java:192)> at> > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)> > > at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer. > (OperatorSnapshotFinalizer.java:47)> at> > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)> > > ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is > not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at> > org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)> > > at> > org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)> > > at> > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)> > > at> > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)> > > at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at> > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)> > > ... 5 more>>> > guaishushu1103@ >> -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-savepoint问题
我也遇到同样问题,为了打散数据,在keyby时加了随机数作为后缀,去掉随机数,可以正常savepoint,加上随机数就savepoint失败。所以如果确有要打散数据的需求,应该怎么处理呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-savepoint问题
是不是使用了随机key。 guaishushu1...@163.com 于2021年3月3日周三 下午6:53写道: > checkpoint 可以成功保存,但是savepoint出现错误: > java.lang.Exception: Could not materialize checkpoint 2404 for operator > KeyedProcess (21/48). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalArgumentException: Key group 0 is not in > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) > ... 3 more > Caused by: java.lang.IllegalArgumentException: Key group 0 is not in > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. > at > org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142) > at > org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447) > ... 5 more > > > guaishushu1...@163.com >
flink-savepoint问题
checkpoint 可以成功保存,但是savepoint出现错误: java.lang.Exception: Could not materialize checkpoint 2404 for operator KeyedProcess (21/48). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) ... 3 more Caused by: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=54, endKeyGroup=55}. at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142) at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447) ... 5 more guaishushu1...@163.com
Re: 请教个Flink savepoint的问题
好的,感谢您的帮助! > 在 2021年1月11日,20:23,Yun Tang 写道: > > Hi, > > 没有暴露现成的API,但是可以参照测试用例的写法,给jobGraph设置 savepointRestoreSettings [1]。 > > [1] > https://github.com/apache/flink/blob/ab87cf4ec0260c30194f3d8e75be1f43318aa32a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java#L383 > > 祝好 > 唐云 > > From: yinghua...@163.com > Sent: Monday, January 11, 2021 19:07 > To: user-zh > Subject: 请教个Flink savepoint的问题 > > Flink APi 中我们在java代码中使用客户端调用类ClusterClient如下的方法停止了任务: > CompletableFuture stopWithSavepoint(JobID var1, boolean var2, > @Nullable String > var3);然后我们保存了该任务的savepoint信息,那恢复时没看到对应的恢复方法,找了些资料都是通过命令来恢复,有没有直接调用Java API > 层次的方法来恢复任务? > > > > yinghua...@163.com
Re: 请教个Flink savepoint的问题
Hi, 没有暴露现成的API,但是可以参照测试用例的写法,给jobGraph设置 savepointRestoreSettings [1]。 [1] https://github.com/apache/flink/blob/ab87cf4ec0260c30194f3d8e75be1f43318aa32a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java#L383 祝好 唐云 From: yinghua...@163.com Sent: Monday, January 11, 2021 19:07 To: user-zh Subject: 请教个Flink savepoint的问题 Flink APi 中我们在java代码中使用客户端调用类ClusterClient如下的方法停止了任务: CompletableFuture stopWithSavepoint(JobID var1, boolean var2, @Nullable String var3);然后我们保存了该任务的savepoint信息,那恢复时没看到对应的恢复方法,找了些资料都是通过命令来恢复,有没有直接调用Java API 层次的方法来恢复任务? yinghua...@163.com
请教个Flink savepoint的问题
Flink APi 中我们在java代码中使用客户端调用类ClusterClient如下的方法停止了任务: CompletableFuture stopWithSavepoint(JobID var1, boolean var2, @Nullable String var3);然后我们保存了该任务的savepoint信息,那恢复时没看到对应的恢复方法,找了些资料都是通过命令来恢复,有没有直接调用Java API 层次的方法来恢复任务? yinghua...@163.com
Re: Command: Flink savepoint -d reported an error。
这个有解决吗?我的也是报 Missing required argument: savepoint path. Usage: bin/flink savepoint -d | | 赢峰 | | si_ji_f...@163.com | 签名由网易邮箱大师定制 On 09/30/2019 15:06,pengchengl...@163.com wrote: 你好,感谢回复,我的命令是参照官方网站,https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#trigger-a-savepoint $ bin/flink savepoint -d :savepointPath 这个命令是删除savepoint,你说的是触发savepoint,是没有问题的。 pengchengl...@163.com 发件人: 狄玉坤 发送时间: 2019-09-30 14:59 收件人: user-zh@flink.apache.org 主题: Re: Re: Command: Flink savepoint -d reported an error。 缺少了jobId 参数; ./bin/flink savepoint [savepointDirectory] This will trigger a savepoint for the job with ID jobId, and returns the path of the created savepoint. You need this path to restore and dispose savepoints. | | diyukun2019 | | diyukun2...@163.com | 签名由网易邮箱大师定制 On 9/30/2019 14:56,pengchengl...@163.com wrote: 谢谢你的回复,我试了下,还是说缺少必填参数。 如下: $ bin/flink savepoint -d file://home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/ The program finished with the following exception: java.lang.NullPointerException: Missing required argument: savepoint path. Usage: bin/flink savepoint -d at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.client.cli.CliFrontend.disposeSavepoint(CliFrontend.java:721) at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$8(CliFrontend.java:657) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:945) at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:654) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) pengchengl...@163.com 发件人: 星沉 发送时间: 2019-09-30 14:47 收件人: user-zh 主题: Re: Command: Flink savepoint -d reported an error。 savepoint路径参数不对。file后面多了个一个/吧。 pengchengl...@163.com wrote: $ bin/flink savepoint -dfile:///home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/ The program finished with the following exception: java.lang.NullPointerException: Missing required argument: savepoint path. Usage: bin/flink savepoint -d
Re: flink savepoint 异常
Hi 异常信息中有 “Failed to trigger savepoint. Failure reason: An Exception occurred while triggering the checkpoint.” 或许你可以看看 JM 的日志,找一下看看有没有什么详细日志 Best, Congxian 张锴 于2020年11月7日周六 下午4:14写道: > 本人用flink 1.10.1版本进行savepoint时遇到下列错误,暂时不清楚错误的原因,特来寻求帮助,麻烦大佬们看看 > > 已经排除反压和重启的原因,checkpoint超时设置了十分钟,conf配置增加客户端连接master的时间,但还是出现异常。 > > 命令 > > flink savepoint -yid application_1604456903594_2381 > fb8131bcb78cbdf2bb9a705d8a4ceebc > hdfs:///hadoopnamenodeHA/flink/flink-savepoints > > 异常 > > The program finished with the following exception: > > org.apache.flink.util.FlinkException: Triggering a savepoint for the job > fb8131bcb78cbdf2bb9a705d8a4ceebc failed. > at > > org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) > at > > org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) > at > > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) > at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) > Caused by: java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: > org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger > savepoint. Failure reason: An Exception occurred while triggering the > checkpoint. > at > > org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:756) > at > > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) > at > > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) > at > > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) > at > > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseState >
Re: flink savepoint
看到了,通过JM看到是写的权限没有,改了之后就好了 Congxian Qiu 于2020年11月6日周五 下午1:31写道: > Hi > 从 client 端日志,或者 JM 日志还能看到其他的异常么? > Best, > Congxian > > > 张锴 于2020年11月6日周五 上午11:42写道: > > > 重启和反压都正常 > > 另外增加了从客户端到master的时间,还是有这个问题 > > > > hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道: > > > > > Hi, > > > > > > > > > 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时, > > > 具体的原因需要看下 Jobmaster 的日志。 > > > PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。 > > > > > > > > > Best, > > > Hailong Wang > > > > > > > > > > > > > > > 在 2020-11-06 09:33:48,"张锴" 写道: > > > >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。 > > > > > > > >flink 版本1.10.1 > > > > > > > > > > > >执行 flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47 > > > >hdfs://hadoopnamenodeHA/flink/flink-savepoints > > > > > > > > > > > >出现错误信息 > > > > > > > > > > > >org.apache.flink.util.FlinkException: Triggering a savepoint for the > job > > > >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed. > > > > > > > > at > > > > > > > > > >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) > > > > > > > > at > > > > > > > > > >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) > > > > > > > > at > > > > > > > > > >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) > > > > > > > > at > > > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) > > > > > > > > at > > > > > > > > > >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) > > > > > > > > at > > > > > > > > > >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) > > > > > > > > at java.security.AccessController.doPrivileged(Native Method) > > > > > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > > > > > at > > > > > > > > > >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > > > > > > > > at > > > > > > > > > >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > > > > > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) > > > > > > > >Caused by: java.util.concurrent.TimeoutException > > > > > > > > at > > > > > > > > > >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > > > > > > > > at > > > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > > > > > > > > at > > > > > > > > > >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625) > > > > > >
Re: flink savepoint
已经指定了 admin <17626017...@163.com> 于2020年11月6日周五 下午3:17写道: > Hi, > 你的任务时跑在yarn上的吗?如果是 需要指定 -yid > > > 2020年11月6日 下午1:31,Congxian Qiu 写道: > > > > Hi > > 从 client 端日志,或者 JM 日志还能看到其他的异常么? > > Best, > > Congxian > > > > > > 张锴 于2020年11月6日周五 上午11:42写道: > > > >> 重启和反压都正常 > >> 另外增加了从客户端到master的时间,还是有这个问题 > >> > >> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道: > >> > >>> Hi, > >>> > >>> > >>> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时, > >>> 具体的原因需要看下 Jobmaster 的日志。 > >>> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。 > >>> > >>> > >>> Best, > >>> Hailong Wang > >>> > >>> > >>> > >>> > >>> 在 2020-11-06 09:33:48,"张锴" 写道: > >>>> 本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。 > >>>> > >>>> flink 版本1.10.1 > >>>> > >>>> > >>>> 执行 flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47 > >>>> hdfs://hadoopnamenodeHA/flink/flink-savepoints > >>>> > >>>> > >>>> 出现错误信息 > >>>> > >>>> > >>>> org.apache.flink.util.FlinkException: Triggering a savepoint for the > job > >>>> a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed. > >>>> > >>>> at > >>> > >>> > >>> > org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) > >>>> > >>>> at > >>> > >>> > >>> > org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) > >>>> > >>>> at > >>> > >>> > >>> > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) > >>>> > >>>> at > >>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) > >>>> > >>>> at > >>> > >>> > >>> > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) > >>>> > >>>> at > >>> > >>> > >>> > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) > >>>> > >>>> at java.security.AccessController.doPrivileged(Native Method) > >>>> > >>>> at javax.security.auth.Subject.doAs(Subject.java:422) > >>>> > >>>> at > >>> > >>> > >>> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > >>>> > >>>> at > >>> > >>> > >>> > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > >>>> > >>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) > >>>> > >>>> Caused by: java.util.concurrent.TimeoutException > >>>> > >>>> at > >>> > >>> > >>> > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > >>>> > >>>> at > >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > >>>> > >>>> at > >>> > >>> > >>> > org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625) > >>> > >> > >
flink savepoint 异常
本人用flink 1.10.1版本进行savepoint时遇到下列错误,暂时不清楚错误的原因,特来寻求帮助,麻烦大佬们看看 已经排除反压和重启的原因,checkpoint超时设置了十分钟,conf配置增加客户端连接master的时间,但还是出现异常。 命令 flink savepoint -yid application_1604456903594_2381 fb8131bcb78cbdf2bb9a705d8a4ceebc hdfs:///hadoopnamenodeHA/flink/flink-savepoints 异常 The program finished with the following exception: org.apache.flink.util.FlinkException: Triggering a savepoint for the job fb8131bcb78cbdf2bb9a705d8a4ceebc failed. at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: An Exception occurred while triggering the checkpoint. at org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:756) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseState
Re: flink savepoint
Hi, 你的任务时跑在yarn上的吗?如果是 需要指定 -yid > 2020年11月6日 下午1:31,Congxian Qiu 写道: > > Hi > 从 client 端日志,或者 JM 日志还能看到其他的异常么? > Best, > Congxian > > > 张锴 于2020年11月6日周五 上午11:42写道: > >> 重启和反压都正常 >> 另外增加了从客户端到master的时间,还是有这个问题 >> >> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道: >> >>> Hi, >>> >>> >>> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时, >>> 具体的原因需要看下 Jobmaster 的日志。 >>> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。 >>> >>> >>> Best, >>> Hailong Wang >>> >>> >>> >>> >>> 在 2020-11-06 09:33:48,"张锴" 写道: >>>> 本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。 >>>> >>>> flink 版本1.10.1 >>>> >>>> >>>> 执行 flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47 >>>> hdfs://hadoopnamenodeHA/flink/flink-savepoints >>>> >>>> >>>> 出现错误信息 >>>> >>>> >>>> org.apache.flink.util.FlinkException: Triggering a savepoint for the job >>>> a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed. >>>> >>>> at >>> >>> >>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) >>>> >>>> at >>> >>> >>> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) >>>> >>>> at >>> >>> >>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) >>>> >>>> at >>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) >>>> >>>> at >>> >>> >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) >>>> >>>> at >>> >>> >>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) >>>> >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> >>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>> >>>> at >>> >>> >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) >>>> >>>> at >>> >>> >>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>>> >>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) >>>> >>>> Caused by: java.util.concurrent.TimeoutException >>>> >>>> at >>> >>> >>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) >>>> >>>> at >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >>>> >>>> at >>> >>> >>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625) >>> >>
Re: flink savepoint
Hi 从 client 端日志,或者 JM 日志还能看到其他的异常么? Best, Congxian 张锴 于2020年11月6日周五 上午11:42写道: > 重启和反压都正常 > 另外增加了从客户端到master的时间,还是有这个问题 > > hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道: > > > Hi, > > > > > > 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时, > > 具体的原因需要看下 Jobmaster 的日志。 > > PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。 > > > > > > Best, > > Hailong Wang > > > > > > > > > > 在 2020-11-06 09:33:48,"张锴" 写道: > > >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。 > > > > > >flink 版本1.10.1 > > > > > > > > >执行 flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47 > > >hdfs://hadoopnamenodeHA/flink/flink-savepoints > > > > > > > > >出现错误信息 > > > > > > > > >org.apache.flink.util.FlinkException: Triggering a savepoint for the job > > >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed. > > > > > > at > > > > > >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) > > > > > > at > > > > > >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) > > > > > > at > > > > > >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) > > > > > > at > > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) > > > > > > at > > > > > >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) > > > > > > at > > > > > >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) > > > > > > at java.security.AccessController.doPrivileged(Native Method) > > > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > > > at > > > > > >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > > > > > > at > > > > > >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > > > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) > > > > > >Caused by: java.util.concurrent.TimeoutException > > > > > > at > > > > > >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > > > > > > at > > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > > > > > > at > > > > > >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625) > > >
Re: flink savepoint
重启和反压都正常 另外增加了从客户端到master的时间,还是有这个问题 hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道: > Hi, > > > 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时, > 具体的原因需要看下 Jobmaster 的日志。 > PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。 > > > Best, > Hailong Wang > > > > > 在 2020-11-06 09:33:48,"张锴" 写道: > >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。 > > > >flink 版本1.10.1 > > > > > >执行 flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47 > >hdfs://hadoopnamenodeHA/flink/flink-savepoints > > > > > >出现错误信息 > > > > > >org.apache.flink.util.FlinkException: Triggering a savepoint for the job > >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed. > > > > at > > >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) > > > > at > > >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) > > > > at > > >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) > > > > at > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) > > > > at > > >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) > > > > at > > >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) > > > > at java.security.AccessController.doPrivileged(Native Method) > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > at > > >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > > > > at > > >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) > > > >Caused by: java.util.concurrent.TimeoutException > > > > at > > >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > > > > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > > > > at > > >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625) >
flink savepoint
本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。 flink 版本1.10.1 执行 flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47 hdfs://hadoopnamenodeHA/flink/flink-savepoints 出现错误信息 org.apache.flink.util.FlinkException: Triggering a savepoint for the job a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed. at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631) at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841) at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) Caused by: java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
Re:回复: Re: flink savepoint和checkpoint相关事项
我理解是一样的,关于两者的不同点在这里[1]有介绍。 恢复方法是启动任务时 -s 指定从哪个路径恢复,例如 -s file:///tmp/test/db262ffab6b00db9820c54f25a3f956f/chk-61 [1] https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint 在 2020-10-10 08:43:38,"zjfpla...@hotmail.com" 写道: >请问savepoint也是一样吗? > > > >zjfpla...@hotmail.com > >发件人: Yun Tang >发送时间: 2020-10-10 01:35 >收件人: user-zh >主题: Re: flink savepoint和checkpoint相关事项 >Hi > >在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored >state [2] 恢复 > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids >[2] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state > >祝好 >唐云 > >From: zjfpla...@hotmail.com >Sent: Friday, October 9, 2020 8:59 >To: user-zh >Subject: flink savepoint和checkpoint相关事项 > >Hi, >flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了 > > > >zjfpla...@hotmail.com
回复: Re: flink savepoint和checkpoint相关事项
请问savepoint也是一样吗? zjfpla...@hotmail.com 发件人: Yun Tang 发送时间: 2020-10-10 01:35 收件人: user-zh 主题: Re: flink savepoint和checkpoint相关事项 Hi 在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored state [2] 恢复 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state 祝好 唐云 From: zjfpla...@hotmail.com Sent: Friday, October 9, 2020 8:59 To: user-zh Subject: flink savepoint和checkpoint相关事项 Hi, flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了 zjfpla...@hotmail.com
Re: flink savepoint和checkpoint相关事项
Hi 在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored state [2] 恢复 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state 祝好 唐云 From: zjfpla...@hotmail.com Sent: Friday, October 9, 2020 8:59 To: user-zh Subject: flink savepoint和checkpoint相关事项 Hi, flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了 zjfpla...@hotmail.com
flink savepoint和checkpoint相关事项
Hi, flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了 zjfpla...@hotmail.com
??????Flink??SavePoint??????????????????????????
?? 1.??id 2.savepoint?? ??2020??7??6?? 20:32??wujunxi<462329...@qq.com> ?? 1.id 2.savepoint?? ---- ??:"milan183sansiro"
??????Flink??SavePoint??????????????????????????
1.id 2.savepoint?? ---- ??:"milan183sansiro"
Re: Re:Re: flink savepoint问题
Hi 首先,如果这个问题很容易复现的话,我们需要定位到是什么导致了OOMkilled。 1. 打开block-cache usage [1] 观察metrics中block cache的使用量。 2. 麻烦回答一下几个问题,有助于进一步定位 * 单个TM有几个slot * 单个TM的managed memory配置了多少 * 一共声明了多少个keyed state,(如果使用了window,也相当于会使用一个state),其中有多少个map state,是否经常遍历那个map state * 被kill的container内一共有几个rocksDB 实例,可以通过搜索日志 "Obtained shared RocksDB cache of size" 计数 * 是否对RocksDB单独配置了options factory或者相关options state.backend.rocksdb.memory.managed 这个参数的语义是RocksDB使用的内存从Flink来,一个slot内的若干RocksDB实例会共享一块share cache。如果将这个参数设置为false,那么就回退到1.9以前的场景,rocksDB的内存将完全不由Flink管理,在某种程度上来说,更容易被conatiner kill。 如果想要快速缓解这个问题,一种办法是增大 taskmanager.memory.task.off-heap.size [2],使得提供多一部分内存以供RocksDB超用。其他的缓解办法需要根据您对上面问题的回答来实施 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#state-backend-rocksdb-metrics-block-cache-usage [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-task-off-heap-size Best 唐云 From: xyq Sent: Monday, March 30, 2020 10:41 To: user-zh@flink.apache.org Subject: Re:Re: flink savepoint问题 Hi,您好: 我这边有个小流 left join大流的需求,小流的数据夜间基本没有 可能会4-5个小时没数据,目前的情况是一到晚上container老是被kill掉,报的是内存溢出。我想问下,我想把托管内存这设置成false,会有什么弊端吗?或者该问题怎么解决?困扰了好久了,请您指点一谢谢。 state.backend.rocksdb.memory.managed : false 在 2020-03-28 11:04:09,"Congxian Qiu" 写道: >Hi > >对于问题 1 在反压的情况下,可能导致 Savepoint 做不成功从而超时,这个暂时没法解决,现在有一个 issue[1] 在做 Unalign >Checkpoint 可以解决反压情况下的 checkpoint >对于问题 3,checkpoint 超时了,超时的定义:在设置的时间内(比如你这里 5 分钟),有 task 没有完成 >snapshot。调长超时时间能够一定的缓解这个问题,不过你最好找到超时的原因,然后针对性的优化。 >[1] https://issues.apache.org/jira/browse/FLINK-14551 >Best, >Congxian > > >大数据开发面试_夏永权 于2020年3月27日周五 下午4:19写道: > >> Hi,您好,在使用flink的过程中遇到如下问题,个人未能解决,所以请求您指导一下,谢谢! >> >> 1. flink cancel -s $SAVEPOINT_DIR $job_id -yid $application_id >> 在程序有背压的时候停不掉 >> >> >> The program finished with the following exception: >> org.apache.flink.util.FlinkException: Could not cancel job >> 1f768e4ca9ad5792a4844a5d12163b73. >> at >> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:523) >> at >> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) >> at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:515) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) >> at >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >> Caused by: java.util.concurrent.TimeoutException >> at >> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) >> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:521) >> ... 9 more >> stop flink job failed!!! >> >> >> >> >> 2.再用flink >> sql的ddl时候增加一个字段后,程序启动失败,需要删除savepoint才能启动(ddl后双流join的逻辑,其中一个流加了一个字段) >> >> >> The program finished with the following exception: >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error: >> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not >> complete the operation. Number of retries has been exhausted. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >> at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) >> at >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContex
Re:Re: flink savepoint问题
Hi,您好: 我这边有个小流 left join大流的需求,小流的数据夜间基本没有 可能会4-5个小时没数据,目前的情况是一到晚上container老是被kill掉,报的是内存溢出。我想问下,我想把托管内存这设置成false,会有什么弊端吗?或者该问题怎么解决?困扰了好久了,请您指点一谢谢。 state.backend.rocksdb.memory.managed : false 在 2020-03-28 11:04:09,"Congxian Qiu" 写道: >Hi > >对于问题 1 在反压的情况下,可能导致 Savepoint 做不成功从而超时,这个暂时没法解决,现在有一个 issue[1] 在做 Unalign >Checkpoint 可以解决反压情况下的 checkpoint >对于问题 3,checkpoint 超时了,超时的定义:在设置的时间内(比如你这里 5 分钟),有 task 没有完成 >snapshot。调长超时时间能够一定的缓解这个问题,不过你最好找到超时的原因,然后针对性的优化。 >[1] https://issues.apache.org/jira/browse/FLINK-14551 >Best, >Congxian > > >大数据开发面试_夏永权 于2020年3月27日周五 下午4:19写道: > >> Hi,您好,在使用flink的过程中遇到如下问题,个人未能解决,所以请求您指导一下,谢谢! >> >> 1. flink cancel -s $SAVEPOINT_DIR $job_id -yid $application_id >> 在程序有背压的时候停不掉 >> >> >> The program finished with the following exception: >> org.apache.flink.util.FlinkException: Could not cancel job >> 1f768e4ca9ad5792a4844a5d12163b73. >> at >> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:523) >> at >> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) >> at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:515) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) >> at >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >> Caused by: java.util.concurrent.TimeoutException >> at >> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) >> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:521) >> ... 9 more >> stop flink job failed!!! >> >> >> >> >> 2.再用flink >> sql的ddl时候增加一个字段后,程序启动失败,需要删除savepoint才能启动(ddl后双流join的逻辑,其中一个流加了一个字段) >> >> >> The program finished with the following exception: >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error: >> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not >> complete the operation. Number of retries has been exhausted. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >> at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) >> at >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >> Caused by: java.util.concurrent.ExecutionException: >> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not >> complete the operation. Number of retries has been exhausted. >> at >> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) >> at >> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) >> at >> com.etl.DwdBaseCarlifeTIotBindGeoManage.main(DwdBaseCarlifeTIotBindGeoManage.java:265) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >> ... 11 more >> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: >> Could not complete the operation. Number of retries has been exhausted. >> at >> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284) >> at >> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >> at >>
Re:Re: flink savepoint问题
非常感谢 在 2020-03-28 11:04:09,"Congxian Qiu" 写道: >Hi > >对于问题 1 在反压的情况下,可能导致 Savepoint 做不成功从而超时,这个暂时没法解决,现在有一个 issue[1] 在做 Unalign >Checkpoint 可以解决反压情况下的 checkpoint >对于问题 3,checkpoint 超时了,超时的定义:在设置的时间内(比如你这里 5 分钟),有 task 没有完成 >snapshot。调长超时时间能够一定的缓解这个问题,不过你最好找到超时的原因,然后针对性的优化。 >[1] https://issues.apache.org/jira/browse/FLINK-14551 >Best, >Congxian > > >大数据开发面试_夏永权 于2020年3月27日周五 下午4:19写道: > >> Hi,您好,在使用flink的过程中遇到如下问题,个人未能解决,所以请求您指导一下,谢谢! >> >> 1. flink cancel -s $SAVEPOINT_DIR $job_id -yid $application_id >> 在程序有背压的时候停不掉 >> >> >> The program finished with the following exception: >> org.apache.flink.util.FlinkException: Could not cancel job >> 1f768e4ca9ad5792a4844a5d12163b73. >> at >> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:523) >> at >> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) >> at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:515) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) >> at >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >> Caused by: java.util.concurrent.TimeoutException >> at >> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) >> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:521) >> ... 9 more >> stop flink job failed!!! >> >> >> >> >> 2.再用flink >> sql的ddl时候增加一个字段后,程序启动失败,需要删除savepoint才能启动(ddl后双流join的逻辑,其中一个流加了一个字段) >> >> >> The program finished with the following exception: >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error: >> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not >> complete the operation. Number of retries has been exhausted. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) >> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) >> at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) >> at >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) >> Caused by: java.util.concurrent.ExecutionException: >> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not >> complete the operation. Number of retries has been exhausted. >> at >> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) >> at >> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) >> at >> com.etl.DwdBaseCarlifeTIotBindGeoManage.main(DwdBaseCarlifeTIotBindGeoManage.java:265) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) >> ... 11 more >> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: >> Could not complete the operation. Number of retries has been exhausted. >> at >> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284) >> at >> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >> at >> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >> at >> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >> at >>
Re: flink savepoint问题
Hi 对于问题 1 在反压的情况下,可能导致 Savepoint 做不成功从而超时,这个暂时没法解决,现在有一个 issue[1] 在做 Unalign Checkpoint 可以解决反压情况下的 checkpoint 对于问题 3,checkpoint 超时了,超时的定义:在设置的时间内(比如你这里 5 分钟),有 task 没有完成 snapshot。调长超时时间能够一定的缓解这个问题,不过你最好找到超时的原因,然后针对性的优化。 [1] https://issues.apache.org/jira/browse/FLINK-14551 Best, Congxian 大数据开发面试_夏永权 于2020年3月27日周五 下午4:19写道: > Hi,您好,在使用flink的过程中遇到如下问题,个人未能解决,所以请求您指导一下,谢谢! > > 1. flink cancel -s $SAVEPOINT_DIR $job_id -yid $application_id > 在程序有背压的时候停不掉 > > > The program finished with the following exception: > org.apache.flink.util.FlinkException: Could not cancel job > 1f768e4ca9ad5792a4844a5d12163b73. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:523) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:515) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.TimeoutException > at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:521) > ... 9 more > stop flink job failed!!! > > > > > 2.再用flink > sql的ddl时候增加一个字段后,程序启动失败,需要删除savepoint才能启动(ddl后双流join的逻辑,其中一个流加了一个字段) > > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not > complete the operation. Number of retries has been exhausted. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not > complete the operation. Number of retries has been exhausted. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) > at > com.etl.DwdBaseCarlifeTIotBindGeoManage.main(DwdBaseCarlifeTIotBindGeoManage.java:265) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > ... 11 more > Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: > Could not complete the operation. Number of retries has been exhausted. > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:342) > at >
flink savepoint问题
Hi,您好,在使用flink的过程中遇到如下问题,个人未能解决,所以请求您指导一下,谢谢! 1. flink cancel -s $SAVEPOINT_DIR $job_id -yid $application_id 在程序有背压的时候停不掉 The program finished with the following exception: org.apache.flink.util.FlinkException: Could not cancel job 1f768e4ca9ad5792a4844a5d12163b73. at org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:523) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:515) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:521) ... 9 more stop flink job failed!!! 2.再用flink sql的ddl时候增加一个字段后,程序启动失败,需要删除savepoint才能启动(ddl后双流join的逻辑,其中一个流加了一个字段) The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) at com.etl.DwdBaseCarlifeTIotBindGeoManage.main(DwdBaseCarlifeTIotBindGeoManage.java:265) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) ... 11 more Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted. at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:342) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:493) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:472) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413) at
Re: Re: flink savepoint checkpoint
Hello ,针对于你的问题 我发现一件有趣的事情 在我以 Yarn per-Job 方式 启动Job程序后 在yarn 的资源管理界面 可以看到我启动的任务 -> 它有属于自己的application-Id 然后当我 通过Yarn 的Tracking Ui 下的 Application <http://node01:8088/proxy/application_1577499691717_0064/>Master 点击进入到Job的Web Ui 界面后(flink的web ui)通过在此界面点击canal 这个按钮 kill 掉程序后 在Yarn 的 管理界面 发现还是有个空壳子的。 当我通过在终端输入 yarn application -kill Id 后 这个程序才会被杀死。 所以我初步认为 他是Stop 程序。 祝好! Px amen...@163.com 于2020年1月10日周五 下午5:59写道: > hi,了解到使用stop进行任务停止并触发savepoint,会在停止之前生成max_watermark,并注册event-time计时器,我想请问使用yarn > kill方式直接停止任务,会属于cancel还是stop亦或是其他? > > > > amen...@163.com > > From: Congxian Qiu > Date: 2020-01-10 17:16 > To: user-zh > Subject: Re: flink savepoint checkpoint > Hi > 从 Flink 的角度看,Checkpoint 用户 Job 运行过程中发生 failover 进行恢复,savepoint 用于 Job > 之间的状态复用。 > 另外,从 1.9 开始,可以尝试下 StopWithSavepoint[1],以及社区另外一个 issue 尝试做 > StopWithCheckpoint[2] > > [1] https://issues.apache.org/jira/browse/FLINK-11458 > [2] https://issues.apache.org/jira/browse/FLINK-12619 > Best, > Congxian > > > zhisheng 于2020年1月10日周五 上午11:39写道: > > > hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint > > 的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。 > > > > 祝好! > > zhisheng > > > > Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道: > > > > > Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点 > > > --> > > > > > > > > > > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > > > > > > > > lucas.wu 于2019年12月11日周三 上午11:56写道: > > > > > > > hi 各位: > > > > > > > > > > > > > > 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。 > > > > > >
Re: Re: flink savepoint checkpoint
其实这个主要还是要看你checkpoint的时间间隔,就像我们看视频倒退一样,它们是两个不同的后退时间点,savepoint能在当下生成checkpoint数据,但是自动的checkpoint可能还要在更早的时间点上生成checkpoint数据(因为在cancel job的时候可能还不到自动checkpoint时间)。两种都可以,只是是一前一后,这也决定了你任务恢复的快慢。线上需要经常修改的job savepoint很实用。 个人觉得任务失败,不管是哪种方式失败(除非是savepoint),肯定是回到上一个自动checkpoint的点上,不会是在savepoint。 原始邮件 发件人: amen...@163.com 收件人: user-zh 发送时间: 2020年1月10日(周五) 17:58 主题: Re: Re: flink savepoint checkpoint hi,了解到使用stop进行任务停止并触发savepoint,会在停止之前生成max_watermark,并注册event-time计时器,我想请问使用yarn kill方式直接停止任务,会属于cancel还是stop亦或是其他? amen...@163.com From: Congxian Qiu Date: 2020-01-10 17:16 To: user-zh Subject: Re: flink savepoint checkpoint Hi 从 Flink 的角度看,Checkpoint 用户 Job 运行过程中发生 failover 进行恢复,savepoint 用于 Job 之间的状态复用。 另外,从 1.9 开始,可以尝试下 StopWithSavepoint[1],以及社区另外一个 issue 尝试做 StopWithCheckpoint[2] [1] https://issues.apache.org/jira/browse/FLINK-11458 [2] https://issues.apache.org/jira/browse/FLINK-12619 Best, Congxian zhisheng 于2020年1月10日周五 上午11:39写道: > hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint > 的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。 > > 祝好! > zhisheng > > Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道: > > > Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点 > > --> > > > > > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > > > > > lucas.wu 于2019年12月11日周三 上午11:56写道: > > > > > hi 各位: > > > > > > > > > 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。 > > >
Re: Re: flink savepoint checkpoint
hi,了解到使用stop进行任务停止并触发savepoint,会在停止之前生成max_watermark,并注册event-time计时器,我想请问使用yarn kill方式直接停止任务,会属于cancel还是stop亦或是其他? amen...@163.com From: Congxian Qiu Date: 2020-01-10 17:16 To: user-zh Subject: Re: flink savepoint checkpoint Hi 从 Flink 的角度看,Checkpoint 用户 Job 运行过程中发生 failover 进行恢复,savepoint 用于 Job 之间的状态复用。 另外,从 1.9 开始,可以尝试下 StopWithSavepoint[1],以及社区另外一个 issue 尝试做 StopWithCheckpoint[2] [1] https://issues.apache.org/jira/browse/FLINK-11458 [2] https://issues.apache.org/jira/browse/FLINK-12619 Best, Congxian zhisheng 于2020年1月10日周五 上午11:39写道: > hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint > 的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。 > > 祝好! > zhisheng > > Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道: > > > Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点 > > --> > > > > > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > > > > > lucas.wu 于2019年12月11日周三 上午11:56写道: > > > > > hi 各位: > > > > > > > > > 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。 > > >
Re: flink savepoint checkpoint
Hi 从 Flink 的角度看,Checkpoint 用户 Job 运行过程中发生 failover 进行恢复,savepoint 用于 Job 之间的状态复用。 另外,从 1.9 开始,可以尝试下 StopWithSavepoint[1],以及社区另外一个 issue 尝试做 StopWithCheckpoint[2] [1] https://issues.apache.org/jira/browse/FLINK-11458 [2] https://issues.apache.org/jira/browse/FLINK-12619 Best, Congxian zhisheng 于2020年1月10日周五 上午11:39写道: > hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint > 的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。 > > 祝好! > zhisheng > > Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道: > > > Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点 > > --> > > > > > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > > > > > lucas.wu 于2019年12月11日周三 上午11:56写道: > > > > > hi 各位: > > > > > > > > > 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。 > > >
Re: flink savepoint checkpoint
hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint 的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。 祝好! zhisheng Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道: > Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点 > --> > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > > lucas.wu 于2019年12月11日周三 上午11:56写道: > > > hi 各位: > > > > > 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。 >
Re: flink savepoint checkpoint
Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点 --> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); lucas.wu 于2019年12月11日周三 上午11:56写道: > hi 各位: > > 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
Re: flink savepoint checkpoint
Flink 也支持从 retained checkpoint 进行恢复,可以参考文档[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint Best, Congxian 陈帅 于2019年12月11日周三 下午9:34写道: > flink 1.9里面支持cancel job with savepoint功能 > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint > checkpoint可能是增量的,但savepoint是全量的。具体区别可以参考 > > https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink > > > lucas.wu 于2019年12月11日周三 上午11:56写道: > > > hi 各位: > > > > > 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。 >
Re: flink savepoint checkpoint
flink 1.9里面支持cancel job with savepoint功能 https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint checkpoint可能是增量的,但savepoint是全量的。具体区别可以参考 https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink lucas.wu 于2019年12月11日周三 上午11:56写道: > hi 各位: > > 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
Re: flink savepoint checkpoint
Hi Checkpoint 是自动的,你可以配置retain checkpoint[1] 然后从checkpoint 恢复[2],可以不需要一定触发Savepoint。 [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint 祝好 唐云 On 12/11/19, 11:56 AM, "lucas.wu" wrote: hi 各位: 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
Re:bin/flink savepoint jobId 出错
解决了 At 2019-12-04 09:54:17, "guoshuai" wrote: >是一个per模式运行的测试job,检查下 JobManager是正常的 >基于flink1.4.0, flink集群开启了kerberos >flink-conf.yaml中已经配置过 targetDirectory目录 > > > The program finished with the following exception: > >org.apache.flink.util.FlinkException: Could not connect to the leading >JobManager. Please check that the JobManager is running. >at > org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:861) >at > org.apache.flink.client.program.ClusterClient.listJobs(ClusterClient.java:699) >at org.apache.flink.client.CliFrontend.list(CliFrontend.java:439) >at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1082) >at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1127) >at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1124) >at java.security.AccessController.doPrivileged(Native Method) >at javax.security.auth.Subject.doAs(Subject.java:422) >at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1781) >at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1124) >Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: >Could not retrieve the leader gateway. >at > org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:79) >at > org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:856) >... 10 more >Caused by: java.util.concurrent.TimeoutException: Futures timed out after >[1 milliseconds] >at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223) >at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) >at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) >at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) >at scala.concurrent.Await$.result(package.scala:190) >at scala.concurrent.Await.result(package.scala) >at > org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:77) >... 11 more >
bin/flink savepoint jobId 出错
是一个per模式运行的测试job,检查下 JobManager是正常的 基于flink1.4.0, flink集群开启了kerberos flink-conf.yaml中已经配置过 targetDirectory目录 The program finished with the following exception: org.apache.flink.util.FlinkException: Could not connect to the leading JobManager. Please check that the JobManager is running. at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:861) at org.apache.flink.client.program.ClusterClient.listJobs(ClusterClient.java:699) at org.apache.flink.client.CliFrontend.list(CliFrontend.java:439) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1082) at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1127) at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1124) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1781) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1124) Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway. at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:79) at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:856) ... 10 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at scala.concurrent.Await.result(package.scala) at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:77) ... 11 more
Re: Re: Command: Flink savepoint -d reported an error。
缺少了jobId 参数; ./bin/flink savepoint [savepointDirectory] This will trigger a savepoint for the job with ID jobId, and returns the path of the created savepoint. You need this path to restore and dispose savepoints. | | diyukun2019 | | diyukun2...@163.com | 签名由网易邮箱大师定制 On 9/30/2019 14:56,pengchengl...@163.com wrote: 谢谢你的回复,我试了下,还是说缺少必填参数。 如下: $ bin/flink savepoint -d file://home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/ The program finished with the following exception: java.lang.NullPointerException: Missing required argument: savepoint path. Usage: bin/flink savepoint -d at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.client.cli.CliFrontend.disposeSavepoint(CliFrontend.java:721) at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$8(CliFrontend.java:657) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:945) at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:654) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) pengchengl...@163.com 发件人: 星沉 发送时间: 2019-09-30 14:47 收件人: user-zh 主题: Re: Command: Flink savepoint -d reported an error。 savepoint路径参数不对。file后面多了个一个/吧。 pengchengl...@163.com wrote: $ bin/flink savepoint -dfile:///home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/ The program finished with the following exception: java.lang.NullPointerException: Missing required argument: savepoint path. Usage: bin/flink savepoint -d
Re: Command: Flink savepoint -d reported an error。
savepoint路径参数不对。file后面多了个一个/吧。 pengchengl...@163.com wrote: $ bin/flink savepoint -dfile:///home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/ The program finished with the following exception: java.lang.NullPointerException: Missing required argument: savepoint path. Usage: bin/flink savepoint -d
Re: Flink Savepoint 超时
找了一圈 没有看到其他的错误.就只有上面我贴出来的异常了.. 因为这个是CLI执行时报的错... On Fri, Sep 6, 2019 at 4:51 PM Wesley Peng wrote: > > > SJMSTER wrote: > > Checkpoints一直都是成功的。 > > 今天重新尝试了一下cancle job with savepoint又成功了.. > > 不知道之前为什么试了几次都是超时的.. > > are there any log items for diagnosis? > > regards. >
Re: Flink Savepoint 超时
SJMSTER wrote: Checkpoints一直都是成功的。 今天重新尝试了一下cancle job with savepoint又成功了.. 不知道之前为什么试了几次都是超时的.. are there any log items for diagnosis? regards.
Flink Savepoint 超时
请问下有谁遇到过在CLI手动触发Flink的Savepoint的时候遇到超时的异常吗? 或者尝试把Job Cancel With Savepoint也是一样的超时错误. Savepoint是已经配置了存到HDFS上的, Flink本身Run在Yarn上. 在官网看到一个参数“akka.client.timeout”不知道是不是针对这个的, 但是这个参数生效是要配置在flink-conf.yml里的, 也没办法CLI传递进去. 这样Job没法Cancel, Flink Cluster也就没法重启,死循环了. 感谢! Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set. > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/opt/flink-1.6.0-hdp/lib/phoenix-4.7.0.2.6.3.0-235-client.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/opt/flink-1.6.0-hdp/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > 2019-09-05 10:45:41,807 INFO > org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn > properties file under /tmp/.yarn-properties-hive. > 2019-09-05 10:45:41,807 INFO > org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn > properties file under /tmp/.yarn-properties-hive. > 2019-09-05 10:45:42,056 INFO > org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN > properties set default parallelism to 1 > 2019-09-05 10:45:42,056 INFO > org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN > properties set default parallelism to 1 > YARN properties set default parallelism to 1 > 2019-09-05 10:45:42,269 INFO org.apache.hadoop.yarn.client.AHSProxy > - Connecting to Application History server at > ac13ghdpt2m01.lab-rot.saas.sap.corp/10.116.201.103:10200 > 2019-09-05 10:45:42,276 INFO > org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path > for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2019-09-05 10:45:42,276 INFO > org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path > for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2019-09-05 10:45:42,282 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither > the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The > Flink YARN Client needs one of these to be set to properly load the Hadoop > configuration for accessing YARN. > 2019-09-05 10:45:42,284 INFO > org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider - > Looking for the active RM in [rm1, rm2]... > 2019-09-05 10:45:42,341 INFO > org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider - > Found active RM [rm1] > 2019-09-05 10:45:42,345 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found > application JobManager host name 'ac13ghdpt2dn01.lab-rot.saas.sap.corp' and > port '40192' from supplied application id 'application_1559153472177_52202' > 2019-09-05 10:45:42,689 WARN > org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The > short-circuit local reads feature cannot be used because libhadoop cannot > be loaded. > Triggering savepoint for job 6399ec2e8fdf4cb7d8481890019554f6. > Waiting for response... > > The program finished with the following exception: > org.apache.flink.util.FlinkException: Triggering a savepoint for the job > 6399ec2e8fdf4cb7d8481890019554f6 failed. > at > org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714) > at > org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979) > at > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120) > Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: > Could not complete the operation. Exception is not retryable. > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at >
flink????????savepoint
??flink-ssavepointsavepoint??