Re: Flink savepoint迁移问题

2021-03-12 文章 赵 建云
确认了,pulsar的MessageId的实现类内部的增加了字段,导致flink在反序列化时失败了。具体的issue:https://github.com/streamnative/pulsar-flink/issues/256。
我会给flink 1.9的pulsar连接器升级下checkpoint,让MessageId的序列化使用基于 
`MessageId.toByteArray`的序列化器。
非常感谢您的帮助~。

Jianyun8023
2021-03-12

2021年3月11日 下午10:43,Kezhu Wang mailto:kez...@gmail.com>> 写道:

> 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。

确实是这样的,checkpoint 把 serializer 也 snapshot 了。

重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar 
的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用 
`MessageId.toByteArray`。



On March 11, 2021 at 20:26:15, 赵 建云 
(zhaojianyu...@outlook.com<mailto:zhaojianyu...@outlook.com>) wrote:

你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 
这个错误似乎发生在source的initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

 new ListStateDescriptor<>(
 OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
 })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 
state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 
(zhaojianyu...@outlook.com<mailto:zhaojianyu...@outlook.com>) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any 
of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: 
java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F=04%7C01%7C%7Ccc0a40e3ae954472c75b08d8e49c12

回复: Flink savepoint迁移问题

2021-03-11 文章 allanqinjy
建云,
之前我也遇到了savepoint 起作业失败的问题,是我们升级pulsar客户端以后,从2.2升级到2.5.2,我-s 
启动作业的时候。因为作业也不是很重要,当时手头有其他任务,我就没有关注这个问题。你看看pulsar source那儿是不是做了什么。


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年03月11日 22:43,Kezhu Wang 写道:
有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。

确实是这样的,checkpoint 把 serializer 也 snapshot 了。

重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar
的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用
`MessageId.toByteArray`。


On March 11, 2021 at 20:26:15, 赵 建云 (zhaojianyu...@outlook.com) wrote:

你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的
initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang  写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
})));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator
的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more
Caused by: java.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D=0>
<http://java.io
<https://apac01.safelinks.prot

Re: Flink savepoint迁移问题

2021-03-11 文章 Kezhu Wang
> 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。

确实是这样的,checkpoint 把 serializer 也 snapshot 了。

重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar
的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用
`MessageId.toByteArray`。


On March 11, 2021 at 20:26:15, 赵 建云 (zhaojianyu...@outlook.com) wrote:

你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的
initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang  写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

 new ListStateDescriptor<>(
 OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
 })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator
的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more
Caused by: java.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D=0>
<http://java.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9

Re: Flink savepoint迁移问题

2021-03-11 文章 赵 建云
你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 
这个错误似乎发生在source的initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

 new ListStateDescriptor<>(
 OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
 })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 
state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 
(zhaojianyu...@outlook.com<mailto:zhaojianyu...@outlook.com>) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any 
of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: 
java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D=0><http://java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734217175%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=VU%2Bgy0%2B6u%2BVe0Qj1maU3nvijfSH9hBdQApSnfFjq9G8%3D=0>>.EOFException:
 No more bytes left.
at 
org.apache.flin

Re: Flink savepoint迁移问题

2021-03-10 文章 赵 建云
现在是我在维护pulsar-flink 
connector,是存在不兼容的升级。还是个很坑的改动。我现在尝试旧的迁移新的字段上方法,会报这个错误。我对1.11支持的代码进行修改,将state的数据结构改成旧版本的形式,同样也是这个错误。你说的StatefulSinkWriterOperator我研究下怎么使用。

2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道:

StatefulSinkWriterOperator



Re: Flink savepoint迁移问题

2021-03-10 文章 Kezhu Wang
新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

 new ListStateDescriptor<>(
 OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
 })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator
的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more
Caused by: java.io<http://java.io>.EOFException: No more bytes left.
at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)

at 
com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readVarInt(Input.java:355)

at 
com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readInt(Input.java:350)

at
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)

at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)

at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)

at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deser

Flink savepoint迁移问题

2021-03-10 文章 赵 建云
社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
        })));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any 
of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.io<http://java.io>.EOFException: No more bytes left.
at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)
at 
com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readVarInt(Input.java:355)
at 
com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readInt(Input.java:350)
at 
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)
at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
... 15 more

请问题大佬们可以提供排查问题的办法或者解决方案吗?


Jianyun8023
2021-3-11


Re: flink-savepoint问题

2021-03-03 文章 Congxian Qiu
对于 keyed state,需要保证同一个 key 在 同一个 keygroup 中,如果是某个 key 有热点,可以在 keyby 之前进行一次
map(在 key 后面拼接一些 后缀),然后 keyby,最后处理完成之后,将这些进行聚合
Best,
Congxian


guomuhua <663021...@qq.com> 于2021年3月4日周四 下午12:49写道:

> 我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢?
> nobleyd wrote
> > 是不是使用了随机key。
>
> > guaishushu1103@
>
> >  
>
> > guaishushu1103@
>
> >  于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:>
> > java.lang.Exception: Could not materialize checkpoint 2404 for operator>
> > KeyedProcess (21/48).> at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)>
>
> > at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)>
>
> > at>
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)>
>
> > at>
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)>
>
> > at java.lang.Thread.run(Thread.java:745)> Caused by:
> > java.util.concurrent.ExecutionException:>
> > java.lang.IllegalArgumentException: Key group 0 is not in>
> > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at
> > java.util.concurrent.FutureTask.report(FutureTask.java:122)> at
> > java.util.concurrent.FutureTask.get(FutureTask.java:192)> at>
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)>
>
> > at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.
>
> > (OperatorSnapshotFinalizer.java:47)> at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)>
>
> > ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is
> > not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at>
> >
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)>
>
> > at>
> >
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)>
>
> > at>
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)>
>
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at>
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)>
>
> > ... 5 more>>>
>
> > guaishushu1103@
>
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-savepoint问题

2021-03-03 文章 guomuhua
我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢?
nobleyd wrote
> 是不是使用了随机key。

> guaishushu1103@

>  

> guaishushu1103@

>  于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:>
> java.lang.Exception: Could not materialize checkpoint 2404 for operator>
> KeyedProcess (21/48).> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)>
> 
> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)>
> 
> at>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)>
> 
> at>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)>
> 
> at java.lang.Thread.run(Thread.java:745)> Caused by:
> java.util.concurrent.ExecutionException:>
> java.lang.IllegalArgumentException: Key group 0 is not in>
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at
> java.util.concurrent.FutureTask.report(FutureTask.java:122)> at
> java.util.concurrent.FutureTask.get(FutureTask.java:192)> at>
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)>
> 
> at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.

