Re: Re:回复:flink 从mysql读取数据异常

2021-03-30 文章 Robin Zhang
Hi,air23
JDBCTableSource就是batch模式的,不走实时。Flink解析执行计划时内部会去判断。

Best






air23 wrote
> 这边是想离线读取。不是走实时的 
> 看到异常是 Only insert statement is supported now
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2021-03-30 10:31:51,"guoyb" <

> 861277329@

>> 写道:
>>可以读取的,还有内置flink cdc
>>select得用query方法,看看是不是用错了execute。
>>
>>
>>
>>---原始邮件---
>>发件人: "air23"

> wangfei23_job@

> gt;
> 发送时间: 2021年3月30日(周二) 上午10:25
>>收件人: "user-zh"

> user-zh@.apache

> gt;;
> 主题: flink 从mysql读取数据异常
>>
>>
>>你好 参考官网
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html
>>这边读取mysql jdbc数据报错Exception in thread "main"
org.apache.flink.table.api.TableException: Only insert statement is
supported now.
>>
>>
>>String a = "-- register a MySQL table 'users' in Flink SQL\n" +
>>"CREATE TABLE MyUserTable (\n" +
>>" id BIGINT\n" +
>>") WITH (\n" +
>>" 'connector' = 'jdbc',\n" +
>>" 'url' = 'jdbc:mysql://***:3306/monitor',\n" +
>>" 'table-name' = 't1',\n" +
>>" 'username' = 'root',\n" +
>>" 'password' = '***'\n" +
>>") ";
>>
>>String b ="-- scan data from the JDBC table\n" +
>>"SELECT id FROM MyUserTable\n";
>>
>>tEnv.executeSql(a);
>>
>>
>>
>>请问是不可以从mysql读取数据吗?





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql count distonct 优化

2021-03-30 文章 Robin Zhang
Hi,guomuhua
   `The number of inputs accumulated by local aggregation every time is
based on mini-batch interval. It means local-global aggregation depends on
mini-batch optimization is enabled `
,关于本地聚合,官网有这么一段话,也就是说,需要先开启批次聚合,然后才能使用本地聚合,加起来有三个参数.
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
 看你提问时只是开启了本地聚合一个参数,不知道是不是没写全。
Best,
Robin




guomuhua wrote
> 在SQL中,如果开启了 local-global 参数:set
> table.optimizer.agg-phase-strategy=TWO_PHASE;
> 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
>  set
> table.optimizer.distinct-agg.split.bucket-num=1024;
> 还需要对应的将SQL改写为两段式吗?
> 例如:
> 原SQL:
> SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> 
> 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> SELECT day, SUM(cnt) total
> FROM (
> SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> FROM T GROUP BY day, MOD(buy_id, 1024))
> GROUP BY day
> 
> 还是flink会帮我自动改写SQL,我不用关心?
> 
> 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png;
>  
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql count distonct 优化

2021-03-30 文章 Robin Zhang
Hi,Jark
   我理解疑问中的sql是一个普通的agg操作,只不过分组的键是时间字段,不知道您说的 `我看你的作业里面是window agg`
,这个怎么理解

Best,
Robin



Jark wrote
>> 如果不是window agg,开启参数后flink会自动打散是吧
> 是的
> 
>> 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗?
> 文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。
> 
> Best,
> Jark
> 
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> 
> On Fri, 26 Mar 2021 at 11:00, guomuhua <

> 663021157@

>> wrote:
> 
>> Jark wrote
>> > 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
>> > agg支持这个参数了。可以期待下。
>> >
>> > Best,
>> > Jark
>> >
>> > On Wed, 24 Mar 2021 at 19:29, Robin Zhang 
>>
>> > vincent2015qdlg@
>>
>> > 
>> > wrote:
>> >
>> >> Hi,guomuhua
>> >>   开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>> >>
>> >> Best,
>> >> Robin
>> >>
>> >>
>> >> guomuhua wrote
>> >> > 在SQL中,如果开启了 local-global 参数:set
>> >> > table.optimizer.agg-phase-strategy=TWO_PHASE;
>> >> > 或者开启了Partial-Final 参数:set
>> >> table.optimizer.distinct-agg.split.enabled=true;
>> >> >  set
>> >> > table.optimizer.distinct-agg.split.bucket-num=1024;
>> >> > 还需要对应的将SQL改写为两段式吗?
>> >> > 例如:
>> >> > 原SQL:
>> >> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
>> >> >
>> >> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
>> >> > SELECT day, SUM(cnt) total
>> >> > FROM (
>> >> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
>> >> > FROM T GROUP BY day, MOD(buy_id, 1024))
>> >> > GROUP BY day
>> >> >
>> >> > 还是flink会帮我自动改写SQL,我不用关心?
>> >> >
>> >> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
>> >> > 
>> >>
>> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png
>> ;
>> >>
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>>
>> 感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg,
>> 不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql count distonct 优化

2021-03-24 文章 Robin Zhang
Hi,guomuhua
  开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。

Best,
Robin


