回复:Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压source端日志量堆积从cp恢复失败

2020-08-13 文章
有没有可能是没分配uid,然后dag发生了变化,导致的恢复不了状态



---原始邮件---
发件人: "Yang Peng"

Re: 请教大佬一个在flink调用kafka数据源时'scan.startup.mode'参数的使用问题

2020-08-13 文章 yulu yang
好的,谢谢,我试一下!

魏子涵  于2020年8月14日周五 下午1:35写道:

> 建议先不使用flink的Kafka来消费,先自己编码写一个kafka消费看是否还是有这个问题,作个对比,看是否是flink提供的kafka接口的问题。
>
>
> | |
> 魏子涵
> |
> |
> 邮箱:wzh1007181...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年08月14日 13:27,yulu yang 写道:
>  我这个flink作业和和分组都是新创建的,没有抽取历史
> group是新的
>
> 魏子涵  于2020年8月14日周五 下午1:20写道:
>
> > Kafka客户端的group. id参数有改吗?
> >
> >
> > | |
> > 魏子涵
> > |
> > |
> > 邮箱:wzh1007181...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
> > 在2020年08月14日 12:44,yulu yang 写道:
> > 在flink作业中从kafka数据源获取数据,将 参数设置为'scan.startup.mode' = 'earliest-offset',
> > 检测flink运行结果时,发现只抽取了kafka中的newest数据,没有获取到oldest数据。
> > 不知道是不是我这里'scan.startup.mode' 参数用的不对。
> > Flink 版本1.11.1 kafka版本 2.6.0
> >
>


回复:请教大佬一个在flink调用kafka数据源时'scan.startup.mode'参数的使用问题

2020-08-13 文章 魏子涵
建议先不使用flink的Kafka来消费,先自己编码写一个kafka消费看是否还是有这个问题,作个对比,看是否是flink提供的kafka接口的问题。


| |
魏子涵
|
|
邮箱:wzh1007181...@163.com
|

签名由 网易邮箱大师 定制

在2020年08月14日 13:27,yulu yang 写道:
 我这个flink作业和和分组都是新创建的,没有抽取历史
group是新的

魏子涵  于2020年8月14日周五 下午1:20写道:

> Kafka客户端的group. id参数有改吗?
>
>
> | |
> 魏子涵
> |
> |
> 邮箱:wzh1007181...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年08月14日 12:44,yulu yang 写道:
> 在flink作业中从kafka数据源获取数据,将 参数设置为'scan.startup.mode' = 'earliest-offset',
> 检测flink运行结果时,发现只抽取了kafka中的newest数据,没有获取到oldest数据。
> 不知道是不是我这里'scan.startup.mode' 参数用的不对。
> Flink 版本1.11.1 kafka版本 2.6.0
>


Re: 请教大佬一个在flink调用kafka数据源时'scan.startup.mode'参数的使用问题

2020-08-13 文章 yulu yang
  我这个flink作业和和分组都是新创建的,没有抽取历史
group是新的

魏子涵  于2020年8月14日周五 下午1:20写道:

> Kafka客户端的group. id参数有改吗?
>
>
> | |
> 魏子涵
> |
> |
> 邮箱:wzh1007181...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年08月14日 12:44,yulu yang 写道:
> 在flink作业中从kafka数据源获取数据,将 参数设置为'scan.startup.mode' = 'earliest-offset',
> 检测flink运行结果时,发现只抽取了kafka中的newest数据,没有获取到oldest数据。
> 不知道是不是我这里'scan.startup.mode' 参数用的不对。
> Flink 版本1.11.1 kafka版本 2.6.0
>


回复:请教大佬一个在flink调用kafka数据源时'scan.startup.mode'参数的使用问题

2020-08-13 文章 魏子涵
Kafka客户端的group. id参数有改吗?


| |
魏子涵
|
|
邮箱:wzh1007181...@163.com
|

签名由 网易邮箱大师 定制

在2020年08月14日 12:44,yulu yang 写道:
在flink作业中从kafka数据源获取数据,将 参数设置为'scan.startup.mode' = 'earliest-offset',
检测flink运行结果时,发现只抽取了kafka中的newest数据,没有获取到oldest数据。
不知道是不是我这里'scan.startup.mode' 参数用的不对。
Flink 版本1.11.1 kafka版本 2.6.0


Re: 请教关于Flink算子FINISHED状态时无法保存Checkpoint的问题

2020-08-13 文章 yulu yang
对了,我这个flink作业和和分组都是新创建,不存在抽取历史。

杨豫鲁  于2020年8月13日周四 下午3:33写道:

> 请教大家一个我最近在配置Flink流的过程中遇到问题,
>
> flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。
>
>
>
>
>


Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压source端日志量堆积从cp恢复失败

2020-08-13 文章 Yang Peng
Hi,咨询各位一个问题我们有个任务,statebackend为rocksdb
增量执行cp,flink读取kafka经过处理然后写入到kafka,producer开启了EOS,最近发现任务有反压,source端日志量有积压,然后准备改一下资源分配多加一些资源(没有修改并行度,代码未做修改)从cp恢复任务,任务被cancel之后然后从cp恢复发现起不来了连续两次都不行,由于客户端日志保存时间太短当时没来得及去查看客户端日志,所以没有找到客户端日志,


请教大佬一个在flink调用kafka数据源时'scan.startup.mode'参数的使用问题

2020-08-13 文章 yulu yang
在flink作业中从kafka数据源获取数据,将 参数设置为'scan.startup.mode' = 'earliest-offset',
检测flink运行结果时,发现只抽取了kafka中的newest数据,没有获取到oldest数据。
不知道是不是我这里'scan.startup.mode' 参数用的不对。
Flink 版本1.11.1 kafka版本 2.6.0


Re: 如何设置FlinkSQL并行度

2020-08-13 文章 赵一旦
检查点呢,大多数用FlinkSQL的同学们,你们的任务是随时可运行那种吗,不是必须保证不可间断的准确性级别吗?

Xingbo Huang  于2020年8月14日周五 下午12:01写道:

> Hi,
>
> 关于并行度的问题,据我所知,目前Table API上还没法对每一个算子单独设置并行度
>
> Best,
> Xingbo
>
> Zhao,Yi(SEC)  于2020年8月14日周五 上午10:49写道:
>
> > 并行度问题有人帮忙解答下吗,此外补充个相关问题,除了并行度,flink-sql情况下,能做检查点/保存点,并基于检查点/保存点重启sql任务吗。
> >
> > 发件人: "Zhao,Yi(SEC)" 
> > 日期: 2020年8月13日 星期四 上午11:44
> > 收件人: "user-zh@flink.apache.org" 
> > 主题: 如何设置FlinkSQL并行度
> >
> > 看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。
> > 如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢?
> >
> > 比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。
> >
> >
>


Re: flink 1.11 SQL idea调试无数据也无报错

2020-08-13 文章 Xingbo Huang
Hi,
这是因为flink
1.11里面executeSql是一个异步的接口,在idea里面跑的话,直接就结束了,你需要手动拿到那个executeSql的返回的TableResult,然后去

tableResult.getJobClient.get()
  .getJobExecutionResult(Thread.currentThread().getContextClassLoader)
  .get()

进行wait job finished

Best,
Xingbo

DanielGu <610493...@qq.com> 于2020年8月14日周五 上午11:45写道:

> 我遇到个问题,请教一下:
> 环境 1.11 idea
> 参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量
> 数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应
> https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN
> 求助,各位
>
>
>
> 下面是pom 和代码,以及运行结果
>
> // 创建执行环境
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> bsEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
> //设置StateBackend
> bsEnv.setStateBackend(new
> FsStateBackend("file:///tmp/flink/chkdir"));
> EnvironmentSettings bsSettings = EnvironmentSettings
> .newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
> bsSettings);
>
> // Kafka
> String sourceDDL ="CREATE TABLE user_behavior (" +
> "user_id BIGINT," +
> "item_id BIGINT," +
> "category_id BIGINT," +
> "behavior STRING," +
> "ts TIMESTAMP (3)," +
> "proctime AS PROCTIME ()," +
> "WATERMARK FOR ts AS ts-INTERVAL '5' SECOND) " +
> "WITH (" +
> "'connector'='kafka'," +
> "'topic'='user_behavior'," +
> "'scan.startup.mode'='earliest-offset'," +
> "'properties.bootstrap.servers'='localhost:9092'," +
> "'format'='json'" +
> ")";
>
>
> //写入es 改为print
> /*String sinkDDL = "CREATE TABLE buy_cnt_per_hour (" +
> "hour_of_day BIGINT," +
> "buy_cnt BIGINT" +
> ") WITH (" +
> "'connector'='elasticsearch-7'," +
> "'hosts'='http://localhost:9200'," +
> "'index'='buy_cnt_per_hour')";*/
> String sinkDDL = "CREATE TABLE buy_cnt_per_hour (\n" +
>  "hour_of_day BIGINT," +
>  "buy_cnt BIGINT" +
> ") WITH (\n" +
> " 'connector' = 'print'\n" +
> ")";
>
>
> String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" +
> "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as
> hour_of_day , COUNT(*) as buy_cnt\n" +
> "FROM user_behavior\n" +
> "WHERE behavior = 'buy'\n" +
> "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)";
>
>
>
> //注册source和sink
> tEnv.executeSql(sourceDDL);
> tEnv.executeSql(sinkDDL);
> //tableResult.print();
>
>tEnv.executeSql(transformationDDL);
>
> pom
> 
>
> 
> org.apache.flink
>
> flink-table-api-java-bridge_${scala.version}
> ${flink.version}
>
> 
>
> 
> org.apache.flink
>
> flink-table-planner-blink_${scala.version}
> ${flink.version}
>
> 
>
>
> 
> org.apache.flink
> flink-table-common
> ${flink.version}
> provided
> 
>
>
>
>
> 
> org.apache.flink
> flink-clients_${scala.version}
> ${flink.version}
>
> 
>
>
>
> 
> org.apache.flink
> flink-json
> ${flink.version}
>
> 
>
>
> 
> org.apache.flink
>
> flink-connector-elasticsearch7_${scala.version}
> ${flink.version}
> 
>
>
> 
> org.apache.flink
>
> flink-sql-connector-kafka_${scala.version}
> ${flink.version}
> 
>
>
>
>
> 
> org.apache.flink
> flink-connector-jdbc_${scala.version}
> ${flink.version}
> 
>
>
> 
> mysql
> mysql-connector-java
> ${mysql.version}
> 
>
>
> 
> org.apache.flink
> flink-runtime-web_${scala.version}
> ${flink.version}
> provided
> 
> 
>
> 运行结果
> 01:15:12,358 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> -
> Kafka version: unknown
> 01:15:12,358 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> -
> Kafka commitId: unknown
> 01:15:12,358 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
> -
> Kafka startTimeMs: 1597338912355
> 01:15:12,361 INFO
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer
>
> - [Consumer clientId=consumer-20, groupId=null] Subscribed to partition(s):
> user_behavior-0
> 01:15:12,365 INFO
> 

Re: Flink sql TUMBLE window 不支持offset吗

2020-08-13 文章 nobel一旦
所以总结下,实际不仅仅是 https://issues.apache.org/jira/browse/FLINK-17767
 这一个问题,这个正式我讲到的UTC+8时区下天级别窗口正确划分的解决方案。
但FlinkSQL本身的eventtime的类型问题反而感觉更严重,造成各种误解等。时间戳是最准确的信息,既然采用了日期这种不准确的东西,就应该明确其时区信息。即使时区信息是被隐藏了,那么就正确考虑时区,而不是在将日期翻译回时间戳的时候默认采用了UTC+0的时区转回去,毕竟日期可能是UTC+8时区的表示。

nobel一旦  于2020年8月14日周五 上午11:49写道:

> 窗口周期实际需求是UTC+8时区的(8月)14日0点~14日24点,实际对应UTC+0时区的(8月)*13日*16点~14日16点。
> 1 解释下为什么在FlinkSQL场景下时区设置正确情况下,窗口没划分错误。
> *这个原因比较绕,这也是我想不通的点,作为疑问,希望有人解答(即为什么FlinkSQL使用TIMESTAMP(3)这种日期作为event
> timed定义,以及watermark计算的依据,而不是bigint的UTC+0的时间戳作为eventtime,和datastream
> api保持统一呢)*。
>
> 如上疑问,当然也正是这个疑问导致实际上如果时区设置正确,就不存在窗口划分错误问题了。原因如下:
>
> 记录A,event
> time时间设置为UTC+8的14日0点,这个时间点FlinkSQL估计是按照UTC+0理解的,即将其理解为UTC+0时区的14日0点。
> 记录B,event
> time时间设置为UTC+8的14日24点,这个时间点FlinkSQL估计是按照UTC+0理解的,即将其理解为UTC+0时区的14日24点。
> 如上2句,导致UTC+8时区的0到24点刚好对应到UTC+0时区的0到24点,*因此窗口划分不会出错*。
>
> *但是,这种做法实际很不好,为什么呢?*因为这种做法会导致event
> time实际是错误的,比如你打开Flink的WEB-UI去看watermark,会发现watermark时间戳转成日期(UTC+8)后发现超前了8小时。即现在是14日12点,则watermark大概会是14日20点了已经。
>
>
> 回过头来,那什么做法才最好呢(个人观点)。
> 首先,event time必须按照utc+0的时间戳给出,这意味者要么(1)FlinkSQL使用Timestamp类型作为event
> time这一点要么改变,改为使用时间戳。要么(2)仍然使用Timestamp类型作为event
> time,但是需要用户主动将其设置为UTC+0时间,当然这会导致其他问题,等会讲解决。再或者(3)仍然使用Timestamp类型作为event
> time,同时也使用UTC+8的表示,但为其保留时区信息,即告诉Flink这是UTC+8的时区,然后Flink将其转会时间戳时候可以按照正确的时区信息去转。
>
>
> 关于第(2),(3)种方式下,都会导致窗口划分出现问题(UTC+8情况下的天级别窗口划分问题),但我想说,这个问题本身是应该通过窗口的offset实现的,本身不不应该基于这种错误的设计碰巧去解决,而且还附带一个错误的效果,即WEB-UI的watermark超前8小时。
>
> *希望小伙伴们看看,不管是否有道理,对源码有了解的可以确认下我的说法对不对也。*
>
> Benchao Li  于2020年8月13日周四 上午10:35写道:
>
>> 不管是SQL还是DataStream,底层最终用来划分窗口的时候的时间,都是unix timestamp。
>> SQL里面的Timestamp类型,是可以带着时区的,但是转成unix timestamp表示的时候,时区信息就丢掉了。
>>
>> Zhao,Yi(SEC)  于2020年8月13日周四 上午10:12写道:
>>
>> > 大概懂你意思,sql情况下event time都是Timestamp数据类型,这是一种-MM-dd
>> >
>> HH:mm:ss格式的,从这个角度讲的确是不需要考虑offset的问题(只要周期是1min/1h/1d等类似)。只需要这个Timestamp是我需要的时区格式的日期即可。
>> > 其实我是联系DatastreamAPI的,因为DatastreamAPI中是基于unix timestamp(时间戳)来划分的。
>> >
>> > 在 2020/8/12 下午9:21,“Benchao Li” 写入:
>> >
>> > Hi,
>> >
>> > 目前还没有支持offset,有一个issue[1] 在跟进解决这个问题。
>> > 但是你这个case应该是不需要offset就可以的,现在划分窗口的逻辑是直接按照毫秒级别的时间戳来整除进行计算的。
>> > 所以如果窗口是1min/1h/1d 这种,都是整点的窗口,没有问题的。如果你有这个问题,那应该是你的时间字段有时区问题。
>> > 有一种case是需要offset的,比如一周这种窗口,因为1980-1-1是周四,所以一周的窗口是从周四开始的。
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-17767
>> >
>> > Zhao,Yi(SEC)  于2020年8月12日周三 下午8:15写道:
>> >
>> > > 如题,是不是不支持offset呀。在中国,天窗口实际应该是 INTVERVAL ‘1’ DAY + 8小时offset。
>> > > 但是看了文档没发现添加offset的语法。
>> > >
>> > >
>> > > 如果仅仅是时间显示问题还好说,搞个函数转一转,但分窗这个不提供入口没办法。
>> > >
>> >
>> >
>> > --
>> >
>> > Best,
>> > Benchao Li
>> >
>> >
>> >
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>


Re: 如何设置FlinkSQL并行度

2020-08-13 文章 Xingbo Huang
Hi,

关于并行度的问题,据我所知,目前Table API上还没法对每一个算子单独设置并行度

Best,
Xingbo

Zhao,Yi(SEC)  于2020年8月14日周五 上午10:49写道:

> 并行度问题有人帮忙解答下吗,此外补充个相关问题,除了并行度,flink-sql情况下,能做检查点/保存点,并基于检查点/保存点重启sql任务吗。
>
> 发件人: "Zhao,Yi(SEC)" 
> 日期: 2020年8月13日 星期四 上午11:44
> 收件人: "user-zh@flink.apache.org" 
> 主题: 如何设置FlinkSQL并行度
>
> 看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。
> 如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢?
>
> 比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。
>
>


Re: Flink sql TUMBLE window 不支持offset吗

2020-08-13 文章 nobel一旦
窗口周期实际需求是UTC+8时区的(8月)14日0点~14日24点,实际对应UTC+0时区的(8月)*13日*16点~14日16点。
1 解释下为什么在FlinkSQL场景下时区设置正确情况下,窗口没划分错误。
*这个原因比较绕,这也是我想不通的点,作为疑问,希望有人解答(即为什么FlinkSQL使用TIMESTAMP(3)这种日期作为event
timed定义,以及watermark计算的依据,而不是bigint的UTC+0的时间戳作为eventtime,和datastream
api保持统一呢)*。

如上疑问,当然也正是这个疑问导致实际上如果时区设置正确,就不存在窗口划分错误问题了。原因如下:

记录A,event
time时间设置为UTC+8的14日0点,这个时间点FlinkSQL估计是按照UTC+0理解的,即将其理解为UTC+0时区的14日0点。
记录B,event
time时间设置为UTC+8的14日24点,这个时间点FlinkSQL估计是按照UTC+0理解的,即将其理解为UTC+0时区的14日24点。
如上2句,导致UTC+8时区的0到24点刚好对应到UTC+0时区的0到24点,*因此窗口划分不会出错*。

*但是,这种做法实际很不好,为什么呢?*因为这种做法会导致event
time实际是错误的,比如你打开Flink的WEB-UI去看watermark,会发现watermark时间戳转成日期(UTC+8)后发现超前了8小时。即现在是14日12点,则watermark大概会是14日20点了已经。


回过头来,那什么做法才最好呢(个人观点)。
首先,event time必须按照utc+0的时间戳给出,这意味者要么(1)FlinkSQL使用Timestamp类型作为event
time这一点要么改变,改为使用时间戳。要么(2)仍然使用Timestamp类型作为event
time,但是需要用户主动将其设置为UTC+0时间,当然这会导致其他问题,等会讲解决。再或者(3)仍然使用Timestamp类型作为event
time,同时也使用UTC+8的表示,但为其保留时区信息,即告诉Flink这是UTC+8的时区,然后Flink将其转会时间戳时候可以按照正确的时区信息去转。

关于第(2),(3)种方式下,都会导致窗口划分出现问题(UTC+8情况下的天级别窗口划分问题),但我想说,这个问题本身是应该通过窗口的offset实现的,本身不不应该基于这种错误的设计碰巧去解决,而且还附带一个错误的效果,即WEB-UI的watermark超前8小时。

*希望小伙伴们看看,不管是否有道理,对源码有了解的可以确认下我的说法对不对也。*

Benchao Li  于2020年8月13日周四 上午10:35写道:

> 不管是SQL还是DataStream,底层最终用来划分窗口的时候的时间,都是unix timestamp。
> SQL里面的Timestamp类型,是可以带着时区的,但是转成unix timestamp表示的时候,时区信息就丢掉了。
>
> Zhao,Yi(SEC)  于2020年8月13日周四 上午10:12写道:
>
> > 大概懂你意思,sql情况下event time都是Timestamp数据类型,这是一种-MM-dd
> >
> HH:mm:ss格式的,从这个角度讲的确是不需要考虑offset的问题(只要周期是1min/1h/1d等类似)。只需要这个Timestamp是我需要的时区格式的日期即可。
> > 其实我是联系DatastreamAPI的,因为DatastreamAPI中是基于unix timestamp(时间戳)来划分的。
> >
> > 在 2020/8/12 下午9:21,“Benchao Li” 写入:
> >
> > Hi,
> >
> > 目前还没有支持offset,有一个issue[1] 在跟进解决这个问题。
> > 但是你这个case应该是不需要offset就可以的,现在划分窗口的逻辑是直接按照毫秒级别的时间戳来整除进行计算的。
> > 所以如果窗口是1min/1h/1d 这种,都是整点的窗口,没有问题的。如果你有这个问题,那应该是你的时间字段有时区问题。
> > 有一种case是需要offset的,比如一周这种窗口,因为1980-1-1是周四,所以一周的窗口是从周四开始的。
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-17767
> >
> > Zhao,Yi(SEC)  于2020年8月12日周三 下午8:15写道:
> >
> > > 如题,是不是不支持offset呀。在中国,天窗口实际应该是 INTVERVAL ‘1’ DAY + 8小时offset。
> > > 但是看了文档没发现添加offset的语法。
> > >
> > >
> > > 如果仅仅是时间显示问题还好说,搞个函数转一转,但分窗这个不提供入口没办法。
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
> >
> >
>
> --
>
> Best,
> Benchao Li
>


flink 1.11 SQL idea调试无数据也无报错

2020-08-13 文章 DanielGu
我遇到个问题,请教一下:
环境 1.11 idea
参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量
数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应
https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN
求助,各位



下面是pom 和代码,以及运行结果

// 创建执行环境
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
//设置StateBackend
bsEnv.setStateBackend(new
FsStateBackend("file:///tmp/flink/chkdir"));
EnvironmentSettings bsSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);

// Kafka
String sourceDDL ="CREATE TABLE user_behavior (" +
"user_id BIGINT," +
"item_id BIGINT," +
"category_id BIGINT," +
"behavior STRING," +
"ts TIMESTAMP (3)," +
"proctime AS PROCTIME ()," +
"WATERMARK FOR ts AS ts-INTERVAL '5' SECOND) " +
"WITH (" +
"'connector'='kafka'," +
"'topic'='user_behavior'," +
"'scan.startup.mode'='earliest-offset'," +
"'properties.bootstrap.servers'='localhost:9092'," +
"'format'='json'" +
")";


//写入es 改为print
/*String sinkDDL = "CREATE TABLE buy_cnt_per_hour (" +
"hour_of_day BIGINT," +
"buy_cnt BIGINT" +
") WITH (" +
"'connector'='elasticsearch-7'," +
"'hosts'='http://localhost:9200'," +
"'index'='buy_cnt_per_hour')";*/
String sinkDDL = "CREATE TABLE buy_cnt_per_hour (\n" +
 "hour_of_day BIGINT," +
 "buy_cnt BIGINT" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";


String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" +
"SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as
hour_of_day , COUNT(*) as buy_cnt\n" +
"FROM user_behavior\n" +
"WHERE behavior = 'buy'\n" +
"GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)";



//注册source和sink
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
//tableResult.print();

   tEnv.executeSql(transformationDDL);

pom



org.apache.flink
   
flink-table-api-java-bridge_${scala.version}
${flink.version}




org.apache.flink
   
flink-table-planner-blink_${scala.version}
${flink.version}





org.apache.flink
flink-table-common
${flink.version}
provided






org.apache.flink
flink-clients_${scala.version}
${flink.version}






org.apache.flink
flink-json
${flink.version}





org.apache.flink
   
flink-connector-elasticsearch7_${scala.version}
${flink.version}




org.apache.flink
   
flink-sql-connector-kafka_${scala.version}
${flink.version}






org.apache.flink
flink-connector-jdbc_${scala.version}
${flink.version}




mysql
mysql-connector-java
${mysql.version}




org.apache.flink
flink-runtime-web_${scala.version}
${flink.version}
provided



运行结果
01:15:12,358 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  -
Kafka version: unknown
01:15:12,358 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  -
Kafka commitId: unknown
01:15:12,358 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  -
Kafka startTimeMs: 1597338912355
01:15:12,361 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer 
- [Consumer clientId=consumer-20, groupId=null] Subscribed to partition(s):
user_behavior-0
01:15:12,365 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState
 
- [Consumer clientId=consumer-20, groupId=null] Seeking to EARLIEST offset
of partition user_behavior-0
01:15:12,377 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata  - [Consumer
clientId=consumer-20, groupId=null] Cluster ID: txkqox8yRL6aWBNsOcS67g
01:15:12,387 INFO 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState
 
- [Consumer clientId=consumer-20, groupId=null] 

回复: 关于flink升级

2020-08-13 文章 引领


谢谢各位大佬。我测测整体环境吧!!!
| |
引领
|
|
yrx73...@163.com
|
签名由网易邮箱大师定制


在2020年08月14日 10:51,caozhen 写道:

我升级1.7到1.11过程遇到主要下面的问题,可以参考下

1、编译mainjar阶段:flink api的变化,例如env创建有变化,某些类的包名有变化
2、编译mainjar阶段:flink、flinksql环境相关的依赖改为provided,不打进fat包,否则引发NoClassDefFoundError问题
3、测试运行阶段:资源设置:TM、JM 有重大变化
4、测试运行阶段:解决flink和hadoop的依赖问题(1.11开始不再提供hadoop-shade依赖)
---



Zhao,Yi(SEC) wrote
经历了1.7到1.8,1.8到1.9,1.9到1.10;前2还好,最后一个有些坑,jdk8版本不要太旧,某个版本和1.10配合会有bug。

在 2020/8/14 上午9:25,“蒋佳成(Jiacheng Jiang)”<

920334586@

写入:


1.10有了新的内存模型,没弄清楚这些内存配置前,可能跑不起job!建议先弄清楚,在测试环境上先搞搞--原始邮件--
发件人:引领<

yrx73513@


发送时间:2020年8月14日(星期五) 上午9:15
收件人:

user-zh@.apache

<

user-zh@.apache

;
主题:关于flink升级





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