> (OperatorSnapshotFinalizer.java:47)> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)>
> 
> ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is
> not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at>
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)>
> 
> at>
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)>
> 
> at>
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)>
> 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at>
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)>
> 
> ... 5 more>>> 

> guaishushu1103@

>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-savepoint问题

2021-03-03 文章 guomuhua
我也遇到同样问题,为了打散数据,在keyby时加了随机数作为后缀,去掉随机数,可以正常savepoint,加上随机数就savepoint失败。所以如果确有要打散数据的需求,应该怎么处理呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-savepoint问题

2021-03-03 文章 yidan zhao
是不是使用了随机key。

guaishushu1...@163.com  于2021年3月3日周三 下午6:53写道:

> checkpoint 可以成功保存,但是savepoint出现错误:
> java.lang.Exception: Could not materialize checkpoint 2404 for operator
> KeyedProcess (21/48).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
> ... 3 more
> Caused by: java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
> ... 5 more
>
>
> guaishushu1...@163.com
>


flink-savepoint问题

2021-03-03 文章 guaishushu1...@163.com
checkpoint 可以成功保存,但是savepoint出现错误:
java.lang.Exception: Could not materialize checkpoint 2404 for operator 
KeyedProcess (21/48).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: 
java.lang.IllegalArgumentException: Key group 0 is not in 
KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
... 3 more
Caused by: java.lang.IllegalArgumentException: Key group 0 is not in 
KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
at 
org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at 
org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
... 5 more


guaishushu1...@163.com


Re: 请教个Flink savepoint的问题

2021-01-11 文章 占英华
好的,感谢您的帮助!

> 在 2021年1月11日,20:23,Yun Tang  写道:
> 
> Hi,
> 
> 没有暴露现成的API,但是可以参照测试用例的写法,给jobGraph设置 savepointRestoreSettings [1]。
> 
> [1] 
> https://github.com/apache/flink/blob/ab87cf4ec0260c30194f3d8e75be1f43318aa32a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java#L383
> 
> 祝好
> 唐云
> 
> From: yinghua...@163.com 
> Sent: Monday, January 11, 2021 19:07
> To: user-zh 
> Subject: 请教个Flink savepoint的问题
> 
> Flink APi 中我们在java代码中使用客户端调用类ClusterClient如下的方法停止了任务:
> CompletableFuture stopWithSavepoint(JobID var1, boolean var2, 
> @Nullable String 
> var3);然后我们保存了该任务的savepoint信息,那恢复时没看到对应的恢复方法,找了些资料都是通过命令来恢复,有没有直接调用Java API 
> 层次的方法来恢复任务?
> 
> 
> 
> yinghua...@163.com




Re: 请教个Flink savepoint的问题

2021-01-11 文章 Yun Tang
Hi,

没有暴露现成的API,但是可以参照测试用例的写法,给jobGraph设置 savepointRestoreSettings [1]。

[1] 
https://github.com/apache/flink/blob/ab87cf4ec0260c30194f3d8e75be1f43318aa32a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java#L383

祝好
唐云

From: yinghua...@163.com 
Sent: Monday, January 11, 2021 19:07
To: user-zh 
Subject: 请教个Flink savepoint的问题

Flink APi 中我们在java代码中使用客户端调用类ClusterClient如下的方法停止了任务:
CompletableFuture stopWithSavepoint(JobID var1, boolean var2, @Nullable 
String var3);然后我们保存了该任务的savepoint信息,那恢复时没看到对应的恢复方法,找了些资料都是通过命令来恢复,有没有直接调用Java 
API 层次的方法来恢复任务?



yinghua...@163.com


请教个Flink savepoint的问题

2021-01-11 文章 yinghua...@163.com
Flink APi 中我们在java代码中使用客户端调用类ClusterClient如下的方法停止了任务:
CompletableFuture stopWithSavepoint(JobID var1, boolean var2, @Nullable 
String var3);然后我们保存了该任务的savepoint信息,那恢复时没看到对应的恢复方法,找了些资料都是通过命令来恢复,有没有直接调用Java 
API 层次的方法来恢复任务?



yinghua...@163.com


Re: Command: Flink savepoint -d reported an error。

2020-12-27 文章 赢峰


这个有解决吗?我的也是报 Missing required argument: savepoint path. Usage: bin/flink 
savepoint -d 
| |
赢峰
|
|
si_ji_f...@163.com
|
签名由网易邮箱大师定制


On 09/30/2019 15:06,pengchengl...@163.com wrote:
你好,感谢回复,我的命令是参照官方网站,https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#trigger-a-savepoint
$ bin/flink savepoint -d :savepointPath
这个命令是删除savepoint,你说的是触发savepoint,是没有问题的。


pengchengl...@163.com

发件人: 狄玉坤
发送时间: 2019-09-30 14:59
收件人: user-zh@flink.apache.org
主题: Re: Re: Command: Flink savepoint -d reported an error。
缺少了jobId 参数;
./bin/flink savepoint  [savepointDirectory]
This will trigger a savepoint for the job with ID jobId, and returns the path 
of the created savepoint. You need this path to restore and dispose savepoints.
| |
diyukun2019
|
|
diyukun2...@163.com
|
签名由网易邮箱大师定制
On 9/30/2019 14:56,pengchengl...@163.com wrote:
谢谢你的回复,我试了下,还是说缺少必填参数。
如下:
$ bin/flink savepoint -d 
file://home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/


The program finished with the following exception:

java.lang.NullPointerException: Missing required argument: savepoint path. 
Usage: bin/flink savepoint -d 
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at 
org.apache.flink.client.cli.CliFrontend.disposeSavepoint(CliFrontend.java:721)
at 
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$8(CliFrontend.java:657)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:945)
at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:654)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)




pengchengl...@163.com

发件人: 星沉
发送时间: 2019-09-30 14:47
收件人: user-zh
主题: Re: Command: Flink savepoint -d reported an error。
savepoint路径参数不对。file后面多了个一个/吧。


pengchengl...@163.com wrote:
$ bin/flink savepoint 
-dfile:///home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/


The program finished with the following exception:

java.lang.NullPointerException: Missing required argument: savepoint path. 
Usage: bin/flink savepoint -d 


Re: flink savepoint 异常

2020-11-09 文章 Congxian Qiu
Hi
 异常信息中有 “Failed to trigger savepoint. Failure reason: An Exception
occurred while triggering the checkpoint.”  或许你可以看看 JM 的日志,找一下看看有没有什么详细日志
Best,
Congxian


张锴  于2020年11月7日周六 下午4:14写道:

> 本人用flink 1.10.1版本进行savepoint时遇到下列错误,暂时不清楚错误的原因,特来寻求帮助,麻烦大佬们看看
>
> 已经排除反压和重启的原因,checkpoint超时设置了十分钟,conf配置增加客户端连接master的时间,但还是出现异常。
>
> 命令
>
> flink savepoint -yid application_1604456903594_2381
> fb8131bcb78cbdf2bb9a705d8a4ceebc
> hdfs:///hadoopnamenodeHA/flink/flink-savepoints
>
> 异常
>
> The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> fb8131bcb78cbdf2bb9a705d8a4ceebc failed.
>  at
>
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
>  at
>
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
>  at
>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
>  at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
>  at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
>  at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>  at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger
> savepoint. Failure reason: An Exception occurred while triggering the
> checkpoint.
>  at
>
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:756)
>  at
>
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>  at
>
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  at
>
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>  at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>  at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>  at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>  at akka.japi.pf.UnitCaseState
>


Re: flink savepoint

2020-11-08 文章 张锴
看到了,通过JM看到是写的权限没有,改了之后就好了

Congxian Qiu  于2020年11月6日周五 下午1:31写道:

> Hi
>  从 client 端日志,或者 JM 日志还能看到其他的异常么?
> Best,
> Congxian
>
>
> 张锴  于2020年11月6日周五 上午11:42写道:
>
> > 重启和反压都正常
> > 另外增加了从客户端到master的时间,还是有这个问题
> >
> > hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
> >
> > > Hi,
> > >
> > >
> > > 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> > > 具体的原因需要看下 Jobmaster 的日志。
> > > PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
> > >
> > >
> > > Best,
> > > Hailong Wang
> > >
> > >
> > >
> > >
> > > 在 2020-11-06 09:33:48,"张锴"  写道:
> > > >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> > > >
> > > >flink 版本1.10.1
> > > >
> > > >
> > > >执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
> > > >hdfs://hadoopnamenodeHA/flink/flink-savepoints
> > > >
> > > >
> > > >出现错误信息
> > > >
> > > >
> > > >org.apache.flink.util.FlinkException: Triggering a savepoint for the
> job
> > > >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> > > >
> > > > at
> > > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> > > >
> > > > at java.security.AccessController.doPrivileged(Native Method)
> > > >
> > > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > > >
> > > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> > > >
> > > >Caused by: java.util.concurrent.TimeoutException
> > > >
> > > > at
> > >
> > >
> >
> >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> > > >
> > > > at
> > > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> > > >
> > > > at
> > >
> > >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
> > >
> >
>


Re: flink savepoint

2020-11-08 文章 张锴
已经指定了

admin <17626017...@163.com> 于2020年11月6日周五 下午3:17写道:

> Hi,
> 你的任务时跑在yarn上的吗?如果是 需要指定 -yid
>
> > 2020年11月6日 下午1:31,Congxian Qiu  写道:
> >
> > Hi
> > 从 client 端日志,或者 JM 日志还能看到其他的异常么?
> > Best,
> > Congxian
> >
> >
> > 张锴  于2020年11月6日周五 上午11:42写道:
> >
> >> 重启和反压都正常
> >> 另外增加了从客户端到master的时间,还是有这个问题
> >>
> >> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
> >>
> >>> Hi,
> >>>
> >>>
> >>> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> >>> 具体的原因需要看下 Jobmaster 的日志。
> >>> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
> >>>
> >>>
> >>> Best,
> >>> Hailong Wang
> >>>
> >>>
> >>>
> >>>
> >>> 在 2020-11-06 09:33:48,"张锴"  写道:
> >>>> 本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> >>>>
> >>>> flink 版本1.10.1
> >>>>
> >>>>
> >>>> 执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
> >>>> hdfs://hadoopnamenodeHA/flink/flink-savepoints
> >>>>
> >>>>
> >>>> 出现错误信息
> >>>>
> >>>>
> >>>> org.apache.flink.util.FlinkException: Triggering a savepoint for the
> job
> >>>> a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> >>>>
> >>>> at
> >>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> >>>>
> >>>> at java.security.AccessController.doPrivileged(Native Method)
> >>>>
> >>>> at javax.security.auth.Subject.doAs(Subject.java:422)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >>>>
> >>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> >>>>
> >>>> Caused by: java.util.concurrent.TimeoutException
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> >>>>
> >>>> at
> >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> >>>>
> >>>> at
> >>>
> >>>
> >>>
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
> >>>
> >>
>
>


flink savepoint 异常

2020-11-07 文章 张锴
本人用flink 1.10.1版本进行savepoint时遇到下列错误,暂时不清楚错误的原因,特来寻求帮助,麻烦大佬们看看

已经排除反压和重启的原因,checkpoint超时设置了十分钟,conf配置增加客户端连接master的时间,但还是出现异常。

命令

flink savepoint -yid application_1604456903594_2381
fb8131bcb78cbdf2bb9a705d8a4ceebc
hdfs:///hadoopnamenodeHA/flink/flink-savepoints

异常

The program finished with the following exception:

org.apache.flink.util.FlinkException: Triggering a savepoint for the job
fb8131bcb78cbdf2bb9a705d8a4ceebc failed.
 at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
 at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
 at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
 at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
 at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
 at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
 at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger
savepoint. Failure reason: An Exception occurred while triggering the
checkpoint.
 at
org.apache.flink.runtime.scheduler.SchedulerBase.lambda$triggerSavepoint$3(SchedulerBase.java:756)
 at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
 at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
 at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 at akka.japi.pf.UnitCaseState


Re: flink savepoint

2020-11-05 文章 admin
Hi,
你的任务时跑在yarn上的吗?如果是 需要指定 -yid

> 2020年11月6日 下午1:31,Congxian Qiu  写道:
> 
> Hi
> 从 client 端日志,或者 JM 日志还能看到其他的异常么?
> Best,
> Congxian
> 
> 
> 张锴  于2020年11月6日周五 上午11:42写道:
> 
>> 重启和反压都正常
>> 另外增加了从客户端到master的时间,还是有这个问题
>> 
>> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
>> 
>>> Hi,
>>> 
>>> 
>>> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
>>> 具体的原因需要看下 Jobmaster 的日志。
>>> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
>>> 
>>> 
>>> Best,
>>> Hailong Wang
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-11-06 09:33:48,"张锴"  写道:
>>>> 本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
>>>> 
>>>> flink 版本1.10.1
>>>> 
>>>> 
>>>> 执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
>>>> hdfs://hadoopnamenodeHA/flink/flink-savepoints
>>>> 
>>>> 
>>>> 出现错误信息
>>>> 
>>>> 
>>>> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
>>>> a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
>>>> 
>>>> at
>>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>>>> 
>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>> 
>>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>>> 
>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>>>> 
>>>> Caused by: java.util.concurrent.TimeoutException
>>>> 
>>>> at
>>> 
>>> 
>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>>> 
>>>> at
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>>> 
>>>> at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
>>> 
>> 