guomuhua wrote
> 在SQL中,如果开启了 local-global 参数:set
> table.optimizer.agg-phase-strategy=TWO_PHASE;
> 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
>  set
> table.optimizer.distinct-agg.split.bucket-num=1024;
> 还需要对应的将SQL改写为两段式吗?
> 例如:
> 原SQL:
> SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> 
> 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> SELECT day, SUM(cnt) total
> FROM (
> SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> FROM T GROUP BY day, MOD(buy_id, 1024))
> GROUP BY day
> 
> 还是flink会帮我自动改写SQL,我不用关心?
> 
> 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png;
>  
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 消费kafka ,写ORC文件

2021-03-23 文章 Robin Zhang
Hi,Jacob
  
官网有这么一段:`我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner
`
链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/file_sink.html#%E6%A1%B6%E5%88%86%E9%85%8D

  
希望对你有帮助。

Best,
Robin


Jacob wrote
> 【现状如下】
> 
> Flink Job消费kafka消息,每半个小时将消费到的消息进行一系列聚合操作(flink 窗口聚合),然后写入一个orc文件。
> 据了解,flink写orc的桶分配策略[1],有两种:
> 
> 一种是基于时间,即按时间为目录创建orc文件。[test/realtime/ : 为根目录]
> 
> test/realtime/
> └── 2021-03-23--07
> ├── part-0-0.orc
> ├── part-0-1.orc
> └── 2021-03-23--08
> ├── part-0-0.orc
> ├── part-0-1.orc
> 
> 一种是将所有部分文件放在一个目录下:
> 
> test/realtime/
> ├── part-0-0.orc
> ├── part-0-1.orc
> ├── part-0-2.orc
> ├── part-0-3.orc
> 
> 【问题】
> 
> 最终需求是想按照partition将每半个小时的orc文件load到hive,hive表dt为分区字段,值为时间戳,如:
> 
> hive> show partitions table_demo;
> OK
> dt=161645580
> dt=161645760
> dt=161645940
> dt=161646121
> dt=161646301
> Time taken: 0.134 seconds, Fetched: 5 row(s)
> 
> 因此希望每个orc文件的所在目录名都是dt=`时间戳`的格式:
> 
> http://apache-flink.147419.n8.nabble.com/file/t1162/dir.png; 
> 
> 用flink实现这些功能后,发现这两种桶分配策略都不能实现上述需求。
> 
> 不知如何实现?之前一直是自己写代码实现聚合、写orc的操作,目录文件名一切东西完全可控,现在用flink自带的功能实现,发现不太容易实现上述需求了
> 
> [1].https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/file_sink.html#%E6%A1%B6%E5%88%86%E9%85%8D
> 
> 
> 
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink On Yarn Per Job 作业提交失败问题

2021-02-24 文章 Robin Zhang
Hi,凌战
看看hadoop环境变量是否正确设置,可以参考文档
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#preparation

Best,
Robin


凌战 wrote
> hi,社区
> 在接口端设置用户为 hdfs 用户,在调度执行作业后,发现在/user/hdfs/.flink/application-id 目录下 存在相关包,如
> -rw-r--r--   3 hdfs supergroup   9402 2021-02-24 11:02
> /user/hdfs/.flink/application_1610671284452_0257/WordCount.jar
> -rw-r--r--   3 hdfs supergroup   1602 2021-02-24 11:09
> /user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_0257-flink-conf.yaml7449295579763306480.tmp
> -rw-r--r--   3 hdfs supergroup  32629 2021-02-24 11:09 
> /user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_02573784840919107496826.tmp
> -rw-r--r--   3 hdfs supergroup  110075001 2021-02-24 11:09
> /user/hdfs/.flink/application_1610671284452_0257/flink-dist_2.11-1.10.1.jar
> 
> 
> 但是报错 Could not find or load main class
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
> 发现上传文件目录的权限是  -rw-r--r-- ,不知道是不是因为权限问题导致
> 
> 
> 希望有人解惑!
> | |
> 凌战
> |
> |

> m18340872285@

> |
> 签名由网易邮箱大师定制





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11的Streaming File Sink问题

2021-02-23 文章 Robin Zhang
Hi, op
   
flink内部可以实现exactly-once语义,但是写到hdfs是至少一次的语义,如果任务失败重新启动会发生数据重复的问题,所以需要自己增加逻辑处理。

Best,
Robin


op wrote
> 大家好:
>   我想知道flink1.11的Streaming File
> Sink保存流数据到hdfs支持exactly-once语义吗,官网好像没说,谢谢!





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink实时统计 结果波动时大时小

2021-02-17 文章 Robin Zhang
Hi,flink2021
   首先看看业务场景,是否存在订单数据减少的情况,如果没有,就是逻辑或者代码有问题

Best,
Robin


flink2021 wrote
> 我的数据源是kafka
> 统计订单数据结果写入到mysql,发现在数据有积压的过程中,统计结果会忽大忽小?有人遇到过相关的问题没有呢?需要调整那些设置呢?(数据链路又点复杂,state
> 使用rockdb报错,没有设置过期时间)
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FlinkKafka Consumer can't dynamic discover the partition update

2021-02-17 文章 Robin Zhang
Hi,张云云
1. flink.partition-discovery.interval-millis
是kafka的一个配置参数,不知道你是不是通过kafkaProp设置的
2. 通过shell查看topic分区是否顺利增加,并且有数据写入。

