Flink-1.11.2版本FileSystem connector问题

2021-03-11 文章 史 正超
问题现象是这样的
1. flink 实时的往hdfs 目录 /warehouse/rt_ods/ed_cell_num_info/写数据,以天为分区
2. 然后我们启动了一个定时任务在 hive上新建partition,是T - 1的,比如今天6点执行,新建 昨天 的分区。
3. 会报,inprogress文件找不到的错误,错误如下 :
```
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught 
exception while processing timer.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1091)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1222)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1211)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on 
/warehouse/rt_ods/ed_cell_num_info/pt=20210311/.part-80f4307d-b910-42fa-8500-2c1226c5a879-0-57.inprogress.94c24d6c-e6f7-4387-b2e2-e667a44b23f6
 (inode 2792145632): File does not exist. [Lease.  Holder: 
DFSClient_NONMAPREDUCE_-1421245761_79, pending creates: 2]
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3698)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3785)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3755)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:745)
at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.complete(AuthorizationProviderProxyClientProtocol.java:245)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:540)
--
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2211)

... 12 common frames omitted

```
从上面看出,报错的是pt=20210311的分区里的 inprogress文件找不到。

sql 如下 :
```
CREATE TABLE T_ED_CELL_NUM_INFO_SINK(

pt   STRING
) PARTITIONED BY (pt) WITH (
'connector' = 'filesystem',
'path' = 'hdfs:///warehouse/rt_ods/ed_cell_num_info',
'format' = 'csv',
'csv.field-delimiter' = U&'\0009',
'csv.disable-quote-character' = 'true',
'sink.rolling-policy.file-size' = '200m',
'sink.rolling-policy.rollover-interval' = '15min',
'sink.rolling-policy.check-interval' = '1min'
);
```



Flink-1.11.2版本 Filesystem-connector 问题

2021-03-11 文章 史 正超

问题现象是这样的
1. flink 实时的往hdfs 目录 /warehouse/rt_ods/ed_cell_num_info/写数据,以天为分区
2. 然后我们启动了一个定时任务在 hive上新建partition,是T - 1的,比如今天6点执行,新建 昨天 的分区。
3. 会报,inprogress文件找不到的错误,错误如下 :
```
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught 
exception while processing timer.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1091)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1222)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1211)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on 
/warehouse/rt_ods/ed_cell_num_info/pt=20210311/.part-80f4307d-b910-42fa-8500-2c1226c5a879-0-57.inprogress.94c24d6c-e6f7-4387-b2e2-e667a44b23f6
 (inode 2792145632): File does not exist. [Lease.  Holder: 
DFSClient_NONMAPREDUCE_-1421245761_79, pending creates: 2]
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3698)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3785)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3755)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:745)
at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.complete(AuthorizationProviderProxyClientProtocol.java:245)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:540)
--
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2211)

... 12 common frames omitted

```
从上面看出,报错的是pt=20210311的分区里的 inprogress文件找不到。

sql 如下 :
```
CREATE TABLE T_ED_CELL_NUM_INFO_SINK(

pt   STRING
) PARTITIONED BY (pt) WITH (
'connector' = 'filesystem',
'path' = 'hdfs:///warehouse/rt_ods/ed_cell_num_info',
'format' = 'csv',
'csv.field-delimiter' = U&'\0009',
'csv.disable-quote-character' = 'true',
'sink.rolling-policy.file-size' = '200m',
'sink.rolling-policy.rollover-interval' = '15min',
'sink.rolling-policy.check-interval' = '1min'
);
```


Flink sql中插入null值失败

2021-03-11 文章 Jimmy Zhang
Flink sql中如何插入null值,有人了解吗?目前,insert 语句values中直接写null在zeppelin上报错了。


|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

Flink-1.11.2版本 Filesystem-connector 问题