2020-08-13 文章 Zhao,Yi(SEC)
补充:
刚刚翻了下源码,kafka那个直到原因了,见FlinkKafkaConsumer的288行,限定了必须是ByteArrayDeserializer,而且引用到了ByteArrayDeserializer类,这个是在new
 KafkaConsumer的过程就执行到的,所以这个依赖是提交端需要的。

按照  
的讲法,flink-sql按照-j或-l指定的包会被上传,这个倒也合理,毕竟有些任务特定需要一些包,提供这个功能肯定有用。
但像connector,json,csv这种非常通用的包感觉应该统一放入集群就好,但实际按照这个情况来看无法做到。
因为即使我把这些包统一放到了集群,实际提交段还是需要这些包,因为没有这些包提交sql时就直接报错了,于是还是需要通过-j或-l指定,然后进一步游会被上传?所以说,此处又涉及到一个flink集群上的包和sql-client提交的包重复的问题,一致还好,不一致情况下哪个优先呢?
___

在 2020/8/14 上午10:46,“Zhao,Yi(SEC)” 写入:

分析个报错,报错如下:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find 
a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
'format.type' expects 'csv', but is 'json'

The following properties are requested:
从报错来看,是需要json的format,但实际只有csv。因为缺少json的format。
这个实验,是我将相关所有jar都放到集群的flink的lib目录并重启了集群。
但是提交sql(即执行sql-client.sh命令)的机器上没有这些依赖,报错如上。

所以,这个根据我的表定义去找对应的format,以及connector等的过程是在提交端做的吗?

还有一个更奇怪的,就算format,connector相关是提交端做的,但是我kafka的ssl证书路径的读取理论上肯定应该是在任务执行时候才会做,但当我执行select
 * from xxx提交sql之后马上报错了,报错为:
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.serialization.ByteArrayDeserializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

Kafka的序列化类理论上是作为kafkasource被创建时候的properties传入,然后kafkaConsumer执行期间才会发现这个class不存在吧。


___
在 2020/8/14 上午9:44,“godfrey he” 写入:

sql client 中通过 -j 或者 -l 的指定的包会被随着job提交的时候一起上传到jm。

Zhao,Yi(SEC)  于2020年8月13日周四 下午5:11写道:

> A是10机器集群(HA模式,独立集群),B作为提交机器。
> 从我实验效果来看,我是先启动一个sql-client的cli,如下命令:
> ./bin/sql-client.sh embedded -l $(pwd)/libs_sql -l $(pwd)/libs_udf
> 其中libs_sql中有:flink-connector-kafka_2.12-1.10.0.jar
> flink-connector-kafka-base_2.12-1.10.0.jar  flink-jdbc_2.12-1.10.0.jar
> 
flink-json-1.10.0.jar。然后A集群所有机器没有加这些包(Flink部署目录lib中没有)。A集群上其他任务提交的包中jar应该不致于会影响到我sql提交的任务。
>
> 
结论是,我libs_sql中没有flink-json、flink-connector-kafka等的时候,提交sql任务会报错。加了的时候,提交sql任务不报错。
> 所以感觉貌似提交sql任务会将启动sql-client时候指定的lib相关包都上传吗?
> 
>
> 在 2020/8/13 下午3:10,“Jeff Zhang” 写入:
>
> 你的10台机器是flink standalone还是 yarn集群 ?
> 如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。
>
> 另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
> https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
> 或者加入钉钉群讨论,钉钉群号: 32803524
>
>
> Zhao,Yi(SEC)  于2020年8月13日周四 下午1:02写道:
>
> > 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
> > 现在比较混乱,哪些jar需要放到A,哪些放到B。
> >
> >
> > (1) kafka ssl
> >
> 
证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
> >
> > (2)
> >
> 
flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
> >
> >
> >
> >
> 
总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
> >
> > 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
> >
> >
> >
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>






Re:Re:Re:Re:Re:Re:Flink SQL No Watermark

2020-08-13 文章 forideal
Hi Zhou Zach:
  
“但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink
 ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的”
  
  关于这个问题,我昨天也和李本超进行了线下沟通,大概的结论是:
   >1.如果不直接看每个operator的metrics,只看 flink ui 那个 graph 图,不进行 disable chain 
的话,是看不出来问题的。如果打开了disable chain 有可能能看出来问题。在我们这个场景下能看出来问题。其他场景可能会不那么直接。
   >2.我们打开了disable chain 看 flink ui,其实也是看的相关的 metric。


   总体来看,就是要用 metrics 来诊断问题。不过,在某些场景下,一个用户开发了一个 Flink SQL 然后,去看监控,也增加了对应的 
cost。一个是说这个用户之前不怎么看metrics,一个是说平台metrics也做的不好。所以如果能在 Flink ui 
上面解决一定量的问题,将能减少用户的成本。


Best forideal


  

