Best,
Robin


张云云 wrote
> When start the job, occurs WARN log like below:
> 
> WARN  org.apache.kafka.clients.consumer.ConsumerConfig  - The
> configuration 'flink.partition-discovery.interval-millis' was supplied
> but isn't a known config.
> 
> 
> 
> 
> And I try to change the kafka partion with command, partition number from
> 3
> to 4
> 
> ./kafka-topics.sh --alter --zookeeper 10.0.10.21:15311 --topic
> STRUCTED_LOG --partitions 4
> 
> it dosen't work.
> 
> 
> 
> How can I do with this problem. Thanks a lot





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 提交作业时的缓存可以删除吗

2021-02-02 文章 Robin Zhang
Hi,tison
 感谢提供思路。当前版本flink1.10,测试发现在yarn web ui点击左上角kill,无法触发删除。通过flink web
ui中的cancel按钮以及 官方建议的停止job 的方式(echo "stop" | ./bin/yarn-session.sh -id
application_Id)是可以实现停止任务即可清除文件。 
之前没有清除的文件是因为在yarn web ui直接点击kill。

调用栈: 
org.apache.flink.yarn.Utils.deleteApplicationFiles:214
org.apache.flink.yarn.YarnClusterDescriptor.killCluster:403
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run:569

Best,
Robin.


tison wrote
> org/apache/flink/yarn/YarnResourceManagerDriver.java:236
> org/apache/flink/yarn/YarnClusterDescriptor.java:495
> 
> 应该是会在作业退出或者强杀的时候清理的,你可以看一下对应版本有无这个逻辑
> 
> 可以加一下日志看看实际是否触发,删除的是什么目录
> 
> Best,
> tison.
> 
> 
> Robin Zhang 

> vincent2015qdlg@

>  于2021年2月2日周二 下午2:37写道:
> 
>> Flink 1.12下会将flink的依赖以及作业的jar包缓存在hdfs上,如下图:
>>
>> <
>> http://apache-flink.147419.n8.nabble.com/file/t447/flink_%E6%8F%90%E4%BA%A4%E6%97%B6%E7%BC%93%E5%AD%98.png>
>>
>>
>>
>> 由于flink很早就开始使用了,这种目录越来越多,就算任务不在运行也不会自动清除。经过简单测试,直接删除后,不影响任务的运行以及简单的状态恢复。目前不知道会不会存在其他依赖,希望有清楚的能解释下这个的原理、作用以及能否删除。
>> 删除的目的是为了节省hdfs空间,做自身优化;另一方面是想弄清楚这个的原理
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink 提交作业时的缓存可以删除吗

2021-02-01 文章 Robin Zhang
Flink 1.12下会将flink的依赖以及作业的jar包缓存在hdfs上,如下图:


 

由于flink很早就开始使用了,这种目录越来越多,就算任务不在运行也不会自动清除。经过简单测试,直接删除后,不影响任务的运行以及简单的状态恢复。目前不知道会不会存在其他依赖,希望有清楚的能解释下这个的原理、作用以及能否删除。
删除的目的是为了节省hdfs空间,做自身优化;另一方面是想弄清楚这个的原理



--
Sent from: http://apache-flink.147419.n8.nabble.com/

FlinkSQL 1.10.0 where条件包含关键字列名的过滤条件不能使用=判断

2021-01-04 文章 Robin Zhang


测试代码如下:

create view sink_test as
select
id
,type
,student_id
,kefu_id
,action_time
,action_user
,distribute_status
,unbind_type
,`comment`
,time_created
,pull_from
from distribute_new_log
where `comment` ='娃娃鱼';

print table sink_test;

当使用关键字列做过滤条件时,能过滤出符合的数据,但是关键字列comment的值输出为unicode码(\u5B9A\u5411\u5206\u914DTMK\u5C0F\u7EC4),并不是中文。正常来说SQL中字符串判断用=是支持的吧。测试下来发现使用`comment`
like '娃娃鱼',输出就没问题,写如结果表数据显示正常,不知道这是不是10版本的sql限制?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: FlinkSQL 1.10 事件时间声明不能包含系统保留字

2020-12-29 文章 Robin Zhang
Hi,zilong 
确实是bug,跟我的使用方式一样。感谢!





zilong xiao wrote
> 没记错这是一个bug,计算列中含有关键字会异常,可以看下这个issue:
> https://issues.apache.org/jira/browse/FLINK-16068
> 
> Robin Zhang 

> vincent2015qdlg@