Re: flink savepoint

2020-11-05 文章 Congxian Qiu
Hi
 从 client 端日志,或者 JM 日志还能看到其他的异常么?
Best,
Congxian


张锴  于2020年11月6日周五 上午11:42写道:

> 重启和反压都正常
> 另外增加了从客户端到master的时间,还是有这个问题
>
> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
>
> > Hi,
> >
> >
> > 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> > 具体的原因需要看下 Jobmaster 的日志。
> > PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
> >
> >
> > Best,
> > Hailong Wang
> >
> >
> >
> >
> > 在 2020-11-06 09:33:48,"张锴"  写道:
> > >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> > >
> > >flink 版本1.10.1
> > >
> > >
> > >执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
> > >hdfs://hadoopnamenodeHA/flink/flink-savepoints
> > >
> > >
> > >出现错误信息
> > >
> > >
> > >org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> > >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> > >
> > > at
> > org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> > >
> > > at java.security.AccessController.doPrivileged(Native Method)
> > >
> > > at javax.security.auth.Subject.doAs(Subject.java:422)
> > >
> > > at
> >
> >
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> > >
> > > at
> >
> >
> >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > >
> > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> > >
> > >Caused by: java.util.concurrent.TimeoutException
> > >
> > > at
> >
> >
> >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> > >
> > > at
> > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> > >
> > > at
> >
> >
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
> >
>


Re: flink savepoint

2020-11-05 文章 张锴
重启和反压都正常
另外增加了从客户端到master的时间,还是有这个问题

hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:

> Hi,
>
>
> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
> 具体的原因需要看下 Jobmaster 的日志。
> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
>
>
> Best,
> Hailong Wang
>
>
>
>
> 在 2020-11-06 09:33:48,"张锴"  写道:
> >本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
> >
> >flink 版本1.10.1
> >
> >
> >执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
> >hdfs://hadoopnamenodeHA/flink/flink-savepoints
> >
> >
> >出现错误信息
> >
> >
> >org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> >a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
> >
> > at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
> >
> > at java.security.AccessController.doPrivileged(Native Method)
> >
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> >
> > at
>
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> >
> > at
>
> >org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
> >
> >Caused by: java.util.concurrent.TimeoutException
> >
> > at
>
> >java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> >
> > at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> >
> > at
>
> >org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
>


flink savepoint

2020-11-05 文章 张锴
本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。

flink 版本1.10.1


执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
hdfs://hadoopnamenodeHA/flink/flink-savepoints


出现错误信息


org.apache.flink.util.FlinkException: Triggering a savepoint for the job
a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.

 at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)

 at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)

 at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)

 at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)

 at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)

 at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)

 at java.security.AccessController.doPrivileged(Native Method)

 at javax.security.auth.Subject.doAs(Subject.java:422)

 at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)

 at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)

Caused by: java.util.concurrent.TimeoutException

 at
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)

 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)

 at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)


Re:回复: Re: flink savepoint和checkpoint相关事项

2020-10-09 文章 izual






我理解是一样的,关于两者的不同点在这里[1]有介绍。
恢复方法是启动任务时 -s 指定从哪个路径恢复,例如 -s
file:///tmp/test/db262ffab6b00db9820c54f25a3f956f/chk-61


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint

在 2020-10-10 08:43:38,"zjfpla...@hotmail.com"  写道:
>请问savepoint也是一样吗?
>
>
>
>zjfpla...@hotmail.com
> 
>发件人: Yun Tang
>发送时间: 2020-10-10 01:35
>收件人: user-zh
>主题: Re: flink savepoint和checkpoint相关事项
>Hi
> 
>在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored 
>state [2] 恢复
> 
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
>[2] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state
> 
>祝好
>唐云
>
>From: zjfpla...@hotmail.com 
>Sent: Friday, October 9, 2020 8:59
>To: user-zh 
>Subject: flink savepoint和checkpoint相关事项
> 
>Hi,
>flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了
> 
> 
> 
>zjfpla...@hotmail.com


回复: Re: flink savepoint和checkpoint相关事项

2020-10-09 文章 zjfpla...@hotmail.com
请问savepoint也是一样吗?



zjfpla...@hotmail.com
 
发件人: Yun Tang
发送时间: 2020-10-10 01:35
收件人: user-zh
主题: Re: flink savepoint和checkpoint相关事项
Hi
 
在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored 
state [2] 恢复
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state
 
祝好
唐云

From: zjfpla...@hotmail.com 
Sent: Friday, October 9, 2020 8:59
To: user-zh 
Subject: flink savepoint和checkpoint相关事项
 
Hi,
flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了
 
 
 
zjfpla...@hotmail.com


Re: flink savepoint和checkpoint相关事项

2020-10-09 文章 Yun Tang
Hi

在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored 
state [2] 恢复

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state

祝好
唐云

From: zjfpla...@hotmail.com 
Sent: Friday, October 9, 2020 8:59
To: user-zh 
Subject: flink savepoint和checkpoint相关事项

Hi,
flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了



zjfpla...@hotmail.com


flink savepoint和checkpoint相关事项

2020-10-08 文章 zjfpla...@hotmail.com
Hi,
flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了



zjfpla...@hotmail.com


??????Flink??SavePoint??????????????????????????

2020-07-06 文章 milan183sansiro
??
1.??id
2.savepoint??


??2020??7??6?? 20:32??wujunxi<462329...@qq.com> ??

1.id
2.savepoint??



----
??:"milan183sansiro"

??????Flink??SavePoint??????????????????????????

2020-07-06 文章 wujunxi

1.id
2.savepoint??



----
??:"milan183sansiro"

Re: Re:Re: flink savepoint问题

2020-03-30 文章 Yun Tang
Hi

首先,如果这个问题很容易复现的话,我们需要定位到是什么导致了OOMkilled。

  1.  打开block-cache usage [1] 观察metrics中block cache的使用量。
  2.  麻烦回答一下几个问题,有助于进一步定位
 *   单个TM有几个slot
 *   单个TM的managed memory配置了多少
 *   一共声明了多少个keyed  state,(如果使用了window,也相当于会使用一个state),其中有多少个map 
state,是否经常遍历那个map state
 *   被kill的container内一共有几个rocksDB 实例,可以通过搜索日志 "Obtained shared RocksDB 