在 2020-08-13 16:33:29,"Zhou Zach"  写道:
>
>
>
>Hi forideal, Shengkai Fang,
>   
>加上env.disableOperatorChaining()之后,发现5个算子,
>
>
>
>
>Source: TableSourceScan(table=[[default_catalog, default_database, user]], 
>fields=[uid, sex, age, created_time]) -> 
>
>Calc(select=[uid, sex, age, created_time, () AS procTime, 
>TO_TIMESTAMP(((created_time / 1000) FROM_UNIXTIME _UTF-16LE'-MM-dd 
>HH:mm:ss')) AS eventTime]) -> 
>
>WatermarkAssigner(rowtime=[eventTime], watermark=[(eventTime - 3000:INTERVAL 
>SECOND)]) -> 
>
>Calc(select=[uid, sex, age, created_time]) -> 
>
>Sink: Sink(table=[default_catalog.default_database.user_mysql], fields=[uid, 
>sex, age, created_time])
>但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink
> ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-08-13 15:39:44,"forideal"  写道:
>>Hi Zhou Zach:
>>你可以试试 env.disableOperatorChaining();
>>然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。
>>> 我是怎么设置参数的
>>我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样
>>tableEnv.getConfig().getConfiguration() .setString(key, 
>>configs.getString(key, null));
>>同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL '10' 
>>SECOND
>>
>>Best forideal
>>
>>
>>在 2020-08-13 15:20:13,"Zhou Zach"  写道:
>>>
>>>
>>>
>>>Hi forideal,
>>>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
>>>
>>>
>>>val streamExecutionEnv = 
>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>
>>> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>streamExecutionEnv.setStateBackend(new 
>>> RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
>>>
>>>val blinkEnvSettings = 
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, 
>>> blinkEnvSettings)
>>>
>>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
>>>
>>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
>>>
>>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
>>>
>>>
>>> streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")
>>>
>>>
>>>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
>>>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>在 2020-08-13 14:02:58,"forideal"  写道:
大家好

   问题的原因定位到了。
   由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
   这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 
 op chain 在一起,不能确定到底是那个环节存在问题)
   发现在  WatermarkAssigner(rowtime=[event_time], 
 watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source 
 chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 
 watermark 为 min(parent task output watermark),所以下游是 No 
 watermark。导致在查问题的时候,比较困难。
   定位到由于 kafka 部分 partition 无数据导致 No watermark 加上  
 table.exec.source.idle-timeout = 10s 参数即可。
   当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 
 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain 观察每个 op 
 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。


Best forideal








在 2020-08-13 12:56:57,"forideal"  写道:
>大家好
>
>
>关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 
> StreamExecWatermarkAssigner
>在translateToPlanInternal 中生成了如下一个 class 代码,
>public final class WatermarkGenerator$2 extends 
>org.apache.flink.table.runtime.generated.WatermarkGenerator { public 
>WatermarkGenerator$2(Object[] references) throws Exception { } @Override 
>public void open(org.apache.flink.configuration.Configuration parameters) 
>throws Exception { } @Override public Long 
>currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws 
>Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; 
>boolean isNull$3; boolean isNull$4; 

Re: 回复:关于flink升级

2020-08-13 文章 caozhen

我升级1.7到1.11过程遇到主要下面的问题,可以参考下

1、编译mainjar阶段:flink api的变化,例如env创建有变化,某些类的包名有变化
2、编译mainjar阶段:flink、flinksql环境相关的依赖改为provided,不打进fat包,否则引发NoClassDefFoundError问题
3、测试运行阶段:资源设置:TM、JM 有重大变化
4、测试运行阶段:解决flink和hadoop的依赖问题(1.11开始不再提供hadoop-shade依赖)
---



Zhao,Yi(SEC) wrote
> 经历了1.7到1.8,1.8到1.9,1.9到1.10;前2还好,最后一个有些坑,jdk8版本不要太旧,某个版本和1.10配合会有bug。
> 
> 在 2020/8/14 上午9:25,“蒋佳成(Jiacheng Jiang)”<

> 920334586@

>> 写入:
> 
>
> 1.10有了新的内存模型,没弄清楚这些内存配置前,可能跑不起job!建议先弄清楚,在测试环境上先搞搞--原始邮件--
> 发件人:引领<

> yrx73513@

> 
> 发送时间:2020年8月14日(星期五) 上午9:15
> 收件人:

> user-zh@.apache

> <

> user-zh@.apache

> ;
> 主题:关于flink升级





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 日志不能正常打印问题

2020-08-13 文章 caozhen
我觉得可以看下用什么命令启动的,进到启动脚本里,跟踪下log的设置。

例如我用的standalone-job.sh start-forground启动, 跟踪到flink-console.sh 中,能看到具体log设置




zilong xiao wrote
> 我也是用的properties配置文件,可是日志貌似没收集到,有什么方法可以判断配置文件是否生效吗 ?
> 
> caozhen 

> caozhen1937@

>  于2020年8月14日周五 上午10:23写道:
> 
>> log4j2的配置:我是直接用的flink1.11.1客户端提供的log4j-console.properties。
>>
>> 如果你是用的xml、yaml文件,在客户端提交作业时可能要指定一下日志文件,也可以改下flink启动脚本的日志设置
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 如何设置FlinkSQL并行度

2020-08-13 文章 Zhao,Yi(SEC)
并行度问题有人帮忙解答下吗,此外补充个相关问题,除了并行度,flink-sql情况下,能做检查点/保存点,并基于检查点/保存点重启sql任务吗。

发件人: "Zhao,Yi(SEC)" 
日期: 2020年8月13日 星期四 上午11:44
收件人: "user-zh@flink.apache.org" 
主题: 如何设置FlinkSQL并行度

看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。
如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢?

比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。



Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

2020-08-13 文章 Zhao,Yi(SEC)
分析个报错,报错如下:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
'format.type' expects 'csv', but is 'json'

The following properties are requested:
从报错来看,是需要json的format,但实际只有csv。因为缺少json的format。
这个实验,是我将相关所有jar都放到集群的flink的lib目录并重启了集群。
但是提交sql(即执行sql-client.sh命令)的机器上没有这些依赖,报错如上。

所以,这个根据我的表定义去找对应的format,以及connector等的过程是在提交端做的吗?
还有一个更奇怪的,就算format,connector相关是提交端做的,但是我kafka的ssl证书路径的读取理论上肯定应该是在任务执行时候才会做,但当我执行select
 * from xxx提交sql之后马上报错了,报错为:
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.serialization.ByteArrayDeserializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
Kafka的序列化类理论上是作为kafkasource被创建时候的properties传入,然后kafkaConsumer执行期间才会发现这个class不存在吧。

___
在 2020/8/14 上午9:44,“godfrey he” 写入:

sql client 中通过 -j 或者 -l 的指定的包会被随着job提交的时候一起上传到jm。

Zhao,Yi(SEC)  于2020年8月13日周四 下午5:11写道:

> A是10机器集群(HA模式,独立集群),B作为提交机器。
> 从我实验效果来看,我是先启动一个sql-client的cli,如下命令:
> ./bin/sql-client.sh embedded -l $(pwd)/libs_sql -l $(pwd)/libs_udf
> 其中libs_sql中有:flink-connector-kafka_2.12-1.10.0.jar
> flink-connector-kafka-base_2.12-1.10.0.jar  flink-jdbc_2.12-1.10.0.jar
> 
flink-json-1.10.0.jar。然后A集群所有机器没有加这些包(Flink部署目录lib中没有)。A集群上其他任务提交的包中jar应该不致于会影响到我sql提交的任务。
>
> 
结论是,我libs_sql中没有flink-json、flink-connector-kafka等的时候,提交sql任务会报错。加了的时候,提交sql任务不报错。
> 所以感觉貌似提交sql任务会将启动sql-client时候指定的lib相关包都上传吗?
> 
>
> 在 2020/8/13 下午3:10,“Jeff Zhang” 写入:
>
> 你的10台机器是flink standalone还是 yarn集群 ?
> 如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。
>
> 另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
> https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
> 或者加入钉钉群讨论,钉钉群号: 32803524
>
>
> Zhao,Yi(SEC)  于2020年8月13日周四 下午1:02写道:
>
> > 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
> > 现在比较混乱,哪些jar需要放到A,哪些放到B。
> >
> >
> > (1) kafka ssl
> >
> 
证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
> >
> > (2)
> >
> 
flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
> >
> >
> >
> >
> 总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
> >
> > 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
> >
> >
> >
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>




Re: 回复:关于flink升级

2020-08-13 文章 Zhao,Yi(SEC)
经历了1.7到1.8,1.8到1.9,1.9到1.10;前2还好,最后一个有些坑,jdk8版本不要太旧,某个版本和1.10配合会有bug。

在 2020/8/14 上午9:25,“蒋佳成(Jiacheng Jiang)”<920334...@qq.com> 写入:


1.10有了新的内存模型,没弄清楚这些内存配置前,可能跑不起job!建议先弄清楚,在测试环境上先搞搞--原始邮件--
发件人:引领

Re: flink 1.11 日志不能正常打印问题

2020-08-13 文章 zilong xiao
我也是用的properties配置文件,可是日志貌似没收集到,有什么方法可以判断配置文件是否生效吗 ?

caozhen  于2020年8月14日周五 上午10:23写道:

> log4j2的配置:我是直接用的flink1.11.1客户端提供的log4j-console.properties。
>
> 如果你是用的xml、yaml文件,在客户端提交作业时可能要指定一下日志文件,也可以改下flink启动脚本的日志设置
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-1.10.1 想用 DDL 入 ES5.6

2020-08-13 文章 Leonard Xu
Hi,
我贴的链接里有对应的PR[1], 你可以看看这个PR里的代码,代码入口可以从 Elasticsearch6DynamicSink.java 

 开始
比如你自己实现了Elasticsearch5DynamicSink 

 一套后,再打一个 es5 的sql jar 就好了。

祝好
Leonard 
[1] https://github.com/apache/flink/pull/12184 



> 在 2020年8月14日,10:14,kcz <573693...@qq.com> 写道:
> 
> 查看您说的[1]的url之后,发现里面并没有跟 es sql jar有关的。
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: flink 1.11 日志不能正常打印问题

2020-08-13 文章 caozhen
log4j2的配置:我是直接用的flink1.11.1客户端提供的log4j-console.properties。

如果你是用的xml、yaml文件,在客户端提交作业时可能要指定一下日志文件,也可以改下flink启动脚本的日志设置



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 日志不能正常打印问题

2020-08-13 文章 zilong xiao
想问下你是用的什么类型的配置 xml、yaml还是properties呢?

caozhen  于2020年8月14日周五 上午9:58写道:

> 我最后用的是log4j2。
>
>
> 之前mainjar中有很多log4j的依赖(slf4j-log4j12),而flink客户端lib下是log4j2的依赖(log4j-slf4j-impl),导致了冲突,不能打印日志。
>
> 改动:把mainjar中的log4j的依赖改成provided,使用了客户端提供的log4j2依赖
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 求助:flink 1.11.1 yarn perjob 模式配置zookeeper的HA后application提交超时,1.10时正常的

2020-08-13 文章 Yang Wang
perjob模式在1.10到1.11是没有变化的,只是1.11新增了application模式,Zookeeper的HA也没有变化

还是得你分享一下提交失败时候的Client端和JM端的log,这样才能方便查问题


Best,
Yang

x2009438  于2020年8月13日周四 下午5:35写道:

> 各位,
>
> 今天从1.10.0升级到1.11.1,结果yarn per job 提交作业,配置zookeeper的HA之后作业提交超时,有人碰到过吗?
> 看日志也没记录什么。
>
> 配置是从1.10.0上粘贴过来的,1.10.0是正常可用的。
>
>
>
>
> 发自我的iPhone


Re: flink-1.10.1 想用 DDL 入 ES5.6

2020-08-13 文章 kcz
查看您说的[1]的url之后,发现里面并没有跟 es sql jar有关的。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.1 sql client 流式 join 出现预期之外的含 null 行

2020-08-13 文章 godfrey he
可以把原始的计算结果打印出来,执行 set execution.result-mode=changelog
(如果source有delete消息,可能会出现null值)


LittleFall <1578166...@qq.com> 于2020年8月13日周四 下午3:33写道:

> mysql 的建表语句
> use test;
> create table base (
> id int primary key,
> location varchar(20)
> );
> create table stuff(
> id int primary key,
> b_id int,
> name varchar(20)
> );
>
> flink sql client 的建表语句
> create table base (
> id int primary key,
> location varchar(20)
> )WITH (
>'connector' = 'kafka',
>'topic' = 'example',
>'properties.group.id' = 'testGroup',
>'scan.startup.mode' = 'latest-offset',
>'properties.bootstrap.servers' = 'localhost:9092',
>'format' = 'canal-json'
> );
> create table stuff(
> id int primary key,
> b_id int,
> name varchar(20)
> )WITH (
>'connector' = 'kafka',
>'topic' = 'example',
>'properties.group.id' = 'testGroup',
>'scan.startup.mode' = 'latest-offset',
>'properties.bootstrap.servers' = 'localhost:9092',
>'format' = 'canal-json'
> );
>
> flink 查询语句
> select distinct stuff.id s_id, base.id b_id, base.location, stuff.name
> from stuff inner join base
> on stuff.b_id = base.id;
>
> mysql 插入语句
> insert into base values (1, 'beijing');
> insert into stuff values (1, 1, 'zz');
>
> flink 结果
> <
> http://apache-flink.147419.n8.nabble.com/file/t858/2020-08-13_15-12-36_%E7%9A%84%E5%B1%8F%E5%B9%95%E6%88%AA%E5%9B%BE.png>
>
>
> mysql 执行同样的查询的结果:
> +--+--+--+--+
> | s_id | b_id | location | name |
> +--+--+--+--+
> |1 |1 | beijing  | zz   |
> +--+--+--+--+
> 1 row in set (0.01 sec)
>
>
> 而且有时候连结果正确的行都不会出现,只会出现含 null 的行。
>
> 求助大家。。。
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11 日志不能正常打印问题

2020-08-13 文章 caozhen
我最后用的是log4j2。

之前mainjar中有很多log4j的依赖(slf4j-log4j12),而flink客户端lib下是log4j2的依赖(log4j-slf4j-impl),导致了冲突,不能打印日志。

改动:把mainjar中的log4j的依赖改成provided,使用了客户端提供的log4j2依赖



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

2020-08-13 文章 godfrey he
sql client 中通过 -j 或者 -l 的指定的包会被随着job提交的时候一起上传到jm。

Zhao,Yi(SEC)  于2020年8月13日周四 下午5:11写道:

> A是10机器集群(HA模式,独立集群),B作为提交机器。
> 从我实验效果来看,我是先启动一个sql-client的cli,如下命令:
> ./bin/sql-client.sh embedded -l $(pwd)/libs_sql -l $(pwd)/libs_udf
> 其中libs_sql中有:flink-connector-kafka_2.12-1.10.0.jar
> flink-connector-kafka-base_2.12-1.10.0.jar  flink-jdbc_2.12-1.10.0.jar
> flink-json-1.10.0.jar。然后A集群所有机器没有加这些包(Flink部署目录lib中没有)。A集群上其他任务提交的包中jar应该不致于会影响到我sql提交的任务。
>
> 结论是,我libs_sql中没有flink-json、flink-connector-kafka等的时候,提交sql任务会报错。加了的时候,提交sql任务不报错。
> 所以感觉貌似提交sql任务会将启动sql-client时候指定的lib相关包都上传吗?
> 
>
> 在 2020/8/13 下午3:10,“Jeff Zhang” 写入:
>
> 你的10台机器是flink standalone还是 yarn集群 ?
> 如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。
>
> 另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
> https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
> 或者加入钉钉群讨论,钉钉群号: 32803524
>
>
> Zhao,Yi(SEC)  于2020年8月13日周四 下午1:02写道:
>
> > 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
> > 现在比较混乱,哪些jar需要放到A,哪些放到B。
> >
> >
> > (1) kafka ssl
> >
> 证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
> >
> > (2)
> >
> flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
> >
> >
> >
> >
> 总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
> >
> > 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
> >
> >
> >
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>


Re: 请问在 flink sql 中建立的多张表应当怎样分辨接收 kafka 传来的 canal-json?

2020-08-13 文章 Leonard Xu
Hello
现在只支持一个topic里包含单张表的changelog, 
你这个case相当于用了一个topic里包含多张表的changelog,只是twocol在解析binlog时 a,b 
字段找不到,你配置ignore-parse-errors就会返回(null,null)
建议每张chagnelog表和一个topic对应就可以了

祝好
Leonard

> 在 2020年8月13日,19:55,LittleFall <1578166...@qq.com> 写道:
> 
> 这是我在 flink sql 中建立的两张表:
> create table base ( 
>id int, 
>location varchar(20) 
> )WITH ( 
>'connector' = 'kafka', 
>'topic' = 'example', 
>'properties.group.id' = 'testGroup', 
>'scan.startup.mode' = 'latest-offset', 
>'properties.bootstrap.servers' = 'localhost:9092', 
>'format' = 'canal-json', 
>'canal-json.ignore-parse-errors'='true'
> );
> 
> create table twocol ( 
>a int, 
>b varchar(20) 
> ) -- WITH 部分和上面一样
> 
> 此时我在 mysql 执行了以下插入:
> insert into base values (1, 'beijing');
> 
> canal 通过 kafka 将日志传给了 flink,于是 flink 的 base 表里多了一条记录 (1, 'beijing'),然而
> twocol 表里也多了一条 (null, null).
> 
> 请问大家,有什么方法可以指定哪张表接收对应的 catalog 吗?如果不能的话,大家是怎样解决这样的问题的,谢谢!
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: flink 1.11 日志不能正常打印问题

2020-08-13 文章 caozhen
恩,是这样,把mainjar中log4j的依赖挨个都provided了。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:关于flink升级

2020-08-13 文章 USERNAME
官网有升级建议

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/upgrading.html#compatibility-table














在 2020-08-14 09:15:53,"引领"  写道:
>我们的flink是在1.7版本的,所以这次想对flink进行升级,但升级建议直接升级flink1.11.1么?有木有大佬在生产环境部署的么?
>
>
>| |
>引领
>|
>|
>yrx73...@163.com
>|
>签名由网易邮箱大师定制
>


关于flink升级

2020-08-13 文章 引领
我们的flink是在1.7版本的,所以这次想对flink进行升级,但升级建议直接升级flink1.11.1么?有木有大佬在生产环境部署的么?


| |
引领
|
|
yrx73...@163.com
|
签名由网易邮箱大师定制



Re: flink 1.11 日志不能正常打印问题

2020-08-13 文章 shizk233
flink框架里用的slf4j吧,log4j2只是一种具体实现,应该是可以直接替换掉的。
就是把flink发行包下log4j2相关的jar替换成log4j的jar,当然,相应的配置文件也要改成log4j支持的配置。

caozhen  于2020年8月13日周四 下午3:39写道:

> flink1.11好像是用的log4j2,我的mainjar用到了log4j,  两者类有冲突,导致JM、TM日志为空。
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/opt/job.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/opt/flink-1.11.1-k8s/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
>
> 目前
> 1、mainjar中的log4j不能完全去掉,毕竟有很多地方在用,而且后期新增依赖也有可能会用,不太可行
>
>
> 大家又啥好办法嘛
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


请问在 flink sql 中建立的多张表应当怎样分辨接收 kafka 传来的 canal-json?

2020-08-13 文章 LittleFall
这是我在 flink sql 中建立的两张表:
create table base ( 
id int, 
location varchar(20) 
)WITH ( 
'connector' = 'kafka', 
'topic' = 'example', 
'properties.group.id' = 'testGroup', 
'scan.startup.mode' = 'latest-offset', 
'properties.bootstrap.servers' = 'localhost:9092', 
'format' = 'canal-json', 
'canal-json.ignore-parse-errors'='true'
);

create table twocol ( 
a int, 
b varchar(20) 
) -- WITH 部分和上面一样

此时我在 mysql 执行了以下插入:
insert into base values (1, 'beijing');

canal 通过 kafka 将日志传给了 flink,于是 flink 的 base 表里多了一条记录 (1, 'beijing'),然而
twocol 表里也多了一条 (null, null).

请问大家,有什么方法可以指定哪张表接收对应的 catalog 吗?如果不能的话,大家是怎样解决这样的问题的,谢谢!



--
Sent from: http://apache-flink.147419.n8.nabble.com/


求助:flink 1.11.1 yarn perjob 模式配置zookeeper的HA后application提交超时,1.10时正常的

2020-08-13 文章 x2009438
各位,

今天从1.10.0升级到1.11.1,结果yarn per job 提交作业,配置zookeeper的HA之后作业提交超时,有人碰到过吗?
看日志也没记录什么。

配置是从1.10.0上粘贴过来的,1.10.0是正常可用的。




发自我的iPhone

Re:Re: FLINK1.11.1 对OGG数据入HIVE的问题咨询

2020-08-13 文章 USERNAME


感谢您的回复,您说的这个方法类似于 “采用通过表结构”如下结构,屏蔽掉 
用table分区,每个表的data部分弱化到一个字段中,使用的时候再通过解析json方式来从 "before"或者"after"中获取对应表的字段及数据,
这种方式确实拓展跟灵活性强很多,牺牲掉部分易用性。
看到很多大公司 美团 字节等 都有基于flink的实时数仓,不知道他们是怎么解决这种大量表入仓的 拓展灵活易用性的
create table TABLENAME
 (
 table   STRING,
 op_type  STRING,
 op_ts  STRING,
 current_ts   STRING,
 pos STRING,
 "before"  STRING,
 "after" STRING

 ) partitioned by (pt_d table)
。





在 2020-08-13 16:35:19,"Rui Li"  写道:
>你提到的这三个难点现在的hive
>connector确实是支持不了的。前两个也许可以通过把写不同的表变成写同一个表的不同分区来解决。第三个可能可以通过检查数据跟目标schema是不是匹配,来判断是不是需要去跟HMS同步新的schema。
>
>On Thu, Aug 13, 2020 at 3:27 PM USERNAME  wrote:
>
>>
>>
>> 任务流程:
>> OGG->KAFKA->FLINK->HIVE
>>
>>
>> KAFKA数据样例:
>> 其中会有多个
>> "table",所以"before","after"中的字段是不一致的,同一个表如果有有DDL变更也会导致"before","after"字段的变更。
>> {
>> "table": "SCOOT.TABLENAME",
>> "op_type": "U",
>> "op_ts": "2020-08-11 07:53:40.008001",
>> "current_ts": "2020-08-11T15:56:41.233000",
>> "pos": "980119769930",
>> "before": {
>> "C1": 4499000,
>> "C2": null,
>> "C3": null,
>> "C4": null,
>> "C5": null
>> },
>> "after": {
>> "C1": 4499000,
>> "C2": null,
>> "C3": "",
>> "C4": "",
>> "C5": "通过"
>> }
>> }
>> 问题:有没有优雅的方式在入到hive中可以跟源库表及结构一致?
>> 看到很多FLINK to HIVE 的案例,很多大公司也都在用实时数仓,不知入hive这部分如果做到灵活,拓展,通用的?
>>
>>
>> 例如 样例数据在hive中建表
>> create table TABLENAME
>> (
>> op_type  STRING,
>> op_ts  STRING,
>> current_ts   STRING,
>> pos STRING,
>> "C1" STRING,
>> "C2" STRING,
>> "C3" STRING,
>> "C4" STRING,
>> "C5" STRING
>> )
>> 理解的难点,
>> 1.同一FLINK任务需要写入多个表,每个表的字段是不一致的
>> 2.同一FLINK任务会有新增的表,需自动适配
>> 3.同一个表结构不是固定的,需要随着源库DDL变更而变更,可能的字段类型长度变更,新增删除字段等
>>
>>
>> 或者只能采用通过表结构
>> create table TABLENAME
>> (
>> table   STRING,
>> op_type  STRING,
>> op_ts  STRING,
>> current_ts   STRING,
>> pos STRING,
>> "before"  STRING,
>> "after" STRING
>> )
>> 然后剩下的在HIVE中解决。
>>
>>
>> 或者有其他更好的方案?
>>
>>
>
>-- 
>Best regards!
>Rui Li


Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

2020-08-13 文章 Zhao,Yi(SEC)
A是10机器集群(HA模式,独立集群),B作为提交机器。
从我实验效果来看,我是先启动一个sql-client的cli,如下命令:
./bin/sql-client.sh embedded -l $(pwd)/libs_sql -l $(pwd)/libs_udf
其中libs_sql中有:flink-connector-kafka_2.12-1.10.0.jar  
flink-connector-kafka-base_2.12-1.10.0.jar  flink-jdbc_2.12-1.10.0.jar  
flink-json-1.10.0.jar。然后A集群所有机器没有加这些包(Flink部署目录lib中没有)。A集群上其他任务提交的包中jar应该不致于会影响到我sql提交的任务。
结论是,我libs_sql中没有flink-json、flink-connector-kafka等的时候,提交sql任务会报错。加了的时候,提交sql任务不报错。
所以感觉貌似提交sql任务会将启动sql-client时候指定的lib相关包都上传吗?


在 2020/8/13 下午3:10,“Jeff Zhang” 写入:

你的10台机器是flink standalone还是 yarn集群 ?
如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。

另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
或者加入钉钉群讨论,钉钉群号: 32803524


Zhao,Yi(SEC)  于2020年8月13日周四 下午1:02写道:

> 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
> 现在比较混乱,哪些jar需要放到A,哪些放到B。
>
>
> (1) kafka ssl
> 
证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
>
> (2)
>  
flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
>
>
>
> 总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
>
> 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
>
>
>

-- 
Best Regards

Jeff Zhang




Re:Re: Re:Re:Re:Re:Flink SQL No Watermark

2020-08-13 文章 Zhou Zach



Hi,试了,将并行度设置为2和kafka分区数9,都试了,都只有一个consumer有watermark,可能是因为我开了一个producer吧














在 2020-08-13 16:57:25,"Shengkai Fang"  写道:
>hi, watermark本来就是通过watermark assigner生成的。这是正常现象。
>我想问问 你有没有试过调大并行度来解决这个问题?因为不同partition的数据可能存在时间上的差异。
>
>Zhou Zach  于2020年8月13日周四 下午4:33写道:
>
>>
>>
>>
>> Hi forideal, Shengkai Fang,
>>
>> 加上env.disableOperatorChaining()之后,发现5个算子,
>>
>>
>>
>>
>> Source: TableSourceScan(table=[[default_catalog, default_database, user]],
>> fields=[uid, sex, age, created_time]) ->
>>
>> Calc(select=[uid, sex, age, created_time, () AS procTime,
>> TO_TIMESTAMP(((created_time / 1000) FROM_UNIXTIME _UTF-16LE'-MM-dd
>> HH:mm:ss')) AS eventTime]) ->
>>
>> WatermarkAssigner(rowtime=[eventTime], watermark=[(eventTime -
>> 3000:INTERVAL SECOND)]) ->
>>
>> Calc(select=[uid, sex, age, created_time]) ->
>>
>> Sink: Sink(table=[default_catalog.default_database.user_mysql],
>> fields=[uid, sex, age, created_time])
>> 但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink
>> ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-08-13 15:39:44,"forideal"  写道:
>> >Hi Zhou Zach:
>> >你可以试试 env.disableOperatorChaining();
>> >然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。
>> >> 我是怎么设置参数的
>> >我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样
>> >tableEnv.getConfig().getConfiguration() .setString(key,
>> configs.getString(key, null));
>> >同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL
>> '10' SECOND
>> >
>> >Best forideal
>> >
>> >
>> >在 2020-08-13 15:20:13,"Zhou Zach"  写道:
>> >>
>> >>
>> >>
>> >>Hi forideal,
>> >>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
>> >>
>> >>
>> >>val streamExecutionEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment
>> >>
>> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> >>streamExecutionEnv.setStateBackend(new
>> RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
>> >>
>> >>val blinkEnvSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >>val streamTableEnv =
>> StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)
>> >>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
>> >>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
>> >>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
>> >>
>> >>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")
>> >>
>> >>
>> >>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
>> >>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>在 2020-08-13 14:02:58,"forideal"  写道:
>> >>>大家好
>> >>>
>> >>>   问题的原因定位到了。
>> >>>   由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
>> >>>   这个时候,我进行了 disable chain,观察 watermark
>> 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题)
>> >>>   发现在  WatermarkAssigner(rowtime=[event_time],
>> watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source
>> chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark
>> 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
>> >>>   定位到由于 kafka 部分 partition 无数据导致 No watermark 加上
>> table.exec.source.idle-timeout = 10s 参数即可。
>> >>>   当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task
>> 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain
>> 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
>> >>>
>> >>>
>> >>>Best forideal
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>在 2020-08-13 12:56:57,"forideal"  写道:
>> 大家好
>> 
>> 
>> 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是
>> StreamExecWatermarkAssigner
>> 在translateToPlanInternal 中生成了如下一个 class 代码,
>> public final class WatermarkGenerator$2 extends
>> org.apache.flink.table.runtime.generated.WatermarkGenerator { public
>> WatermarkGenerator$2(Object[] references) throws Exception { } @Override
>> public void open(org.apache.flink.configuration.Configuration parameters)
>> throws Exception { } @Override public Long
>> currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws
>> Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean
>> isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp
>> result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) {
>> field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5
>> = null; if (!isNull$4) { result$5 =
>> 

Re: Re:Re:Re:Re:Flink SQL No Watermark

2020-08-13 文章 Shengkai Fang
hi, watermark本来就是通过watermark assigner生成的。这是正常现象。
我想问问 你有没有试过调大并行度来解决这个问题?因为不同partition的数据可能存在时间上的差异。

Zhou Zach  于2020年8月13日周四 下午4:33写道:

>
>
>
> Hi forideal, Shengkai Fang,
>
> 加上env.disableOperatorChaining()之后,发现5个算子,
>
>
>
>
> Source: TableSourceScan(table=[[default_catalog, default_database, user]],
> fields=[uid, sex, age, created_time]) ->
>
> Calc(select=[uid, sex, age, created_time, () AS procTime,
> TO_TIMESTAMP(((created_time / 1000) FROM_UNIXTIME _UTF-16LE'-MM-dd
> HH:mm:ss')) AS eventTime]) ->
>
> WatermarkAssigner(rowtime=[eventTime], watermark=[(eventTime -
> 3000:INTERVAL SECOND)]) ->
>
> Calc(select=[uid, sex, age, created_time]) ->
>
> Sink: Sink(table=[default_catalog.default_database.user_mysql],
> fields=[uid, sex, age, created_time])
> 但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink
> ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-13 15:39:44,"forideal"  写道:
> >Hi Zhou Zach:
> >你可以试试 env.disableOperatorChaining();
> >然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。
> >> 我是怎么设置参数的
> >我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样
> >tableEnv.getConfig().getConfiguration() .setString(key,
> configs.getString(key, null));
> >同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL
> '10' SECOND
> >
> >Best forideal
> >
> >
> >在 2020-08-13 15:20:13,"Zhou Zach"  写道:
> >>
> >>
> >>
> >>Hi forideal,
> >>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
> >>
> >>
> >>val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> >>streamExecutionEnv.setStateBackend(new
> RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
> >>
> >>val blinkEnvSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >>val streamTableEnv =
> StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)
> >>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
> >>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
> >>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
> >>
> >>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")
> >>
> >>
> >>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
> >>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>在 2020-08-13 14:02:58,"forideal"  写道:
> >>>大家好
> >>>
> >>>   问题的原因定位到了。
> >>>   由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
> >>>   这个时候,我进行了 disable chain,观察 watermark
> 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题)
> >>>   发现在  WatermarkAssigner(rowtime=[event_time],
> watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source
> chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark
> 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
> >>>   定位到由于 kafka 部分 partition 无数据导致 No watermark 加上
> table.exec.source.idle-timeout = 10s 参数即可。
> >>>   当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task
> 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain
> 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
> >>>
> >>>
> >>>Best forideal
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>在 2020-08-13 12:56:57,"forideal"  写道:
> 大家好
> 
> 
> 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是
> StreamExecWatermarkAssigner
> 在translateToPlanInternal 中生成了如下一个 class 代码,
> public final class WatermarkGenerator$2 extends
> org.apache.flink.table.runtime.generated.WatermarkGenerator { public
> WatermarkGenerator$2(Object[] references) throws Exception { } @Override
> public void open(org.apache.flink.configuration.Configuration parameters)
> throws Exception { } @Override public Long
> currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws
> Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean
> isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp
> result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) {
> field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5
> = null; if (!isNull$4) { result$5 =
> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
> - ((long) 1L), field$3.getNanoOfMillisecond()); } if (isNull$4) {
> return null; } else { return result$5.getMillisecond(); } } @Override
> public void close() throws Exception { } }
> 
> 
> 
>    其中关键的信息是 result$5 =
> 

Re: user-zh

2020-08-13 文章 Xingbo Huang
Hi,

退订请发邮件到  user-zh-unsubscr...@flink.apache.org

详细的可以参考 [1]

[1] https://flink.apache.org/zh/community.html#section-1

Best,
Xingbo

15037433...@163.com <15037433...@163.com> 于2020年8月13日周四 下午3:40写道:

>
> 退订
>
>
> 15037433...@163.com
>


Re: 退订

2020-08-13 文章 Xingbo Huang
Hi,

退订请发邮件到  user-zh-unsubscr...@flink.apache.org

详细的可以参考 [1]

[1] https://flink.apache.org/zh/community.html#section-1

Best,
Xingbo

李强  于2020年8月13日周四 下午4:35写道:

> 退订


Re: FLINK1.11.1 对OGG数据入HIVE的问题咨询

2020-08-13 文章 Rui Li
你提到的这三个难点现在的hive
connector确实是支持不了的。前两个也许可以通过把写不同的表变成写同一个表的不同分区来解决。第三个可能可以通过检查数据跟目标schema是不是匹配,来判断是不是需要去跟HMS同步新的schema。

On Thu, Aug 13, 2020 at 3:27 PM USERNAME  wrote:

>
>
> 任务流程:
> OGG->KAFKA->FLINK->HIVE
>
>
> KAFKA数据样例:
> 其中会有多个
> "table",所以"before","after"中的字段是不一致的,同一个表如果有有DDL变更也会导致"before","after"字段的变更。
> {
> "table": "SCOOT.TABLENAME",
> "op_type": "U",
> "op_ts": "2020-08-11 07:53:40.008001",
> "current_ts": "2020-08-11T15:56:41.233000",
> "pos": "980119769930",
> "before": {
> "C1": 4499000,
> "C2": null,
> "C3": null,
> "C4": null,
> "C5": null
> },
> "after": {
> "C1": 4499000,
> "C2": null,
> "C3": "",
> "C4": "",
> "C5": "通过"
> }
> }
> 问题:有没有优雅的方式在入到hive中可以跟源库表及结构一致?
> 看到很多FLINK to HIVE 的案例,很多大公司也都在用实时数仓,不知入hive这部分如果做到灵活,拓展,通用的?
>
>
> 例如 样例数据在hive中建表
> create table TABLENAME
> (
> op_type  STRING,
> op_ts  STRING,
> current_ts   STRING,
> pos STRING,
> "C1" STRING,
> "C2" STRING,
> "C3" STRING,
> "C4" STRING,
> "C5" STRING
> )
> 理解的难点,
> 1.同一FLINK任务需要写入多个表,每个表的字段是不一致的
> 2.同一FLINK任务会有新增的表,需自动适配
> 3.同一个表结构不是固定的,需要随着源库DDL变更而变更,可能的字段类型长度变更,新增删除字段等
>
>
> 或者只能采用通过表结构
> create table TABLENAME
> (
> table   STRING,
> op_type  STRING,
> op_ts  STRING,
> current_ts   STRING,
> pos STRING,
> "before"  STRING,
> "after" STRING
> )
> 然后剩下的在HIVE中解决。
>
>
> 或者有其他更好的方案?
>
>

-- 
Best regards!
Rui Li


退订

2020-08-13 文章 李强
退订

Re:Re:Re:Re:Re:Flink SQL No Watermark

2020-08-13 文章 Zhou Zach



Hi forideal, Shengkai Fang,
   
加上env.disableOperatorChaining()之后,发现5个算子,




Source: TableSourceScan(table=[[default_catalog, default_database, user]], 
fields=[uid, sex, age, created_time]) -> 

Calc(select=[uid, sex, age, created_time, () AS procTime, 
TO_TIMESTAMP(((created_time / 1000) FROM_UNIXTIME _UTF-16LE'-MM-dd 
HH:mm:ss')) AS eventTime]) -> 

WatermarkAssigner(rowtime=[eventTime], watermark=[(eventTime - 3000:INTERVAL 
SECOND)]) -> 

Calc(select=[uid, sex, age, created_time]) -> 

Sink: Sink(table=[default_catalog.default_database.user_mysql], fields=[uid, 
sex, age, created_time])
但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink
 ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的














在 2020-08-13 15:39:44,"forideal"  写道:
>Hi Zhou Zach:
>你可以试试 env.disableOperatorChaining();
>然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。
>> 我是怎么设置参数的
>我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样
>tableEnv.getConfig().getConfiguration() .setString(key, configs.getString(key, 
>null));
>同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL '10' 
>SECOND
>
>Best forideal
>
>
>在 2020-08-13 15:20:13,"Zhou Zach"  写道:
>>
>>
>>
>>Hi forideal,
>>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
>>
>>
>>val streamExecutionEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment
>>
>> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>streamExecutionEnv.setStateBackend(new 
>> RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
>>
>>val blinkEnvSettings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, 
>> blinkEnvSettings)
>>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
>>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
>>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
>>
>>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")
>>
>>
>>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
>>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2020-08-13 14:02:58,"forideal"  写道:
>>>大家好
>>>
>>>   问题的原因定位到了。
>>>   由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
>>>   这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op 
>>> chain 在一起,不能确定到底是那个环节存在问题)
>>>   发现在  WatermarkAssigner(rowtime=[event_time], 
>>> watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source 
>>> chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 
>>> 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
>>>   定位到由于 kafka 部分 partition 无数据导致 No watermark 加上  
>>> table.exec.source.idle-timeout = 10s 参数即可。
>>>   当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 
>>> watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain 观察每个 op 
>>> 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
>>>
>>>
>>>Best forideal
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>在 2020-08-13 12:56:57,"forideal"  写道:
大家好


关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 
 StreamExecWatermarkAssigner
在translateToPlanInternal 中生成了如下一个 class 代码,
public final class WatermarkGenerator$2 extends 
org.apache.flink.table.runtime.generated.WatermarkGenerator { public 
WatermarkGenerator$2(Object[] references) throws Exception { } @Override 
public void open(org.apache.flink.configuration.Configuration parameters) 
throws Exception { } @Override public Long 
currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws 
Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean 
isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp 
result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { 
field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 
= null; if (!isNull$4) { result$5 = 
org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
 - ((long) 1L), field$3.getNanoOfMillisecond()); } if (isNull$4) { 
return null; } else { return result$5.getMillisecond(); } } @Override 
public void close() throws Exception { } } 
 


   其中关键的信息是 result$5 = 
 org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
  - ((long) 1L), field$3.getNanoOfMillisecond());
确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 
watermark。
在 

Re: Re:Re: 用hive streaming写 orc文件的问题

2020-08-13 文章 Rui Li
如果是IDE里执行的话,tableEnv.executeSql是马上返回的,然后就退出了,可以用类似这种写法等作业结束:

val tableResult = tEnv.executeSql(insert)
// wait to finish
tableResult.getJobClient.get
  .getJobExecutionResult(Thread.currentThread.getContextClassLoader)
  .get

> 为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?

这里其实是缺少orc的依赖,按说只有table.exec.hive.fallback-mapred-writer设置为false的时候才会发生,我后面修复一下

> sql client 我想要设置checkpoint生成间隔我应该在哪里设置?

可以在flink-conf.yaml里设置execution.checkpointing.interval


On Thu, Aug 13, 2020 at 10:23 AM flink小猪 <18579099...@163.com> wrote:

> 添加不了附件,我就直接贴代码了
>
> import java.time.Duration
>
>
> import org.apache.flink.streaming.api.{CheckpointingMode,
> TimeCharacteristic}
> import
> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect,
> TableResult}
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.table.catalog.hive.HiveCatalog
>
>
>
>
> /**
>   * author dinghh
>   * time 2020-08-11 17:03
>   */
> object WriteHiveStreaming {
> def main(args: Array[String]): Unit = {
>
>
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamEnv.setParallelism(3)
>
>
> val tableEnvSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build()
> val tableEnv = StreamTableEnvironment.create(streamEnv,
> tableEnvSettings)
>
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE)
>
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofSeconds(20))
>
>
>
>
>
>
> val catalogName = "my_catalog"
> val catalog = new HiveCatalog(
> catalogName,  // catalog name
> "default",// default database
>
> "D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources",
> // Hive config (hive-site.xml) directory
> "1.1.0"   // Hive version
> )
> tableEnv.registerCatalog(catalogName, catalog)
> tableEnv.useCatalog(catalogName)
>
>
>
>
> //删除流表
> tableEnv.executeSql(
> """
>   |DROP TABLE IF EXISTS `stream_db`.`datagen_user`
> """.stripMargin)
>
>
> //创建流表
> tableEnv.executeSql(
> """
>   |CREATE TABLE `stream_db`.`datagen_user` (
>   | id INT,
>   | name STRING,
>   | dt AS localtimestamp,
>   | WATERMARK FOR dt AS dt
>   |) WITH (
>   | 'connector' = 'datagen',
>   | 'rows-per-second'='10',
>   | 'fields.id.kind'='random',
>   | 'fields.id.min'='1',
>   | 'fields.id.max'='1000',
>   | 'fields.name.length'='5'
>   |)
> """.stripMargin)
>
>
> //切换hive方言
> tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>
>
> //删除hive orc表
> tableEnv.executeSql(
> """
>   |DROP TABLE IF EXISTS `default`.`hive_user_orc`
>   |
> """.stripMargin)
>
>
> //创建hive orc表
> tableEnv.executeSql(
> """
>   |CREATE TABLE `default`.`hive_user_orc` (
>   |  id INT,
>   |  name STRING
>   |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute
> STRING ) STORED AS ORC TBLPROPERTIES (
>   |  'partition.time-extractor.timestamp-pattern'='$ts_dt
> $ts_hour:$ts_minute:00.000',
>   |  'sink.partition-commit.trigger'='partition-time',
>   |  'sink.partition-commit.delay'='1 min',
>   |
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>   |)
> """.stripMargin)
>
>
> //删除hive parquet表
> tableEnv.executeSql(
> """
>   |DROP TABLE IF EXISTS `default`.`hive_user_parquet`
> """.stripMargin)
> //创建hive parquet表
> tableEnv.executeSql(
> """
>   |CREATE TABLE `default`.`hive_user_parquet` (
>   |  id INT,
>   |  name STRING
>   |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute
> STRING) STORED AS PARQUET TBLPROPERTIES (
>   |  'partition.time-extractor.timestamp-pattern'='$ts_dt
> $ts_hour:$ts_minute:00.000',
>   |  'sink.partition-commit.trigger'='partition-time',
>   |  'sink.partition-commit.delay'='1 min',
>   |
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>   |)
> """.stripMargin)
> 

Re:Re:Re:Re:Flink SQL No Watermark

2020-08-13 文章 forideal
Hi Zhou Zach:
你可以试试 env.disableOperatorChaining();
然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。
> 我是怎么设置参数的
我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样
tableEnv.getConfig().getConfiguration() .setString(key, configs.getString(key, 
null));
同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL '10' 
SECOND

Best forideal


在 2020-08-13 15:20:13,"Zhou Zach"  写道:
>
>
>
>Hi forideal,
>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
>
>
>val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>streamExecutionEnv.setStateBackend(new 
> RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
>
>val blinkEnvSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, 
> blinkEnvSettings)
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
>
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")
>
>
>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-08-13 14:02:58,"forideal"  写道:
>>大家好
>>
>>   问题的原因定位到了。
>>   由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
>>   这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op 
>> chain 在一起,不能确定到底是那个环节存在问题)
>>   发现在  WatermarkAssigner(rowtime=[event_time], 
>> watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source 
>> chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 
>> 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
>>   定位到由于 kafka 部分 partition 无数据导致 No watermark 加上  
>> table.exec.source.idle-timeout = 10s 参数即可。
>>   当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 
>> watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain 观察每个 op 
>> 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
>>
>>
>>Best forideal
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2020-08-13 12:56:57,"forideal"  写道:
>>>大家好
>>>
>>>
>>>关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 
>>> StreamExecWatermarkAssigner
>>>在translateToPlanInternal 中生成了如下一个 class 代码,
>>>public final class WatermarkGenerator$2 extends 
>>>org.apache.flink.table.runtime.generated.WatermarkGenerator { public 
>>>WatermarkGenerator$2(Object[] references) throws Exception { } @Override 
>>>public void open(org.apache.flink.configuration.Configuration parameters) 
>>>throws Exception { } @Override public Long 
>>>currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws 
>>>Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean 
>>>isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp 
>>>result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { 
>>>field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 
>>>= null; if (!isNull$4) { result$5 = 
>>>org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
>>> - ((long) 1L), field$3.getNanoOfMillisecond()); } if (isNull$4) { 
>>>return null; } else { return result$5.getMillisecond(); } } @Override public 
>>>void close() throws Exception { } } 
>>> 
>>>
>>>
>>>   其中关键的信息是 result$5 = 
>>> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
>>>  - ((long) 1L), field$3.getNanoOfMillisecond());
>>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 
>>>watermark。
>>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 
>>>这样的结果。因为这部分codegen的代码确实无法进一步debug了。
>>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
>>>
>>>  Best forideal
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>在 2020-08-11 17:13:01,"forideal"  写道:
大家好,请教一个问题


   我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 
 watermark。消费大量的数据的时候,就无法生成watermark。
   一直是No Watermark。 暂时找不到排查问题的思路。
  Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 
 EventTime mode 模式,Blink Planner。
|
No Watermark |
   SQL如下


  DDL:
   create table test(
   user_id varchar,
   action varchar,
   event_time TIMESTAMP(3),
   WATERMARK FOR event_time AS event_time - INTERVAL 
 '10' SECOND
   ) with();


  

user-zh

2020-08-13 文章 15037433...@163.com

退订


15037433...@163.com


flink 1.11 日志不能正常打印问题

2020-08-13 文章 caozhen
flink1.11好像是用的log4j2,我的mainjar用到了log4j,  两者类有冲突,导致JM、TM日志为空。

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/job.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/flink-1.11.1-k8s/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.

目前
1、mainjar中的log4j不能完全去掉,毕竟有很多地方在用,而且后期新增依赖也有可能会用,不太可行


大家又啥好办法嘛



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 1.11.1 sql client 流式 join 出现预期之外的含 null 行

2020-08-13 文章 LittleFall
mysql 的建表语句
use test;
create table base (
id int primary key,
location varchar(20)
);
create table stuff(
id int primary key,
b_id int,
name varchar(20)
);

flink sql client 的建表语句
create table base (
id int primary key,
location varchar(20)
)WITH (
   'connector' = 'kafka',
   'topic' = 'example',
   'properties.group.id' = 'testGroup',
   'scan.startup.mode' = 'latest-offset',
   'properties.bootstrap.servers' = 'localhost:9092',
   'format' = 'canal-json'
);
create table stuff(
id int primary key,
b_id int,
name varchar(20)
)WITH (
   'connector' = 'kafka',
   'topic' = 'example',
   'properties.group.id' = 'testGroup',
   'scan.startup.mode' = 'latest-offset',
   'properties.bootstrap.servers' = 'localhost:9092',
   'format' = 'canal-json'
);

flink 查询语句
select distinct stuff.id s_id, base.id b_id, base.location, stuff.name
from stuff inner join base
on stuff.b_id = base.id;

mysql 插入语句
insert into base values (1, 'beijing');
insert into stuff values (1, 1, 'zz');

flink 结果

 

mysql 执行同样的查询的结果:
+--+--+--+--+
| s_id | b_id | location | name |
+--+--+--+--+
|1 |1 | beijing  | zz   |
+--+--+--+--+
1 row in set (0.01 sec)


而且有时候连结果正确的行都不会出现,只会出现含 null 的行。

求助大家。。。





--
Sent from: http://apache-flink.147419.n8.nabble.com/


请教关于Flink算子FINISHED状态时无法保存Checkpoint的问题

2020-08-13 文章 杨豫鲁

请教大家一个我最近在配置Flink流的过程中遇到问题,

flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。









flink 1.11 SQL Client 流式 join 出现了不应有的含 null 的行

2020-08-13 文章 LittleFall
在 mysql 上的建表语句:
use test;
create table base (
id int primary key,
location varchar(20)
);
create table stuff(
id int primary key,
b_id int,
name varchar(20)
);

在 flink sql client 中的建表语句:
create table base (
id int primary key,
location varchar(20)
)WITH (
   'connector' = 'kafka',
   'topic' = 'example',
   'properties.group.id' = 'testGroup',
   'properties.bootstrap.servers' = 'localhost:9092',
   'format' = 'canal-json'
);
create table stuff(
id int primary key,
b_id int,
name varchar(20)
)WITH (
   'connector' = 'kafka',
   'topic' = 'example',
   'properties.group.id' = 'testGroup',
   'properties.bootstrap.servers' = 'localhost:9092',
   'format' = 'canal-json'
);

在 flink sql client 中执行查询:
select distinct stuff.id s_id, base.id b_id, base.location, stuff.name
from stuff inner join base
on stuff.b_id = base.id;

在 mysql 中插入数据:
mysql> insert into base values (1, "beijing");
Query OK, 1 row affected (0.02 sec)

mysql> insert into stuff values (1, 1, "zz");
Query OK, 1 row affected (0.01 sec)


结果如附图所示。

而在 mysql 中执行查询:
mysql> select distinct stuff.id s_id, base.id b_id, base.location,
stuff.name
-> from stuff inner join base
-> on stuff.b_id = base.id;
+--+--+--+--+
| s_id | b_id | location | name |
+--+--+--+--+
|1 |1 | beijing  | zz   |
+--+--+--+--+
1 row in set (0.00 sec)

请问各位,为什么会导致结果不正确呢?而且有时连正确的那行都没有,只有一行含 null 的。







--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re:Re:Flink SQL No Watermark

2020-08-13 文章 Shengkai Fang
hi 那你有没有试过将并行度设置为partition的数量

Zhou Zach 于2020年8月13日 周四下午3:21写道:

>
>
>
> Hi forideal,
> 我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
>
>
> val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamExecutionEnv.setStateBackend(new
> RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
>
> val blinkEnvSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
> blinkEnvSettings)
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
>
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")
>
>
> 并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
> 在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-13 14:02:58,"forideal"  写道:
> >大家好
> >
> >   问题的原因定位到了。
> >   由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
> >   这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个
> op chain 在一起,不能确定到底是那个环节存在问题)
> >   发现在  WatermarkAssigner(rowtime=[event_time],
> watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source
> chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark
> 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
> >   定位到由于 kafka 部分 partition 无数据导致 No watermark 加上
> table.exec.source.idle-timeout = 10s 参数即可。
> >   当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task
> 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain
> 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
> >
> >
> >Best forideal
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2020-08-13 12:56:57,"forideal"  写道:
> >>大家好
> >>
> >>
> >>关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是
> StreamExecWatermarkAssigner
> >>在translateToPlanInternal 中生成了如下一个 class 代码,
> >>public final class WatermarkGenerator$2 extends
> org.apache.flink.table.runtime.generated.WatermarkGenerator { public
> WatermarkGenerator$2(Object[] references) throws Exception { } @Override
> public void open(org.apache.flink.configuration.Configuration parameters)
> throws Exception { } @Override public Long
> currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws
> Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean
> isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp
> result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) {
> field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5
> = null; if (!isNull$4) { result$5 =
> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
> - ((long) 1L), field$3.getNanoOfMillisecond()); } if (isNull$4) {
> return null; } else { return result$5.getMillisecond(); } } @Override
> public void close() throws Exception { } }
> >>
> >>
> >>
> >>   其中关键的信息是 result$5 =
> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
> - ((long) 1L), field$3.getNanoOfMillisecond());
> >>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> 的定义获取的 watermark。
> >>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark
> 这样的结果。因为这部分codegen的代码确实无法进一步debug了。
> >>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
> >>
> >>  Best forideal
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>在 2020-08-11 17:13:01,"forideal"  写道:
> >>>大家好,请教一个问题
> >>>
> >>>
> >>>   我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成
> watermark。消费大量的数据的时候,就无法生成watermark。
> >>>   一直是No Watermark。 暂时找不到排查问题的思路。
> >>>  Flink 版本号是 1.10,kafka
> 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。
> >>>|
> >>>No Watermark |
> >>>   SQL如下
> >>>
> >>>
> >>>  DDL:
> >>>   create table test(
> >>>   user_id varchar,
> >>>   action varchar,
> >>>   event_time TIMESTAMP(3),
> >>>   WATERMARK FOR event_time AS event_time -
> INTERVAL '10' SECOND
> >>>   ) with();
> >>>
> >>>
> >>>  DML:
> >>>insert into
> >>>  console
> >>>select
> >>>  user_id,
> >>>  f_get_str(bind_id) as id_list
> >>>from
> >>>  (
> >>>select
> >>>  action as bind_id,
> >>>  user_id,
> >>>  event_time
> >>>from
> >>>  (
> >>>SELECT
> >>>  user_id,
> >>>  action,
> >>>  PROCTIME() as 

FLINK1.11.1 对OGG数据入HIVE的问题咨询

2020-08-13 文章 USERNAME


任务流程:
OGG->KAFKA->FLINK->HIVE


KAFKA数据样例:
其中会有多个 
"table",所以"before","after"中的字段是不一致的,同一个表如果有有DDL变更也会导致"before","after"字段的变更。
{
"table": "SCOOT.TABLENAME",
"op_type": "U",
"op_ts": "2020-08-11 07:53:40.008001",
"current_ts": "2020-08-11T15:56:41.233000",
"pos": "980119769930",
"before": {
"C1": 4499000,
"C2": null,
"C3": null,
"C4": null,
"C5": null
},
"after": {
"C1": 4499000,
"C2": null,
"C3": "",
"C4": "",
"C5": "通过"
}
}
问题:有没有优雅的方式在入到hive中可以跟源库表及结构一致?
看到很多FLINK to HIVE 的案例,很多大公司也都在用实时数仓,不知入hive这部分如果做到灵活,拓展,通用的?


例如 样例数据在hive中建表
create table TABLENAME
(
op_type  STRING,
op_ts  STRING,
current_ts   STRING,
pos STRING,
"C1" STRING,
"C2" STRING,
"C3" STRING,
"C4" STRING,
"C5" STRING
)
理解的难点,
1.同一FLINK任务需要写入多个表,每个表的字段是不一致的
2.同一FLINK任务会有新增的表,需自动适配
3.同一个表结构不是固定的,需要随着源库DDL变更而变更,可能的字段类型长度变更,新增删除字段等


或者只能采用通过表结构
create table TABLENAME
(
table   STRING,
op_type  STRING,
op_ts  STRING,
current_ts   STRING,
pos STRING,
"before"  STRING,
"after" STRING
)
然后剩下的在HIVE中解决。


或者有其他更好的方案?



Re:Re:Re:Flink SQL No Watermark

2020-08-13 文章 Zhou Zach



Hi forideal,
我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:


val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamExecutionEnv.setStateBackend(new 
RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))

val blinkEnvSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, 
blinkEnvSettings)

streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)

streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))

streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))


streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")


并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的














在 2020-08-13 14:02:58,"forideal"  写道:
>大家好
>
>   问题的原因定位到了。
>   由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
>   这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op 
> chain 在一起,不能确定到底是那个环节存在问题)
>   发现在  WatermarkAssigner(rowtime=[event_time], watermark=[(event_ti...)这个 
> op 中部分 task 为 No watermark,由于这个op和source chain在一起,导致这个vertex 
> 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 为 min(parent task 
> output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
>   定位到由于 kafka 部分 partition 无数据导致 No watermark 加上  
> table.exec.source.idle-timeout = 10s 参数即可。
>   当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 
> watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain 观察每个 op 
> 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
>
>
>Best forideal
>
>
>
>
>
>
>
>
>在 2020-08-13 12:56:57,"forideal"  写道:
>>大家好
>>
>>
>>关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 
>> StreamExecWatermarkAssigner
>>在translateToPlanInternal 中生成了如下一个 class 代码,
>>public final class WatermarkGenerator$2 extends 
>>org.apache.flink.table.runtime.generated.WatermarkGenerator { public 
>>WatermarkGenerator$2(Object[] references) throws Exception { } @Override 
>>public void open(org.apache.flink.configuration.Configuration parameters) 
>>throws Exception { } @Override public Long 
>>currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws 
>>Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean 
>>isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp 
>>result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { 
>>field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = 
>>null; if (!isNull$4) { result$5 = 
>>org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
>> - ((long) 1L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return 
>>null; } else { return result$5.getMillisecond(); } } @Override public void 
>>close() throws Exception { } } 
>> 
>>
>>
>>   其中关键的信息是 result$5 = 
>> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
>>  - ((long) 1L), field$3.getNanoOfMillisecond());
>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 
>>watermark。
>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 
>>这样的结果。因为这部分codegen的代码确实无法进一步debug了。
>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
>>
>>  Best forideal
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2020-08-11 17:13:01,"forideal"  写道:
>>>大家好,请教一个问题
>>>
>>>
>>>   我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 
>>> watermark。消费大量的数据的时候,就无法生成watermark。
>>>   一直是No Watermark。 暂时找不到排查问题的思路。
>>>  Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 
>>> EventTime mode 模式,Blink Planner。
>>>|
>>>No Watermark |
>>>   SQL如下
>>>
>>>
>>>  DDL:
>>>   create table test(
>>>   user_id varchar,
>>>   action varchar,
>>>   event_time TIMESTAMP(3),
>>>   WATERMARK FOR event_time AS event_time - INTERVAL 
>>> '10' SECOND
>>>   ) with();
>>>
>>>
>>>  DML:
>>>insert into
>>>  console
>>>select
>>>  user_id,
>>>  f_get_str(bind_id) as id_list
>>>from
>>>  (
>>>select
>>>  action as bind_id,
>>>  user_id,
>>>  event_time
>>>from
>>>  (
>>>SELECT
>>>  user_id,
>>>  action,
>>>  PROCTIME() as proc_time,
>>>  event_time
>>>FROM
>>>  test
>>>  ) T
>>>where
>>>  user_id is not null
>>>  and user_id <> ''
>>>  and CHARACTER_LENGTH(user_id) = 24
>>>  ) T
>>>group by
>>>  

Re:Re: flink1.11.1 flink on yarn 任务启动报错

2020-08-13 文章 郭华威



好的,谢谢!














在 2020-08-13 14:08:07,"Congxian Qiu"  写道:
>Hi
>
>   这应该是个已知问题[1] 在 1.11.2 和 1.12 中已经修复
>
>[1] https://issues.apache.org/jira/browse/FLINK-18710
>Best,
>Congxian
>
>
>郭华威  于2020年8月13日周四 上午11:05写道:
>
>> 你好,请教下:
>> flink1.11.1 flink on yarn 任务启动报错:
>>
>>
>> 启动命令:
>> /opt/flink-1.11.1/bin/flink  run  -p 4 -ys 2 -m yarn-cluster -c
>> yueworld.PVUV.MyPvUv  /mywork/flink/flink_1.11.1-1.0.jar
>>
>>
>> 报错信息:
>>
>>
>> 2020-08-13 10:53:08,160 ERROR
>> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler [] -
>> Unhandled exception.
>> org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to
>> serialize the result for RPC call : requestTaskManagerInfo.
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:368)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:335)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>> ~[?:1.8.0_221]
>> at
>> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
>> ~[?:1.8.0_221]
>> at
>> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
>> ~[?:1.8.0_221]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:329)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:298)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [flink-dist_2.11-1.11.1.jar:1.11.1]
>> Caused by: java.io.NotSerializableException:
>> org.apache.flink.runtime.rest.messages.ResourceProfileInfo
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> ~[?:1.8.0_221]
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> ~[?:1.8.0_221]
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> ~[?:1.8.0_221]
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> ~[?:1.8.0_221]
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> ~[?:1.8.0_221]
>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> ~[?:1.8.0_221]
>> at java.util.ArrayList.writeObject(ArrayList.java:766) ~[?:1.8.0_221]
>> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?]
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_221]
>> at java.lang.reflect.Method.invoke(Method.java:498) 

Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