2021-03-11 文章 史 正超
问题现象是这样的
1. flink 实时的往hdfs 目录 /warehouse/rt_ods/ed_cell_num_info/写数据,以天为分区
2. 然后我们启动了一个定时任务在 hive上新建partition,是T - 1的,比如今天6点执行,新建 昨天 的分区。
3. 会报,inprogress文件找不到的错误,错误如下 :
```
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught 
exception while processing timer.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1091)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1222)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$16(StreamTask.java:1211)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on 
/warehouse/rt_ods/ed_cell_num_info/pt=20210311/.part-80f4307d-b910-42fa-8500-2c1226c5a879-0-57.inprogress.94c24d6c-e6f7-4387-b2e2-e667a44b23f6
 (inode 2792145632): File does not exist. [Lease.  Holder: 
DFSClient_NONMAPREDUCE_-1421245761_79, pending creates: 2]
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3698)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3785)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3755)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:745)
at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.complete(AuthorizationProviderProxyClientProtocol.java:245)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:540)
--
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2211)

... 12 common frames omitted

```
从上面看出,报错的是pt=20210311的分区里的 inprogress文件找不到。

sql 如下 :
```
CREATE TABLE T_ED_CELL_NUM_INFO_SINK(

pt   STRING
) PARTITIONED BY (pt) WITH (
'connector' = 'filesystem',
'path' = 'hdfs:///warehouse/rt_ods/ed_cell_num_info',
'format' = 'csv',
'csv.field-delimiter' = U&'\0009',
'csv.disable-quote-character' = 'true',
'sink.rolling-policy.file-size' = '200m',
'sink.rolling-policy.rollover-interval' = '15min',
'sink.rolling-policy.check-interval' = '1min'
);
```


Re: Re: flink sql如何从远程加载jar包中的udf

2021-03-11 文章 Jeff Zhang
Zeppelin 支持加载UDF jar的,可以参考下面的代码,不过架构上可能与你们的原有架构会有所差别

https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2#8iONE
https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala#L469


chenxyz  于2021年3月12日周五 上午9:42写道:

> 目前这种方法不可行,在公司的平台化系统里提交flink任务,自己能掌控的只有代码这块。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-03-11 16:39:24,"silence"  写道:
> >启动时通过-C加到classpath里试试
> >
> >
> >
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best Regards

Jeff Zhang


Re:Re: flink sql如何从远程加载jar包中的udf

2021-03-11 文章 chenxyz
目前这种方法不可行,在公司的平台化系统里提交flink任务,自己能掌控的只有代码这块。

















在 2021-03-11 16:39:24,"silence"  写道:
>启动时通过-C加到classpath里试试
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


flink-kinesis setParallelism 之后 shard 分布不均匀

2021-03-11 文章 mo jia
默认的 shard assigner

public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER =
(shard, subtasks) -> shard.hashCode();

如何shard 的数量 大于 并发度 很容易造成分布不均。

想着用这种方法,在主类使用

static ConcurrentHashMap map = new ConcurrentHashMap<>();
static AtomicInteger counter = new AtomicInteger(0);

public static final KinesisShardAssigner origin_shard  =
  (shard, subtasks) -> {
 String shardId = shard.getShard().getShardId();
 Integer index  = map.get(shardId);
 if (index != null){
return index;
 }
 else{
counter.getAndIncrement();
Integer new_index = counter.get();
map.put(shardId, new_index);
return new_index;
 }
  };


flinkKinesisConsumer.setShardAssigner(origin_shard);

虽然试验了一把没有问题。 但是感觉 这段代码 最终会运行在 task slot 里面。

这种方法的有效性 是不是*依赖 kinesis list shards api 返回的顺序固定*呢?

有没有不依赖 api 返回的。又能均匀分布的方法?


虽然问题是 kinesis 。但是感觉对 数据源 和 slot 并发读取多源的其他场景或许有相似之处。


初来社区。欢迎给出建议。


谢谢。


回复: Flink savepoint迁移问题