cache of size" 计数
 *   是否对RocksDB单独配置了options factory或者相关options

state.backend.rocksdb.memory.managed 
这个参数的语义是RocksDB使用的内存从Flink来,一个slot内的若干RocksDB实例会共享一块share 
cache。如果将这个参数设置为false,那么就回退到1.9以前的场景,rocksDB的内存将完全不由Flink管理,在某种程度上来说,更容易被conatiner
 kill。

如果想要快速缓解这个问题,一种办法是增大 taskmanager.memory.task.off-heap.size 
[2],使得提供多一部分内存以供RocksDB超用。其他的缓解办法需要根据您对上面问题的回答来实施

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#state-backend-rocksdb-metrics-block-cache-usage
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-task-off-heap-size

Best
唐云


From: xyq 
Sent: Monday, March 30, 2020 10:41
To: user-zh@flink.apache.org 
Subject: Re:Re: flink savepoint问题

Hi,您好:
我这边有个小流 left join大流的需求,小流的数据夜间基本没有 
可能会4-5个小时没数据,目前的情况是一到晚上container老是被kill掉,报的是内存溢出。我想问下,我想把托管内存这设置成false,会有什么弊端吗?或者该问题怎么解决?困扰了好久了,请您指点一谢谢。
state.backend.rocksdb.memory.managed : false

















在 2020-03-28 11:04:09,"Congxian Qiu"  写道:
>Hi
>
>对于问题 1 在反压的情况下,可能导致 Savepoint 做不成功从而超时,这个暂时没法解决,现在有一个 issue[1] 在做 Unalign
>Checkpoint 可以解决反压情况下的 checkpoint
>对于问题 3,checkpoint 超时了,超时的定义:在设置的时间内(比如你这里 5 分钟),有 task 没有完成
>snapshot。调长超时时间能够一定的缓解这个问题,不过你最好找到超时的原因,然后针对性的优化。
>[1] https://issues.apache.org/jira/browse/FLINK-14551
>Best,
>Congxian
>
>
>大数据开发面试_夏永权  于2020年3月27日周五 下午4:19写道:
>
>> Hi,您好,在使用flink的过程中遇到如下问题,个人未能解决,所以请求您指导一下,谢谢!
>>
>> 1. flink cancel -s $SAVEPOINT_DIR $job_id -yid $application_id
>> 在程序有背压的时候停不掉
>>
>>
>>  The program finished with the following exception:
>> org.apache.flink.util.FlinkException: Could not cancel job
>> 1f768e4ca9ad5792a4844a5d12163b73.
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:523)
>> at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:515)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.util.concurrent.TimeoutException
>> at
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:521)
>> ... 9 more
>> stop flink job failed!!!
>>
>>
>>
>>
>> 2.再用flink
>> sql的ddl时候增加一个字段后,程序启动失败,需要删除savepoint才能启动(ddl后双流join的逻辑,其中一个流加了一个字段)
>>
>>
>>  The program finished with the following exception:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error:
>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
>> complete the operation. Number of retries has been exhausted.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContex

Re:Re: flink savepoint问题

2020-03-29 文章 xyq
Hi,您好:
我这边有个小流 left join大流的需求,小流的数据夜间基本没有 
可能会4-5个小时没数据,目前的情况是一到晚上container老是被kill掉,报的是内存溢出。我想问下,我想把托管内存这设置成false,会有什么弊端吗?或者该问题怎么解决?困扰了好久了,请您指点一谢谢。
state.backend.rocksdb.memory.managed : false

















在 2020-03-28 11:04:09,"Congxian Qiu"  写道:
>Hi
>
>对于问题 1 在反压的情况下,可能导致 Savepoint 做不成功从而超时,这个暂时没法解决,现在有一个 issue[1] 在做 Unalign
>Checkpoint 可以解决反压情况下的 checkpoint
>对于问题 3,checkpoint 超时了,超时的定义:在设置的时间内(比如你这里 5 分钟),有 task 没有完成
>snapshot。调长超时时间能够一定的缓解这个问题,不过你最好找到超时的原因,然后针对性的优化。
>[1] https://issues.apache.org/jira/browse/FLINK-14551
>Best,
>Congxian
>
>
>大数据开发面试_夏永权  于2020年3月27日周五 下午4:19写道:
>
>> Hi,您好,在使用flink的过程中遇到如下问题,个人未能解决,所以请求您指导一下,谢谢!
>>
>> 1. flink cancel -s $SAVEPOINT_DIR $job_id -yid $application_id
>> 在程序有背压的时候停不掉
>>
>>
>>  The program finished with the following exception:
>> org.apache.flink.util.FlinkException: Could not cancel job
>> 1f768e4ca9ad5792a4844a5d12163b73.
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:523)
>> at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:515)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.util.concurrent.TimeoutException
>> at
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:521)
>> ... 9 more
>> stop flink job failed!!!
>>
>>
>>
>>
>> 2.再用flink
>> sql的ddl时候增加一个字段后,程序启动失败,需要删除savepoint才能启动(ddl后双流join的逻辑,其中一个流加了一个字段)
>>
>>
>>  The program finished with the following exception:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error:
>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
>> complete the operation. Number of retries has been exhausted.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
>> complete the operation. Number of retries has been exhausted.
>> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>> at
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>> at
>> com.etl.DwdBaseCarlifeTIotBindGeoManage.main(DwdBaseCarlifeTIotBindGeoManage.java:265)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>> ... 11 more
>> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
>> Could not complete the operation. Number of retries has been exhausted.
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284)
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>> at
>> 

Re:Re: flink savepoint问题