>  于2020年12月29日周二 下午6:56写道:
> 
>> -- 定义时间非系统保留字为事件时间字段,能正常运行
>> create table events (
>> process_time  bigint  comment '事件时间',
>> event   string  comment '事件类型',
>> ts AS TO_TIMESTAMP(FROM_UNIXTIME(process_time/1000, '-MM-dd
>> HH:mm:ss')),
>> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
>> ) with (
>>   ... ...
>> );
>>
>> 但是,定义的字段是系统保留字时,就会报错:
>> create table events (
>> `time`  bigint  comment '事件时间',
>>  eventstring  comment '事件类型',
>> ts AS TO_TIMESTAMP(FROM_UNIXTIME(`time`/1000, '-MM-dd
>> HH:mm:ss')),
>> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
>> ) with (
>>   ... ...
>> );
>>
>> 但现在问题是:神策埋点的事件时间字段是time,如果单独写一个程序转换字段的话,显得有些鸡肋。
>> 不知道是不是bug,目前还没想到较好的解决方案。
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


FlinkSQL 1.10 事件时间声明不能包含系统保留字

2020-12-29 文章 Robin Zhang
-- 定义时间非系统保留字为事件时间字段,能正常运行
create table events (
process_time  bigint  comment '事件时间',
event   string  comment '事件类型',
ts AS TO_TIMESTAMP(FROM_UNIXTIME(process_time/1000, '-MM-dd
HH:mm:ss')),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) with (
  ... ...
);

但是,定义的字段是系统保留字时,就会报错:
create table events (
`time`  bigint  comment '事件时间',
 eventstring  comment '事件类型',
ts AS TO_TIMESTAMP(FROM_UNIXTIME(`time`/1000, '-MM-dd HH:mm:ss')),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) with (
  ... ...
);

但现在问题是:神策埋点的事件时间字段是time,如果单独写一个程序转换字段的话,显得有些鸡肋。
不知道是不是bug,目前还没想到较好的解决方案。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink1.10 history server无法监控 FlinkSQL任务

2020-10-23 文章 Robin Zhang
Hi,yujianbo

只要任务结束,不管是cancel、failed、killed都会在history sever展示,
可以先去hdfs查看配置的目录是否存在任务相关的文件夹;也可以尝试重启一下history
server试试。麻烦问一下,你的任务使用什么api写的,以及版本、提交方式?





yujianbo wrote
> 大佬,我发现我配置完后就只能看到完成的任务在history sever上面,失败的看不到。现在疑惑的是失败的能不能出现在history server
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink1.10 history server无法监控 FlinkSQL任务

2020-10-23 文章 Robin Zhang
Hi,zhisheng

1.默认的刷新时间10s以及5s都测试过,真实体验是反应时间有点长,达到分钟级别,猜测这个参数的设置意义不大;
2.其实页面提供了Runing job
List,理论上是可以展示的,如果不能展示,web用的同一套前端代码的话,觉得有点鸡肋。使用起来,目前只能查看job的一些统计信息,如
ck相关,dag相关。

使用Filnk on yarn per job提交方式, 已经启动了yarn
JobHistoryServer,应该是不会产生影响的,除了sql其他api的任务监控正常。
对于大佬提出的问题:1.由于目前是测试阶段,没有上生产,依照yarn-session的running job
list展示模式,官方没有对页面进行分页操作,需要自己改源码。
   问题2:1.10版本对日志的展示不是很友好,1.11可以滚动文件展示,至于jm 和 tm
日志怎么获取,受限于官网文档资料的限制,现在还没有解决,我这里现在还是依赖yarn的job history
server以及聚合日志功能进行bug分析。如有进展会在此继续讨论,欢迎分享新成果。

Best,
Robin



zhisheng wrote
> Hi Robin:
> 
> 1、是不是更改了刷新时间?一直不显示吗?
> 
> 2、running 的作业不会显示的,你可以之间在 yarn 查看,history server 应该是只提供展示挂掉的作业
> 
> PS:另外提几个 history server 的问题
> 
> 1、挂掉的作业展示能否支持分页呢?目前直接在一个页面全部展示了历史所有的作业,打开会很卡
> 
> 2、有办法可以查看挂掉作业的 jm 和 tm 日志吗?因为 HDFS
> 其实是有日志,按道理是可以拿到日志信息然后解析展示出来的,Spark history server 也是可以查看挂掉作业的日志
> 
> 
> Best!
> zhisheng
> 
> Robin Zhang 

> vincent2015qdlg@

>  于2020年10月22日周四 下午6:11写道:
> 
>>
>> 如下图,Flink 1.10 on yarn per job提交方式,如果是java datastream 以及table
>> api开发的应用,能够被jm正常拉取统计信息,但是sql化的job没有办法被历史服务器监控。
>> 使用的sql不完全是官网的,但是是经过转化为datastream,以on yarn per
>> job方式提交到yarn运行的,只是多了个sql解析动作。不能理解
>>
>> ,为什么历史服务器没有加载job信息到hdfs上的目标目录。查看jobmanager日志以及configuration都能确定jm加载到了历史服务器的相关配置。
>>
>> <
>> http://apache-flink.147419.n8.nabble.com/file/t447/%E5%8E%86%E5%8F%B2%E6%9C%8D%E5%8A%A1%E5%99%A8.png>
>>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink1.10 history server无法监控 FlinkSQL任务

2020-10-22 文章 Robin Zhang

如下图,Flink 1.10 on yarn per job提交方式,如果是java datastream 以及table
api开发的应用,能够被jm正常拉取统计信息,但是sql化的job没有办法被历史服务器监控。
使用的sql不完全是官网的,但是是经过转化为datastream,以on yarn per
job方式提交到yarn运行的,只是多了个sql解析动作。不能理解
,为什么历史服务器没有加载job信息到hdfs上的目标目录。查看jobmanager日志以及configuration都能确定jm加载到了历史服务器的相关配置。


 





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flinksql报错Could not find any factory for identifier 'elasticsearch-6' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-10-20 文章 Robin Zhang
Hi,奔跑的小飞袁
目前没试过flink集成es,所以细节方面没办法深究太多,但是,可以给你提供个思路:
  1. 查看pom中es的dependency是否设置了scope,导致依赖没有成功引入;
  2.
如果依赖成功引入了,但是还不行,相反,在lib下放置相同的jar却可以正常执行,基本可以确定就是依赖冲突,具体什么类导致的,这个目前无法确定,期待更好地思路。


Best,
Robin


奔跑的小飞袁 wrote
> 现在我的lib下没有ElasticSearch相关的connector,在pom中引用,这样会产生冲突吗,还有这种现象有可能是在哪块冲突了
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于flink-sql count/sum 数据如何每天重新计算

2020-10-20 文章 Robin Zhang
Hi, 夜思流年梦
我理解按照日期分组就可以解决你的需求,流数据属于哪一天就只算当天的,不影响其他date的数据;
按天分组的数据都计算出来了,再汇总一下就是一个月的

Best,
Robin



夜思流年梦 wrote
> 现有此场景:
> 计算每天员工的业绩(只计算当天的)
> 
> 
> 现在我用flink-sql 的方式,insert into  select current_date, count(1) ,worker from
> XX  where writeTime>=current_date  group by worker;  
> 把数据按天分区的方式先把数据sink到mysql
> 
> 
> 但是发现落地到mysql的数据把前几天的数据都给算进来了,如何只算今天的数据?
> 另外还有一个疑惑,如何既计算当天数据,又包含了本月的所有数据?





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flinksql报错Could not find any factory for identifier 'elasticsearch-6' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-10-20 文章 Robin Zhang
Hi, 奔跑的小飞袁
Flink的class加载原则是child
first,所以,尽量避免在pom中自己引入flink相关的依赖,避免跟Flink集群环境造成冲突,建议将安装包放在lib下,由flink去加载。

Best,
Robin



奔跑的小飞袁 wrote
> hello 
> 我在使用flinksql连接器时当我将flink-sql-connector-elasticsearch6_2.11-1.11.1.jar放在lib下,程序正常执行,但是当我在pom中进行配置时会产生如下报错,同样的问题会产生在hbase、jdbc的connector中,请问下这可能是什么造成的
> org.apache.flink.client.program.ProgramInvocationException: The main
> method
> caused an error: Unable to create a sink for writing table
> 'default_catalog.default_database.cloud_behavior_sink'.
> 
> Table options are:
> 
> 'connector'='elasticsearch-6'
> 'document-type'='cdbp'
> 'hosts'='http://10.2.11.116:9200;http://10.2.11.117:9200;http://10.2.11.118:9200;http://10.2.11.119:9200'
> 'index'='flink_sql_test'
> 'sink.bulk-flush.max-actions'='100'
>   at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>   at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>   at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>   at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>   at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>   at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
>   at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> create
> a sink for writing table
> 'default_catalog.default_database.cloud_behavior_sink'.
> 
> Table options are:
> 
> 'connector'='elasticsearch-6'
> 'document-type'='cdbp'
> 'hosts'='http://10.2.11.116:9200;http://10.2.11.117:9200;http://10.2.11.118:9200;http://10.2.11.119:9200'
> 'index'='flink_sql_test'
> 'sink.bulk-flush.max-actions'='100'
>   at
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>   at
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>   at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>   at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>   at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>   at
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:97)
>   at
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:72)
>   at
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:53)
>   at
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:24)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>   ... 11 more
> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover
> a
> connector using option 

Re: 单任务多条流的逻辑报错

2020-10-20 文章 Robin Zhang
Hi,
   根据报错内容,定位到你的代码在
  at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
  at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
  at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
  at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
InstantiationUtil类的处理有问题,应该是反序列化问题。本地测试没问题,是因为本地不涉及到序列化。

Best,
Robin




freeza1...@outlook.com wrote
> Hi all:
> 
> 请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因?
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> 
> (OperatorChain.java:144)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.InvalidClassException:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class
> invalid for deserialization
> at
> java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
> at
> java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> 
> 大致逻辑如下, 我有2条流:
> 1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka
> 2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka
> 代码如下:
>  StreamExecutionEnvironment streamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> List
> 
>  kafkaSourceConfiguration = this.kafkaConfiguration.getSource0();
> KafkaInfo kafkaSinkConfiguration =
> this.kafkaConfiguration.getSink();
> RecordTransformOperator transformOperator = new
> RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE);
> RecordKeySelector keySelector = new RecordKeySelector();
> RecordComputeOperator computeOperator = new
> RecordComputeOperator();
> Properties sinkProperties = new Properties();
> sinkProperties.setProperty("bootstrap.servers",
> kafkaSinkConfiguration.getBootstrapServer());
> FlinkKafkaProducer011 flinkKafkaProducer
> = new
> FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
> KafkaSerializer(), sinkProperties);
> 
> ListSingleOutputStreamOperatorTuple2String,
> String>> dataStreamList = new ArrayList<>();
> for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) {
> Properties sourceProperties = new Properties();
> sourceProperties.setProperty("bootstrap.servers",
> kafkaInfo.getBootstrapServer());
> sourceProperties.setProperty("group.id",
> kafkaInfo.getGroupId());
> sourceProperties.setProperty("max.poll.records",
> kafkaInfo.getMaxPollRecord());
> sourceProperties.put("max.poll.interval.ms",
> kafkaInfo.getMaxPollIntervalMs());
> String topicName = kafkaInfo.getTopicName();
> FlinkKafkaConsumer011Tuple2String, String>
> flinkKafkaConsumer
> = new FlinkKafkaConsumer011(topicName,
> new 

Re: flink1.10 stop with a savepoint失败

2020-10-19 文章 Robin Zhang
Hi,zilong
的确是这个问题,感谢帮助。
Best,
Robin


zilong xiao wrote
> Hi Robin Zhang
> 你应该是遇到了这个issue报告的问题:https://issues.apache.org/jira/browse/FLINK-16626
> ,可以看下这个issue描述,祝好~
> 
> Robin Zhang 

> vincent2015qdlg@

>  于2020年10月19日周一 下午3:42写道:
> 
>> 普通的source -> map -> filter-> sink 测试应用。
>>
>> 触发savepoint的脚本 :
>> ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID}
>> 具体报错信息:
>>
>> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
>> "81990282a4686ebda3d04041e3620776".
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> at
>> org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
>> at
>>
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.util.concurrent.TimeoutException
>> at
>>
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460)
>> ... 9 more
>>
>>
>>
>> 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop
>> 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10 stop with a savepoint失败