2020-08-13 文章 Jeff Zhang
你的10台机器是flink standalone还是 yarn集群 ?
如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。

另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
或者加入钉钉群讨论,钉钉群号: 32803524


Zhao,Yi(SEC)  于2020年8月13日周四 下午1:02写道:

> 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
> 现在比较混乱,哪些jar需要放到A,哪些放到B。
>
>
> (1) kafka ssl
> 证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
>
> (2)
>  
> flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
>
>
>
> 总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
>
> 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
>
>
>

-- 
Best Regards

Jeff Zhang


????????????????flink

2020-08-13 文章 ??????
kafka0.10??flink1.10.flinkkafka

HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys

2020-08-13 文章 xiao cai
Hi All:
使用flink-sql写入hbase sink时报错:
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.


我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表
kafka source表与hbase 维表left join后的结果insert到hbase sink表中:
sql如下:
create table user_click_source(
`id` bigint,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint,
`catalog_id` int,
`device_id` int,
`user_id` int,
`proc_time` timestamp(3)
PRIMARY KEY (id) NOT ENFORCED
)with(
'connector.type' = 'kafka',
……
)
;
create table dim_user(
`rowkey` varchar,
cf ROW<
`id` int,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint
>,
ts bigint
)with(
'connector.type'='hbase',
……
)
;


