Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 文章 Leonard Xu
Hello,
 
很有意思的话题,我理解这需要保证多个CDC数据源 的全局一致性, 多个业务表的 bin-log 通过 cdc接入flink后,得保证 
每个数据源的写入目标库的时候有一个全局一致性的保证,这个底层的APi应该也支持不了的。
一种可能的思路是 抽取cdc 记录 的metadata里的 committed ts (原始数据库中每次变更的时间, debezuim 
的source.ts_ms字段, canal的es 字段),通过这个时间来协调 多个 CDC 数据源的处理速度,这只是我的一个想法。

不过可以确定的是,目前的API应该拿不到这个信息,现在的 Flink 框架没法处理这个数据, 可以看下 一些CDC框架是否能做这个事情。

Best,
Leonard Xu


> 在 2020年7月8日,13:18,jindy_liu <286729...@qq.com> 写道:
> 
> 对的,只不过生产中,有些设计的时候外键没有显示声明,都是用流程保证更新表的顺序。
> 所以消费数据变化的时候,也是要按顺序消费。不然使用镜像数据的人,可能会出问题。
> 
> 求教:除flink sql 的cdc功能外,flink的其它特性能否较好的支持这种场景呢?  需要写再底层点的api吗?
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: 如何在Flink SQL中使用周期性水印?

2020-07-07 文章 Jark Wu
嗯, 可以在 JIRA 中开个 issue 描述下你的需求~

On Wed, 8 Jul 2020 at 12:01, 1193216154 <1193216...@qq.com> wrote:

>  Jark,flink有没有必要去支持这个特性?我感觉还是有一些应用场景
>
>
>
>
> --原始邮件--
> 发件人:"Jark Wu" 发送时间:2020年7月8日(星期三) 中午11:48
> 收件人:"user-zh"
> 主题:Re: 如何在Flink SQL中使用周期性水印?
>
>
>
> 如果所有 partition 都没有数据,还希望 watermark 往前走,那 idle source 确实解决不了这个问题。
> 目前确实没有太好的解决办法。
>
> Best,
> Jark
>
> On Wed, 8 Jul 2020 at 11:08, 1193216154 <1193216...@qq.com wrote:
>
>  hi Jark Wu.
> 
> 
> 我的理解是table.exec.source.idle-timeout只能解决watermark对齐的时候去忽略某个没有watermark的并行度。但是在每个并行度都没有watermark的时候,还是无法更新watermark。
> 
> 我觉得题主的意思应该是,在kafka的所有分区都没有数据的时候,最后一个窗口无法触发(因为没有watermark大于最后那个窗口结束时间了)。
>  有没有可以设置在eventTime情况下,周期性生成当前时间的一个waterMark(和数据无关),因为可能没有新数据到来了。
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Jark Wu"  发送时间:nbsp;2020年7月7日(星期二) 晚上6:09
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 如何在Flink SQL中使用周期性水印?
> 
> 
> 
>  Hi,
> 
>  这个问题我理解其实和周期性水印没有关系,是属于 idle source
>  的问题,你可以尝试下加上配置 table.exec.source.idle-timeout = 10s 能不能解决你的问题。[1]
> 
>  Best,
>  Jark
> 
>  [1]:
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout
> 
> 
> ;
>  On Tue, 7 Jul 2020 at 17:35, noake  
>  gt; Dear All:
>  gt;
>  gt;
>  gt; 大佬们, 请教下如何在Flink SQL中使用周期性的水印。
>  gt; 我们在消费kafka时, 想设置在没有数据时水印时间也能继续向前走, 用的是Flink SQL。


Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 文章 jindy_liu
对的,只不过生产中,有些设计的时候外键没有显示声明,都是用流程保证更新表的顺序。
所以消费数据变化的时候,也是要按顺序消费。不然使用镜像数据的人,可能会出问题。

求教:除flink sql 的cdc功能外,flink的其它特性能否较好的支持这种场景呢?  需要写再底层点的api吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 文章 jindy_liu
https://github.com/apache/flink/tree/release-1.11.0我看github上tag下已经发布了release-1.11.0,我就编了下tag下的release-1.11.0。最近在做实时计算的一些调研,我们第一步就是要做数据的实时搬运(异构存储),看flink
1.11有cdc功能,我关注了下。看发布了就立即试用了下,看看能不能用你们这个做变化数据的实时同步。1、体验了下,若mysql的binlog按单表有序到kafka,单topic,单分区,flink
cdc的同步确实很方便,几条sql语句就搞定了。2、若mysql binlog按db实例,多表有序到kafka
单topic,单分区,感觉不知道要怎么样定义这个ddl,
同时怎么保证按序同步。(比如表与表之前的数据存在逻辑上的外键约束等等,具体来说test表的status字端就是个外键,如果关联记录都有更新,那更新顺序就比较重要了,要严格按binlog顺序来)。今天看了下,源码里canal-json的解析,好像只解析到了json里的feild
和 operate 类型。感觉这个多表有序的场景应该也是比较多的需求的。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11 on kubernetes 构建失败

2020-07-07 文章 SmileSmile
hi

按照文档[1]的方法部署session cluster on kubernetes,集群构建的时候出现了如下报错


Starting Task Manager
sed: couldn't open temporary file /opt/flink/conf/sedVdyy6Q: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sedcj5VKQ: Read-only file 
system
/docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml: Permission denied
sed: couldn't open temporary file /opt/flink/conf/sedB5eynR: Read-only file 
system
/docker-entrypoint.sh: 120: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
[ERROR] The execution result is empty.
[ERROR] Could not get JVM parameters and dynamic configurations properly.


是否有遇到同样的问题,支个招



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

回复:作业升级到flink1.11,idea运行失败

2020-07-07 文章 SmileSmile
添加依赖后正常了。应该是这个导致的

https://ci.apache.org/projects/flink/flink-docs-master/release-notes/flink-1.11.html#reversed-dependency-from-flink-streaming-java-to-flink-client-flink-15090

thanks




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月08日 11:30,Yangze Guo 写道:
尝试加一下这个依赖
groupId: org.apache.flink
artifactId: flink-clients_${scala.binary.version}

Best,
Yangze Guo

On Wed, Jul 8, 2020 at 11:27 AM SmileSmile  wrote:
>
>
> hi
>
> 作业的依赖从1.10.1升级到1.11.0,在idea运行的时候报错
>
> Exception in thread "main" java.lang.IllegalStateException: No 
> ExecutorFactory found to execute the application.
>  at 
> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1803)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713)
>  at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment()
>
> 是哪里出问题了呢
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制


Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 文章 Leonard Xu
Hello,
我理解下你场景:d1的 test 表 和 status 表两者之间有关联,比如外键,比如 test 更新一条数据后 status也需要级联地更新一条数据。
希望通过 Flink 的CDC功能同步这两张表到db2后,任意时刻,这两张表的状态是原子的(两张表对应 d1中两张表的一个快照版本), 是这种场景吗?

如果是这种场景,现在是还没有支持的。

Best,
Leonard Xu