2020-03-29 文章 xyq
非常感谢
在 2020-03-28 11:04:09,"Congxian Qiu"  写道:
>Hi
>
>对于问题 1 在反压的情况下,可能导致 Savepoint 做不成功从而超时,这个暂时没法解决,现在有一个 issue[1] 在做 Unalign
>Checkpoint 可以解决反压情况下的 checkpoint
>对于问题 3,checkpoint 超时了,超时的定义:在设置的时间内(比如你这里 5 分钟),有 task 没有完成
>snapshot。调长超时时间能够一定的缓解这个问题,不过你最好找到超时的原因,然后针对性的优化。
>[1] https://issues.apache.org/jira/browse/FLINK-14551
>Best,
>Congxian
>
>
>大数据开发面试_夏永权  于2020年3月27日周五 下午4:19写道:
>
>> Hi,您好,在使用flink的过程中遇到如下问题,个人未能解决,所以请求您指导一下,谢谢!
>>
>> 1. flink cancel -s $SAVEPOINT_DIR $job_id -yid $application_id
>> 在程序有背压的时候停不掉
>>
>>
>>  The program finished with the following exception:
>> org.apache.flink.util.FlinkException: Could not cancel job
>> 1f768e4ca9ad5792a4844a5d12163b73.
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:523)
>> at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:515)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.util.concurrent.TimeoutException
>> at
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:521)
>> ... 9 more
>> stop flink job failed!!!
>>
>>
>>
>>
>> 2.再用flink
>> sql的ddl时候增加一个字段后,程序启动失败,需要删除savepoint才能启动(ddl后双流join的逻辑,其中一个流加了一个字段)
>>
>>
>>  The program finished with the following exception:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error:
>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
>> complete the operation. Number of retries has been exhausted.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
>> complete the operation. Number of retries has been exhausted.
>> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>> at
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>> at
>> com.etl.DwdBaseCarlifeTIotBindGeoManage.main(DwdBaseCarlifeTIotBindGeoManage.java:265)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>> ... 11 more
>> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
>> Could not complete the operation. Number of retries has been exhausted.
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284)
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>> 

Re: flink savepoint问题

2020-03-27 文章 Congxian Qiu
Hi

对于问题 1 在反压的情况下,可能导致 Savepoint 做不成功从而超时,这个暂时没法解决,现在有一个 issue[1] 在做 Unalign
Checkpoint 可以解决反压情况下的 checkpoint
对于问题 3,checkpoint 超时了,超时的定义:在设置的时间内(比如你这里 5 分钟),有 task 没有完成
snapshot。调长超时时间能够一定的缓解这个问题,不过你最好找到超时的原因,然后针对性的优化。
[1] https://issues.apache.org/jira/browse/FLINK-14551
Best,
Congxian


大数据开发面试_夏永权  于2020年3月27日周五 下午4:19写道:

> Hi,您好,在使用flink的过程中遇到如下问题,个人未能解决,所以请求您指导一下,谢谢!
>
> 1. flink cancel -s $SAVEPOINT_DIR $job_id -yid $application_id
> 在程序有背压的时候停不掉
>
>
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Could not cancel job
> 1f768e4ca9ad5792a4844a5d12163b73.
> at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:523)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:515)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
> at
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:521)
> ... 9 more
> stop flink job failed!!!
>
>
>
>
> 2.再用flink
> sql的ddl时候增加一个字段后,程序启动失败,需要删除savepoint才能启动(ddl后双流join的逻辑,其中一个流加了一个字段)
>
>
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Number of retries has been exhausted.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Number of retries has been exhausted.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> at
> com.etl.DwdBaseCarlifeTIotBindGeoManage.main(DwdBaseCarlifeTIotBindGeoManage.java:265)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> ... 11 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been exhausted.
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:342)
> at
> 

flink savepoint问题

2020-03-27 文章 大数据开发面试_夏永权
Hi,您好,在使用flink的过程中遇到如下问题,个人未能解决,所以请求您指导一下,谢谢!

1. flink cancel -s $SAVEPOINT_DIR $job_id -yid $application_id  在程序有背压的时候停不掉


 The program finished with the following exception:
org.apache.flink.util.FlinkException: Could not cancel job 
1f768e4ca9ad5792a4844a5d12163b73.
at org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:523)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:515)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:904)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.client.cli.CliFrontend.lambda$cancel$6(CliFrontend.java:521)
... 9 more
stop flink job failed!!!




2.再用flink sql的ddl时候增加一个字段后,程序启动失败,需要删除savepoint才能启动(ddl后双流join的逻辑,其中一个流加了一个字段)


 The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Number of retries has been exhausted.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Number of retries has been exhausted.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at 
com.etl.DwdBaseCarlifeTIotBindGeoManage.main(DwdBaseCarlifeTIotBindGeoManage.java:265)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 11 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Number of retries has been exhausted.
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:342)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:493)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:472)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
at 

Re: Re: flink savepoint checkpoint

2020-01-10 文章 Px New
Hello ,针对于你的问题 我发现一件有趣的事情
在我以 Yarn per-Job 方式 启动Job程序后  在yarn 的资源管理界面 可以看到我启动的任务 ->
它有属于自己的application-Id  然后当我 通过Yarn 的Tracking Ui 下的 Application
<http://node01:8088/proxy/application_1577499691717_0064/>Master
点击进入到Job的Web Ui 界面后(flink的web ui)通过在此界面点击canal 这个按钮 kill 掉程序后 在Yarn 的 管理界面
发现还是有个空壳子的。  当我通过在终端输入 yarn application -kill Id  后 这个程序才会被杀死。 所以我初步认为
他是Stop 程序。

祝好!
Px

amen...@163.com  于2020年1月10日周五 下午5:59写道:

> hi,了解到使用stop进行任务停止并触发savepoint,会在停止之前生成max_watermark,并注册event-time计时器,我想请问使用yarn
> kill方式直接停止任务,会属于cancel还是stop亦或是其他?
>
>
>
> amen...@163.com
>
> From: Congxian Qiu
> Date: 2020-01-10 17:16
> To: user-zh
> Subject: Re: flink savepoint checkpoint
> Hi
> 从 Flink 的角度看,Checkpoint 用户 Job 运行过程中发生 failover 进行恢复,savepoint 用于 Job
> 之间的状态复用。
> 另外,从 1.9 开始,可以尝试下 StopWithSavepoint[1],以及社区另外一个 issue 尝试做
> StopWithCheckpoint[2]
>
> [1] https://issues.apache.org/jira/browse/FLINK-11458
> [2] https://issues.apache.org/jira/browse/FLINK-12619
> Best,
> Congxian
>
>
> zhisheng  于2020年1月10日周五 上午11:39写道:
>
> > hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint
> > 的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。
> >
> > 祝好!
> > zhisheng
> >
> > Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道:
> >
> > > Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点
> > > -->
> > >
> > >
> > >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > >
> > >
> > > lucas.wu  于2019年12月11日周三 上午11:56写道:
> > >
> > > > hi 各位:
> > > >
> > > >
> > >
> >
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
> > >
> >
>


Re: Re: flink savepoint checkpoint

2020-01-10 文章 muyexm329
其实这个主要还是要看你checkpoint的时间间隔,就像我们看视频倒退一样,它们是两个不同的后退时间点,savepoint能在当下生成checkpoint数据,但是自动的checkpoint可能还要在更早的时间点上生成checkpoint数据(因为在cancel
 job的时候可能还不到自动checkpoint时间)。两种都可以,只是是一前一后,这也决定了你任务恢复的快慢。线上需要经常修改的job 