create table dim_device(
`rowkey` varchar,
cf ROW<
`id` int,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint
>
)with(
'connector.type'='hbase',
……
)
;


create table dim_catalog(
`rowkey` varchar,
cf ROW<
`id` int,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint
>
)with(
'connector.type'='hbase',
……
)
;
create table hbase_full_user_click_case1_sink(
`rowkey` bigint,
cf ROW<
`click_id` bigint,
`click_name` varchar,
`click_partition` int,
`click_event_time` bigint,
`click_write_time` bigint,
`click_snapshot_time` bigint,
`click_max_snapshot_time` bigint,
`catalog_id` int,
`catalog_name` varchar,
`catalog_partition` int,
`catalog_event_time` bigint,
`catalog_write_time` bigint,
`catalog_snapshot_time` bigint,
`catalog_max_snapshot_time` bigint,
`device_id` int,
`device_name` varchar,
`device_partition` int,
`device_event_time` bigint,
`device_write_time` bigint,
`device_snapshot_time` bigint,
`device_max_snapshot_time` bigint,
`user_id` int,
`user_name` varchar,
`user_partition` int,
`user_event_time` bigint,
`user_write_time` bigint,
`user_snapshot_time` bigint,
`user_max_snapshot_time` bigint
>,
PRIMARY KEY (rowkey) NOT ENFORCED
)with(
'connector.type'='hbase',
……
)
;
insert into hbase_full_user_click_case1_sink
select
`click_id`,
ROW(
`click_id`,
`click_name`,
`click_partition`,
`click_event_time`,
`click_write_time`,
`click_snapshot_time`,
`click_max_snapshot_time`,
`catalog_id`,
`catalog_name`,
`catalog_partition`,
`catalog_event_time`,
`catalog_write_time`,
`catalog_snapshot_time`,
`catalog_max_snapshot_time`,
`device_id`,
`device_name`,
`device_partition`,
`device_event_time`,
`device_write_time`,
`device_snapshot_time`,
`device_max_snapshot_time`,
`user_id`,
`user_name`,
`user_partition`,
`user_event_time`,
`user_write_time`,
`user_snapshot_time`,
`user_max_snapshot_time`
)
from (select
click.id as `click_id`,
click.name as `click_name`,
click.kafka_partition as `click_partition`,
click.event_time as `click_event_time`,
click.write_time as `click_write_time`,
click.snapshot_time as `click_snapshot_time`,
click.max_snapshot_time as `click_max_snapshot_time`,
cat.cf.id as `catalog_id`,
cat.cf.name as `catalog_name`,
cat.cf.kafka_partition as `catalog_partition`,
cat.cf.event_time as `catalog_event_time`,
cat.cf.write_time as `catalog_write_time`,
cat.cf.snapshot_time as `catalog_snapshot_time`,
cat.cf.max_snapshot_time as `catalog_max_snapshot_time`,
dev.cf.id as `device_id`,
dev.cf.name as `device_name`,
dev.cf.kafka_partition as `device_partition`,
dev.cf.event_time as `device_event_time`,
dev.cf.write_time as `device_write_time`,
dev.cf.snapshot_time as `device_snapshot_time`,
dev.cf.max_snapshot_time as `device_max_snapshot_time`,
u.cf.id as `user_id`,
u.cf.name as `user_name`,
u.cf.kafka_partition as `user_partition`,
u.cf.event_time as `user_event_time`,
u.cf.write_time as `user_write_time`,
u.cf.snapshot_time as `user_snapshot_time`,
u.cf.max_snapshot_time as `user_max_snapshot_time`


from (select
id,
`name`,
`kafka_partition`,
`event_time`,
`write_time`,
`snapshot_time`,
`max_snapshot_time`,
cast(catalog_id as varchar) as catalog_key,
cast(device_id as varchar) as device_key,
cast(user_id as varchar) as user_key,
`catalog_id`,
`device_id`,
`user_id`,
`proc_time`,
`event_time`,
FROM user_click_source
GROUP BY TUMBLE(event_time, INTERVAL '1' SECOND),
`id`,
`name`,
`kafka_partition`,
`event_time`,
`write_time`,
`snapshot_time`,
`max_snapshot_time`,
`catalog_id`,
`device_id`,
`user_id`,
`proc_time`) click


left join dim_catalog cat on click.catalog_key = cat.rowkey
left join dim_device dev on click.device_key = dev.rowkey
left join dim_user u on click.user_key = u.rowkey and click.event_time = u.ts
) t