2021-03-11 文章 allanqinjy
建云,
之前我也遇到了savepoint 起作业失败的问题,是我们升级pulsar客户端以后,从2.2升级到2.5.2,我-s 
启动作业的时候。因为作业也不是很重要,当时手头有其他任务,我就没有关注这个问题。你看看pulsar source那儿是不是做了什么。


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年03月11日 22:43,Kezhu Wang 写道:
有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。

确实是这样的,checkpoint 把 serializer 也 snapshot 了。

重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar
的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用
`MessageId.toByteArray`。


On March 11, 2021 at 20:26:15, 赵 建云 (zhaojianyu...@outlook.com) wrote:

你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的
initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang  写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
})));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator
的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more
Caused by: java.io



Re: Flink savepoint迁移问题

2021-03-11 文章 Kezhu Wang
> 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。

确实是这样的,checkpoint 把 serializer 也 snapshot 了。

重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar
的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用
`MessageId.toByteArray`。


On March 11, 2021 at 20:26:15, 赵 建云 (zhaojianyu...@outlook.com) wrote:

你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的
initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang  写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

 new ListStateDescriptor<>(
 OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
 })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator
的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more
Caused by: java.io



Re: Flink savepoint迁移问题

2021-03-11 文章 赵 建云
你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 
这个错误似乎发生在source的initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang mailto:kez...@gmail.com>> 写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

 new ListStateDescriptor<>(
 OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
 })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 
state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 
(zhaojianyu...@outlook.com) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any 
of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: 
java.io>.EOFException:
 No more bytes left.
at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchin

Re: 将每个tm的slot数从2降低到1,任务反而无法启动

2021-03-11 文章 Smile
建议看下集群剩余的内存情况,看是不是 140 个 TAskManager 内存不够了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


退订

2021-03-11 文章 Lyon
退订





--
发自我的网易邮箱手机智能版

退订

2021-03-11 文章 hyangvv
退订

Re: 提交两个SQL任务,其中一个不生效。

2021-03-11 文章 HunterXHunter
StatementSet inserts = tableEnv.createStatementSet();
inserts.addInsertSql("insert into xxx select * from xxx") // topic1
-》topic2任务
inserts.addInsertSql("insert into xxx select * from xxx") // topic2
-》Postgre 任务
inserts.execute();



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:flink sql如何从远程加载jar包中的udf

2021-03-11 文章 邓从宝
请user-zh  不要再发邮件了


--
发件人:silence 
发送时间:2021年3月11日(星期四) 16:39
收件人:user-zh 
主 题:Re: flink sql如何从远程加载jar包中的udf

启动时通过-C加到classpath里试试



--
Sent from: http://apache-flink.147419.n8.nabble.com/



flink 在不同的 operator 之间传递通过基类方式,在 operator 中能转换为子类型吗?

2021-03-11 文章 Lei Wang
flink 变量在 operator 之间传递是需要序列话的。如果 DataStream<> 泛型通过基类引用,到后面的 operator
上能保留子类的完整信息并强制转换吗?


比如:

DataStream stream = source.from(SubClass);

stream.keyBy(   ) {

这里的代码能判断并强制转换吗。

SubClass subObj = (SubClass) baseObj;

}


谢谢,

王磊


Re: flink sql如何从远程加载jar包中的udf

2021-03-11 文章 silence
启动时通过-C加到classpath里试试



--
Sent from: http://apache-flink.147419.n8.nabble.com/


疑问:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大

2021-03-11 文章 HunterXHunter
1:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大
我10分钟一次checkpoint,每次都增大2M,两天增大到400M,但其实我的实际应该只有20M(只做一个窗口计算)(我做savepoint之后也才20M)。
已设置了 ttl。
2:当我关闭state.backend.incremental 后 。每次checkpoint也就20M左右,不会变大了。

按我的理解:state.backend.incremental 开启后,Checkpointed Data
Size大小应该是增量的大小,可能也就几k左右。为什么会一直增大。

想问下这是为什么??是bug??



--
Sent from: http://apache-flink.147419.n8.nabble.com/