savepoint很实用。 
个人觉得任务失败,不管是哪种方式失败(除非是savepoint),肯定是回到上一个自动checkpoint的点上,不会是在savepoint。


 原始邮件 
发件人: amen...@163.com
收件人: user-zh
发送时间: 2020年1月10日(周五) 17:58
主题: Re: Re: flink savepoint checkpoint


hi,了解到使用stop进行任务停止并触发savepoint,会在停止之前生成max_watermark,并注册event-time计时器,我想请问使用yarn
 kill方式直接停止任务,会属于cancel还是stop亦或是其他? amen...@163.com From: Congxian Qiu Date: 
2020-01-10 17:16 To: user-zh Subject: Re: flink savepoint checkpoint Hi 从 Flink 
的角度看,Checkpoint 用户 Job 运行过程中发生 failover 进行恢复,savepoint 用于 Job 之间的状态复用。 另外,从 1.9 
开始,可以尝试下 StopWithSavepoint[1],以及社区另外一个 issue 尝试做 StopWithCheckpoint[2] [1] 
https://issues.apache.org/jira/browse/FLINK-11458 [2] 
https://issues.apache.org/jira/browse/FLINK-12619 Best, Congxian zhisheng 
 于2020年1月10日周五 上午11:39写道: > 
hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint > 
的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。 > > 祝好! > zhisheng > > 
Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道: > > > Hello 
,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点 > > --> > > > > > > > 
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 > > > > > > lucas.wu  于2019年12月11日周三 上午11:56写道: > > > > 
> hi 各位: > > > > > > > > > 
有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
 > > >

Re: Re: flink savepoint checkpoint

2020-01-10 文章 amen...@163.com
hi,了解到使用stop进行任务停止并触发savepoint,会在停止之前生成max_watermark,并注册event-time计时器,我想请问使用yarn
 kill方式直接停止任务,会属于cancel还是stop亦或是其他?



amen...@163.com
 
From: Congxian Qiu
Date: 2020-01-10 17:16
To: user-zh
Subject: Re: flink savepoint checkpoint
Hi
从 Flink 的角度看,Checkpoint 用户 Job 运行过程中发生 failover 进行恢复,savepoint 用于 Job
之间的状态复用。
另外,从 1.9 开始,可以尝试下 StopWithSavepoint[1],以及社区另外一个 issue 尝试做
StopWithCheckpoint[2]
 
[1] https://issues.apache.org/jira/browse/FLINK-11458
[2] https://issues.apache.org/jira/browse/FLINK-12619
Best,
Congxian
 
 
zhisheng  于2020年1月10日周五 上午11:39写道:
 
> hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint
> 的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。
>
> 祝好!
> zhisheng
>
> Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道:
>
> > Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点
> > -->
> >
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >
> >
> > lucas.wu  于2019年12月11日周三 上午11:56写道:
> >
> > > hi 各位:
> > >
> > >
> >
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
> >
>


Re: flink savepoint checkpoint

2020-01-10 文章 Congxian Qiu
Hi
从 Flink 的角度看,Checkpoint 用户 Job 运行过程中发生 failover 进行恢复,savepoint 用于 Job
之间的状态复用。
另外,从 1.9 开始,可以尝试下 StopWithSavepoint[1],以及社区另外一个 issue 尝试做
StopWithCheckpoint[2]

[1] https://issues.apache.org/jira/browse/FLINK-11458
[2] https://issues.apache.org/jira/browse/FLINK-12619
Best,
Congxian


zhisheng  于2020年1月10日周五 上午11:39写道:

> hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint
> 的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。
>
> 祝好!
> zhisheng
>
> Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道:
>
> > Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点
> > -->
> >
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >
> >
> > lucas.wu  于2019年12月11日周三 上午11:56写道:
> >
> > > hi 各位:
> > >
> > >
> >
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
> >
>


Re: flink savepoint checkpoint

2020-01-09 文章 zhisheng
hi,这个参数我理解的作用应该是:作业取消的时候是否清除之前的 checkpoint,但这个 checkpoint
的并不一定是作业最新的状态。而如果你取消命令的时候触发一次 savepoint,那么这次的状态就是最新且最全的。

祝好!
zhisheng

Px New <15701181132mr@gmail.com> 于2020年1月10日周五 上午10:58写道:

> Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点
> -->
>
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>
> lucas.wu  于2019年12月11日周三 上午11:56写道:
>
> > hi 各位:
> >
> >
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
>


Re: flink savepoint checkpoint

2020-01-09 文章 Px New
Hello ,针对于你这个问题 在FLink 中 是有这个配置的. 当程序stop时,会额外保存检查点
-->

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


lucas.wu  于2019年12月11日周三 上午11:56写道:

> hi 各位:
>
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。


Re: flink savepoint checkpoint

2019-12-12 文章 Congxian Qiu
Flink 也支持从 retained checkpoint 进行恢复,可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
Best,
Congxian


陈帅  于2019年12月11日周三 下午9:34写道:

> flink 1.9里面支持cancel job with savepoint功能
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint
> checkpoint可能是增量的,但savepoint是全量的。具体区别可以参考
>
> https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink
>
>
> lucas.wu  于2019年12月11日周三 上午11:56写道:
>
> > hi 各位:
> >
> >
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。
>


Re: flink savepoint checkpoint

2019-12-11 文章 陈帅
flink 1.9里面支持cancel job with savepoint功能
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint
checkpoint可能是增量的,但savepoint是全量的。具体区别可以参考
https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink


lucas.wu  于2019年12月11日周三 上午11:56写道:

> hi 各位:
>
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。


Re: flink savepoint checkpoint

2019-12-10 文章 Yun Tang
Hi

Checkpoint 是自动的,你可以配置retain checkpoint[1] 然后从checkpoint 
恢复[2],可以不需要一定触发Savepoint。

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
 

祝好
唐云

On 12/11/19, 11:56 AM, "lucas.wu"  wrote:

hi 各位:

有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。



Re:bin/flink savepoint jobId 出错

2019-12-03 文章 guoshuai

解决了






At 2019-12-04 09:54:17, "guoshuai"  wrote:
>是一个per模式运行的测试job,检查下 JobManager是正常的
>基于flink1.4.0, flink集群开启了kerberos
>flink-conf.yaml中已经配置过 targetDirectory目录
>
>
> The program finished with the following exception:
>
>org.apache.flink.util.FlinkException: Could not connect to the leading 
>JobManager. Please check that the JobManager is running.
>at 
> org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:861)
>at 
> org.apache.flink.client.program.ClusterClient.listJobs(ClusterClient.java:699)
>at org.apache.flink.client.CliFrontend.list(CliFrontend.java:439)
>at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1082)
>at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1127)
>at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1124)
>at java.security.AccessController.doPrivileged(Native Method)
>at javax.security.auth.Subject.doAs(Subject.java:422)
>at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1781)
>at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1124)
>Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: 
>Could not retrieve the leader gateway.
>at 
> org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:79)
>at 
> org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:856)
>... 10 more
>Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
>[1 milliseconds]
>at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
>at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
>at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
>at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>at scala.concurrent.Await$.result(package.scala:190)
>at scala.concurrent.Await.result(package.scala)
>at 
> org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:77)
>... 11 more
>