Re: 请教关于Flink算子FINISHED状态时无法保存Checkpoint的问题

2020-08-13 文章 yulu yang
收到,十分感谢,我学习一下!

Congxian Qiu  于2020年8月13日周四 下午2:04写道:

> Hi
>不好意思,上一份邮件没有说完就发送出去了。
>如果你希望把从其他地方读入 字典表,然后在 flink 中使用,或许可以看看 broadcast state[1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/broadcast_state.html
> Best,
> Congxian
>
>
> Congxian Qiu  于2020年8月13日周四 下午2:00写道:
>
> > Hi
> > 现在 checkpoint/savepoint 需要所有算子都处于 RUNNING 状态,不过社区也有一些 issue
> > 希望能够优化这个问题[1][2]
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-2491
> > [2] https://issues.apache.org/jira/browse/FLINK-18263
> > Best,
> > Congxian
> >
> >
> > yulu yang  于2020年8月13日周四 下午1:49写道:
> >
> >> 请教大佬一个我最近在配置Flink流的过程中遇到问题,
> >>
> >>
> flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。
> >>
> >
>


-- 
致
   礼!

   北京理正人信息技术有限公司

综合软件开发部: 杨豫鲁

TEL:13488824529


Re: flink1.11????

2020-08-13 文章 ??????


Re: flink1.11.1 flink on yarn 任务启动报错

2020-08-13 文章 Congxian Qiu
Hi

   这应该是个已知问题[1] 在 1.11.2 和 1.12 中已经修复

[1] https://issues.apache.org/jira/browse/FLINK-18710
Best,
Congxian


郭华威  于2020年8月13日周四 上午11:05写道:

> 你好,请教下:
> flink1.11.1 flink on yarn 任务启动报错:
>
>
> 启动命令:
> /opt/flink-1.11.1/bin/flink  run  -p 4 -ys 2 -m yarn-cluster -c
> yueworld.PVUV.MyPvUv  /mywork/flink/flink_1.11.1-1.0.jar
>
>
> 报错信息:
>
>
> 2020-08-13 10:53:08,160 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler [] -
> Unhandled exception.
> org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed to
> serialize the result for RPC call : requestTaskManagerInfo.
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:368)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:335)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> ~[?:1.8.0_221]
> at
> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
> ~[?:1.8.0_221]
> at
> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
> ~[?:1.8.0_221]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:329)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:298)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> Caused by: java.io.NotSerializableException:
> org.apache.flink.runtime.rest.messages.ResourceProfileInfo
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[?:1.8.0_221]
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> ~[?:1.8.0_221]
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> ~[?:1.8.0_221]
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> ~[?:1.8.0_221]
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> ~[?:1.8.0_221]
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[?:1.8.0_221]
> at java.util.ArrayList.writeObject(ArrayList.java:766) ~[?:1.8.0_221]
> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_221]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221]
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
> ~[?:1.8.0_221]
> at
> 