> 在 2020年7月8日,11:59,Jark Wu  写道:
> 
> Hi,
> 
> 我想先问一下你使用的是刚发布的 1.11.0 版本吗? 还是自己 build 的 release-1.11 分支呢?
> 
> 另外,我理解下你的需求是  db1.test 同步到 db2.test,  db1.status 同步到 db2.status?
> 多表的*有序*同步是指?
> 我理解你只需要像定义 db1.test -> db2.test 一样,定义好 db1.status binlog table 然后 insert
> into 到 db2.status mysql table就行了。
> 
> 感谢反馈使用体验。
> 
> Best,
> Jark
> 
> 
> On Wed, 8 Jul 2020 at 10:30, jindy_liu <286729...@qq.com> wrote:
> 
>> 场景:canal解析binlog后,将db1实例内的多张表(表数据有关联)的变化发送到kafka的单topic,单分区中,从而保证有序;
>> 若我想做数据同步至另一个mysql实例db2中,怎么用flink sql操作多张表,同时保证表与表之间有序呢?
>> 例如mysql实例db1中有表test, statusCREATE TABLE `test` (  `id` int(11) NOT NULL,
>> `name` varchar(255) NOT NULL,  `time` datetime NOT NULL,  `status` int(11)
>> NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8CREATE
>> TABLE `status` (  `status` int(11) NOT NULL,  `name` varchar(255) NOT
>> NULL,
>> PRIMARY KEY (`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8比如,我用flink
>> 
>> sql,可以设置对应的一张test表,然后sink到mysql镜像实例db2的镜像表test,和表status做同步,但status表要怎么操作呢?如何保证有序?我目前能实现单表,确实方便,求助,多表的怎么做有序同步?CREATE
>> TABLE test (`id` INT,`name` VARCHAR(255),`time` TIMESTAMP(3),`status`
>> INT,PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='kafka',
>> 'topic'='test', 'properties.group.id'='c_mysql_binlog_postgres',
>> 'properties.bootstrap.servers'='localhost:9092',
>> 'scan.startup.mode'='earliest-offset', 'format'='canal-json',
>> 'canal-json.ignore-parse-errors'='true');CREATE TABLE status (`status`
>> INT,`name` VARCHAR(255),PRIMARY KEY(name) NOT ENFORCED ) WITH (
>> 'connector'='kafka', 'topic'='test',
>> 'properties.group.id'='c_mysql_binlog_postgres',
>> 'properties.bootstrap.servers'='localhost:9092',
>> 'scan.startup.mode'='earliest-offset', 'format'='canal-json',
>> 'canal-json.ignore-parse-errors'='true');
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: Re: Re: Re: Re: flink 1.11 作业执行异常

2020-07-07 文章 Jark Wu
估计是这个导致的:
https://ci.apache.org/projects/flink/flink-docs-master/release-notes/flink-1.11.html#reversed-dependency-from-flink-streaming-java-to-flink-client-flink-15090

On Wed, 8 Jul 2020 at 09:21, sunfulin  wrote:

> hi, noake
> 感谢分享。我加了这个依赖后也OK了。周知下大家。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-07 22:15:05,"noake"  写道:
> >我在1.11.0中遇到了同样的问题, pom中加了下面的依赖就没解决了
> >dependency
> > groupIdorg.apache.flink/groupId
> > artifactIdflink-clients_${scala.binary.version}/artifactId
> > version${flink.version}/version
> >/dependency
> >
> >
> >原始邮件
> >发件人:Congxian qiuqcx978132...@gmail.com
> >收件人:user-zhuser...@flink.apache.org
> >抄送:Jark wuimj...@gmail.com; Jun zhangzhangjunemail...@gmail.com
> >发送时间:2020年7月7日(周二) 19:35
> >主题:Re: Re: Re: Re: flink 1.11 作业执行异常
> >
> >
> >Hi 从这个报错看上去是尝试通过 serviceLoader 加载一些 factory 的时候出错了(找不到),可以看看对应的 module 的
> resources 文件下是否有对应的 resource 文件 Best, Congxian sunfulin
> sunfulin0...@163.com 于2020年7月7日周二 下午6:29写道: hi,
> 我的pom文件本地执行时,scope的provided都是去掉的。  dependency
> groupIdorg.apache.flink/groupId
>  artifactIdflink-table-planner-blink_${scala.binary.version}/artifactId
> version${flink.version}/version  /dependency
> 确实比较诡异。org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
> 这个异常在啥情况下会触发到。在 2020-07-07 18:10:58,"Jark Wu"
> imj...@gmail.com 写道:  如果是在 IDEA 中运行的话,你看看 blink planner 这个依赖的 scope 是不是被
> provided 掉了? 去掉  provided  再试试看?Best,  JarkOn Tue, 7 Jul 2020 at
> 18:01, sunfulin sunfulin0...@163.com wrote: hi,   @Jun Zhang
> 我一直使用的就是blink planner,这个jar包一直都有的。 @Jark Wu
> 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?   在
> 2020-07-07 15:40:17,"Jark Wu" imj...@gmail.com 写道:   Hi,
> 你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?  Best,   Jark  On Tue, 7 Jul 2020 at
> 15:31, Jun Zhang zhangjunemail...@gmail.com   wrote:   hi.sunfulin
> 你有没有导入blink的planner呢,加入这个试试   dependency
> groupIdorg.apache.flink/groupId
>  artifactIdflink-table-planner-blink_${scala.binary.version}/artifactId
> version${flink.version}/version/dependency  sunfulin
> sunfulin0...@163.com 于2020年7月7日周二 下午3:21写道:hi, jark
> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
> configuration里的DeployOptions.TARGET
> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
> //构建StreamExecutionEnvironmentpublic static final
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>  //构建EnvironmentSettings 并指定Blink Plannerprivate static final
> EnvironmentSettings bsSettings =
>  
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>  //构建StreamTableEnvironmentpublic static final
> StreamTableEnvironment tEnv =StreamTableEnvironment.create(env,
> bsSettings);   tEnv.executeSql(“ddl sql”);
> //source注册成表   tEnv.createTemporaryView("test", ds, $("f0").as("id"),
>   $("f1").as("first"), $("p").proctime());//join语句
>  Table table = tEnv.sqlQuery("select b.* from test a left  joinmy_dim
> FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
> //输出   tEnv.toAppendStream(table,  Row.class).print("LookUpJoinJob");
>   env.execute("LookUpJoinJob");在
> 2020-07-06 14:59:17,"Jark Wu" imj...@gmail.com 写道:能分享下复现的作业代码不?
>   Best,JarkOn Mon, 6 Jul 2020 at 11:00, sunfulin
> sunfulin0...@163.com  wrote: Hi, 我使用目前最新的Flink 1.11
> rc4来测试我的作业。报了如下异常: org.apache.flink.table.api.TableExecution: Failed to
> execute sql caused by : java.lang.IlleagalStateException: No
> ExecutorFactory   foundto execute the application. at
>  
> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
>


?????? ??????Flink SQL?????????????????

2020-07-07 文章 1193216154
 Jark??flink??




----
??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout

 On Tue, 7 Jul 2020 at 17:35, noake 

Re: flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 文章 Jark Wu
Hi,

我想先问一下你使用的是刚发布的 1.11.0 版本吗? 还是自己 build 的 release-1.11 分支呢?

另外,我理解下你的需求是  db1.test 同步到 db2.test,  db1.status 同步到 db2.status?
多表的*有序*同步是指?
我理解你只需要像定义 db1.test -> db2.test 一样,定义好 db1.status binlog table 然后 insert
into 到 db2.status mysql table就行了。

感谢反馈使用体验。

Best,
Jark


On Wed, 8 Jul 2020 at 10:30, jindy_liu <286729...@qq.com> wrote:

> 场景:canal解析binlog后,将db1实例内的多张表(表数据有关联)的变化发送到kafka的单topic,单分区中,从而保证有序;
> 若我想做数据同步至另一个mysql实例db2中,怎么用flink sql操作多张表,同时保证表与表之间有序呢?
> 例如mysql实例db1中有表test, statusCREATE TABLE `test` (  `id` int(11) NOT NULL,
> `name` varchar(255) NOT NULL,  `time` datetime NOT NULL,  `status` int(11)
> NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8CREATE
> TABLE `status` (  `status` int(11) NOT NULL,  `name` varchar(255) NOT
> NULL,
> PRIMARY KEY (`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8比如,我用flink
>
> sql,可以设置对应的一张test表,然后sink到mysql镜像实例db2的镜像表test,和表status做同步,但status表要怎么操作呢?如何保证有序?我目前能实现单表,确实方便,求助,多表的怎么做有序同步?CREATE
> TABLE test (`id` INT,`name` VARCHAR(255),`time` TIMESTAMP(3),`status`
> INT,PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='kafka',
> 'topic'='test', 'properties.group.id'='c_mysql_binlog_postgres',
> 'properties.bootstrap.servers'='localhost:9092',
> 'scan.startup.mode'='earliest-offset', 'format'='canal-json',
> 'canal-json.ignore-parse-errors'='true');CREATE TABLE status (`status`
> INT,`name` VARCHAR(255),PRIMARY KEY(name) NOT ENFORCED ) WITH (
> 'connector'='kafka', 'topic'='test',
> 'properties.group.id'='c_mysql_binlog_postgres',
> 'properties.bootstrap.servers'='localhost:9092',
> 'scan.startup.mode'='earliest-offset', 'format'='canal-json',
> 'canal-json.ignore-parse-errors'='true');
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 如何在Flink SQL中使用周期性水印?

2020-07-07 文章 Jark Wu
如果所有 partition 都没有数据,还希望 watermark 往前走,那 idle source 确实解决不了这个问题。
目前确实没有太好的解决办法。

Best,
Jark

On Wed, 8 Jul 2020 at 11:08, 1193216154 <1193216...@qq.com> wrote:

> hi Jark Wu.
>
> 我的理解是table.exec.source.idle-timeout只能解决watermark对齐的时候去忽略某个没有watermark的并行度。但是在每个并行度都没有watermark的时候,还是无法更新watermark。
> 我觉得题主的意思应该是,在kafka的所有分区都没有数据的时候,最后一个窗口无法触发(因为没有watermark大于最后那个窗口结束时间了)。
> 有没有可以设置在eventTime情况下,周期性生成当前时间的一个waterMark(和数据无关),因为可能没有新数据到来了。
>
>
>
>
> --原始邮件--
> 发件人:"Jark Wu" 发送时间:2020年7月7日(星期二) 晚上6:09
> 收件人:"user-zh"
> 主题:Re: 如何在Flink SQL中使用周期性水印?
>
>
>
> Hi,
>
> 这个问题我理解其实和周期性水印没有关系,是属于 idle source
> 的问题,你可以尝试下加上配置 table.exec.source.idle-timeout = 10s 能不能解决你的问题。[1]
>
> Best,
> Jark
>
> [1]:
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout
>
> On Tue, 7 Jul 2020 at 17:35, noake 
>  Dear All:
> 
> 
>  大佬们, 请教下如何在Flink SQL中使用周期性的水印。
>  我们在消费kafka时, 想设置在没有数据时水印时间也能继续向前走, 用的是Flink SQL。


Re: FlinkKafkaProducer没有写入多个topic的功能

2020-07-07 文章 Leonard Xu
Hi,

夏帅的方案是ok的,因为Kafka 默认支持写入topic不存在时自动创建[1], 
这个配置是默认开启的,所以只用实现下自定义KafkaSerializationSchema就可以满足你的需求。

祝好,
Leonard Xu

[1] 
https://docs.confluent.io/current/installation/configuration/broker-configs.html#auto.create.topics.enable
 


> 在 2020年7月8日,11:08,flink小猪 <18579099...@163.com> 写道:
> 
> 兄弟,感谢
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-08 11:04:40,"夏帅"  写道:
> 
> 你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景
> class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] {
> override def serialize(element: DemoBean, timestamp: lang.Long): 
> ProducerRecord[Array[Byte], Array[Byte]] = {
> new ProducerRecord[Array[Byte], Array[Byte]](element.getTopic, 
> element.getValue)
>  }
> }
> --
> 发件人:18579099...@163.com <18579099...@163.com>
> 发送时间:2020年7月8日(星期三) 10:59
> 收件人:user-zh 
> 主 题:FlinkKafkaProducer没有写入多个topic的功能
> 
> 
> 我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
> 但是FlinkKafkaProducer好像只能写入一个主题里面?
> 
> 
> 
> 18579099...@163.com
> 
> 



Re: flink 1.10.1 入 hive 格式为parquet

2020-07-07 文章 Rui Li
只要你的hive目标表创建为Parquet格式就行了哈,INSERT语句上跟其他类型的表没有区别的

On Tue, Jul 7, 2020 at 10:05 AM lydata  wrote:

>  Hi,
>
> 可以提供一份flink1.10 入hive格式为parquet的例子吗?
>
> Best,
> lydata



-- 
Best regards!
Rui Li


Re: 作业升级到flink1.11,idea运行失败

2020-07-07 文章 Yangze Guo
尝试加一下这个依赖
groupId: org.apache.flink
artifactId: flink-clients_${scala.binary.version}

Best,
Yangze Guo

On Wed, Jul 8, 2020 at 11:27 AM SmileSmile  wrote:
>
>
> hi
>
> 作业的依赖从1.10.1升级到1.11.0,在idea运行的时候报错
>
> Exception in thread "main" java.lang.IllegalStateException: No 
> ExecutorFactory found to execute the application.
>  at 
> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1803)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713)
>  at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment()
>
> 是哪里出问题了呢
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制


作业升级到flink1.11,idea运行失败

2020-07-07 文章 SmileSmile

hi

作业的依赖从1.10.1升级到1.11.0,在idea运行的时候报错

Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory 
found to execute the application.
 at 
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1803)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713)
 at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment()

是哪里出问题了呢
| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

Re:回复:FlinkKafkaProducer没有写入多个topic的功能

2020-07-07 文章 flink小猪
兄弟,感谢
















在 2020-07-08 11:04:40,"夏帅"  写道:

你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景
class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] {
override def serialize(element: DemoBean, timestamp: lang.Long): 
ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord[Array[Byte], Array[Byte]](element.getTopic, element.getValue)
  }
}
--
发件人:18579099...@163.com <18579099...@163.com>
发送时间:2020年7月8日(星期三) 10:59
收件人:user-zh 
主 题:FlinkKafkaProducer没有写入多个topic的功能


我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
但是FlinkKafkaProducer好像只能写入一个主题里面?



18579099...@163.com




回复: 求助:FLINKSQL1.10实时统计累计UV

2020-07-07 文章 x
您说的这种方式,V1.10.1 不支持吧,我看参数只有一个String类型的
void sqlUpdate(String stmt);




--原始邮件--
发件人:"seeksst"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
 gt; gt; x 35907418@qq.comamp;gt; 于2020年7月6日周一 上午11:15写道: gt; gt; amp;gt; 
版本是1.10.1,最后sink的时候确实是一个window里面做count gt; amp;gt; 
distinct操作。请问是只要计算过程中含有一个window里面做count gt; amp;gt; gt; 
distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupamp;amp;nbsp;DATE_FORMAT(rowtm,
 gt; amp;gt; '-MM-dd') 这个sql对应的状态很大。代码如下: gt; amp;gt; val rt_totaluv_view : 