2020-10-19 文章 Robin Zhang
Hi,Congxian
感谢提供思路,看了一下,JM端没有暴露日志,只能查看到ck正常的日志

Best,
Robin



Congxian Qiu wrote
> Hi
> 你可以看下 JM log 中这个 savepoint 失败是什么原因导致的,如果是 savepoint 超时了,就要看哪个 task
> 完成的慢,(savepoint 可能比 checkpoint 要慢)
> Best,
> Congxian
> 
> 
> Robin Zhang 

> vincent2015qdlg@

>  于2020年10月19日周一 下午3:42写道:
> 
>> 普通的source -> map -> filter-> sink 测试应用。
>>
>> 触发savepoint的脚本 :
>> ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID}
>> 具体报错信息:
>>
>> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
>> "81990282a4686ebda3d04041e3620776".
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> at
>> org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
>> at
>>
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.util.concurrent.TimeoutException
>> at
>>
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460)
>> ... 9 more
>>
>>
>>
>> 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop
>> 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.10 stop with a savepoint失败

2020-10-19 文章 Robin Zhang
普通的source -> map -> filter-> sink 测试应用。

触发savepoint的脚本 :
${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID}
具体报错信息:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job
"81990282a4686ebda3d04041e3620776".
at
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.util.concurrent.TimeoutException
at
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460)
... 9 more