Re: 请教关于Flink算子FINISHED状态时无法保存Checkpoint的问题

2020-08-13 文章 Congxian Qiu
Hi
   不好意思,上一份邮件没有说完就发送出去了。
   如果你希望把从其他地方读入 字典表,然后在 flink 中使用,或许可以看看 broadcast state[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/broadcast_state.html
Best,
Congxian


Congxian Qiu  于2020年8月13日周四 下午2:00写道:

> Hi
> 现在 checkpoint/savepoint 需要所有算子都处于 RUNNING 状态,不过社区也有一些 issue
> 希望能够优化这个问题[1][2]
>
> [1] https://issues.apache.org/jira/browse/FLINK-2491
> [2] https://issues.apache.org/jira/browse/FLINK-18263
> Best,
> Congxian
>
>
> yulu yang  于2020年8月13日周四 下午1:49写道:
>
>> 请教大佬一个我最近在配置Flink流的过程中遇到问题,
>>
>> flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。
>>
>


Re:Re:Flink SQL No Watermark

2020-08-13 文章 forideal
大家好

   问题的原因定位到了。
   由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
   这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op 
chain 在一起,不能确定到底是那个环节存在问题)
   发现在  WatermarkAssigner(rowtime=[event_time], watermark=[(event_ti...)这个 
op 中部分 task 为 No watermark,由于这个op和source chain在一起,导致这个vertex 
对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 为 min(parent task 
output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
   定位到由于 kafka 部分 partition 无数据导致 No watermark 加上  
table.exec.source.idle-timeout = 10s 参数即可。
   当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 
watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain 观察每个 op 
的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。


Best forideal








在 2020-08-13 12:56:57,"forideal"  写道:
>大家好
>
>
>关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 
> StreamExecWatermarkAssigner
>在translateToPlanInternal 中生成了如下一个 class 代码,
>public final class WatermarkGenerator$2 extends 
>org.apache.flink.table.runtime.generated.WatermarkGenerator { public 
>WatermarkGenerator$2(Object[] references) throws Exception { } @Override 
>public void open(org.apache.flink.configuration.Configuration parameters) 
>throws Exception { } @Override public Long 
>currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws 
>Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean 
>isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp 
>result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { 
>field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = 
>null; if (!isNull$4) { result$5 = 
>org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
> - ((long) 1L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return 
>null; } else { return result$5.getMillisecond(); } } @Override public void 
>close() throws Exception { } } 
> 
>
>
>   其中关键的信息是 result$5 = 
> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
>  - ((long) 1L), field$3.getNanoOfMillisecond());
>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 
>watermark。
>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 
>这样的结果。因为这部分codegen的代码确实无法进一步debug了。
>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
>
>  Best forideal
>
>
>
>
>
>
>
>
>
>
>
>在 2020-08-11 17:13:01,"forideal"  写道:
>>大家好,请教一个问题
>>
>>
>>   我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 
>> watermark。消费大量的数据的时候,就无法生成watermark。
>>   一直是No Watermark。 暂时找不到排查问题的思路。
>>  Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 
>> EventTime mode 模式,Blink Planner。
>>|
>>No Watermark |
>>   SQL如下
>>
>>
>>  DDL:
>>   create table test(
>>   user_id varchar,
>>   action varchar,
>>   event_time TIMESTAMP(3),
>>   WATERMARK FOR event_time AS event_time - INTERVAL '10' 
>> SECOND
>>   ) with();
>>
>>
>>  DML:
>>insert into
>>  console
>>select
>>  user_id,
>>  f_get_str(bind_id) as id_list
>>from
>>  (
>>select
>>  action as bind_id,
>>  user_id,
>>  event_time
>>from
>>  (
>>SELECT
>>  user_id,
>>  action,
>>  PROCTIME() as proc_time,
>>  event_time
>>FROM
>>  test
>>  ) T
>>where
>>  user_id is not null
>>  and user_id <> ''
>>  and CHARACTER_LENGTH(user_id) = 24
>>  ) T
>>group by
>>  SESSION(event_time, INTERVAL '10' SECOND),
>>  user_id
>> 
>>Best forideal


Re: 请教关于Flink算子FINISHED状态时无法保存Checkpoint的问题

2020-08-13 文章 Congxian Qiu
Hi
现在 checkpoint/savepoint 需要所有算子都处于 RUNNING 状态,不过社区也有一些 issue
希望能够优化这个问题[1][2]

[1] https://issues.apache.org/jira/browse/FLINK-2491
[2] https://issues.apache.org/jira/browse/FLINK-18263
Best,
Congxian


yulu yang  于2020年8月13日周四 下午1:49写道:

> 请教大佬一个我最近在配置Flink流的过程中遇到问题,
>
> flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。
>