Table = tabEnv.sqlQuery( gt; amp;gt;amp;nbsp;amp;nbsp; """ gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT MAX(DATE_FORMAT(rowtm, 
'-MM-dd gt; HH:mm:00')) gt; amp;gt; time_str,COUNT(DISTINCT userkey) uv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM source gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY DATE_FORMAT(rowtm, 
'-MM-dd') gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) gt; amp;gt; gt; 
amp;gt; val totaluvTmp = gt; 
tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) gt; 
amp;gt;amp;nbsp;amp;nbsp; .filter( line =amp;amp;gt; line._1 == true ).map( 
line gt; =amp;amp;gt; line._2 ) gt; amp;gt; gt; amp;gt; val totaluvTabTmp = 
tabEnv.fromDataStream( totaluvTmp ) gt; amp;gt; gt; amp;gt; tabEnv.sqlUpdate( 
gt; amp;gt;amp;nbsp;amp;nbsp; s""" gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; INSERT INTO mysql_totaluv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT _1,MAX(_2) gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM $totaluvTabTmp gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY _1 gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
--amp;amp;nbsp;原始邮件amp;amp;nbsp;-- gt; amp;gt; 
发件人:amp;amp;nbsp;"Benchao Li"libenchao@apache.orgamp;amp;gt;; gt; amp;gt; 
发送时间:amp;amp;nbsp;2020年7月3日(星期五) 晚上9:47 gt; amp;gt; 
收件人:amp;amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;amp;gt;; gt; amp;gt; gt; 
amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; gt; amp;gt; gt; 
amp;gt; gt; amp;gt; 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, 
gt; amp;gt; 这个已经在1.11中修复了。 gt; amp;gt; gt; amp;gt; [1] 
https://issues.apache.org/jira/browse/FLINK-17942 gt; amp;gt; gt; amp;gt; x 
35907418@qq.comamp;amp;gt; 于2020年7月3日周五 下午4:34写道: gt; amp;gt; gt; amp;gt; 
amp;amp;gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, gt; amp;gt; amp;amp;gt; 
gt; amp;gt; amp;amp;gt; gt; amp;gt; gt; 
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
 gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
--amp;amp;amp;nbsp;原始邮件amp;amp;amp;nbsp;-- gt; 
amp;gt; amp;amp;gt; 发件人:amp;amp;amp;nbsp;"Jark 
Wu"imjark@gmail.comamp;amp;amp;gt;; gt; amp;gt; amp;amp;gt; 
发送时间:amp;amp;amp;nbsp;2020年6月18日(星期四) 中午12:16 gt; amp;gt; amp;amp;gt; 
收件人:amp;amp;amp;nbsp;"user-zh"user-zh@flink.apache.org gt; amp;amp;amp;gt;; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 主题:amp;amp;amp;nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 是的,我觉得这样子是能绕过的。 gt; amp;gt; 
amp;amp;gt; gt; amp;gt; amp;amp;gt; On Thu, 18 Jun 2020 at 10:34, x 
35907418@qq.comamp;amp;amp;gt; gt; wrote: gt; amp;gt; amp;amp;gt; gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; val resTmpTab: Table = tabEnv.sqlQuery( gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp; """ gt; amp;gt; 
amp;amp;gt; gt; 
amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;
 SELECT gt; amp;gt; MAX(DATE_FORMAT(ts, '-MM-dd gt; amp;gt; amp;amp;gt; 
HH:mm:00')) gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; time_str,COUNT(DISTINCT 
userkey) uv gt; amp;gt; amp;amp;gt; gt; 
amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;
 FROM gt; amp;gt; 
user_behavioramp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp; GROUP BY gt; 
amp;gt; amp;amp;gt; DATE_FORMAT(ts, gt; 
'-MM-dd')amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp; """) gt; 
amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; val 
gt; amp;gt; resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) gt; 
amp;gt; amp;amp;gt; amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp; gt; 
amp;gt; amp;amp;gt; gt; 
.filter(line=amp;amp;amp;amp;gt;line._1==true).map(line=amp;amp;amp;amp;gt;line._2)
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; 
amp;amp;amp;gt; val res= tabEnv.fromDataStream(resTmpStream) gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; tabEnv.sqlUpdate( gt; amp;gt; amp;amp;gt; 
amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp; s""" gt; amp;gt; amp;amp;gt; 
gt; 

?????? ??????Flink SQL?????????????????

2020-07-07 文章 1193216154
hi Jark Wu.
??table.exec.source.idle-timeoutwatermarkwatermarkwatermarkwatermark??
??kafka??watermark??
eventTimewaterMark(??),??




----
??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout

On Tue, 7 Jul 2020 at 17:35, noake 

回复:FlinkKafkaProducer没有写入多个topic的功能

2020-07-07 文章 夏帅
你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景
class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] {
  override def serialize(element: DemoBean, timestamp: lang.Long): 
ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord[Array[Byte], Array[Byte]](element.getTopic, 
element.getValue)
  }
}
--
发件人:18579099...@163.com <18579099...@163.com>
发送时间:2020年7月8日(星期三) 10:59
收件人:user-zh 
主 题:FlinkKafkaProducer没有写入多个topic的功能

我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
但是FlinkKafkaProducer好像只能写入一个主题里面?



18579099...@163.com



Flink Hadoop????

2020-07-07 文章 Z-Z
Hi?? ??Flink 
1.10.0??jobmanager??libflink-shaded-hadoop-2-uber-2.7.5-10.0.jar??webuicli??Could
 not find a file system implementation for scheme 'hdfs'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded.??

FlinkKafkaProducer没有写入多个topic的功能

2020-07-07 文章 18579099...@163.com
我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
但是FlinkKafkaProducer好像只能写入一个主题里面?



18579099...@163.com


Re: 嵌套 json 中string 数组的解析异常

2020-07-07 文章 Jun Zou
Hi,
感谢您的指导!

祝好!

Leonard Xu  于2020年7月7日周二 下午9:49写道:

> Hi,
>
> 看了下代码,这确实是Flink 1.9里面的一个bug[1], 原因没有 source 没有正确处理legacy type 和新的
> type,这个issue没有在1.9的分支上修复,可以升级到1.10.1试下。
>
> 祝好,
> Leonard Xu
> [1]https://issues.apache.org/jira/browse/FLINK-16622 <
> https://issues.apache.org/jira/browse/FLINK-16622?focusedCommentId=17061790=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17061790
> >
>
>


State??????guava Cache

2020-07-07 文章 op
 ValueState[Cache]??value 



map??cacheputupdatestate??cache??1

flink 1.11 中cdc功能中,使用flink sql来操作一个kafka topic中的多张表,并保证有序?

2020-07-07 文章 jindy_liu
场景:canal解析binlog后,将db1实例内的多张表(表数据有关联)的变化发送到kafka的单topic,单分区中,从而保证有序;   
若我想做数据同步至另一个mysql实例db2中,怎么用flink sql操作多张表,同时保证表与表之间有序呢?  
例如mysql实例db1中有表test, statusCREATE TABLE `test` (  `id` int(11) NOT NULL, 
`name` varchar(255) NOT NULL,  `time` datetime NOT NULL,  `status` int(11)
NOT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8CREATE
TABLE `status` (  `status` int(11) NOT NULL,  `name` varchar(255) NOT NULL, 
PRIMARY KEY (`status`)) ENGINE=InnoDB DEFAULT CHARSET=utf8比如,我用flink
sql,可以设置对应的一张test表,然后sink到mysql镜像实例db2的镜像表test,和表status做同步,但status表要怎么操作呢?如何保证有序?我目前能实现单表,确实方便,求助,多表的怎么做有序同步?CREATE
TABLE test (`id` INT,`name` VARCHAR(255),`time` TIMESTAMP(3),`status`
INT,PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='kafka',
'topic'='test', 'properties.group.id'='c_mysql_binlog_postgres',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset', 'format'='canal-json',
'canal-json.ignore-parse-errors'='true');CREATE TABLE status (`status`
INT,`name` VARCHAR(255),PRIMARY KEY(name) NOT ENFORCED ) WITH (
'connector'='kafka', 'topic'='test',
'properties.group.id'='c_mysql_binlog_postgres',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset', 'format'='canal-json',
'canal-json.ignore-parse-errors'='true');



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 connector jdbc 依赖解析失败

2020-07-07 文章 Leonard Xu
Hello,

我看下了maven仓库里有的[1], 官网文档里也有下载链接[2],是不是pom里的依赖没有写对?1.11 jdbc connector 的module名从 
flink-jdbc 规范到了 flink-connector-jdbc。

祝好,
Leonard Xu

[1] 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.11.0/
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
 



> 在 2020年7月8日,08:15,Zhou Zach  写道:
> 
> hi all,
> flink升级到1.11,flink-connector-jdbc 
> idea解析失败,去maven仓库查也没查到,请问是不是要手动编译1.11的源码的方式安装依赖的
> 



回复:【Flink的shuffle mode】

2020-07-07 文章 夏帅
补充: 1.11的shuffle-mode配置的默认值为ALL_EDGES_BLOCKING
共有
ALL_EDGES_BLOCKING(等同于batch)
FORWARD_EDGES_PIPELINEDPOINTWISE_EDGES_PIPELINED
ALL_EDGES_PIPELINED(等同于pipelined)对于pipelined多出了两种选择


--
发件人:忝忝向仧 <153488...@qq.com>
发送时间:2020年7月7日(星期二) 23:37
收件人:user-zh 
主 题:回复: 【Flink的shuffle mode】

如果是批的模式,怎么在应用程序里面指定shuffle_mode呢?
另外,下面提到如果是流的计算,一定是pipeline模式.
那为什么我使用datastream做keyby流操作后,跟踪源码它的mode是UNDEFINED呢?
谢谢.




--原始邮件--
发件人:"Jingsong Li"

回复:【Flink的shuffle mode】

2020-07-07 文章 夏帅
你好:
问题1,指定shuffle_mode
tEnv.getConfig.getConfiguration.setString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
 "pipeline")
问题2,mode是UNDEFINED的概念
使用UNDEFINED并不是说模式没有定义,而是由框架自己决定
The shuffle mode is undefined. It leaves it up to the framework to decide the 
shuffle mode.



--
发件人:忝忝向仧 <153488...@qq.com>
发送时间:2020年7月7日(星期二) 23:37
收件人:user-zh 
主 题:回复: 【Flink的shuffle mode】

如果是批的模式,怎么在应用程序里面指定shuffle_mode呢?
另外,下面提到如果是流的计算,一定是pipeline模式.
那为什么我使用datastream做keyby流操作后,跟踪源码它的mode是UNDEFINED呢?
谢谢.




--原始邮件--
发件人:"Jingsong Li"

Re:Re: Re: Re: Re: flink 1.11 作业执行异常

2020-07-07 文章 sunfulin
hi, noake
感谢分享。我加了这个依赖后也OK了。周知下大家。

















在 2020-07-07 22:15:05,"noake"  写道:
>我在1.11.0中遇到了同样的问题, pom中加了下面的依赖就没解决了
>dependency
> groupIdorg.apache.flink/groupId
> artifactIdflink-clients_${scala.binary.version}/artifactId
> version${flink.version}/version
>/dependency
>
>
>原始邮件
>发件人:Congxian qiuqcx978132...@gmail.com
>收件人:user-zhuser...@flink.apache.org
>抄送:Jark wuimj...@gmail.com; Jun zhangzhangjunemail...@gmail.com
>发送时间:2020年7月7日(周二) 19:35
>主题:Re: Re: Re: Re: flink 1.11 作业执行异常
>
>
>Hi 从这个报错看上去是尝试通过 serviceLoader 加载一些 factory 的时候出错了(找不到),可以看看对应的 module 的 
>resources 文件下是否有对应的 resource 文件 Best, Congxian sunfulin sunfulin0...@163.com 
>于2020年7月7日周二 下午6:29写道: hi,  我的pom文件本地执行时,scope的provided都是去掉的。  dependency  
>groupIdorg.apache.flink/groupId   
>artifactIdflink-table-planner-blink_${scala.binary.version}/artifactId  
>version${flink.version}/version  /dependency
>确实比较诡异。org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>  这个异常在啥情况下会触发到。在 2020-07-07 18:10:58,"Jark Wu" 
>imj...@gmail.com 写道:  如果是在 IDEA 中运行的话,你看看 blink planner 这个依赖的 scope 是不是被 
>provided 掉了? 去掉  provided  再试试看?Best,  JarkOn Tue, 7 Jul 2020 at 
>18:01, sunfulin sunfulin0...@163.com wrote: hi,   @Jun Zhang 我一直使用的就是blink 
>planner,这个jar包一直都有的。 @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?   
>在 2020-07-07 15:40:17,"Jark Wu" imj...@gmail.com 
>写道:   Hi,  你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?  Best,   Jark  On Tue, 7 
>Jul 2020 at 15:31, Jun Zhang zhangjunemail...@gmail.com   wrote:   
>hi.sunfulin你有没有导入blink的planner呢,加入这个试试   dependency
>groupIdorg.apache.flink/groupId   
>artifactIdflink-table-planner-blink_${scala.binary.version}/artifactId
>version${flink.version}/version/dependency  sunfulin 
>sunfulin0...@163.com 于2020年7月7日周二 下午3:21写道:hi, jark
>我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
>configuration里的DeployOptions.TARGET
>(execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。  
>//构建StreamExecutionEnvironmentpublic static final 
>StreamExecutionEnvironment env =
>StreamExecutionEnvironment.getExecutionEnvironment();   
>//构建EnvironmentSettings 并指定Blink Plannerprivate static final 
>EnvironmentSettings bsSettings =   
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); 
>  //构建StreamTableEnvironmentpublic static final StreamTableEnvironment 
>tEnv =StreamTableEnvironment.create(env, bsSettings);   
>tEnv.executeSql(“ddl sql”);//source注册成表   
>tEnv.createTemporaryView("test", ds, $("f0").as("id"),$("f1").as("first"), 
>$("p").proctime());//join语句   Table table = 
>tEnv.sqlQuery("select b.* from test a left  joinmy_dim FOR SYSTEM_TIME AS 
>OF a.p AS b on a.first = b.userId");//输出   
>tEnv.toAppendStream(table,  Row.class).print("LookUpJoinJob");
>env.execute("LookUpJoinJob");在 2020-07-06 
>14:59:17,"Jark Wu" imj...@gmail.com 写道:能分享下复现的作业代码不?Best,Jark  
>  On Mon, 6 Jul 2020 at 11:00, sunfulin sunfulin0...@163.com  wrote:   
>  Hi, 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常: 
>org.apache.flink.table.api.TableExecution: Failed to execute sql 
>caused by : java.lang.IlleagalStateException: No ExecutorFactory   foundto 
>execute the application. at   
>org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。


flink 1.11 connector jdbc 依赖解析失败

2020-07-07 文章 Zhou Zach
hi all,
flink升级到1.11,flink-connector-jdbc 
idea解析失败,去maven仓库查也没查到,请问是不是要手动编译1.11的源码的方式安装依赖的



回复:Flink 1.11 SQL作业中调用UDTF 出现“No match found for function signature ”异常

2020-07-07 文章 邹云鹤
好的




| |
邹云鹤
|
|
邮箱:kevinyu...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月07日 23:27,Benchao Li 写道:
我感觉这应该是新版本的udf的bug,我在本地也可以复现。
已经建了一个issue[1] 来跟进。

[1] https://issues.apache.org/jira/browse/FLINK-18520

邹云鹤  于2020年7月7日周二 下午9:43写道:

>
>
> hi all
> 本人基于Flink 1.11 SNAPSHOT 在 Flink sql 作业中使用 UDTF, UDTF 的定义如下:
>
>
> @FunctionHint(
> input = {@DataTypeHint("STRING"), @DataTypeHint("STRING")},
> output = @DataTypeHint("STRING")
> )
> public class Split extends TableFunction {
>   public Split(){}
>   public void eval(String str, String ch) {
> if (str == null || str.isEmpty()) {
>   return;
> } else {
>   String[] ss = str.split(ch);
>   for (String s : ss) {
> collect(s);
>   }
> }
>   }
> }
>
>
> 在flink sql中通过 create function splitByChar as '**.**.Split'
> 来创建这个function,在tableEnv 中调用executeSql() 来完成对这个 function的注册,在sql
> 后面的计算逻辑中 通过以下方式来调用这个UDTF
> create view view_source_1 as select `dateTime,`itime`,
> lng,lat,net,event_info, cast(split_index(T.s, '_', 0) as int) as time_page
> from view_source as a left join LATERAL TABLE (splitByChar('a,b,c',',')) as
> T(s) on true;
>
>
> 结果一直出现以下错误信息:
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 3, column 25 to line 3, column 47: No match found for function
> signature splitByChar(, )
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:629)
> 
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 3, column 25 to line 3, column 47: r(, )
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
> at
> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
> at
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
> at
> 

?????? ??Flink??shuffle mode??

2020-07-07 文章 ????????
??shuffle_mode???
pipeline.
??datastream??keyby??mode??UNDEFINED???
.




----
??:"Jingsong Li"

Re: Flink 1.11 SQL作业中调用UDTF 出现“No match found for function signature ”异常

2020-07-07 文章 Benchao Li
我感觉这应该是新版本的udf的bug,我在本地也可以复现。
已经建了一个issue[1] 来跟进。

[1] https://issues.apache.org/jira/browse/FLINK-18520

邹云鹤  于2020年7月7日周二 下午9:43写道:

>
>
> hi all
> 本人基于Flink 1.11 SNAPSHOT 在 Flink sql 作业中使用 UDTF, UDTF 的定义如下:
>
>
> @FunctionHint(
> input = {@DataTypeHint("STRING"), @DataTypeHint("STRING")},
> output = @DataTypeHint("STRING")
> )
> public class Split extends TableFunction {
>   public Split(){}
>   public void eval(String str, String ch) {
> if (str == null || str.isEmpty()) {
>   return;
> } else {
>   String[] ss = str.split(ch);
>   for (String s : ss) {
> collect(s);
>   }
> }
>   }
> }
>
>
> 在flink sql中通过 create function splitByChar as '**.**.Split'
> 来创建这个function,在tableEnv 中调用executeSql() 来完成对这个 function的注册,在sql
> 后面的计算逻辑中 通过以下方式来调用这个UDTF
> create view view_source_1 as select `dateTime,`itime`,
> lng,lat,net,event_info, cast(split_index(T.s, '_', 0) as int) as time_page
> from view_source as a left join LATERAL TABLE (splitByChar('a,b,c',',')) as
> T(s) on true;
>
>
> 结果一直出现以下错误信息:
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 3, column 25 to line 3, column 47: No match found for function
> signature splitByChar(, )
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:629)
> 
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 3, column 25 to line 3, column 47: r(, )
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
> at
> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
> at
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> 

DataStream????uv????

2020-07-07 文章 ?g???U?[????



  
DataStream??apiUV??2
  
1Tumbling??1Time.days(1)??uv
 
trigger
  
2stateTTL


DataStream

Flink DataStream ????UV????

2020-07-07 文章 ?g???U?[????



  
DataStream??apiUV??2
1Tumbling??1Time.days(1)??uv
trigger
2stateTTL

 
??
Jiazhi

Re: Re: Re: Re: flink 1.11 作业执行异常

2020-07-07 文章 noake
我在1.11.0中遇到了同样的问题, pom中加了下面的依赖就没解决了
dependency
 groupIdorg.apache.flink/groupId
 artifactIdflink-clients_${scala.binary.version}/artifactId
 version${flink.version}/version
/dependency


原始邮件
发件人:Congxian qiuqcx978132...@gmail.com
收件人:user-zhuser...@flink.apache.org
抄送:Jark wuimj...@gmail.com; Jun zhangzhangjunemail...@gmail.com
发送时间:2020年7月7日(周二) 19:35
主题:Re: Re: Re: Re: flink 1.11 作业执行异常


Hi 从这个报错看上去是尝试通过 serviceLoader 加载一些 factory 的时候出错了(找不到),可以看看对应的 module 的 
resources 文件下是否有对应的 resource 文件 Best, Congxian sunfulin sunfulin0...@163.com 
于2020年7月7日周二 下午6:29写道: hi,  我的pom文件本地执行时,scope的provided都是去掉的。  dependency  
groupIdorg.apache.flink/groupId   
artifactIdflink-table-planner-blink_${scala.binary.version}/artifactId  
version${flink.version}/version  /dependency
确实比较诡异。org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
  这个异常在啥情况下会触发到。在 2020-07-07 18:10:58,"Jark Wu" 
imj...@gmail.com 写道:  如果是在 IDEA 中运行的话,你看看 blink planner 这个依赖的 scope 是不是被 
provided 掉了? 去掉  provided  再试试看?Best,  JarkOn Tue, 7 Jul 2020 at 18:01, 
sunfulin sunfulin0...@163.com wrote: hi,   @Jun Zhang 我一直使用的就是blink 
planner,这个jar包一直都有的。 @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?
   在 2020-07-07 15:40:17,"Jark Wu" imj...@gmail.com 写道: 
  Hi,  你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?  Best,   Jark  On Tue, 7 Jul 
2020 at 15:31, Jun Zhang zhangjunemail...@gmail.com   wrote:   hi.sunfulin  
  你有没有导入blink的planner呢,加入这个试试   dependency
groupIdorg.apache.flink/groupId   
artifactIdflink-table-planner-blink_${scala.binary.version}/artifactId
version${flink.version}/version/dependency  sunfulin 
sunfulin0...@163.com 于2020年7月7日周二 下午3:21写道:hi, jark
我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
configuration里的DeployOptions.TARGET
(execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。  
//构建StreamExecutionEnvironmentpublic static final 
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();   
//构建EnvironmentSettings 并指定Blink Plannerprivate static final 
EnvironmentSettings bsSettings =   
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();  
 //构建StreamTableEnvironmentpublic static final StreamTableEnvironment 
tEnv =StreamTableEnvironment.create(env, bsSettings);   
tEnv.executeSql(“ddl sql”);//source注册成表   
tEnv.createTemporaryView("test", ds, $("f0").as("id"),$("f1").as("first"), 
$("p").proctime());//join语句   Table table = 
tEnv.sqlQuery("select b.* from test a left  joinmy_dim FOR SYSTEM_TIME AS 
OF a.p AS b on a.first = b.userId");//输出   
tEnv.toAppendStream(table,  Row.class).print("LookUpJoinJob");
env.execute("LookUpJoinJob");在 2020-07-06 
14:59:17,"Jark Wu" imj...@gmail.com 写道:能分享下复现的作业代码不?Best,Jark   
 On Mon, 6 Jul 2020 at 11:00, sunfulin sunfulin0...@163.com  wrote: 
Hi, 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常: 
org.apache.flink.table.api.TableExecution: Failed to execute sql 
caused by : java.lang.IlleagalStateException: No ExecutorFactory   foundto  
   execute the application. at   
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。

Re: 嵌套 json 中string 数组的解析异常

2020-07-07 文章 Leonard Xu
Hi, 

看了下代码,这确实是Flink 1.9里面的一个bug[1], 原因没有 source 没有正确处理legacy type 和新的 
type,这个issue没有在1.9的分支上修复,可以升级到1.10.1试下。

祝好,
Leonard Xu
[1]https://issues.apache.org/jira/browse/FLINK-16622 




Flink 1.11 SQL作业中调用UDTF 出现“No match found for function signature ”异常

2020-07-07 文章 邹云鹤


hi all
本人基于Flink 1.11 SNAPSHOT 在 Flink sql 作业中使用 UDTF, UDTF 的定义如下:


@FunctionHint(
input = {@DataTypeHint("STRING"), @DataTypeHint("STRING")},
output = @DataTypeHint("STRING")
)
public class Split extends TableFunction {
  public Split(){}
  public void eval(String str, String ch) {
if (str == null || str.isEmpty()) {
  return;
} else {
  String[] ss = str.split(ch);
  for (String s : ss) {
collect(s);
  }
}
  }
} 


在flink sql中通过 create function splitByChar as '**.**.Split' 
来创建这个function,在tableEnv 中调用executeSql() 来完成对这个 function的注册,在sql 后面的计算逻辑中 
通过以下方式来调用这个UDTF 
create view view_source_1 as select `dateTime,`itime`,  lng,lat,net,event_info, 
cast(split_index(T.s, '_', 0) as int) as time_page from view_source as a left 
join LATERAL TABLE (splitByChar('a,b,c',',')) as T(s) on true;


结果一直出现以下错误信息:
org.apache.flink.table.api.ValidationException: SQL validation failed. From 
line 3, column 25 to line 3, column 47: No match found for function signature 
splitByChar(, )
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:629)

Caused by: org.apache.calcite.runtime.CalciteContextException: From line 3, 
column 25 to line 3, column 47: r(, )
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at 
org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
at 
org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
... 8 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match 
found for function signature splitByChar(, )
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 

Re: Re: Re: Re: flink 1.11 作业执行异常

2020-07-07 文章 Congxian Qiu
Hi

从这个报错看上去是尝试通过 serviceLoader 加载一些 factory 的时候出错了(找不到),可以看看对应的 module 的
resources 文件下是否有对应的 resource 文件

Best,
Congxian


sunfulin  于2020年7月7日周二 下午6:29写道:

>
>
>
> hi,
> 我的pom文件本地执行时,scope的provided都是去掉的。
> 
> org.apache.flink
>
>  flink-table-planner-blink_${scala.binary.version}
>${flink.version}
> 
>
>
> 确实比较诡异。org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
> 这个异常在啥情况下会触发到。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-07 18:10:58,"Jark Wu"  写道:
> >如果是在 IDEA 中运行的话,你看看 blink planner 这个依赖的 scope 是不是被 provided 掉了? 去掉
> provided
> >再试试看?
> >
> >Best,
> >Jark
> >
> >On Tue, 7 Jul 2020 at 18:01, sunfulin  wrote:
> >
> >> hi,
> >>  @Jun Zhang 我一直使用的就是blink planner,这个jar包一直都有的。
> >>
> >>  @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-07 15:40:17,"Jark Wu"  写道:
> >> >Hi,
> >> >
> >> >你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?
> >> >
> >> >Best,
> >> >Jark
> >> >
> >> >On Tue, 7 Jul 2020 at 15:31, Jun Zhang 
> >> wrote:
> >> >
> >> >> hi.sunfulin
> >> >> 你有没有导入blink的planner呢,加入这个试试
> >> >>
> >> >> 
> >> >> org.apache.flink
> >> >>
> >>
> flink-table-planner-blink_${scala.binary.version}
> >> >> ${flink.version}
> >> >> 
> >> >>
> >> >>
> >> >> sunfulin  于2020年7月7日周二 下午3:21写道:
> >> >>
> >> >>>
> >> >>>
> >> >>>
> >> >>> hi, jark
> >> >>> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
> >> >>> configuration里的DeployOptions.TARGET
> >> >>> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
> >> >>>
> >> >>>
> >> >>> //构建StreamExecutionEnvironment
> >> >>> public static final StreamExecutionEnvironment env =
> >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> >> >>>
> >> >>> //构建EnvironmentSettings 并指定Blink Planner
> >> >>> private static final EnvironmentSettings bsSettings =
> >> >>>
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >> >>>
> >> >>> //构建StreamTableEnvironment
> >> >>> public static final StreamTableEnvironment tEnv =
> >> >>> StreamTableEnvironment.create(env, bsSettings);
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>tEnv.executeSql(“ddl sql”);
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> //source注册成表
> >> >>>
> >> >>> tEnv.createTemporaryView("test", ds, $("f0").as("id"),
> >> >>> $("f1").as("first"), $("p").proctime());
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> //join语句
> >> >>>
> >> >>> Table table = tEnv.sqlQuery("select b.* from test a left
> join
> >> >>> my_dim FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> //输出
> >> >>>
> >> >>> tEnv.toAppendStream(table,
> Row.class).print("LookUpJoinJob");
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> env.execute("LookUpJoinJob");
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> 在 2020-07-06 14:59:17,"Jark Wu"  写道:
> >> >>> >能分享下复现的作业代码不?
> >> >>> >
> >> >>> >Best,
> >> >>> >Jark
> >> >>> >
> >> >>> >On Mon, 6 Jul 2020 at 11:00, sunfulin 
> wrote:
> >> >>> >
> >> >>> >> Hi,
> >> >>> >> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
> >> >>> >> org.apache.flink.table.api.TableExecution: Failed to execute sql
> >> >>> >>
> >> >>> >>
> >> >>> >> caused by : java.lang.IlleagalStateException: No ExecutorFactory
> >> found
> >> >>> to
> >> >>> >> execute the application.
> >> >>> >>   at
> >> >>> >>
> >> >>>
> >>
> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
> >> >>> >>
> >> >>> >>
> >> >>> >> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
> >> >>>
> >> >>
> >>
>


Re: 嵌套 json 中string 数组的解析异常

2020-07-07 文章 Jun Zou
Hi, Leonard Xu:

我使用的 sql 如下,

> SELECT TUMBLE_START(rowtime, INTERVAL '30' SECOND) AS ts, fruit,
> COUNT(`fruit`) AS `cnt`
> FROM mysource, UNNEST(mysource.parsedResponse) AS A(fruit)
> GROUP BY TUMBLE(rowtime, INTERVAL '30' SECOND), fruit


从调试日志来看,应该是一开始就挂掉了,我贴一下相关的日志

INFO - Initializing heap keyed state backend with stream factory.

INFO - Source: Custom Source -> Timestamps/Watermarks -> from:
> (parsedResponse, rowtime) -> correlate:
> table(explode($cor0.parsedResponse)), select: parsedResponse, rowtime, f0
> -> select: (rowtime, fruit) -> time attribute: (rowtime) (1/1)
> (d8c5f92b850811595dbdc130c04f9e58) switched from RUNNING to FAILED.
> java.lang.Exception:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.CommonConsumer.run(CommonConsumer.java:49)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
> Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be
> cast to [Ljava.lang.String;
> at
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
> ... 10 more
>

另外,如果我把string 数组的类型从  BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
改为 ObjectArrayTypeInfo.getInfoFor(Types.STRING), 即schema 从

> root
>  |-- parsedResponse: LEGACY(BasicArrayTypeInfo)
>  |-- rowtime: TIMESTAMP(3) *ROWTIME*
>
变为

> root
>  |-- parsedResponse: ARRAY
>  |-- rowtime: TIMESTAMP(3) *ROWTIME*
>

也仍然会发生相同的错误,但日志执行有些不同

> INFO - Source: Custom Source -> Timestamps/Watermarks -> from:
> (parsedResponse, rowtime) -> correlate:
> table(explode($cor0.parsedResponse)), select: parsedResponse, rowtime, f0
> -> select: (rowtime, fruit) -> time attribute: (rowtime) (1/1)
> (36b79032354b9e9ab70a30d98b1de903) switched from RUNNING to FAILED.
> java.lang.Exception:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> 

Re: 一个source多个sink的同步问题

2020-07-07 文章 lgs
是1个小时才到来。10:00- 11:00的数据,11:01分到来。

但是现在的问题是这个数据来了,我的第一个sink马上就保存到数据库了, 11:02进数据库。但是第二个sink,因为有tumble
window,所以10:00- 11:00的数据,需要到12:01,才会触发这个窗口。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: Flink从SavePoint启动任务,修改的代码不生效

2020-07-07 文章 milan183sansiro
好的,感谢。


在2020年7月7日 10:28,Paul Lam 写道:
估计你是用同一个 Kafka Source 消费 A B 两个 Topic? 如果是,看起来像是 Kafka Connector 早期的一个问题。

作业停止的时候,Topic B 的 partition offset 被存储到 Savepoint 中,然后在恢复的时候,尽管代码中 Topic B 
已经被移除,但它的 partition offset 还是被恢复了。

这个问题在后来的版本,估计是 1.8 或 1.9,被修复了。

Best,
Paul Lam

2020年7月6日 20:55,milan183sansiro  写道:

你好:
1.没有给算子手动设置id
2.设置savepoint恢复的路径是正确的


在2020年7月6日 20:32,wujunxi<462329...@qq.com> 写道:
你好,确认以下两个点
1.是否给每个算子设置了id
2.设置savepoint恢复的路径是否正确



--原始邮件--
发件人:"milan183sansiro"

Re:Re: Re: Re: flink 1.11 作业执行异常

2020-07-07 文章 sunfulin



hi,
我的pom文件本地执行时,scope的provided都是去掉的。

org.apache.flink
   flink-table-planner-blink_${scala.binary.version}
   ${flink.version}



确实比较诡异。org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
 这个异常在啥情况下会触发到。














在 2020-07-07 18:10:58,"Jark Wu"  写道:
>如果是在 IDEA 中运行的话,你看看 blink planner 这个依赖的 scope 是不是被 provided 掉了? 去掉 provided
>再试试看?
>
>Best,
>Jark
>
>On Tue, 7 Jul 2020 at 18:01, sunfulin  wrote:
>
>> hi,
>>  @Jun Zhang 我一直使用的就是blink planner,这个jar包一直都有的。
>>
>>  @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-07 15:40:17,"Jark Wu"  写道:
>> >Hi,
>> >
>> >你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?
>> >
>> >Best,
>> >Jark
>> >
>> >On Tue, 7 Jul 2020 at 15:31, Jun Zhang 
>> wrote:
>> >
>> >> hi.sunfulin
>> >> 你有没有导入blink的planner呢,加入这个试试
>> >>
>> >> 
>> >> org.apache.flink
>> >>
>>  flink-table-planner-blink_${scala.binary.version}
>> >> ${flink.version}
>> >> 
>> >>
>> >>
>> >> sunfulin  于2020年7月7日周二 下午3:21写道:
>> >>
>> >>>
>> >>>
>> >>>
>> >>> hi, jark
>> >>> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
>> >>> configuration里的DeployOptions.TARGET
>> >>> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
>> >>>
>> >>>
>> >>> //构建StreamExecutionEnvironment
>> >>> public static final StreamExecutionEnvironment env =
>> >>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >>>
>> >>> //构建EnvironmentSettings 并指定Blink Planner
>> >>> private static final EnvironmentSettings bsSettings =
>> >>>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> >>>
>> >>> //构建StreamTableEnvironment
>> >>> public static final StreamTableEnvironment tEnv =
>> >>> StreamTableEnvironment.create(env, bsSettings);
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>tEnv.executeSql(“ddl sql”);
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> //source注册成表
>> >>>
>> >>> tEnv.createTemporaryView("test", ds, $("f0").as("id"),
>> >>> $("f1").as("first"), $("p").proctime());
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> //join语句
>> >>>
>> >>> Table table = tEnv.sqlQuery("select b.* from test a left join
>> >>> my_dim FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> //输出
>> >>>
>> >>> tEnv.toAppendStream(table, Row.class).print("LookUpJoinJob");
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> env.execute("LookUpJoinJob");
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> 在 2020-07-06 14:59:17,"Jark Wu"  写道:
>> >>> >能分享下复现的作业代码不?
>> >>> >
>> >>> >Best,
>> >>> >Jark
>> >>> >
>> >>> >On Mon, 6 Jul 2020 at 11:00, sunfulin  wrote:
>> >>> >
>> >>> >> Hi,
>> >>> >> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
>> >>> >> org.apache.flink.table.api.TableExecution: Failed to execute sql
>> >>> >>
>> >>> >>
>> >>> >> caused by : java.lang.IlleagalStateException: No ExecutorFactory
>> found
>> >>> to
>> >>> >> execute the application.
>> >>> >>   at
>> >>> >>
>> >>>
>> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>> >>> >>
>> >>> >>
>> >>> >> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
>> >>>
>> >>
>>


回复:rocksdb的block cache usage应该如何使用

2020-07-07 文章 SmileSmile
hi yun tang!

下午通过配置yaml的方式修改env成功生成内存文件,目前在重新复现和获取文件ing! tanks!具体内存dump在获取ing



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月07日 17:47,Yun Tang 写道:
Hi

你的jemalloc有带debug的重新编译么? 例如用下面的命令重新编译jemalloc得到相关的so文件
./configure --enable-prof --enable-stats --enable-debug --enable-fill
make

其次最好指定dump文件的输出地址,例如在 MALLOC_CONF中加上前缀的配置  prof_prefix:/tmp/jeprof.out 
,以确保文件位置可写。

最后,由于你是在容器中跑,在容器退出前要保证相关文件能上传或者退出时候hang住一段时间,否则相关dump的文件无法看到了

祝好
唐云

From: SmileSmile 
Sent: Monday, July 6, 2020 14:15
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

hi yun tang!

我在容器内加入了libjemalloc.so.2并且在配置中加上了
containerized.master.env.LD_PRELOAD: "/opt/jemalloc/lib/libjemalloc.so.2"
containerized.master.env.MALLOC_CONF: 
"prof:true,lg_prof_interval:25,lg_prof_sample:17"

请问要如何可以得到内存文件?试着kill一个tm,找不到对应的heap文件。求助



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 19:13,Yun Tang 写道:
hi

有采集过内存使用情况么,推荐使用jemalloc的预先加载方式[1][2]来sample 
JVM的内存使用,观察是否有malloc的内存存在超用的场景。需要配置相关参数 
containerized.taskmanager.env.MALLOC_CONF 和 
containerized.taskmanager.env.LD_PRELOAD


[1] https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling
[2] https://www.evanjones.ca/java-native-leak-bug.html

祝好
唐云



From: SmileSmile 
Sent: Friday, July 3, 2020 15:22
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

Hi

作业只配置了重启策略,作业如果fail了,只会重启,没有恢复历史数据。

【作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。】
我目前遇到的情况是作业fail重启,pod就很容易被os kill,只能重构集群解决。

详情可见
http://apache-flink.147419.n8.nabble.com/Checkpoint-td4406.html


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 15:13,Yun Tang 写道:
Hi

如果是没有开启Checkpoint,作业是如何做到failover的?failover的时候一定需要从Checkpoint加载数据的。还是说你其实开启了Checkpoint,但是Checkpoint的interval设置的很大,所以每次failover相当于作业重新运行。
如果都没有从checkpoint加载数据,哪里来的历史数据呢?作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 15:07
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

hi yun tang!

因为网络或者不可抗力导致pod重生,作业会重启,目前作业没有开启checkpoint,恢复等价继续消费最新数据计算,运行一段时间很容易内存超用被os 
kill,然后重启,再运行一段时间,间隔变短,死的越来越频繁。

从现象上看很像是内存没有释放,这种场景下,上一次作业残留的未到水位线还没有被触发计算的数据是否在作业重启过程中被清除了?




 写道:
Hi

观察block cache usage的显示数值是否超过你的单个slot的managed memory,计算方法是 managed memory / 
slot数目,得到一个slot的managed memory,将该数值与block cache usage比较,看内存是否超用。重启之后容易被os 
kill,使用的是从savepoint恢复数据么?

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 14:20
To: Yun Tang 
Cc: Flink user-zh mailing list 
Subject: 回复:rocksdb的block cache usage应该如何使用

thanks yun tang!

那如果想通过block cache usage判断是否超过managed memory,该如何配置呢? 最近遇到作业只要重启后很容易被os 
kill的情况,想对比下




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 14:10,Yun Tang 写道:
Hi

默认Flink启用了rocksDB 的managed 
memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block 
cache均是一个,这样你可以根据taskmanager和subtask_index 
作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block 
cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。

祝好
唐云

From: SmileSmile 
Sent: Thursday, July 2, 2020 18:05
To: Flink user-zh mailing list 
Subject: rocksdb的block cache usage应该如何使用


通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 
rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。


我们的作业一个TM的内存设置如下:

taskmanager.memory.process.size: 23000m
taskmanager.memory.managed.fraction: 0.4

ui上显示的Flink Managed MEM是8.48G。


通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host)



如果维度是host,operator_name,每个operator_name维度是22G。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host,operator_name)


请问这个指标应该如何使用?

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制


Re: Re: Re: flink 1.11 作业执行异常

2020-07-07 文章 Jark Wu
如果是在 IDEA 中运行的话,你看看 blink planner 这个依赖的 scope 是不是被 provided 掉了? 去掉 provided
再试试看?

Best,
Jark

On Tue, 7 Jul 2020 at 18:01, sunfulin  wrote:

> hi,
>  @Jun Zhang 我一直使用的就是blink planner,这个jar包一直都有的。
>
>  @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-07 15:40:17,"Jark Wu"  写道:
> >Hi,
> >
> >你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?
> >
> >Best,
> >Jark
> >
> >On Tue, 7 Jul 2020 at 15:31, Jun Zhang 
> wrote:
> >
> >> hi.sunfulin
> >> 你有没有导入blink的planner呢,加入这个试试
> >>
> >> 
> >> org.apache.flink
> >>
>  flink-table-planner-blink_${scala.binary.version}
> >> ${flink.version}
> >> 
> >>
> >>
> >> sunfulin  于2020年7月7日周二 下午3:21写道:
> >>
> >>>
> >>>
> >>>
> >>> hi, jark
> >>> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
> >>> configuration里的DeployOptions.TARGET
> >>> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
> >>>
> >>>
> >>> //构建StreamExecutionEnvironment
> >>> public static final StreamExecutionEnvironment env =
> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> >>>
> >>> //构建EnvironmentSettings 并指定Blink Planner
> >>> private static final EnvironmentSettings bsSettings =
> >>>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >>>
> >>> //构建StreamTableEnvironment
> >>> public static final StreamTableEnvironment tEnv =
> >>> StreamTableEnvironment.create(env, bsSettings);
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>tEnv.executeSql(“ddl sql”);
> >>>
> >>>
> >>>
> >>>
> >>> //source注册成表
> >>>
> >>> tEnv.createTemporaryView("test", ds, $("f0").as("id"),
> >>> $("f1").as("first"), $("p").proctime());
> >>>
> >>>
> >>>
> >>>
> >>> //join语句
> >>>
> >>> Table table = tEnv.sqlQuery("select b.* from test a left join
> >>> my_dim FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
> >>>
> >>>
> >>>
> >>>
> >>> //输出
> >>>
> >>> tEnv.toAppendStream(table, Row.class).print("LookUpJoinJob");
> >>>
> >>>
> >>>
> >>>
> >>> env.execute("LookUpJoinJob");
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> 在 2020-07-06 14:59:17,"Jark Wu"  写道:
> >>> >能分享下复现的作业代码不?
> >>> >
> >>> >Best,
> >>> >Jark
> >>> >
> >>> >On Mon, 6 Jul 2020 at 11:00, sunfulin  wrote:
> >>> >
> >>> >> Hi,
> >>> >> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
> >>> >> org.apache.flink.table.api.TableExecution: Failed to execute sql
> >>> >>
> >>> >>
> >>> >> caused by : java.lang.IlleagalStateException: No ExecutorFactory
> found
> >>> to
> >>> >> execute the application.
> >>> >>   at
> >>> >>
> >>>
> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
> >>> >>
> >>> >>
> >>> >> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
> >>>
> >>
>


Re: 如何在Flink SQL中使用周期性水印?

2020-07-07 文章 Jark Wu
Hi,

这个问题我理解其实和周期性水印没有关系,是属于 idle source
的问题,你可以尝试下加上配置 table.exec.source.idle-timeout = 10s 能不能解决你的问题。[1]

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout

On Tue, 7 Jul 2020 at 17:35, noake  wrote:

> Dear All:
>
>
> 大佬们, 请教下如何在Flink SQL中使用周期性的水印。
> 我们在消费kafka时, 想设置在没有数据时水印时间也能继续向前走, 用的是Flink SQL。


Re: 一个source多个sink的同步问题

2020-07-07 文章 Jark Wu
watermark 的计算是跟数据上的 event-time 相关的。你的数据是不是间隔一小时来一波的呢?
比如 10:00 的数据之后,就是 11:00 的数据,但是要1小时后才到来?


Best,
Jark

On Tue, 7 Jul 2020 at 17:20, lgs <9925...@qq.com> wrote:

> source是kafka,有一个rowtime定义:
>
> .field("rowtime", DataTypes.TIMESTAMP(0))
> .rowtime(Rowtime()
> .timestamps_from_field("actionTime")
> .watermarks_periodic_bounded(6)
> )
>
> 有两个sink,第一个sink是直接把kafa的数据保存到postgres。
> 第二个sink是定义一个1小时的tumble window,然后定义了一个udf,udf里面去查询第一个sink保存的数据。
> st_env.scan("source") \
>  .window(Tumble.over("1.hour").on("rowtime").alias("hourlywindow"))
> \
>  .group_by("hourlywindow") \
>  .select("udf(...)")
>  ...
>
>
> 现在的问题是:第二个sink的tumble window触发的时候,数据库里面的数据已经保存了下一个小时的数据了。
>
> 有什么办法让tumble window在一个小时结束后马上触发?现在观察的是需要下一个小时的数据来了,才能触发上一个小时的窗口。
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re:Re: Re: flink 1.11 作业执行异常

2020-07-07 文章 sunfulin
hi, 
 @Jun Zhang 我一直使用的就是blink planner,这个jar包一直都有的。

 @Jark Wu 我是在本地idea中直接运行的,还没有打包到集群跑。跟这个有关系么?


















在 2020-07-07 15:40:17,"Jark Wu"  写道:
>Hi,
>
>你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?
>
>Best,
>Jark
>
>On Tue, 7 Jul 2020 at 15:31, Jun Zhang  wrote:
>
>> hi.sunfulin
>> 你有没有导入blink的planner呢,加入这个试试
>>
>> 
>> org.apache.flink
>> 
>> flink-table-planner-blink_${scala.binary.version}
>> ${flink.version}
>> 
>>
>>
>> sunfulin  于2020年7月7日周二 下午3:21写道:
>>
>>>
>>>
>>>
>>> hi, jark
>>> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
>>> configuration里的DeployOptions.TARGET
>>> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
>>>
>>>
>>> //构建StreamExecutionEnvironment
>>> public static final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> //构建EnvironmentSettings 并指定Blink Planner
>>> private static final EnvironmentSettings bsSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>
>>> //构建StreamTableEnvironment
>>> public static final StreamTableEnvironment tEnv =
>>> StreamTableEnvironment.create(env, bsSettings);
>>>
>>>
>>>
>>>
>>>
>>>tEnv.executeSql(“ddl sql”);
>>>
>>>
>>>
>>>
>>> //source注册成表
>>>
>>> tEnv.createTemporaryView("test", ds, $("f0").as("id"),
>>> $("f1").as("first"), $("p").proctime());
>>>
>>>
>>>
>>>
>>> //join语句
>>>
>>> Table table = tEnv.sqlQuery("select b.* from test a left join
>>> my_dim FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
>>>
>>>
>>>
>>>
>>> //输出
>>>
>>> tEnv.toAppendStream(table, Row.class).print("LookUpJoinJob");
>>>
>>>
>>>
>>>
>>> env.execute("LookUpJoinJob");
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-07-06 14:59:17,"Jark Wu"  写道:
>>> >能分享下复现的作业代码不?
>>> >
>>> >Best,
>>> >Jark
>>> >
>>> >On Mon, 6 Jul 2020 at 11:00, sunfulin  wrote:
>>> >
>>> >> Hi,
>>> >> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
>>> >> org.apache.flink.table.api.TableExecution: Failed to execute sql
>>> >>
>>> >>
>>> >> caused by : java.lang.IlleagalStateException: No ExecutorFactory found
>>> to
>>> >> execute the application.
>>> >>   at
>>> >>
>>> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>>> >>
>>> >>
>>> >> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
>>>
>>


Re: flink sql 读写写kafka表的时候可以指定消息的key吗

2020-07-07 文章 Jark Wu
Hi,

可以描述下你的业务场景么? 为什么一定要去获取 key 的信息呢,因为按照我的理解,一般来说 key 的信息一般在 value 中也有。

Best,
Jark

On Tue, 7 Jul 2020 at 17:17, op <520075...@qq.com> wrote:

> 感谢
>
>
>
>
> --原始邮件--
> 发件人:"Leonard Xu" 发送时间:2020年7月7日(星期二) 下午5:15
> 收件人:"user-zh"
> 主题:Re: flink sql 读写写kafka表的时候可以指定消息的key吗
>
>
>
> Hi,
>
> 目前还不支持的,社区有一个 FLIP-107[1] 在计划做这个事情。
>
> 祝好,
> Leonard Xu
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.
> ;
>
>  在 2020年7月7日,17:01,op <520075...@qq.com 写道:
> 
>  hi:
>  nbsp; flink sql 写kafka表的时候可以指定消息的key吗?
>  看官网的kafka connector没有找到消息key相关的说明
>  如果可以的话,如何指定?
>  nbsp;谢谢


Re: 回复:rocksdb的block cache usage应该如何使用

2020-07-07 文章 Yun Tang
Hi

你的jemalloc有带debug的重新编译么? 例如用下面的命令重新编译jemalloc得到相关的so文件
./configure --enable-prof --enable-stats --enable-debug --enable-fill
make

其次最好指定dump文件的输出地址,例如在 MALLOC_CONF中加上前缀的配置  prof_prefix:/tmp/jeprof.out 
,以确保文件位置可写。

最后,由于你是在容器中跑,在容器退出前要保证相关文件能上传或者退出时候hang住一段时间,否则相关dump的文件无法看到了

祝好
唐云

From: SmileSmile 
Sent: Monday, July 6, 2020 14:15
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

hi yun tang!

我在容器内加入了libjemalloc.so.2并且在配置中加上了
containerized.master.env.LD_PRELOAD: "/opt/jemalloc/lib/libjemalloc.so.2"
containerized.master.env.MALLOC_CONF: 
"prof:true,lg_prof_interval:25,lg_prof_sample:17"

请问要如何可以得到内存文件?试着kill一个tm,找不到对应的heap文件。求助



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 19:13,Yun Tang 写道:
hi

有采集过内存使用情况么,推荐使用jemalloc的预先加载方式[1][2]来sample 
JVM的内存使用,观察是否有malloc的内存存在超用的场景。需要配置相关参数 
containerized.taskmanager.env.MALLOC_CONF 和 
containerized.taskmanager.env.LD_PRELOAD


[1] https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Heap-Profiling
[2] https://www.evanjones.ca/java-native-leak-bug.html

祝好
唐云



From: SmileSmile 
Sent: Friday, July 3, 2020 15:22
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

Hi

作业只配置了重启策略,作业如果fail了,只会重启,没有恢复历史数据。

【作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。】
我目前遇到的情况是作业fail重启,pod就很容易被os kill,只能重构集群解决。

详情可见
http://apache-flink.147419.n8.nabble.com/Checkpoint-td4406.html


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 15:13,Yun Tang 写道:
Hi

如果是没有开启Checkpoint,作业是如何做到failover的?failover的时候一定需要从Checkpoint加载数据的。还是说你其实开启了Checkpoint,但是Checkpoint的interval设置的很大,所以每次failover相当于作业重新运行。
如果都没有从checkpoint加载数据,哪里来的历史数据呢?作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 15:07
To: Yun Tang 
Cc: user-zh@flink.apache.org 
Subject: 回复:rocksdb的block cache usage应该如何使用

hi yun tang!

因为网络或者不可抗力导致pod重生,作业会重启,目前作业没有开启checkpoint,恢复等价继续消费最新数据计算,运行一段时间很容易内存超用被os 
kill,然后重启,再运行一段时间,间隔变短,死的越来越频繁。

从现象上看很像是内存没有释放,这种场景下,上一次作业残留的未到水位线还没有被触发计算的数据是否在作业重启过程中被清除了?




 写道:
Hi

观察block cache usage的显示数值是否超过你的单个slot的managed memory,计算方法是 managed memory / 
slot数目,得到一个slot的managed memory,将该数值与block cache usage比较,看内存是否超用。重启之后容易被os 
kill,使用的是从savepoint恢复数据么?

祝好
唐云

From: SmileSmile 
Sent: Friday, July 3, 2020 14:20
To: Yun Tang 
Cc: Flink user-zh mailing list 
Subject: 回复:rocksdb的block cache usage应该如何使用

thanks yun tang!

那如果想通过block cache usage判断是否超过managed memory,该如何配置呢? 最近遇到作业只要重启后很容易被os 
kill的情况,想对比下




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月03日 14:10,Yun Tang 写道:
Hi

默认Flink启用了rocksDB 的managed 
memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block 
cache均是一个,这样你可以根据taskmanager和subtask_index 
作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block 
cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。

祝好
唐云

From: SmileSmile 
Sent: Thursday, July 2, 2020 18:05
To: Flink user-zh mailing list 
Subject: rocksdb的block cache usage应该如何使用


通过 state.backend.rocksdb.metrics.block-cache-usage: true开启 
rocksdb_block_cache_usage监控,上报到prometheus,对应的指标名称是
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage。


我们的作业一个TM的内存设置如下:

taskmanager.memory.process.size: 23000m
taskmanager.memory.managed.fraction: 0.4

ui上显示的Flink Managed MEM是8.48G。


通过grafana配置出来的图,如果group by的维度是host,得出来的每个TM在作业稳定后是45G,超过8.48G了。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host)



如果维度是host,operator_name,每个operator_name维度是22G。

sum(flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_usage{reportName=~"$reportName"})
 by (host,operator_name)


请问这个指标应该如何使用?

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制


Re: flink kafka connector中获取kafka元数据

2020-07-07 文章 Dream-底限
好的

On Tue, Jul 7, 2020 at 5:30 PM Leonard Xu  wrote:

> 嗯,这个在FLIP-107里会支持,目前没法拿到这些meta数据,可以关注下FLIP-107的进展。
>
> Best,
> Leonard Xu
>
> > 在 2020年7月7日,17:26,Dream-底限  写道:
> >
> > hi
> > 是的,想以下面这种方式获取
> >
> > CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
> > ('connector.type' = 'kafka','connector.version' = '0.11' ,...)
> >
> >
> > On Tue, Jul 7, 2020 at 5:19 PM Leonard Xu  wrote:
> >
> >> Hi,
> >> kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
> >> 如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。
> >>
> >> 祝好,
> >> Leonard Xu
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records
> >>>
> >>
> >>> 在 2020年7月7日,17:12,Dream-底限  写道:
> >>>
> >>> kafka元数据
> >>
> >>
>
>


嵌套 json 中string 数组的解析异常

2020-07-07 文章 Jun Zou
Hi all:
我使用 flink 1.9 处理嵌套 json, 它嵌套了一个string数组,构造出的 table schema结构为:
Row(parsedResponse: BasicArrayTypeInfo, timestamp: Long)
执行作业后会发生报错如下,出现 object 类型和string 类型的转换错误
Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast
to [Ljava.lang.String;
at
org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)

大佬们知道该怎么修改么?

我的json 的结构如下:
{"parsedResponse":["apple", "banana", "orange"], "timestamp": "1522253345"}
 P.S:
如果把 string 数组改为 long 数组或者 double 数组执行对应的操作可以正确运行,目前来看只有 string 数组出现问题。


如何在Flink SQL中使用周期性水印?

2020-07-07 文章 noake
Dear All:


大佬们, 请教下如何在Flink SQL中使用周期性的水印。
我们在消费kafka时, 想设置在没有数据时水印时间也能继续向前走, 用的是Flink SQL。

Re: flink kafka connector中获取kafka元数据

2020-07-07 文章 Leonard Xu
嗯,这个在FLIP-107里会支持,目前没法拿到这些meta数据,可以关注下FLIP-107的进展。

Best,
Leonard Xu

> 在 2020年7月7日,17:26,Dream-底限  写道:
> 
> hi
> 是的,想以下面这种方式获取
> 
> CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
> ('connector.type' = 'kafka','connector.version' = '0.11' ,...)
> 
> 
> On Tue, Jul 7, 2020 at 5:19 PM Leonard Xu  wrote:
> 
>> Hi,
>> kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
>> 如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。
>> 
>> 祝好,
>> Leonard Xu
>> 
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records
>>> 
>> 
>>> 在 2020年7月7日,17:12,Dream-底限  写道:
>>> 
>>> kafka元数据
>> 
>> 



Re: flink kafka connector中获取kafka元数据

2020-07-07 文章 Dream-底限
hi
是的,想以下面这种方式获取

CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
('connector.type' = 'kafka','connector.version' = '0.11' ,...)


On Tue, Jul 7, 2020 at 5:19 PM Leonard Xu  wrote:

> Hi,
>  kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
> 如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。
>
> 祝好,
> Leonard Xu
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records
> >
>
> > 在 2020年7月7日,17:12,Dream-底限  写道:
> >
> > kafka元数据
>
>


一个source多个sink的同步问题

2020-07-07 文章 lgs
source是kafka,有一个rowtime定义:

.field("rowtime", DataTypes.TIMESTAMP(0))
.rowtime(Rowtime()
.timestamps_from_field("actionTime")
.watermarks_periodic_bounded(6)
)

有两个sink,第一个sink是直接把kafa的数据保存到postgres。
第二个sink是定义一个1小时的tumble window,然后定义了一个udf,udf里面去查询第一个sink保存的数据。
st_env.scan("source") \
 .window(Tumble.over("1.hour").on("rowtime").alias("hourlywindow"))
\
 .group_by("hourlywindow") \
 .select("udf(...)")
 ...


现在的问题是:第二个sink的tumble window触发的时候,数据库里面的数据已经保存了下一个小时的数据了。

有什么办法让tumble window在一个小时结束后马上触发?现在观察的是需要下一个小时的数据来了,才能触发上一个小时的窗口。




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink kafka connector中获取kafka元数据

2020-07-07 文章 Leonard Xu
Hi,
 kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。

祝好,
Leonard Xu

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
 


> 在 2020年7月7日,17:12,Dream-底限  写道:
> 
> kafka元数据



?????? flink sql ??????kafka??????????????????????key??

2020-07-07 文章 op





----
??:"Leonard Xu"https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.
 


Re: flink sql 读写写kafka表的时候可以指定消息的key吗

2020-07-07 文章 Leonard Xu
Hi,

目前还不支持的,社区有一个 FLIP-107[1] 在计划做这个事情。

祝好,
Leonard Xu
[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.
 


> 在 2020年7月7日,17:01,op <520075...@qq.com> 写道:
> 
> hi:
>  flink sql 写kafka表的时候可以指定消息的key吗?
> 看官网的kafka connector没有找到消息key相关的说明
> 如果可以的话,如何指定?
> 谢谢



flink kafka connector中获取kafka元数据

2020-07-07 文章 Dream-底限
hi、
flink table/sql api中,有办法获取kafka元数据吗?

tableEnvironment.sqlUpdate(CREATE TABLE MyUserTable (...) WITH
('connector.type' = 'kafka','connector.version' = '0.11' ,...))


flink sql ??????kafka??????????????????????key??

2020-07-07 文章 op
hi??
 flink sql ??kafka??key
kafka connectorkey??



Re: 作业从flink1.9.0迁移到1.10.1,LogicalTypeRoot变更后无法从CP恢复:No enum constant org.apache.flink.table.types.logical.LogicalTypeRoot.ANY

2020-07-07 文章 Jark Wu
Hi,

问一下,你是指用1.10去恢复 1.9 作业的 savepoint/checkpoint 吗?还是指迁移到 1.10 后,无法从 failover
中恢复?
如果是前者的话,Flink SQL 目前没有保证跨大版本的 state 兼容性。所以当你从 1.9 升级到 1.10 时,作业需要放弃状态重跑。

Best,
Jark

On Tue, 7 Jul 2020 at 15:54, 吴磊  wrote:

> 各位好:
> 当我把作业从flink1.9.0迁移到1.10.1,且作业中使用了'group by'形式的语法时,会导致无法从cp/sp恢复,
> 代码:
>
>
> 报错如下:
>
> switched from RUNNING to FAILED.switched from RUNNING to 
> FAILED.java.lang.Exception: Exception while creating 
> StreamOperatorStateContext. at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at 
> java.lang.Thread.run(Thread.java:748)Caused by: 
> org.apache.flink.util.FlinkException: Could not restore keyed state backend 
> for KeyedProcessOperator_d26553d858836b90d15e66f459fbcb50_(2/3) from any of 
> the 1 provided restore options. at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>  at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>  at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>  ... 9 moreCaused by: 
> org.apache.flink.runtime.state.BackendBuildingException: Failed when trying 
> to restore heap backend at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
>  at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
>  at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>  at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>  at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>  ... 11 moreCaused by: java.io.InvalidObjectException: enum constant ANY does 
> not exist in class org.apache.flink.table.types.logical.LogicalTypeRoot at 
> java.io.ObjectInputStream.readEnum(ObjectInputStream.java:2013) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1569) at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166) at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>  at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:555)
>  at 
> org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot.readSnapshot(BaseRowSerializer.java:306)
>  at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
>  at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>  at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
>  at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
>  at 
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
>  at 
> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:169)
>  at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:133)
>  at 
> 

??????flink1.9.0??????1.10.1??LogicalTypeRoot????????????CP??????No enum constant org.apache.flink.table.types.logical.LogicalTypeRoot.ANY

2020-07-07 文章 ????
  flink1.9.0??1.10.1'group 
by'??cp/sp??
??




??
switched from RUNNING to FAILED.switched from RUNNING to 
FAILED.java.lang.Exception: Exception while creating 
StreamOperatorStateContext. at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at 
java.lang.Thread.run(Thread.java:748)Caused by: 
org.apache.flink.util.FlinkException: Could not restore keyed state backend for 
KeyedProcessOperator_d26553d858836b90d15e66f459fbcb50_(2/3) from any of the 1 
provided restore options. at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
 ... 9 moreCaused by: org.apache.flink.runtime.state.BackendBuildingException: 
Failed when trying to restore heap backend at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
 at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
 ... 11 moreCaused by: java.io.InvalidObjectException: enum constant ANY does 
not exist in class org.apache.flink.table.types.logical.LogicalTypeRoot at 
java.io.ObjectInputStream.readEnum(ObjectInputStream.java:2013) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1569) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
 at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:555)
 at 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot.readSnapshot(BaseRowSerializer.java:306)
 at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
 at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
 at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
 at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
 at 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
 at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:169)
 at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:133)
 at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
 ... 15 moreCaused by: java.lang.IllegalArgumentException: No enum constant 
org.apache.flink.table.types.logical.LogicalTypeRoot.ANY at 
java.lang.Enum.valueOf(Enum.java:238) 




 

Re: Re: flink 1.11 作业执行异常

2020-07-07 文章 Jark Wu
Hi,

你是作业打包后在集群执行的,还是在 IDEA 中运行的呢?

Best,
Jark

On Tue, 7 Jul 2020 at 15:31, Jun Zhang  wrote:

> hi.sunfulin
> 你有没有导入blink的planner呢,加入这个试试
>
> 
> org.apache.flink
> flink-table-planner-blink_${scala.binary.version}
> ${flink.version}
> 
>
>
> sunfulin  于2020年7月7日周二 下午3:21写道:
>
>>
>>
>>
>> hi, jark
>> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
>> configuration里的DeployOptions.TARGET
>> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
>>
>>
>> //构建StreamExecutionEnvironment
>> public static final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> //构建EnvironmentSettings 并指定Blink Planner
>> private static final EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>
>> //构建StreamTableEnvironment
>> public static final StreamTableEnvironment tEnv =
>> StreamTableEnvironment.create(env, bsSettings);
>>
>>
>>
>>
>>
>>tEnv.executeSql(“ddl sql”);
>>
>>
>>
>>
>> //source注册成表
>>
>> tEnv.createTemporaryView("test", ds, $("f0").as("id"),
>> $("f1").as("first"), $("p").proctime());
>>
>>
>>
>>
>> //join语句
>>
>> Table table = tEnv.sqlQuery("select b.* from test a left join
>> my_dim FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
>>
>>
>>
>>
>> //输出
>>
>> tEnv.toAppendStream(table, Row.class).print("LookUpJoinJob");
>>
>>
>>
>>
>> env.execute("LookUpJoinJob");
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-06 14:59:17,"Jark Wu"  写道:
>> >能分享下复现的作业代码不?
>> >
>> >Best,
>> >Jark
>> >
>> >On Mon, 6 Jul 2020 at 11:00, sunfulin  wrote:
>> >
>> >> Hi,
>> >> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
>> >> org.apache.flink.table.api.TableExecution: Failed to execute sql
>> >>
>> >>
>> >> caused by : java.lang.IlleagalStateException: No ExecutorFactory found
>> to
>> >> execute the application.
>> >>   at
>> >>
>> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>> >>
>> >>
>> >> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
>>
>


Re: Re: flink 1.11 作业执行异常

2020-07-07 文章 Jun Zhang
hi.sunfulin
你有没有导入blink的planner呢,加入这个试试


org.apache.flink
flink-table-planner-blink_${scala.binary.version}
${flink.version}



sunfulin  于2020年7月7日周二 下午3:21写道:

>
>
>
> hi, jark
> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
> configuration里的DeployOptions.TARGET
> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
>
>
> //构建StreamExecutionEnvironment
> public static final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> //构建EnvironmentSettings 并指定Blink Planner
> private static final EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>
> //构建StreamTableEnvironment
> public static final StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(env, bsSettings);
>
>
>
>
>
>tEnv.executeSql(“ddl sql”);
>
>
>
>
> //source注册成表
>
> tEnv.createTemporaryView("test", ds, $("f0").as("id"),
> $("f1").as("first"), $("p").proctime());
>
>
>
>
> //join语句
>
> Table table = tEnv.sqlQuery("select b.* from test a left join
> my_dim FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
>
>
>
>
> //输出
>
> tEnv.toAppendStream(table, Row.class).print("LookUpJoinJob");
>
>
>
>
> env.execute("LookUpJoinJob");
>
>
>
>
>
>
>
>
> 在 2020-07-06 14:59:17,"Jark Wu"  写道:
> >能分享下复现的作业代码不?
> >
> >Best,
> >Jark
> >
> >On Mon, 6 Jul 2020 at 11:00, sunfulin  wrote:
> >
> >> Hi,
> >> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
> >> org.apache.flink.table.api.TableExecution: Failed to execute sql
> >>
> >>
> >> caused by : java.lang.IlleagalStateException: No ExecutorFactory found
> to
> >> execute the application.
> >>   at
> >>
> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
> >>
> >>
> >> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
>


Re:Re: flink 1.11 作业执行异常

2020-07-07 文章 sunfulin



hi, jark
我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink 
configuration里的DeployOptions.TARGET (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。


//构建StreamExecutionEnvironment
public static final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

//构建EnvironmentSettings 并指定Blink Planner
private static final EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

//构建StreamTableEnvironment
public static final StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(env, bsSettings);





   tEnv.executeSql(“ddl sql”);




//source注册成表

tEnv.createTemporaryView("test", ds, $("f0").as("id"), 
$("f1").as("first"), $("p").proctime());




//join语句

Table table = tEnv.sqlQuery("select b.* from test a left join my_dim 
FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");




//输出

tEnv.toAppendStream(table, Row.class).print("LookUpJoinJob");




env.execute("LookUpJoinJob");








在 2020-07-06 14:59:17,"Jark Wu"  写道:
>能分享下复现的作业代码不?
>
>Best,
>Jark
>
>On Mon, 6 Jul 2020 at 11:00, sunfulin  wrote:
>
>> Hi,
>> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
>> org.apache.flink.table.api.TableExecution: Failed to execute sql
>>
>>
>> caused by : java.lang.IlleagalStateException: No ExecutorFactory found to
>> execute the application.
>>   at
>> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>>
>>
>> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。