查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop
应用遇到权限问题,但是不知道怎么解决,目前卡在这里。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: flink sql count问题

2020-09-29 文章 Robin Zhang
Hi lemon,
不是很理解你的疑问是什么,flink是事件驱动的,所以,来一条数据,就会被处理,走你的逻辑,就会产生一个结果,如果是第一次出现的key,只有一条数据,如果是状态中已经存在的key,会在控制台输出两条数据,一条true的是最终sink的结果。所以,每次输出一条结果有什么问题吗?


Best,
Robin



lemon wrote
> 感谢各位的回答,各位的方法我都试了一下,但是都会在下游输出一条结果,一条不符合条件的语句count会在下游输出0
> 我理解是flink中的count对于每一条数据都会输出一条结果,所以只能把if中的判断条件再放到最后的where中进行过滤
> 类似于 selectcount(if(name like '南京%',1 , null)) where name
> like'南京%' or name like'杭州%' group by ** 这样
> 
> 
> --原始邮件--
> 发件人:  
>  
> "user-zh" 
>   
> <

> vincent2015qdlg@

> ;
> 发送时间:2020年9月29日(星期二) 下午5:32
> 收件人:"user-zh"<

> user-zh@.apache

> ;
> 
> 主题:Re: flink sql count问题
> 
> 
> 
> Hi lemon,
>  内部判断if函数可以替换为case when
> 
> Best,
> Robin
> 
> 
> lemon wrote
>  请教各位:
>  我有一个sql任务需要进行count,在count中有一个表达式,只想count符合条件的记录,
>  之前在hive中是这么写的:count(if(name like '南京%',1 , null)),但是flink
>  sql中count不能为null,有什么别的方法能实现该功能吗?
>  使用的是flink1.10.1 blink
>  nbsp;
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql count问题

2020-09-29 文章 Robin Zhang
Hi lemon,
内部判断if函数可以替换为case when

Best,
Robin


lemon wrote
> 请教各位:
> 我有一个sql任务需要进行count,在count中有一个表达式,只想count符合条件的记录,
> 之前在hive中是这么写的:count(if(name like '南京%',1 , null)),但是flink
> sql中count不能为null,有什么别的方法能实现该功能吗?
> 使用的是flink1.10.1 blink
> 





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 如何在流式数据源上使用分析函数LAG和EAD函数