bin/flink savepoint jobId 出错

2019-12-03 文章 guoshuai
是一个per模式运行的测试job,检查下 JobManager是正常的
基于flink1.4.0, flink集群开启了kerberos
flink-conf.yaml中已经配置过 targetDirectory目录


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not connect to the leading 
JobManager. Please check that the JobManager is running.
at 
org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:861)
at 
org.apache.flink.client.program.ClusterClient.listJobs(ClusterClient.java:699)
at org.apache.flink.client.CliFrontend.list(CliFrontend.java:439)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1082)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1127)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1124)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1781)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1124)
Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: 
Could not retrieve the leader gateway.
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:79)
at 
org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:856)
... 10 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
[1 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at scala.concurrent.Await.result(package.scala)
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:77)
... 11 more



Re: Re: Command: Flink savepoint -d reported an error。

2019-09-30 文章 狄玉坤
缺少了jobId 参数;
./bin/flink savepoint  [savepointDirectory]
This will trigger a savepoint for the job with ID jobId, and returns the path 
of the created savepoint. You need this path to restore and dispose savepoints.
| |
diyukun2019
|
|
diyukun2...@163.com
|
签名由网易邮箱大师定制
On 9/30/2019 14:56,pengchengl...@163.com wrote:
谢谢你的回复,我试了下,还是说缺少必填参数。
如下:
$ bin/flink savepoint -d 
file://home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/


The program finished with the following exception:

java.lang.NullPointerException: Missing required argument: savepoint path. 
Usage: bin/flink savepoint -d 
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at 
org.apache.flink.client.cli.CliFrontend.disposeSavepoint(CliFrontend.java:721)
at 
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$8(CliFrontend.java:657)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:945)
at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:654)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)




pengchengl...@163.com

发件人: 星沉
发送时间: 2019-09-30 14:47
收件人: user-zh
主题: Re: Command: Flink savepoint -d reported an error。
savepoint路径参数不对。file后面多了个一个/吧。


pengchengl...@163.com wrote:
$ bin/flink savepoint 
-dfile:///home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/


The program finished with the following exception:

java.lang.NullPointerException: Missing required argument: savepoint path. 
Usage: bin/flink savepoint -d 


Re: Command: Flink savepoint -d reported an error。

2019-09-30 文章 星沉

savepoint路径参数不对。file后面多了个一个/吧。


pengchengl...@163.com wrote:

$ bin/flink savepoint 
-dfile:///home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/


  The program finished with the following exception:

java.lang.NullPointerException: Missing required argument: savepoint path. Usage: 
bin/flink savepoint -d 


Re: Flink Savepoint 超时

2019-09-06 文章 SJMSTER
找了一圈 没有看到其他的错误.就只有上面我贴出来的异常了..
因为这个是CLI执行时报的错...

On Fri, Sep 6, 2019 at 4:51 PM Wesley Peng  wrote:

>
>
> SJMSTER wrote:
> > Checkpoints一直都是成功的。
> > 今天重新尝试了一下cancle job with savepoint又成功了..
> > 不知道之前为什么试了几次都是超时的..
>
> are there any log items for diagnosis?
>
> regards.
>


Re: Flink Savepoint 超时

2019-09-06 文章 Wesley Peng




SJMSTER wrote:

Checkpoints一直都是成功的。
今天重新尝试了一下cancle job with savepoint又成功了..
不知道之前为什么试了几次都是超时的..


are there any log items for diagnosis?

regards.


Flink Savepoint 超时

2019-09-06 文章 Jimmy.Shao
请问下有谁遇到过在CLI手动触发Flink的Savepoint的时候遇到超时的异常吗?
或者尝试把Job Cancel With Savepoint也是一样的超时错误.
Savepoint是已经配置了存到HDFS上的,
Flink本身Run在Yarn上.
在官网看到一个参数“akka.client.timeout”不知道是不是针对这个的,
但是这个参数生效是要配置在flink-conf.yml里的,
也没办法CLI传递进去.
这样Job没法Cancel, Flink Cluster也就没法重启,死循环了.
感谢!

Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/opt/flink-1.6.0-hdp/lib/phoenix-4.7.0.2.6.3.0-235-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/flink-1.6.0-hdp/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> 2019-09-05 10:45:41,807 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn
> properties file under /tmp/.yarn-properties-hive.
> 2019-09-05 10:45:41,807 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn
> properties file under /tmp/.yarn-properties-hive.
> 2019-09-05 10:45:42,056 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN
> properties set default parallelism to 1
> 2019-09-05 10:45:42,056 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN
> properties set default parallelism to 1
> YARN properties set default parallelism to 1
> 2019-09-05 10:45:42,269 INFO  org.apache.hadoop.yarn.client.AHSProxy
>  - Connecting to Application History server at
> ac13ghdpt2m01.lab-rot.saas.sap.corp/10.116.201.103:10200
> 2019-09-05 10:45:42,276 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
> for the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-09-05 10:45:42,276 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
> for the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-09-05 10:45:42,282 WARN
>  org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Neither
> the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The
> Flink YARN Client needs one of these to be set to properly load the Hadoop
> configuration for accessing YARN.
> 2019-09-05 10:45:42,284 INFO
>  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  -
> Looking for the active RM in [rm1, rm2]...
> 2019-09-05 10:45:42,341 INFO
>  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  -
> Found active RM [rm1]
> 2019-09-05 10:45:42,345 INFO
>  org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Found
> application JobManager host name 'ac13ghdpt2dn01.lab-rot.saas.sap.corp' and
> port '40192' from supplied application id 'application_1559153472177_52202'
> 2019-09-05 10:45:42,689 WARN
>  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory   - The
> short-circuit local reads feature cannot be used because libhadoop cannot
> be loaded.
> Triggering savepoint for job 6399ec2e8fdf4cb7d8481890019554f6.
> Waiting for response...
> 
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 6399ec2e8fdf4cb7d8481890019554f6 failed.
> at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
> at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Exception is not retryable.
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> 

flink????????savepoint

2019-03-22 文章 ????
??flink-ssavepointsavepoint??