2020-09-29 文章 Robin Zhang
Hi Benchao,
 
 感谢回复,解决了我最近的疑惑。

Best,
Robin


Benchao Li-2 wrote
> Hi Robin,
> 
> 目前LAG/LEAD函数在流式场景下的实现的确是有bug的,那个实现只能在批式场景下work,
> 是线上其实没有考虑流式的场景。所以你看到的结果应该是它只能返回当前数据。
> 这个问题我也是最近才发现的,刚刚建了一个issue[1] 来跟踪这个问题。
> 当前如果你想实现类似功能,可以先自己写一个udaf来做。
> 
> [1] https://issues.apache.org/jira/browse/FLINK-19449
> 
> Robin Zhang 

> vincent2015qdlg@

>  于2020年9月29日周二 下午2:04写道:
> 
>> 环境: flink 1.10,使用flinkSQL
>>
>> kafka输入数据如:
>> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}
>> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}
>> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}
>> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}
>> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}
>> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}
>>
>> sql如下:
>>
>> INSERT INTO topic_sink
>> SELECT
>>   t,
>>   id,
>>   speed,
>>   LAG(speed, 1) OVER w AS speed_1,
>>   LAG(speed, 2) OVER w AS speed_2
>> FROM topic_source
>> WINDOW w AS (
>>   PARTITION BY id
>>   ORDER BY t
>> )
>> 我期望得到的结果数据是
>> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null,
>> "speed_2":null}
>> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0,
>> "speed_2":null}
>> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0,
>> "speed_2":1.0}
>> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0,
>> "speed_2":2.0}
>> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0,
>> "speed_2":3.0}
>> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0,
>> "speed_2":4.0}
>>
>> 实际得到的结果数据是:
>> {"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0,
>> "speed_2":1.0}
>> {"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0,
>> "speed_2":2.0}
>> {"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0,
>> "speed_2":3.0}
>> {"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0,
>> "speed_2":4.0}
>> {"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0,
>> "speed_2":5.0}
>> {"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0,
>> "speed_2":6.0}
>>
>> 想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
> 
> 
> -- 
> 
> Best,
> Benchao Li





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink配置hdfs状态后端遇到的问题

2020-09-29 文章 Robin Zhang
Hi jester_jim,

配置文件中指定的checkpoints(以后简称ckp)的目录只是一个父目录,flink在首次触发每个job的ckp时会在这个父目录下新建多级文件夹,命名为指定的job名字/job
id.所以,并不是新建父目录就可以,依然会存在权限问题
。

  祝好,Robin Zhang




Flink中文社区的各位大佬你们好:
本人是小白,由于对Flink不太了解,想学,然后搭了个Flink standalone(1.11.2
jdk1.8)集群,集群本身运行没有什么问题,作业运行也没什么问题。但是最近有用到状态后端,在配置hdfs的时候遇到了一个无法理解的问题,我在issues也没找到解决方法。问题大概是,我在Flink配置文件配置了HDFS状态后端,但是呢Hadoop(CDH2.5.0)是生产系统的集群,有kerberos认证,目录权限不全开放。Flink的配置信息如下:
state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default
bundled
# state backends.
#
state.checkpoints.dir:
hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints
# Default target directory for savepoints, optional.
#
state.savepoints.dir:
hdfs://namenode58:9000/NS3/user/jester/flink/flink-checkpoints
除此之外还有kerberos认证配置,但是都没有什么问题,如下所示,当注释掉上面三行配置过后,作业正常运行:
2020-09-29 11:27:20,430 INFO 
org.apache.hadoop.security.UserGroupInformation  [] - Login
successful for user jester/principle using keytab file
/kafka/flink/flink-1.11.2/conf/jester.keytab
Job has been submitted with JobID 41c01241338f1c7112d48f277701d9c3


但是如果不注释,当我提交作业就会抛出:
(本部分放在异常信息前面:
异常里面说Flink要去/目录创建文件但是没有write权限,但是这个权限肯定是不会给的。但是我明明在Flink配置中我已经指定了目录,我不能理解为什么还会在/下创建什么?除此之外,Hadoop的配置信息我是直接把Hadoop集群的目录拷贝过来了,在启动脚本中指定了HADDOP_CONF_DIR,(本人觉得其实这样好像是不太妥当的,因为Hadoop的hdfs-site.xml和core-site.xml里面有很多只能在Hadoop集群的机器上使用的配置,不知道是不是应该删除无用的配置,还有增加必要的配置,希望能有参考文档)还有Hadoop的jar包我直接`$HADDOP_HOME/bin/hadoop
classpath`
另外还有个问题,就是Hadoop配置了HA,我怎么给checkpoint配置多个namenode?
)
2020-09-29 11:21:20,446 INFO 
org.apache.hadoop.security.UserGroupInformation  [] - Login
successful for user  jester/principle  using keytab file
/kafka/flink/flink-1.11.2/conf/unicom_jiangt37.keytab




--
Sent from: http://apache-flink.147419.n8.nabble.com/

如何在流式数据源上使用分析函数LAG和EAD函数

2020-09-29 文章 Robin Zhang
环境: flink 1.10,使用flinkSQL

kafka输入数据如:
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0}

sql如下:

INSERT INTO topic_sink
SELECT
  t,
  id,
  speed,
  LAG(speed, 1) OVER w AS speed_1,
  LAG(speed, 2) OVER w AS speed_2
FROM topic_source
WINDOW w AS (
  PARTITION BY id
  ORDER BY t
)
我期望得到的结果数据是
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":null,
"speed_2":null}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":1.0,
"speed_2":null}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":2.0,
"speed_2":1.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":3.0,
"speed_2":2.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":4.0,
"speed_2":3.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":5.0,
"speed_2":4.0}

实际得到的结果数据是:
{"t":"2020-04-01T05:00:00Z", "id":"1", "speed":1.0, "speed_1":1.0,
"speed_2":1.0}
{"t":"2020-04-01T05:05:00Z", "id":"1", "speed":2.0,"speed_1":2.0,
"speed_2":2.0}
{"t":"2020-04-01T05:10:00Z", "id":"1", "speed":3.0,"speed_1":3.0,
"speed_2":3.0}
{"t":"2020-04-01T05:15:00Z", "id":"1", "speed":4.0,"speed_1":4.0,
"speed_2":4.0}
{"t":"2020-04-01T05:20:00Z", "id":"1", "speed":5.0,"speed_1":5.0,
"speed_2":5.0}
{"t":"2020-04-01T05:25:00Z", "id":"1", "speed":6.0",speed_1":6.0,
"speed_2":6.0}

想问一下flink sql里的LAG函数能完成我期望的计算吗?如果可以sql该如何写?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-09-25 文章 Robin Zhang
   Hi,Tang老师,   抱歉,之前理解有误,感谢唐老师指正。祝好,Robin
Zhang
Yun Tang wrote
> Hi Robin其实你的说法不是很准确,社区是明文保证savepoint的兼容性
> [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
> schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.另外 @Peihui
> 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root cause,还请在日志中找一下无法恢复的root
> cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。[1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table[2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html祝好唐云________From:
> Robin Zhang 

> vincent2015qdlg@

> Sent: Wednesday, July 15, 2020 16:23To: 

> user-zh@.apache

>  

> user-zh@.apache

> Subject: Re: flink 1.9.2 升级 1.10.0
> 任务失败不能从checkpoint恢复据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑BestRobin
> ZhangFrom: Peihui He <[hidden email]>Sent:
> Tuesday, July 14, 2020 10:42To: [hidden email] <[hidden email]>Subject:
> flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复hello,当升级到1.10.0
> 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示Caused by:
> java.nio.file.NoSuchFileException:/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst->/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst配置和1.9.2
> 一样:state.backend: rocksdbstate.checkpoints.dir:
> hdfs:///flink/checkpoints/wc/state.savepoints.dir:
> hdfs:///flink/savepoints/wc/state.backend.incremental:
> true代码上都有env.enableCheckpointing(1);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,org.apache.flink.api.common.time.Time.of(10,
> TimeUnit.SECONDS)));  是1.10.0 需要做什么特别配置么?--Sent from:
> http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-15 文章 Robin Zhang
据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑

Best
Robin Zhang

From: Peihui He <[hidden email]>
Sent: Tuesday, July 14, 2020 10:42
To: [hidden email] <[hidden email]>
Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

hello,

当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示


Caused by: java.nio.file.NoSuchFileException:
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
->
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst

配置和1.9.2 一样:
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
state.savepoints.dir: hdfs:///flink/savepoints/wc/
state.backend.incremental: true

代码上都有

env.enableCheckpointing(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));


  是1.10.0 需要做什么特别配置么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-14 文章 Robin Zhang
 
我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by  和多次 流表的关联 。
代码如下:
   tEnv.getConfig()
 .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime), 

Time.hours(maxIdleStateRetentionTime));

程序运行一周之后状态现在2.2G. 最近几天越来越大,表现在ttl没有成功,请教一下各位大佬



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-14 文章 Robin Zhang
没有窗口,就简单的表join,有kafka流表 ,kudu维表,使用了group by

> Jul 14, 2020; 12:36pm — by zhisheng zhisheng
> 有没有窗口啊?

Robin Zhang <[hidden email]> 于2020年7月14日周二 上午11:48写道:

> <http://apache-flink.147419.n8.nabble.com/file/t447/ttl.png>
> 我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by  和多次 流表的关联 。
> 代码如下:
>tEnv.getConfig()
>  .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime),
>
> Time.hours(maxIdleStateRetentionTime));
>
> 程序运行一周之后状态现在2.2G. 最近几天越来越大,表现在ttl没有成功,请教一下各位大佬
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-13 文章 Robin Zhang
没有使用窗口呢,就多表关联,涉及到流表join流表,流表join维表,group by 、topN等



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-13 文章 Robin Zhang
 
我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by  和多次 流表的关联 。
代码如下:
   tEnv.getConfig()
 .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime), 

Time.hours(maxIdleStateRetentionTime));

程序运行一周之后状态现在2.2G. 最近几天越来越大,表现在ttl没有成功,请教一下各位大佬



--
Sent from: http://apache-flink.147419.n8.nabble.com/